Refactor App struct

This commit is contained in:
Marc Plano-Lesay 2025-05-21 15:32:43 +10:00
parent f9692b8ddf
commit ee7d7971be
Signed by: kernald
GPG key ID: 66A41B08CC62A6CF
13 changed files with 1147 additions and 163 deletions

View file

@ -1,4 +1,5 @@
use crate::models::Magnet;
use crate::services::database::DatabaseService;
use async_trait::async_trait;
use color_eyre::eyre::Result;
@ -21,6 +22,6 @@ pub trait Action {
/// Process all unprocessed magnet links and return the list of processed magnets
async fn process_unprocessed_magnets(
&mut self,
db: &mut crate::db::Database,
db_service: &mut DatabaseService,
) -> Result<ProcessedMagnets>;
}

View file

@ -1,7 +1,8 @@
use crate::actions::action::{Action, ProcessedMagnets};
use crate::actions::bitmagnet::client::BitmagnetClient;
use crate::actions::bitmagnet::config::BitmagnetConfig;
use crate::db::{BitmagnetProcessedTable, Database};
use crate::db::BitmagnetProcessedTable;
use crate::services::database::DatabaseService;
use color_eyre::eyre::Result;
use log::{debug, warn};
@ -30,14 +31,17 @@ impl Action for BitmagnetAction {
}
/// Process all unprocessed magnet links and return the list of processed magnets
async fn process_unprocessed_magnets(&mut self, db: &mut Database) -> Result<ProcessedMagnets> {
async fn process_unprocessed_magnets(
&mut self,
db_service: &mut DatabaseService,
) -> Result<ProcessedMagnets> {
let unprocessed_magnets =
db.get_unprocessed_magnets_for_table::<BitmagnetProcessedTable>()?;
db_service.get_unprocessed_magnets::<BitmagnetProcessedTable>()?;
let mut processed_magnets = Vec::new();
let mut failed_magnets = Vec::new();
for magnet in unprocessed_magnets {
let tags = db.get_tags_for_magnet(magnet.id)?;
let tags = db_service.get_tags_for_magnet(magnet.id)?;
let tag_refs: Vec<&str> = tags.iter().map(|s| s.as_str()).collect();
match self
@ -60,7 +64,7 @@ impl Action for BitmagnetAction {
if !tags.is_empty() {
debug!("Tags: {:?}", tags);
}
db.mark_magnet_processed_for_table::<BitmagnetProcessedTable>(magnet.id)?;
db_service.mark_magnet_processed::<BitmagnetProcessedTable>(magnet.id)?;
processed_magnets.push(magnet);
}
Err(e) => {

View file

@ -37,6 +37,7 @@ mod tests {
use crate::actions::action::ProcessedMagnets;
use crate::db::Database;
use crate::models::Magnet;
use crate::services::database::DatabaseService;
use async_trait::async_trait;
use color_eyre::eyre::{eyre, Result};
use std::sync::atomic::{AtomicBool, Ordering};
@ -66,7 +67,7 @@ mod tests {
async fn process_unprocessed_magnets(
&mut self,
_db: &mut Database,
_db_service: &mut DatabaseService,
) -> Result<ProcessedMagnets> {
Ok(ProcessedMagnets {
success: Vec::<Magnet>::new(),

View file

@ -1,7 +1,8 @@
use crate::actions::action::{Action, ProcessedMagnets};
use crate::actions::transmission::client::TransmissionClient;
use crate::actions::transmission::config::TransmissionConfig;
use crate::db::{Database, TransmissionProcessedTable};
use crate::db::TransmissionProcessedTable;
use crate::services::database::DatabaseService;
use color_eyre::eyre::Result;
use log::{debug, warn};
@ -30,14 +31,17 @@ impl Action for TransmissionAction {
}
/// Process all unprocessed magnet links and return the list of processed magnets
async fn process_unprocessed_magnets(&mut self, db: &mut Database) -> Result<ProcessedMagnets> {
async fn process_unprocessed_magnets(
&mut self,
db_service: &mut DatabaseService,
) -> Result<ProcessedMagnets> {
let unprocessed_magnets =
db.get_unprocessed_magnets_for_table::<TransmissionProcessedTable>()?;
db_service.get_unprocessed_magnets::<TransmissionProcessedTable>()?;
let mut processed_magnets = Vec::new();
let mut failed_magnets = Vec::new();
for magnet in unprocessed_magnets {
let tags = db.get_tags_for_magnet(magnet.id)?;
let tags = db_service.get_tags_for_magnet(magnet.id)?;
let tag_refs: Vec<&str> = tags.iter().map(|s| s.as_str()).collect();
match self.client.submit_magnet(&magnet.link, tag_refs).await {
@ -51,7 +55,7 @@ impl Action for TransmissionAction {
if !tags.is_empty() {
debug!("Tags: {:?}", tags);
}
db.mark_magnet_processed_for_table::<TransmissionProcessedTable>(magnet.id)?;
db_service.mark_magnet_processed::<TransmissionProcessedTable>(magnet.id)?;
processed_magnets.push(magnet);
}
Err(e) => {

View file

@ -1,19 +1,18 @@
use crate::actions::action::{Action, ProcessedMagnets};
use crate::actions::bitmagnet::BitmagnetAction;
use crate::actions::factory::init_action;
use crate::actions::transmission::TransmissionAction;
use crate::actions::action::ProcessedMagnets;
use crate::config::Config;
use crate::db::Database;
use crate::notifications::factory::init_notification;
use crate::notifications::notification::Notification;
use crate::notifications::ntfy::NtfyNotification;
use crate::reddit_client::{RedditClient, RedditPost};
use crate::report;
use color_eyre::eyre::{Result, WrapErr};
use log::{debug, info, warn};
use crate::reddit_client::RedditPost;
use crate::services::action::ActionService;
use crate::services::database::DatabaseService;
use crate::services::notification::NotificationService;
use crate::services::post_processor::PostProcessorService;
use crate::services::reddit::RedditService;
use crate::services::report::ReportService;
use color_eyre::eyre::Result;
use multimap::MultiMap;
use regex::Regex;
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::rc::Rc;
/// Trait for configurations that can be enabled or disabled
pub trait Enableable {
@ -23,44 +22,37 @@ pub trait Enableable {
/// Application state and behavior
pub struct App {
db: Database,
config: Config,
actions: Vec<Box<dyn Action>>,
notifications: Vec<Box<dyn Notification>>,
reddit_client: RedditClient,
action_service: ActionService,
notification_service: NotificationService,
reddit_service: RedditService,
post_processor: PostProcessorService,
report_service: ReportService,
}
impl App {
/// Create a new App instance
pub fn new(db: Database, config: Config) -> Self {
let db_service = Rc::new(RefCell::new(DatabaseService::new(db)));
Self {
db,
config,
actions: Vec::new(),
notifications: Vec::new(),
reddit_client: RedditClient::new(),
action_service: ActionService::new(Rc::clone(&db_service)),
notification_service: NotificationService::new(),
reddit_service: RedditService::new(),
post_processor: PostProcessorService::new(Rc::clone(&db_service)),
report_service: ReportService::new(),
}
}
/// Initialize actions based on configuration
pub fn init_actions(&mut self) -> Result<()> {
if let Some(action) = init_action(&self.config.bitmagnet, BitmagnetAction::new)? {
self.actions.push(action);
}
if let Some(action) = init_action(&self.config.transmission, TransmissionAction::new)? {
self.actions.push(action);
}
Ok(())
self.action_service.init_actions(&self.config)
}
/// Initialize notifications based on configuration
pub fn init_notifications(&mut self) -> Result<()> {
if let Some(notification) = init_notification(&self.config.ntfy, NtfyNotification::new)? {
self.notifications.push(notification);
}
Ok(())
self.notification_service.init_notifications(&self.config)
}
/// Fetch posts from Reddit
@ -70,15 +62,10 @@ impl App {
unique_usernames.insert(source_config.username.clone());
}
let mut user_posts = MultiMap::new();
for username in unique_usernames {
info!("Fetching posts from user [{}]", username);
let submissions = self
.reddit_client
.fetch_user_submissions(&username, post_count)
.await?;
user_posts.insert_many(username, submissions);
}
let user_posts = self
.reddit_service
.fetch_posts_from_users(unique_usernames, post_count)
.await?;
Ok(user_posts)
}
@ -88,86 +75,13 @@ impl App {
&mut self,
user_posts: &MultiMap<String, RedditPost>,
) -> Result<usize> {
let mut total_new_links = 0;
for (source_name, source_config) in &self.config.sources {
info!("Processing source [{}]", source_name);
let username = source_config.username.clone();
let title_filter = match &source_config.title_filter {
Some(filter) => Some(
Regex::new(filter.as_str())
.context(format!("Invalid regex pattern: {}", filter))?,
),
None => None,
};
if let Some(submissions) = user_posts.get_vec(&username) {
for post in submissions
.iter()
.filter(|s| filter_post(&s.title, title_filter.clone()))
{
let title = &post.title;
let body = &post.body;
let subreddit = &post.subreddit;
let magnet_links = crate::magnet::extract_magnet_links(body);
if !magnet_links.is_empty() {
let post_info = crate::PostInfo {
title: title.to_string(),
submitter: username.clone(),
subreddit: subreddit.to_string(),
magnet_links,
timestamp: post.created,
imdb_id: source_config.imdb_id.clone(),
tags: source_config.tags.clone(),
};
// Store the post info in the database
match self.db.store_magnets(&post_info) {
Ok(count) => {
total_new_links += count;
}
Err(e) => {
warn!("Failed to store post info in database: {}", e);
}
}
}
}
}
}
Ok(total_new_links)
self.post_processor
.process_sources(user_posts, &self.config.sources)
}
/// Process magnet links with actions
pub async fn process_actions(&mut self) -> Result<HashMap<String, ProcessedMagnets>> {
let mut action_results = HashMap::new();
for action in &mut self.actions {
debug!("Processing magnet links with {}", action.get_name());
match action.process_unprocessed_magnets(&mut self.db).await {
Ok(processed_magnets) => {
let count = processed_magnets.success.len();
debug!(
"Successfully processed {} magnet links with {}",
count,
action.get_name()
);
action_results.insert(action.get_name().to_string(), processed_magnets);
}
Err(e) => {
warn!(
"Failed to process magnet links with {}: {}",
action.get_name(),
e
);
}
}
}
Ok(action_results)
self.action_service.process_actions().await
}
/// Send notifications
@ -176,28 +90,9 @@ impl App {
action_results: &HashMap<String, ProcessedMagnets>,
total_new_links: usize,
) -> Result<()> {
for notification in &self.notifications {
match notification
.send_notification(action_results, total_new_links)
.await
{
Ok(_) => {
debug!(
"Successfully sent notification to {}",
notification.get_name()
);
}
Err(e) => {
warn!(
"Failed to send notification to {}: {}",
notification.get_name(),
e
);
}
}
}
Ok(())
self.notification_service
.send_notifications(action_results, total_new_links)
.await
}
/// Generate and display report
@ -206,9 +101,8 @@ impl App {
action_results: &HashMap<String, ProcessedMagnets>,
total_new_links: usize,
) {
for line in report::generate_report(action_results, total_new_links, true).lines() {
info!("{}", line);
}
self.report_service
.generate_report(action_results, total_new_links);
}
pub async fn run(&mut self, post_count: u32) -> Result<()> {
@ -234,11 +128,3 @@ impl App {
Ok(())
}
}
/// Filters posts based on a title filter pattern
fn filter_post(title: &str, title_filter: Option<Regex>) -> bool {
match title_filter {
Some(pattern) => pattern.is_match(title),
None => true,
}
}

View file

@ -18,6 +18,7 @@ mod notifications;
mod reddit_client;
mod report;
mod schema;
mod services;
/// Post information with magnet links
#[derive(Debug)]

312
src/services/action.rs Normal file
View file

@ -0,0 +1,312 @@
use crate::actions::action::{Action, ProcessedMagnets};
use crate::actions::bitmagnet::BitmagnetAction;
use crate::actions::factory::init_action;
use crate::actions::transmission::TransmissionAction;
use crate::config::Config;
use crate::services::database::DatabaseService;
use color_eyre::eyre::Result;
use log::{debug, warn};
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
/// Service for managing actions
pub struct ActionService {
actions: Vec<Box<dyn Action>>,
db_service: Rc<RefCell<DatabaseService>>,
}
impl ActionService {
/// Create a new ActionService
pub fn new(db_service: Rc<RefCell<DatabaseService>>) -> Self {
Self {
actions: Vec::new(),
db_service,
}
}
/// Initialize actions based on configuration
pub fn init_actions(&mut self, config: &Config) -> Result<()> {
if let Some(action) = init_action(&config.bitmagnet, BitmagnetAction::new)? {
self.actions.push(action);
}
if let Some(action) = init_action(&config.transmission, TransmissionAction::new)? {
self.actions.push(action);
}
Ok(())
}
/// Process magnet links with actions
#[allow(clippy::await_holding_refcell_ref)] // Not sure how to address this
pub async fn process_actions(&mut self) -> Result<HashMap<String, ProcessedMagnets>> {
let mut action_results = HashMap::new();
for action in &mut self.actions {
debug!("Processing magnet links with {}", action.get_name());
match action
.process_unprocessed_magnets(&mut self.db_service.borrow_mut())
.await
{
Ok(processed_magnets) => {
let count = processed_magnets.success.len();
debug!(
"Successfully processed {} magnet links with {}",
count,
action.get_name()
);
action_results.insert(action.get_name().to_string(), processed_magnets);
}
Err(e) => {
warn!(
"Failed to process magnet links with {}: {}",
action.get_name(),
e
);
}
}
}
Ok(action_results)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::actions::action::ProcessedMagnets;
use crate::db::Database;
use crate::models::Magnet;
use async_trait::async_trait;
use chrono::NaiveDateTime;
use color_eyre::eyre::{eyre, Result};
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use tempfile::tempdir;
struct MockAction {
name: &'static str,
process_called: AtomicBool,
should_fail: bool,
success_count: usize,
failed_count: usize,
}
impl MockAction {
fn new(name: &'static str, should_fail: bool) -> Self {
Self {
name,
process_called: AtomicBool::new(false),
should_fail,
success_count: 0,
failed_count: 0,
}
}
fn with_success_count(mut self, count: usize) -> Self {
self.success_count = count;
self
}
fn with_failed_count(mut self, count: usize) -> Self {
self.failed_count = count;
self
}
}
#[async_trait]
impl Action for MockAction {
fn name() -> &'static str
where
Self: Sized,
{
"MockAction"
}
fn get_name(&self) -> &'static str {
self.name
}
async fn process_unprocessed_magnets(
&mut self,
_db_service: &mut DatabaseService,
) -> Result<ProcessedMagnets> {
self.process_called.store(true, Ordering::SeqCst);
if self.should_fail {
return Err(eyre!("Mock action failed"));
}
let mut success = Vec::new();
for i in 0..self.success_count {
success.push(create_test_magnet(
i as i32,
&format!("Success Magnet {}", i),
));
}
let mut failed = Vec::new();
for i in 0..self.failed_count {
failed.push(create_test_magnet(
i as i32 + 100,
&format!("Failed Magnet {}", i),
));
}
Ok(ProcessedMagnets { success, failed })
}
}
fn create_test_magnet(id: i32, title: &str) -> Magnet {
Magnet {
id,
title: title.to_string(),
submitter: "test_user".to_string(),
subreddit: "test_subreddit".to_string(),
link: format!("magnet:?xt=urn:btih:{}", id),
published_at: NaiveDateTime::default(),
imdb_id: None,
}
}
fn setup_test_db() -> Rc<RefCell<DatabaseService>> {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let db = Database::new(&db_path).unwrap();
let db_service = DatabaseService::new(db);
Rc::new(RefCell::new(db_service))
}
#[test]
fn test_new_action_service() {
let db_service = setup_test_db();
let action_service = ActionService::new(db_service);
assert_eq!(action_service.actions.len(), 0);
}
#[test]
fn test_init_actions_with_empty_config() {
let db_service = setup_test_db();
let mut action_service = ActionService::new(db_service);
// Create an empty config
let config = Config {
bitmagnet: None,
transmission: None,
ntfy: None,
sources: HashMap::new(),
};
let result = action_service.init_actions(&config);
assert!(result.is_ok());
assert_eq!(action_service.actions.len(), 0);
}
#[tokio::test]
async fn test_process_actions_with_no_actions() {
let db_service = setup_test_db();
let mut action_service = ActionService::new(db_service);
let result = action_service.process_actions().await;
assert!(result.is_ok());
let action_results = result.unwrap();
assert_eq!(action_results.len(), 0);
}
#[tokio::test]
async fn test_process_actions_with_successful_action() {
let db_service = setup_test_db();
let mut action_service = ActionService::new(db_service);
// Create a mock action that succeeds
let mock_action = MockAction::new("SuccessAction", false).with_success_count(2);
action_service.actions.push(Box::new(mock_action));
let result = action_service.process_actions().await;
assert!(result.is_ok());
let action_results = result.unwrap();
assert_eq!(action_results.len(), 1);
assert!(action_results.contains_key("SuccessAction"));
let processed_magnets = &action_results["SuccessAction"];
assert_eq!(processed_magnets.success.len(), 2);
assert_eq!(processed_magnets.failed.len(), 0);
}
#[tokio::test]
async fn test_process_actions_with_failing_action() {
let db_service = setup_test_db();
let mut action_service = ActionService::new(db_service);
// Create a mock action that fails
let mock_action = MockAction::new("FailingAction", true);
action_service.actions.push(Box::new(mock_action));
let result = action_service.process_actions().await;
assert!(result.is_ok());
let action_results = result.unwrap();
assert_eq!(action_results.len(), 0); // No results for failing action
}
#[tokio::test]
async fn test_process_actions_with_mixed_results() {
let db_service = setup_test_db();
let mut action_service = ActionService::new(db_service);
// Create a mock action with both successful and failed magnets
let mock_action = MockAction::new("MixedAction", false)
.with_success_count(1)
.with_failed_count(1);
action_service.actions.push(Box::new(mock_action));
let result = action_service.process_actions().await;
assert!(result.is_ok());
let action_results = result.unwrap();
assert_eq!(action_results.len(), 1);
assert!(action_results.contains_key("MixedAction"));
let processed_magnets = &action_results["MixedAction"];
assert_eq!(processed_magnets.success.len(), 1);
assert_eq!(processed_magnets.failed.len(), 1);
}
#[tokio::test]
async fn test_process_actions_with_multiple_actions() {
let db_service = setup_test_db();
let mut action_service = ActionService::new(db_service);
// Create two mock actions
let action1 = MockAction::new("Action1", false).with_success_count(1);
let action2 = MockAction::new("Action2", false).with_success_count(1);
action_service.actions.push(Box::new(action1));
action_service.actions.push(Box::new(action2));
let result = action_service.process_actions().await;
assert!(result.is_ok());
let action_results = result.unwrap();
assert_eq!(action_results.len(), 2);
assert!(action_results.contains_key("Action1"));
assert!(action_results.contains_key("Action2"));
let processed_magnets1 = &action_results["Action1"];
assert_eq!(processed_magnets1.success.len(), 1);
let processed_magnets2 = &action_results["Action2"];
assert_eq!(processed_magnets2.success.len(), 1);
}
}

140
src/services/database.rs Normal file
View file

@ -0,0 +1,140 @@
use crate::db::{Database, ProcessedTable};
use crate::models;
use crate::PostInfo;
use color_eyre::eyre::{Result, WrapErr};
use log::debug;
/// Service for database operations
pub struct DatabaseService {
db: Database,
}
impl DatabaseService {
/// Create a new DatabaseService
pub fn new(db: Database) -> Self {
Self { db }
}
/// Store magnet links from a post
///
/// # Arguments
///
/// * `post_info` - Information about the post containing magnet links
///
/// # Returns
///
/// The number of new magnet links stored
pub fn store_magnets(&mut self, post_info: &PostInfo) -> Result<usize> {
debug!(
"Storing {} magnet links from post: {}",
post_info.magnet_links.len(),
post_info.title
);
self.db
.store_magnets(post_info)
.wrap_err_with(|| format!("Failed to store magnets for post: {}", post_info.title))
}
/// Get all magnet links from the database
///
/// # Returns
///
/// A vector of all magnet links
#[allow(dead_code)] // Used in tests
pub fn get_all_magnets(&mut self) -> Result<Vec<models::Magnet>> {
debug!("Retrieving all magnet links from database");
self.db
.get_all_magnets()
.wrap_err("Failed to retrieve all magnet links from database")
}
/// Get tags for a magnet link
///
/// # Arguments
///
/// * `magnet_id` - The ID of the magnet link
///
/// # Returns
///
/// A vector of tags associated with the magnet link
pub fn get_tags_for_magnet(&mut self, magnet_id: i32) -> Result<Vec<String>> {
debug!("Retrieving tags for magnet ID: {}", magnet_id);
self.db
.get_tags_for_magnet(magnet_id)
.wrap_err_with(|| format!("Failed to retrieve tags for magnet ID: {}", magnet_id))
}
/// Get unprocessed magnet links for a specific action
///
/// # Returns
///
/// A vector of unprocessed magnet links
pub fn get_unprocessed_magnets<T: ProcessedTable>(&mut self) -> Result<Vec<models::Magnet>> {
debug!("Retrieving unprocessed magnet links");
self.db
.get_unprocessed_magnets_for_table::<T>()
.wrap_err("Failed to retrieve unprocessed magnet links")
}
/// Mark a magnet link as processed for a specific action
///
/// # Arguments
///
/// * `magnet_id` - The ID of the magnet link
pub fn mark_magnet_processed<T: ProcessedTable>(&mut self, magnet_id: i32) -> Result<()> {
debug!("Marking magnet ID {} as processed", magnet_id);
self.db
.mark_magnet_processed_for_table::<T>(magnet_id)
.wrap_err_with(|| format!("Failed to mark magnet ID {} as processed", magnet_id))
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use tempfile::tempdir;
fn create_test_post_info() -> PostInfo {
PostInfo {
title: "Test Post".to_string(),
submitter: "test_user".to_string(),
subreddit: "test_subreddit".to_string(),
magnet_links: vec![
"magnet:?xt=urn:btih:test1".to_string(),
"magnet:?xt=urn:btih:test2".to_string(),
],
timestamp: Utc::now(),
imdb_id: Some("tt1234567".to_string()),
tags: vec!["tag1".to_string(), "tag2".to_string()],
}
}
#[test]
fn test_store_and_retrieve_magnets() -> Result<()> {
let temp_dir = tempdir()?;
let db_path = temp_dir.path().join("test.db");
let db = Database::new(&db_path)?;
let mut service = DatabaseService::new(db);
let post_info = create_test_post_info();
let count = service.store_magnets(&post_info)?;
assert_eq!(count, 2, "Should have stored 2 new magnet links");
let magnets = service.get_all_magnets()?;
assert_eq!(magnets.len(), 2, "Should have retrieved 2 magnet links");
let magnet_links: Vec<String> = magnets.iter().map(|m| m.link.clone()).collect();
assert!(magnet_links.contains(&"magnet:?xt=urn:btih:test1".to_string()));
assert!(magnet_links.contains(&"magnet:?xt=urn:btih:test2".to_string()));
Ok(())
}
}

10
src/services/mod.rs Normal file
View file

@ -0,0 +1,10 @@
/// Service layer for the application
///
/// This module contains service implementations for various parts of the application.
/// Services encapsulate business logic and provide a clean API for the rest of the application.
pub mod action;
pub mod database;
pub mod notification;
pub mod post_processor;
pub mod reddit;
pub mod report;

View file

@ -0,0 +1,305 @@
use crate::actions::action::ProcessedMagnets;
use crate::config::Config;
use crate::notifications::factory::init_notification;
use crate::notifications::notification::Notification;
use crate::notifications::ntfy::NtfyNotification;
use color_eyre::eyre::Result;
use log::{debug, warn};
use std::collections::HashMap;
/// Service for managing notifications
pub struct NotificationService {
notifications: Vec<Box<dyn Notification>>,
}
impl NotificationService {
/// Create a new NotificationService
pub fn new() -> Self {
Self {
notifications: Vec::new(),
}
}
/// Initialize notifications based on configuration
pub fn init_notifications(&mut self, config: &Config) -> Result<()> {
if let Some(notification) = init_notification(&config.ntfy, NtfyNotification::new)? {
self.notifications.push(notification);
}
Ok(())
}
/// Send notifications
pub async fn send_notifications(
&self,
action_results: &HashMap<String, ProcessedMagnets>,
total_new_links: usize,
) -> Result<()> {
for notification in &self.notifications {
match notification
.send_notification(action_results, total_new_links)
.await
{
Ok(_) => {
debug!(
"Successfully sent notification to {}",
notification.get_name()
);
}
Err(e) => {
warn!(
"Failed to send notification to {}: {}",
notification.get_name(),
e
);
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::actions::action::ProcessedMagnets;
use crate::models::Magnet;
use async_trait::async_trait;
use chrono::NaiveDateTime;
use color_eyre::eyre::{eyre, Result};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
struct MockNotification {
name: &'static str,
should_notify_result: bool,
send_called: Arc<AtomicBool>,
should_fail: bool,
}
impl MockNotification {
fn new(name: &'static str, should_notify: bool, should_fail: bool) -> Self {
Self {
name,
should_notify_result: should_notify,
send_called: Arc::new(AtomicBool::new(false)),
should_fail,
}
}
}
#[async_trait]
impl Notification for MockNotification {
fn name() -> &'static str
where
Self: Sized,
{
"MockNotification"
}
fn get_name(&self) -> &'static str {
self.name
}
async fn send_notification(
&self,
_action_results: &HashMap<String, ProcessedMagnets>,
_total_new_links: usize,
) -> Result<()> {
self.send_called.store(true, Ordering::SeqCst);
if self.should_fail {
return Err(eyre!("Mock notification failed"));
}
Ok(())
}
fn should_notify(
&self,
_action_results: &HashMap<String, ProcessedMagnets>,
_total_new_links: usize,
) -> bool {
self.should_notify_result
}
}
fn create_test_magnet(id: i32, title: &str) -> Magnet {
Magnet {
id,
title: title.to_string(),
submitter: "test_user".to_string(),
subreddit: "test_subreddit".to_string(),
link: format!("magnet:?xt=urn:btih:{}", id),
published_at: NaiveDateTime::default(),
imdb_id: None,
}
}
fn create_processed_magnets(success_count: usize, failed_count: usize) -> ProcessedMagnets {
let mut success = Vec::new();
for i in 0..success_count {
success.push(create_test_magnet(
i as i32,
&format!("Success Magnet {}", i),
));
}
let mut failed = Vec::new();
for i in 0..failed_count {
failed.push(create_test_magnet(
i as i32 + 100,
&format!("Failed Magnet {}", i),
));
}
ProcessedMagnets { success, failed }
}
#[test]
fn test_new_notification_service() {
let notification_service = NotificationService::new();
assert_eq!(notification_service.notifications.len(), 0);
}
#[test]
fn test_init_notifications_with_empty_config() {
let mut notification_service = NotificationService::new();
// Create an empty config
let config = Config {
bitmagnet: None,
transmission: None,
ntfy: None,
sources: HashMap::new(),
};
let result = notification_service.init_notifications(&config);
assert!(result.is_ok());
assert_eq!(notification_service.notifications.len(), 0);
}
#[tokio::test]
async fn test_send_notifications_with_no_notifications() {
let notification_service = NotificationService::new();
let action_results = HashMap::new();
let result = notification_service
.send_notifications(&action_results, 0)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_send_notifications_with_successful_notification() {
let mut notification_service = NotificationService::new();
// Create a mock notification that succeeds
let mock_notification = MockNotification::new("SuccessNotification", true, false);
let send_called = mock_notification.send_called.clone();
notification_service
.notifications
.push(Box::new(mock_notification));
// Create some action results
let mut action_results = HashMap::new();
action_results.insert("TestAction".to_string(), create_processed_magnets(1, 0));
let result = notification_service
.send_notifications(&action_results, 1)
.await;
assert!(result.is_ok());
assert!(send_called.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_send_notifications_with_failing_notification() {
let mut notification_service = NotificationService::new();
// Create a mock notification that fails
let mock_notification = MockNotification::new("FailingNotification", true, true);
let send_called = mock_notification.send_called.clone();
notification_service
.notifications
.push(Box::new(mock_notification));
// Create some action results
let mut action_results = HashMap::new();
action_results.insert("TestAction".to_string(), create_processed_magnets(1, 0));
let result = notification_service
.send_notifications(&action_results, 1)
.await;
assert!(result.is_ok()); // The service should still return Ok even if a notification fails
assert!(send_called.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_send_notifications_with_multiple_notifications() {
let mut notification_service = NotificationService::new();
// Create two mock notifications
let mock_notification1 = MockNotification::new("Notification1", true, false);
let send_called1 = mock_notification1.send_called.clone();
let mock_notification2 = MockNotification::new("Notification2", true, false);
let send_called2 = mock_notification2.send_called.clone();
notification_service
.notifications
.push(Box::new(mock_notification1));
notification_service
.notifications
.push(Box::new(mock_notification2));
// Create some action results
let mut action_results = HashMap::new();
action_results.insert("TestAction".to_string(), create_processed_magnets(1, 0));
let result = notification_service
.send_notifications(&action_results, 1)
.await;
assert!(result.is_ok());
assert!(send_called1.load(Ordering::SeqCst));
assert!(send_called2.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_send_notifications_with_mixed_results() {
let mut notification_service = NotificationService::new();
// Create one successful and one failing notification
let mock_notification1 = MockNotification::new("SuccessNotification", true, false);
let send_called1 = mock_notification1.send_called.clone();
let mock_notification2 = MockNotification::new("FailingNotification", true, true);
let send_called2 = mock_notification2.send_called.clone();
notification_service
.notifications
.push(Box::new(mock_notification1));
notification_service
.notifications
.push(Box::new(mock_notification2));
// Create some action results with both successful and failed magnets
let mut action_results = HashMap::new();
action_results.insert("TestAction".to_string(), create_processed_magnets(1, 1));
let result = notification_service
.send_notifications(&action_results, 1)
.await;
assert!(result.is_ok());
assert!(send_called1.load(Ordering::SeqCst));
assert!(send_called2.load(Ordering::SeqCst));
}
}

View file

@ -0,0 +1,146 @@
use crate::services::database::DatabaseService;
use color_eyre::eyre::{Result, WrapErr};
use log::{info, warn};
use regex::Regex;
use std::cell::RefCell;
use std::rc::Rc;
/// Service for processing posts and extracting magnet links
pub struct PostProcessorService {
db_service: Rc<RefCell<DatabaseService>>,
}
impl PostProcessorService {
/// Create a new PostProcessorService
pub fn new(db_service: Rc<RefCell<DatabaseService>>) -> Self {
Self { db_service }
}
/// Process a collection of posts from various sources
///
/// # Arguments
///
/// * `user_posts` - A map of usernames to their posts
/// * `source_configs` - A map of source names to their configurations
///
/// # Returns
///
/// The number of new magnet links stored
pub fn process_sources(
&mut self,
user_posts: &multimap::MultiMap<String, crate::reddit_client::RedditPost>,
source_configs: &std::collections::HashMap<String, crate::config::SourceConfig>,
) -> Result<usize> {
let mut total_new_links = 0;
for (source_name, source_config) in source_configs {
info!("Processing source [{}]", source_name);
let username = source_config.username.clone();
let title_filter = match &source_config.title_filter {
Some(filter) => Some(
Regex::new(filter.as_str())
.wrap_err_with(|| format!("Invalid regex pattern: {}", filter))?,
),
None => None,
};
if let Some(submissions) = user_posts.get_vec(&username) {
for post in submissions
.iter()
.filter(|s| filter_post(&s.title, title_filter.clone()))
{
let title = &post.title;
let body = &post.body;
let subreddit = &post.subreddit;
let magnet_links = crate::magnet::extract_magnet_links(body);
if !magnet_links.is_empty() {
let post_info = crate::PostInfo {
title: title.to_string(),
submitter: username.clone(),
subreddit: subreddit.to_string(),
magnet_links,
timestamp: post.created,
imdb_id: source_config.imdb_id.clone(),
tags: source_config.tags.clone(),
};
// Store the post info in the database
match self.db_service.borrow_mut().store_magnets(&post_info) {
Ok(count) => {
total_new_links += count;
info!(
"Stored {} new magnet links from post: {}",
count, post_info.title
);
}
Err(e) => {
warn!("Failed to store post info in database: {}", e);
}
}
}
}
}
}
Ok(total_new_links)
}
}
/// Filters posts based on a title filter pattern
fn filter_post(title: &str, title_filter: Option<Regex>) -> bool {
match title_filter {
Some(pattern) => pattern.is_match(title),
None => true,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::SourceConfig;
use crate::db::Database;
use crate::reddit_client::RedditPost;
use chrono::Utc;
use multimap::MultiMap;
use std::collections::HashMap;
use tempfile::tempdir;
#[test]
fn test_process_sources() -> Result<()> {
let temp_dir = tempdir()?;
let db_path = temp_dir.path().join("test.db");
let db = Database::new(&db_path)?;
let db_service = DatabaseService::new(db);
let db_service = Rc::new(RefCell::new(db_service));
let mut service = PostProcessorService::new(db_service);
let mut user_posts = MultiMap::new();
let mut source_configs = HashMap::new();
let post = RedditPost {
title: "Test Post".to_string(),
body: "Here are some magnet links: magnet:?xt=urn:btih:test1 and magnet:?xt=urn:btih:test2".to_string(),
subreddit: "test_subreddit".to_string(),
created: Utc::now(),
};
user_posts.insert("test_user".to_string(), post);
let source_config = SourceConfig {
username: "test_user".to_string(),
title_filter: None,
imdb_id: Some("tt1234567".to_string()),
tags: vec!["tag1".to_string(), "tag2".to_string()],
};
source_configs.insert("test_source".to_string(), source_config);
let count = service.process_sources(&mut user_posts, &mut source_configs)?;
assert_eq!(count, 2, "Should have stored 2 new magnet links");
Ok(())
}
}

134
src/services/reddit.rs Normal file
View file

@ -0,0 +1,134 @@
use crate::reddit_client::{RedditClient, RedditPost};
use color_eyre::eyre::{Result, WrapErr};
use log::{debug, error, info, warn};
use multimap::MultiMap;
use std::collections::HashSet;
use std::time::Duration;
use tokio::time::sleep;
/// Configuration for the Reddit service
#[derive(Debug, Clone)]
pub struct RedditServiceConfig {
/// Maximum number of retries for API calls
pub max_retries: u32,
/// Delay between retries in milliseconds
pub retry_delay_ms: u64,
}
impl Default for RedditServiceConfig {
fn default() -> Self {
Self {
max_retries: 3,
retry_delay_ms: 1000,
}
}
}
/// Service for interacting with Reddit
pub struct RedditService {
client: RedditClient,
config: RedditServiceConfig,
}
impl RedditService {
/// Create a new RedditService with default configuration
pub fn new() -> Self {
Self::with_config(RedditServiceConfig::default())
}
/// Create a new RedditService with custom configuration
pub fn with_config(config: RedditServiceConfig) -> Self {
Self {
client: RedditClient::new(),
config,
}
}
/// Fetch posts from multiple Reddit users
///
/// # Arguments
///
/// * `usernames` - A set of Reddit usernames to fetch posts from
/// * `post_count` - The maximum number of posts to fetch per user
///
/// # Returns
///
/// A MultiMap mapping usernames to their posts
pub async fn fetch_posts_from_users(
&self,
usernames: HashSet<String>,
post_count: u32,
) -> Result<MultiMap<String, RedditPost>> {
let mut user_posts = MultiMap::new();
for username in usernames {
info!("Fetching posts from user [{}]", username);
match self
.fetch_user_posts_with_retry(&username, post_count)
.await
{
Ok(posts) => {
debug!("Found {} posts for user [{}]", posts.len(), username);
user_posts.insert_many(username, posts);
}
Err(e) => {
error!("Failed to fetch posts for user [{}]: {}", username, e);
// Continue with other users even if one fails
}
}
}
Ok(user_posts)
}
/// Fetch posts from a single Reddit user with retry logic
///
/// # Arguments
///
/// * `username` - The Reddit username to fetch posts from
/// * `post_count` - The maximum number of posts to fetch
///
/// # Returns
///
/// A vector of RedditPost objects
async fn fetch_user_posts_with_retry(
&self,
username: &str,
post_count: u32,
) -> Result<Vec<RedditPost>> {
let mut attempts = 0;
let max_retries = self.config.max_retries;
let retry_delay = Duration::from_millis(self.config.retry_delay_ms);
loop {
attempts += 1;
match self
.client
.fetch_user_submissions(username, post_count)
.await
{
Ok(posts) => return Ok(posts),
Err(e) => {
if attempts > max_retries {
return Err(e).wrap_err_with(|| {
format!(
"Failed to fetch posts for user [{}] after {} attempts",
username, attempts
)
});
}
warn!(
"Attempt {}/{} failed for user [{}]: {}. Retrying in {}ms...",
attempts,
max_retries + 1,
username,
e,
self.config.retry_delay_ms
);
sleep(retry_delay).await;
}
}
}
}
}

40
src/services/report.rs Normal file
View file

@ -0,0 +1,40 @@
use crate::actions::action::ProcessedMagnets;
use crate::report;
use log::info;
use std::collections::HashMap;
/// Service for generating reports
pub struct ReportService;
impl ReportService {
/// Create a new ReportService
pub fn new() -> Self {
Self
}
/// Generate and display report
pub fn generate_report(
&self,
action_results: &HashMap<String, ProcessedMagnets>,
total_new_links: usize,
) {
for line in report::generate_report(action_results, total_new_links, true).lines() {
info!("{}", line);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::actions::action::ProcessedMagnets;
use crate::models::Magnet;
use std::collections::HashMap;
#[test]
fn test_new_report_service() {
let report_service = ReportService::new();
// Just verify that we can create the service
// The actual report generation is tested in the report module
}
}