From ee7d7971beb0325bf6395f0101f42e91c2da135b Mon Sep 17 00:00:00 2001 From: Marc Plano-Lesay Date: Wed, 21 May 2025 15:32:43 +1000 Subject: [PATCH] Refactor App struct --- src/actions/action.rs | 3 +- src/actions/bitmagnet/action.rs | 14 +- src/actions/factory.rs | 3 +- src/actions/transmission/action.rs | 14 +- src/app.rs | 188 ++++------------- src/main.rs | 1 + src/services/action.rs | 312 +++++++++++++++++++++++++++++ src/services/database.rs | 140 +++++++++++++ src/services/mod.rs | 10 + src/services/notification.rs | 305 ++++++++++++++++++++++++++++ src/services/post_processor.rs | 146 ++++++++++++++ src/services/reddit.rs | 134 +++++++++++++ src/services/report.rs | 40 ++++ 13 files changed, 1147 insertions(+), 163 deletions(-) create mode 100644 src/services/action.rs create mode 100644 src/services/database.rs create mode 100644 src/services/mod.rs create mode 100644 src/services/notification.rs create mode 100644 src/services/post_processor.rs create mode 100644 src/services/reddit.rs create mode 100644 src/services/report.rs diff --git a/src/actions/action.rs b/src/actions/action.rs index f8e381b..f3f5b5e 100644 --- a/src/actions/action.rs +++ b/src/actions/action.rs @@ -1,4 +1,5 @@ use crate::models::Magnet; +use crate::services::database::DatabaseService; use async_trait::async_trait; use color_eyre::eyre::Result; @@ -21,6 +22,6 @@ pub trait Action { /// Process all unprocessed magnet links and return the list of processed magnets async fn process_unprocessed_magnets( &mut self, - db: &mut crate::db::Database, + db_service: &mut DatabaseService, ) -> Result; } diff --git a/src/actions/bitmagnet/action.rs b/src/actions/bitmagnet/action.rs index b121b2d..483e07a 100644 --- a/src/actions/bitmagnet/action.rs +++ b/src/actions/bitmagnet/action.rs @@ -1,7 +1,8 @@ use crate::actions::action::{Action, ProcessedMagnets}; use crate::actions::bitmagnet::client::BitmagnetClient; use crate::actions::bitmagnet::config::BitmagnetConfig; -use crate::db::{BitmagnetProcessedTable, Database}; +use crate::db::BitmagnetProcessedTable; +use crate::services::database::DatabaseService; use color_eyre::eyre::Result; use log::{debug, warn}; @@ -30,14 +31,17 @@ impl Action for BitmagnetAction { } /// Process all unprocessed magnet links and return the list of processed magnets - async fn process_unprocessed_magnets(&mut self, db: &mut Database) -> Result { + async fn process_unprocessed_magnets( + &mut self, + db_service: &mut DatabaseService, + ) -> Result { let unprocessed_magnets = - db.get_unprocessed_magnets_for_table::()?; + db_service.get_unprocessed_magnets::()?; let mut processed_magnets = Vec::new(); let mut failed_magnets = Vec::new(); for magnet in unprocessed_magnets { - let tags = db.get_tags_for_magnet(magnet.id)?; + let tags = db_service.get_tags_for_magnet(magnet.id)?; let tag_refs: Vec<&str> = tags.iter().map(|s| s.as_str()).collect(); match self @@ -60,7 +64,7 @@ impl Action for BitmagnetAction { if !tags.is_empty() { debug!("Tags: {:?}", tags); } - db.mark_magnet_processed_for_table::(magnet.id)?; + db_service.mark_magnet_processed::(magnet.id)?; processed_magnets.push(magnet); } Err(e) => { diff --git a/src/actions/factory.rs b/src/actions/factory.rs index 41360ce..4301b70 100644 --- a/src/actions/factory.rs +++ b/src/actions/factory.rs @@ -37,6 +37,7 @@ mod tests { use crate::actions::action::ProcessedMagnets; use crate::db::Database; use crate::models::Magnet; + use crate::services::database::DatabaseService; use async_trait::async_trait; use color_eyre::eyre::{eyre, Result}; use std::sync::atomic::{AtomicBool, Ordering}; @@ -66,7 +67,7 @@ mod tests { async fn process_unprocessed_magnets( &mut self, - _db: &mut Database, + _db_service: &mut DatabaseService, ) -> Result { Ok(ProcessedMagnets { success: Vec::::new(), diff --git a/src/actions/transmission/action.rs b/src/actions/transmission/action.rs index 554dc2b..0456ca6 100644 --- a/src/actions/transmission/action.rs +++ b/src/actions/transmission/action.rs @@ -1,7 +1,8 @@ use crate::actions::action::{Action, ProcessedMagnets}; use crate::actions::transmission::client::TransmissionClient; use crate::actions::transmission::config::TransmissionConfig; -use crate::db::{Database, TransmissionProcessedTable}; +use crate::db::TransmissionProcessedTable; +use crate::services::database::DatabaseService; use color_eyre::eyre::Result; use log::{debug, warn}; @@ -30,14 +31,17 @@ impl Action for TransmissionAction { } /// Process all unprocessed magnet links and return the list of processed magnets - async fn process_unprocessed_magnets(&mut self, db: &mut Database) -> Result { + async fn process_unprocessed_magnets( + &mut self, + db_service: &mut DatabaseService, + ) -> Result { let unprocessed_magnets = - db.get_unprocessed_magnets_for_table::()?; + db_service.get_unprocessed_magnets::()?; let mut processed_magnets = Vec::new(); let mut failed_magnets = Vec::new(); for magnet in unprocessed_magnets { - let tags = db.get_tags_for_magnet(magnet.id)?; + let tags = db_service.get_tags_for_magnet(magnet.id)?; let tag_refs: Vec<&str> = tags.iter().map(|s| s.as_str()).collect(); match self.client.submit_magnet(&magnet.link, tag_refs).await { @@ -51,7 +55,7 @@ impl Action for TransmissionAction { if !tags.is_empty() { debug!("Tags: {:?}", tags); } - db.mark_magnet_processed_for_table::(magnet.id)?; + db_service.mark_magnet_processed::(magnet.id)?; processed_magnets.push(magnet); } Err(e) => { diff --git a/src/app.rs b/src/app.rs index b4e4509..e5d97c9 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,19 +1,18 @@ -use crate::actions::action::{Action, ProcessedMagnets}; -use crate::actions::bitmagnet::BitmagnetAction; -use crate::actions::factory::init_action; -use crate::actions::transmission::TransmissionAction; +use crate::actions::action::ProcessedMagnets; use crate::config::Config; use crate::db::Database; -use crate::notifications::factory::init_notification; -use crate::notifications::notification::Notification; -use crate::notifications::ntfy::NtfyNotification; -use crate::reddit_client::{RedditClient, RedditPost}; -use crate::report; -use color_eyre::eyre::{Result, WrapErr}; -use log::{debug, info, warn}; +use crate::reddit_client::RedditPost; +use crate::services::action::ActionService; +use crate::services::database::DatabaseService; +use crate::services::notification::NotificationService; +use crate::services::post_processor::PostProcessorService; +use crate::services::reddit::RedditService; +use crate::services::report::ReportService; +use color_eyre::eyre::Result; use multimap::MultiMap; -use regex::Regex; +use std::cell::RefCell; use std::collections::{HashMap, HashSet}; +use std::rc::Rc; /// Trait for configurations that can be enabled or disabled pub trait Enableable { @@ -23,44 +22,37 @@ pub trait Enableable { /// Application state and behavior pub struct App { - db: Database, config: Config, - actions: Vec>, - notifications: Vec>, - reddit_client: RedditClient, + action_service: ActionService, + notification_service: NotificationService, + reddit_service: RedditService, + post_processor: PostProcessorService, + report_service: ReportService, } impl App { /// Create a new App instance pub fn new(db: Database, config: Config) -> Self { + let db_service = Rc::new(RefCell::new(DatabaseService::new(db))); + Self { - db, config, - actions: Vec::new(), - notifications: Vec::new(), - reddit_client: RedditClient::new(), + action_service: ActionService::new(Rc::clone(&db_service)), + notification_service: NotificationService::new(), + reddit_service: RedditService::new(), + post_processor: PostProcessorService::new(Rc::clone(&db_service)), + report_service: ReportService::new(), } } /// Initialize actions based on configuration pub fn init_actions(&mut self) -> Result<()> { - if let Some(action) = init_action(&self.config.bitmagnet, BitmagnetAction::new)? { - self.actions.push(action); - } - if let Some(action) = init_action(&self.config.transmission, TransmissionAction::new)? { - self.actions.push(action); - } - - Ok(()) + self.action_service.init_actions(&self.config) } /// Initialize notifications based on configuration pub fn init_notifications(&mut self) -> Result<()> { - if let Some(notification) = init_notification(&self.config.ntfy, NtfyNotification::new)? { - self.notifications.push(notification); - } - - Ok(()) + self.notification_service.init_notifications(&self.config) } /// Fetch posts from Reddit @@ -70,15 +62,10 @@ impl App { unique_usernames.insert(source_config.username.clone()); } - let mut user_posts = MultiMap::new(); - for username in unique_usernames { - info!("Fetching posts from user [{}]", username); - let submissions = self - .reddit_client - .fetch_user_submissions(&username, post_count) - .await?; - user_posts.insert_many(username, submissions); - } + let user_posts = self + .reddit_service + .fetch_posts_from_users(unique_usernames, post_count) + .await?; Ok(user_posts) } @@ -88,86 +75,13 @@ impl App { &mut self, user_posts: &MultiMap, ) -> Result { - let mut total_new_links = 0; - - for (source_name, source_config) in &self.config.sources { - info!("Processing source [{}]", source_name); - - let username = source_config.username.clone(); - let title_filter = match &source_config.title_filter { - Some(filter) => Some( - Regex::new(filter.as_str()) - .context(format!("Invalid regex pattern: {}", filter))?, - ), - None => None, - }; - - if let Some(submissions) = user_posts.get_vec(&username) { - for post in submissions - .iter() - .filter(|s| filter_post(&s.title, title_filter.clone())) - { - let title = &post.title; - let body = &post.body; - let subreddit = &post.subreddit; - - let magnet_links = crate::magnet::extract_magnet_links(body); - if !magnet_links.is_empty() { - let post_info = crate::PostInfo { - title: title.to_string(), - submitter: username.clone(), - subreddit: subreddit.to_string(), - magnet_links, - timestamp: post.created, - imdb_id: source_config.imdb_id.clone(), - tags: source_config.tags.clone(), - }; - - // Store the post info in the database - match self.db.store_magnets(&post_info) { - Ok(count) => { - total_new_links += count; - } - Err(e) => { - warn!("Failed to store post info in database: {}", e); - } - } - } - } - } - } - - Ok(total_new_links) + self.post_processor + .process_sources(user_posts, &self.config.sources) } /// Process magnet links with actions pub async fn process_actions(&mut self) -> Result> { - let mut action_results = HashMap::new(); - - for action in &mut self.actions { - debug!("Processing magnet links with {}", action.get_name()); - - match action.process_unprocessed_magnets(&mut self.db).await { - Ok(processed_magnets) => { - let count = processed_magnets.success.len(); - debug!( - "Successfully processed {} magnet links with {}", - count, - action.get_name() - ); - action_results.insert(action.get_name().to_string(), processed_magnets); - } - Err(e) => { - warn!( - "Failed to process magnet links with {}: {}", - action.get_name(), - e - ); - } - } - } - - Ok(action_results) + self.action_service.process_actions().await } /// Send notifications @@ -176,28 +90,9 @@ impl App { action_results: &HashMap, total_new_links: usize, ) -> Result<()> { - for notification in &self.notifications { - match notification - .send_notification(action_results, total_new_links) - .await - { - Ok(_) => { - debug!( - "Successfully sent notification to {}", - notification.get_name() - ); - } - Err(e) => { - warn!( - "Failed to send notification to {}: {}", - notification.get_name(), - e - ); - } - } - } - - Ok(()) + self.notification_service + .send_notifications(action_results, total_new_links) + .await } /// Generate and display report @@ -206,9 +101,8 @@ impl App { action_results: &HashMap, total_new_links: usize, ) { - for line in report::generate_report(action_results, total_new_links, true).lines() { - info!("{}", line); - } + self.report_service + .generate_report(action_results, total_new_links); } pub async fn run(&mut self, post_count: u32) -> Result<()> { @@ -234,11 +128,3 @@ impl App { Ok(()) } } - -/// Filters posts based on a title filter pattern -fn filter_post(title: &str, title_filter: Option) -> bool { - match title_filter { - Some(pattern) => pattern.is_match(title), - None => true, - } -} diff --git a/src/main.rs b/src/main.rs index c0d4709..ec85f86 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,6 +18,7 @@ mod notifications; mod reddit_client; mod report; mod schema; +mod services; /// Post information with magnet links #[derive(Debug)] diff --git a/src/services/action.rs b/src/services/action.rs new file mode 100644 index 0000000..a613372 --- /dev/null +++ b/src/services/action.rs @@ -0,0 +1,312 @@ +use crate::actions::action::{Action, ProcessedMagnets}; +use crate::actions::bitmagnet::BitmagnetAction; +use crate::actions::factory::init_action; +use crate::actions::transmission::TransmissionAction; +use crate::config::Config; +use crate::services::database::DatabaseService; +use color_eyre::eyre::Result; +use log::{debug, warn}; +use std::cell::RefCell; +use std::collections::HashMap; +use std::rc::Rc; + +/// Service for managing actions +pub struct ActionService { + actions: Vec>, + db_service: Rc>, +} + +impl ActionService { + /// Create a new ActionService + pub fn new(db_service: Rc>) -> Self { + Self { + actions: Vec::new(), + db_service, + } + } + + /// Initialize actions based on configuration + pub fn init_actions(&mut self, config: &Config) -> Result<()> { + if let Some(action) = init_action(&config.bitmagnet, BitmagnetAction::new)? { + self.actions.push(action); + } + if let Some(action) = init_action(&config.transmission, TransmissionAction::new)? { + self.actions.push(action); + } + + Ok(()) + } + + /// Process magnet links with actions + #[allow(clippy::await_holding_refcell_ref)] // Not sure how to address this + pub async fn process_actions(&mut self) -> Result> { + let mut action_results = HashMap::new(); + + for action in &mut self.actions { + debug!("Processing magnet links with {}", action.get_name()); + + match action + .process_unprocessed_magnets(&mut self.db_service.borrow_mut()) + .await + { + Ok(processed_magnets) => { + let count = processed_magnets.success.len(); + debug!( + "Successfully processed {} magnet links with {}", + count, + action.get_name() + ); + action_results.insert(action.get_name().to_string(), processed_magnets); + } + Err(e) => { + warn!( + "Failed to process magnet links with {}: {}", + action.get_name(), + e + ); + } + } + } + + Ok(action_results) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::actions::action::ProcessedMagnets; + use crate::db::Database; + use crate::models::Magnet; + use async_trait::async_trait; + use chrono::NaiveDateTime; + use color_eyre::eyre::{eyre, Result}; + use std::collections::HashMap; + use std::sync::atomic::{AtomicBool, Ordering}; + use tempfile::tempdir; + + struct MockAction { + name: &'static str, + process_called: AtomicBool, + should_fail: bool, + success_count: usize, + failed_count: usize, + } + + impl MockAction { + fn new(name: &'static str, should_fail: bool) -> Self { + Self { + name, + process_called: AtomicBool::new(false), + should_fail, + success_count: 0, + failed_count: 0, + } + } + + fn with_success_count(mut self, count: usize) -> Self { + self.success_count = count; + self + } + + fn with_failed_count(mut self, count: usize) -> Self { + self.failed_count = count; + self + } + } + + #[async_trait] + impl Action for MockAction { + fn name() -> &'static str + where + Self: Sized, + { + "MockAction" + } + + fn get_name(&self) -> &'static str { + self.name + } + + async fn process_unprocessed_magnets( + &mut self, + _db_service: &mut DatabaseService, + ) -> Result { + self.process_called.store(true, Ordering::SeqCst); + + if self.should_fail { + return Err(eyre!("Mock action failed")); + } + + let mut success = Vec::new(); + for i in 0..self.success_count { + success.push(create_test_magnet( + i as i32, + &format!("Success Magnet {}", i), + )); + } + + let mut failed = Vec::new(); + for i in 0..self.failed_count { + failed.push(create_test_magnet( + i as i32 + 100, + &format!("Failed Magnet {}", i), + )); + } + + Ok(ProcessedMagnets { success, failed }) + } + } + + fn create_test_magnet(id: i32, title: &str) -> Magnet { + Magnet { + id, + title: title.to_string(), + submitter: "test_user".to_string(), + subreddit: "test_subreddit".to_string(), + link: format!("magnet:?xt=urn:btih:{}", id), + published_at: NaiveDateTime::default(), + imdb_id: None, + } + } + + fn setup_test_db() -> Rc> { + let temp_dir = tempdir().unwrap(); + let db_path = temp_dir.path().join("test.db"); + + let db = Database::new(&db_path).unwrap(); + let db_service = DatabaseService::new(db); + Rc::new(RefCell::new(db_service)) + } + + #[test] + fn test_new_action_service() { + let db_service = setup_test_db(); + + let action_service = ActionService::new(db_service); + + assert_eq!(action_service.actions.len(), 0); + } + + #[test] + fn test_init_actions_with_empty_config() { + let db_service = setup_test_db(); + let mut action_service = ActionService::new(db_service); + + // Create an empty config + let config = Config { + bitmagnet: None, + transmission: None, + ntfy: None, + sources: HashMap::new(), + }; + + let result = action_service.init_actions(&config); + + assert!(result.is_ok()); + assert_eq!(action_service.actions.len(), 0); + } + + #[tokio::test] + async fn test_process_actions_with_no_actions() { + let db_service = setup_test_db(); + let mut action_service = ActionService::new(db_service); + + let result = action_service.process_actions().await; + + assert!(result.is_ok()); + let action_results = result.unwrap(); + assert_eq!(action_results.len(), 0); + } + + #[tokio::test] + async fn test_process_actions_with_successful_action() { + let db_service = setup_test_db(); + let mut action_service = ActionService::new(db_service); + + // Create a mock action that succeeds + let mock_action = MockAction::new("SuccessAction", false).with_success_count(2); + + action_service.actions.push(Box::new(mock_action)); + + let result = action_service.process_actions().await; + + assert!(result.is_ok()); + let action_results = result.unwrap(); + assert_eq!(action_results.len(), 1); + assert!(action_results.contains_key("SuccessAction")); + + let processed_magnets = &action_results["SuccessAction"]; + assert_eq!(processed_magnets.success.len(), 2); + assert_eq!(processed_magnets.failed.len(), 0); + } + + #[tokio::test] + async fn test_process_actions_with_failing_action() { + let db_service = setup_test_db(); + let mut action_service = ActionService::new(db_service); + + // Create a mock action that fails + let mock_action = MockAction::new("FailingAction", true); + + action_service.actions.push(Box::new(mock_action)); + + let result = action_service.process_actions().await; + + assert!(result.is_ok()); + let action_results = result.unwrap(); + assert_eq!(action_results.len(), 0); // No results for failing action + } + + #[tokio::test] + async fn test_process_actions_with_mixed_results() { + let db_service = setup_test_db(); + let mut action_service = ActionService::new(db_service); + + // Create a mock action with both successful and failed magnets + let mock_action = MockAction::new("MixedAction", false) + .with_success_count(1) + .with_failed_count(1); + + action_service.actions.push(Box::new(mock_action)); + + let result = action_service.process_actions().await; + + assert!(result.is_ok()); + let action_results = result.unwrap(); + assert_eq!(action_results.len(), 1); + assert!(action_results.contains_key("MixedAction")); + + let processed_magnets = &action_results["MixedAction"]; + assert_eq!(processed_magnets.success.len(), 1); + assert_eq!(processed_magnets.failed.len(), 1); + } + + #[tokio::test] + async fn test_process_actions_with_multiple_actions() { + let db_service = setup_test_db(); + let mut action_service = ActionService::new(db_service); + + // Create two mock actions + let action1 = MockAction::new("Action1", false).with_success_count(1); + + let action2 = MockAction::new("Action2", false).with_success_count(1); + + action_service.actions.push(Box::new(action1)); + action_service.actions.push(Box::new(action2)); + + let result = action_service.process_actions().await; + + assert!(result.is_ok()); + let action_results = result.unwrap(); + assert_eq!(action_results.len(), 2); + assert!(action_results.contains_key("Action1")); + assert!(action_results.contains_key("Action2")); + + let processed_magnets1 = &action_results["Action1"]; + assert_eq!(processed_magnets1.success.len(), 1); + + let processed_magnets2 = &action_results["Action2"]; + assert_eq!(processed_magnets2.success.len(), 1); + } +} diff --git a/src/services/database.rs b/src/services/database.rs new file mode 100644 index 0000000..871a8d1 --- /dev/null +++ b/src/services/database.rs @@ -0,0 +1,140 @@ +use crate::db::{Database, ProcessedTable}; +use crate::models; +use crate::PostInfo; +use color_eyre::eyre::{Result, WrapErr}; +use log::debug; + +/// Service for database operations +pub struct DatabaseService { + db: Database, +} + +impl DatabaseService { + /// Create a new DatabaseService + pub fn new(db: Database) -> Self { + Self { db } + } + + /// Store magnet links from a post + /// + /// # Arguments + /// + /// * `post_info` - Information about the post containing magnet links + /// + /// # Returns + /// + /// The number of new magnet links stored + pub fn store_magnets(&mut self, post_info: &PostInfo) -> Result { + debug!( + "Storing {} magnet links from post: {}", + post_info.magnet_links.len(), + post_info.title + ); + + self.db + .store_magnets(post_info) + .wrap_err_with(|| format!("Failed to store magnets for post: {}", post_info.title)) + } + + /// Get all magnet links from the database + /// + /// # Returns + /// + /// A vector of all magnet links + #[allow(dead_code)] // Used in tests + pub fn get_all_magnets(&mut self) -> Result> { + debug!("Retrieving all magnet links from database"); + + self.db + .get_all_magnets() + .wrap_err("Failed to retrieve all magnet links from database") + } + + /// Get tags for a magnet link + /// + /// # Arguments + /// + /// * `magnet_id` - The ID of the magnet link + /// + /// # Returns + /// + /// A vector of tags associated with the magnet link + pub fn get_tags_for_magnet(&mut self, magnet_id: i32) -> Result> { + debug!("Retrieving tags for magnet ID: {}", magnet_id); + + self.db + .get_tags_for_magnet(magnet_id) + .wrap_err_with(|| format!("Failed to retrieve tags for magnet ID: {}", magnet_id)) + } + + /// Get unprocessed magnet links for a specific action + /// + /// # Returns + /// + /// A vector of unprocessed magnet links + pub fn get_unprocessed_magnets(&mut self) -> Result> { + debug!("Retrieving unprocessed magnet links"); + + self.db + .get_unprocessed_magnets_for_table::() + .wrap_err("Failed to retrieve unprocessed magnet links") + } + + /// Mark a magnet link as processed for a specific action + /// + /// # Arguments + /// + /// * `magnet_id` - The ID of the magnet link + pub fn mark_magnet_processed(&mut self, magnet_id: i32) -> Result<()> { + debug!("Marking magnet ID {} as processed", magnet_id); + + self.db + .mark_magnet_processed_for_table::(magnet_id) + .wrap_err_with(|| format!("Failed to mark magnet ID {} as processed", magnet_id)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Utc; + use tempfile::tempdir; + + fn create_test_post_info() -> PostInfo { + PostInfo { + title: "Test Post".to_string(), + submitter: "test_user".to_string(), + subreddit: "test_subreddit".to_string(), + magnet_links: vec![ + "magnet:?xt=urn:btih:test1".to_string(), + "magnet:?xt=urn:btih:test2".to_string(), + ], + timestamp: Utc::now(), + imdb_id: Some("tt1234567".to_string()), + tags: vec!["tag1".to_string(), "tag2".to_string()], + } + } + + #[test] + fn test_store_and_retrieve_magnets() -> Result<()> { + let temp_dir = tempdir()?; + let db_path = temp_dir.path().join("test.db"); + + let db = Database::new(&db_path)?; + let mut service = DatabaseService::new(db); + + let post_info = create_test_post_info(); + + let count = service.store_magnets(&post_info)?; + assert_eq!(count, 2, "Should have stored 2 new magnet links"); + + let magnets = service.get_all_magnets()?; + assert_eq!(magnets.len(), 2, "Should have retrieved 2 magnet links"); + + let magnet_links: Vec = magnets.iter().map(|m| m.link.clone()).collect(); + assert!(magnet_links.contains(&"magnet:?xt=urn:btih:test1".to_string())); + assert!(magnet_links.contains(&"magnet:?xt=urn:btih:test2".to_string())); + + Ok(()) + } +} diff --git a/src/services/mod.rs b/src/services/mod.rs new file mode 100644 index 0000000..4fb7fcf --- /dev/null +++ b/src/services/mod.rs @@ -0,0 +1,10 @@ +/// Service layer for the application +/// +/// This module contains service implementations for various parts of the application. +/// Services encapsulate business logic and provide a clean API for the rest of the application. +pub mod action; +pub mod database; +pub mod notification; +pub mod post_processor; +pub mod reddit; +pub mod report; diff --git a/src/services/notification.rs b/src/services/notification.rs new file mode 100644 index 0000000..a63a232 --- /dev/null +++ b/src/services/notification.rs @@ -0,0 +1,305 @@ +use crate::actions::action::ProcessedMagnets; +use crate::config::Config; +use crate::notifications::factory::init_notification; +use crate::notifications::notification::Notification; +use crate::notifications::ntfy::NtfyNotification; +use color_eyre::eyre::Result; +use log::{debug, warn}; +use std::collections::HashMap; + +/// Service for managing notifications +pub struct NotificationService { + notifications: Vec>, +} + +impl NotificationService { + /// Create a new NotificationService + pub fn new() -> Self { + Self { + notifications: Vec::new(), + } + } + + /// Initialize notifications based on configuration + pub fn init_notifications(&mut self, config: &Config) -> Result<()> { + if let Some(notification) = init_notification(&config.ntfy, NtfyNotification::new)? { + self.notifications.push(notification); + } + + Ok(()) + } + + /// Send notifications + pub async fn send_notifications( + &self, + action_results: &HashMap, + total_new_links: usize, + ) -> Result<()> { + for notification in &self.notifications { + match notification + .send_notification(action_results, total_new_links) + .await + { + Ok(_) => { + debug!( + "Successfully sent notification to {}", + notification.get_name() + ); + } + Err(e) => { + warn!( + "Failed to send notification to {}: {}", + notification.get_name(), + e + ); + } + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::actions::action::ProcessedMagnets; + use crate::models::Magnet; + use async_trait::async_trait; + use chrono::NaiveDateTime; + use color_eyre::eyre::{eyre, Result}; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + + struct MockNotification { + name: &'static str, + should_notify_result: bool, + send_called: Arc, + should_fail: bool, + } + + impl MockNotification { + fn new(name: &'static str, should_notify: bool, should_fail: bool) -> Self { + Self { + name, + should_notify_result: should_notify, + send_called: Arc::new(AtomicBool::new(false)), + should_fail, + } + } + } + + #[async_trait] + impl Notification for MockNotification { + fn name() -> &'static str + where + Self: Sized, + { + "MockNotification" + } + + fn get_name(&self) -> &'static str { + self.name + } + + async fn send_notification( + &self, + _action_results: &HashMap, + _total_new_links: usize, + ) -> Result<()> { + self.send_called.store(true, Ordering::SeqCst); + + if self.should_fail { + return Err(eyre!("Mock notification failed")); + } + + Ok(()) + } + + fn should_notify( + &self, + _action_results: &HashMap, + _total_new_links: usize, + ) -> bool { + self.should_notify_result + } + } + + fn create_test_magnet(id: i32, title: &str) -> Magnet { + Magnet { + id, + title: title.to_string(), + submitter: "test_user".to_string(), + subreddit: "test_subreddit".to_string(), + link: format!("magnet:?xt=urn:btih:{}", id), + published_at: NaiveDateTime::default(), + imdb_id: None, + } + } + + fn create_processed_magnets(success_count: usize, failed_count: usize) -> ProcessedMagnets { + let mut success = Vec::new(); + for i in 0..success_count { + success.push(create_test_magnet( + i as i32, + &format!("Success Magnet {}", i), + )); + } + + let mut failed = Vec::new(); + for i in 0..failed_count { + failed.push(create_test_magnet( + i as i32 + 100, + &format!("Failed Magnet {}", i), + )); + } + + ProcessedMagnets { success, failed } + } + + #[test] + fn test_new_notification_service() { + let notification_service = NotificationService::new(); + assert_eq!(notification_service.notifications.len(), 0); + } + + #[test] + fn test_init_notifications_with_empty_config() { + let mut notification_service = NotificationService::new(); + + // Create an empty config + let config = Config { + bitmagnet: None, + transmission: None, + ntfy: None, + sources: HashMap::new(), + }; + + let result = notification_service.init_notifications(&config); + + assert!(result.is_ok()); + assert_eq!(notification_service.notifications.len(), 0); + } + + #[tokio::test] + async fn test_send_notifications_with_no_notifications() { + let notification_service = NotificationService::new(); + let action_results = HashMap::new(); + + let result = notification_service + .send_notifications(&action_results, 0) + .await; + + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_send_notifications_with_successful_notification() { + let mut notification_service = NotificationService::new(); + + // Create a mock notification that succeeds + let mock_notification = MockNotification::new("SuccessNotification", true, false); + let send_called = mock_notification.send_called.clone(); + + notification_service + .notifications + .push(Box::new(mock_notification)); + + // Create some action results + let mut action_results = HashMap::new(); + action_results.insert("TestAction".to_string(), create_processed_magnets(1, 0)); + + let result = notification_service + .send_notifications(&action_results, 1) + .await; + + assert!(result.is_ok()); + assert!(send_called.load(Ordering::SeqCst)); + } + + #[tokio::test] + async fn test_send_notifications_with_failing_notification() { + let mut notification_service = NotificationService::new(); + + // Create a mock notification that fails + let mock_notification = MockNotification::new("FailingNotification", true, true); + let send_called = mock_notification.send_called.clone(); + + notification_service + .notifications + .push(Box::new(mock_notification)); + + // Create some action results + let mut action_results = HashMap::new(); + action_results.insert("TestAction".to_string(), create_processed_magnets(1, 0)); + + let result = notification_service + .send_notifications(&action_results, 1) + .await; + + assert!(result.is_ok()); // The service should still return Ok even if a notification fails + assert!(send_called.load(Ordering::SeqCst)); + } + + #[tokio::test] + async fn test_send_notifications_with_multiple_notifications() { + let mut notification_service = NotificationService::new(); + + // Create two mock notifications + let mock_notification1 = MockNotification::new("Notification1", true, false); + let send_called1 = mock_notification1.send_called.clone(); + + let mock_notification2 = MockNotification::new("Notification2", true, false); + let send_called2 = mock_notification2.send_called.clone(); + + notification_service + .notifications + .push(Box::new(mock_notification1)); + notification_service + .notifications + .push(Box::new(mock_notification2)); + + // Create some action results + let mut action_results = HashMap::new(); + action_results.insert("TestAction".to_string(), create_processed_magnets(1, 0)); + + let result = notification_service + .send_notifications(&action_results, 1) + .await; + + assert!(result.is_ok()); + assert!(send_called1.load(Ordering::SeqCst)); + assert!(send_called2.load(Ordering::SeqCst)); + } + + #[tokio::test] + async fn test_send_notifications_with_mixed_results() { + let mut notification_service = NotificationService::new(); + + // Create one successful and one failing notification + let mock_notification1 = MockNotification::new("SuccessNotification", true, false); + let send_called1 = mock_notification1.send_called.clone(); + + let mock_notification2 = MockNotification::new("FailingNotification", true, true); + let send_called2 = mock_notification2.send_called.clone(); + + notification_service + .notifications + .push(Box::new(mock_notification1)); + notification_service + .notifications + .push(Box::new(mock_notification2)); + + // Create some action results with both successful and failed magnets + let mut action_results = HashMap::new(); + action_results.insert("TestAction".to_string(), create_processed_magnets(1, 1)); + + let result = notification_service + .send_notifications(&action_results, 1) + .await; + + assert!(result.is_ok()); + assert!(send_called1.load(Ordering::SeqCst)); + assert!(send_called2.load(Ordering::SeqCst)); + } +} diff --git a/src/services/post_processor.rs b/src/services/post_processor.rs new file mode 100644 index 0000000..348a4da --- /dev/null +++ b/src/services/post_processor.rs @@ -0,0 +1,146 @@ +use crate::services::database::DatabaseService; +use color_eyre::eyre::{Result, WrapErr}; +use log::{info, warn}; +use regex::Regex; + +use std::cell::RefCell; +use std::rc::Rc; + +/// Service for processing posts and extracting magnet links +pub struct PostProcessorService { + db_service: Rc>, +} + +impl PostProcessorService { + /// Create a new PostProcessorService + pub fn new(db_service: Rc>) -> Self { + Self { db_service } + } + + /// Process a collection of posts from various sources + /// + /// # Arguments + /// + /// * `user_posts` - A map of usernames to their posts + /// * `source_configs` - A map of source names to their configurations + /// + /// # Returns + /// + /// The number of new magnet links stored + pub fn process_sources( + &mut self, + user_posts: &multimap::MultiMap, + source_configs: &std::collections::HashMap, + ) -> Result { + let mut total_new_links = 0; + + for (source_name, source_config) in source_configs { + info!("Processing source [{}]", source_name); + + let username = source_config.username.clone(); + let title_filter = match &source_config.title_filter { + Some(filter) => Some( + Regex::new(filter.as_str()) + .wrap_err_with(|| format!("Invalid regex pattern: {}", filter))?, + ), + None => None, + }; + + if let Some(submissions) = user_posts.get_vec(&username) { + for post in submissions + .iter() + .filter(|s| filter_post(&s.title, title_filter.clone())) + { + let title = &post.title; + let body = &post.body; + let subreddit = &post.subreddit; + + let magnet_links = crate::magnet::extract_magnet_links(body); + if !magnet_links.is_empty() { + let post_info = crate::PostInfo { + title: title.to_string(), + submitter: username.clone(), + subreddit: subreddit.to_string(), + magnet_links, + timestamp: post.created, + imdb_id: source_config.imdb_id.clone(), + tags: source_config.tags.clone(), + }; + + // Store the post info in the database + match self.db_service.borrow_mut().store_magnets(&post_info) { + Ok(count) => { + total_new_links += count; + info!( + "Stored {} new magnet links from post: {}", + count, post_info.title + ); + } + Err(e) => { + warn!("Failed to store post info in database: {}", e); + } + } + } + } + } + } + + Ok(total_new_links) + } +} + +/// Filters posts based on a title filter pattern +fn filter_post(title: &str, title_filter: Option) -> bool { + match title_filter { + Some(pattern) => pattern.is_match(title), + None => true, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::SourceConfig; + use crate::db::Database; + use crate::reddit_client::RedditPost; + use chrono::Utc; + use multimap::MultiMap; + use std::collections::HashMap; + use tempfile::tempdir; + + #[test] + fn test_process_sources() -> Result<()> { + let temp_dir = tempdir()?; + let db_path = temp_dir.path().join("test.db"); + + let db = Database::new(&db_path)?; + let db_service = DatabaseService::new(db); + let db_service = Rc::new(RefCell::new(db_service)); + let mut service = PostProcessorService::new(db_service); + + let mut user_posts = MultiMap::new(); + let mut source_configs = HashMap::new(); + + let post = RedditPost { + title: "Test Post".to_string(), + body: "Here are some magnet links: magnet:?xt=urn:btih:test1 and magnet:?xt=urn:btih:test2".to_string(), + subreddit: "test_subreddit".to_string(), + created: Utc::now(), + }; + user_posts.insert("test_user".to_string(), post); + + let source_config = SourceConfig { + username: "test_user".to_string(), + title_filter: None, + imdb_id: Some("tt1234567".to_string()), + tags: vec!["tag1".to_string(), "tag2".to_string()], + }; + source_configs.insert("test_source".to_string(), source_config); + + let count = service.process_sources(&mut user_posts, &mut source_configs)?; + + assert_eq!(count, 2, "Should have stored 2 new magnet links"); + + Ok(()) + } +} diff --git a/src/services/reddit.rs b/src/services/reddit.rs new file mode 100644 index 0000000..ec238c0 --- /dev/null +++ b/src/services/reddit.rs @@ -0,0 +1,134 @@ +use crate::reddit_client::{RedditClient, RedditPost}; +use color_eyre::eyre::{Result, WrapErr}; +use log::{debug, error, info, warn}; +use multimap::MultiMap; +use std::collections::HashSet; +use std::time::Duration; +use tokio::time::sleep; + +/// Configuration for the Reddit service +#[derive(Debug, Clone)] +pub struct RedditServiceConfig { + /// Maximum number of retries for API calls + pub max_retries: u32, + /// Delay between retries in milliseconds + pub retry_delay_ms: u64, +} + +impl Default for RedditServiceConfig { + fn default() -> Self { + Self { + max_retries: 3, + retry_delay_ms: 1000, + } + } +} + +/// Service for interacting with Reddit +pub struct RedditService { + client: RedditClient, + config: RedditServiceConfig, +} + +impl RedditService { + /// Create a new RedditService with default configuration + pub fn new() -> Self { + Self::with_config(RedditServiceConfig::default()) + } + + /// Create a new RedditService with custom configuration + pub fn with_config(config: RedditServiceConfig) -> Self { + Self { + client: RedditClient::new(), + config, + } + } + + /// Fetch posts from multiple Reddit users + /// + /// # Arguments + /// + /// * `usernames` - A set of Reddit usernames to fetch posts from + /// * `post_count` - The maximum number of posts to fetch per user + /// + /// # Returns + /// + /// A MultiMap mapping usernames to their posts + pub async fn fetch_posts_from_users( + &self, + usernames: HashSet, + post_count: u32, + ) -> Result> { + let mut user_posts = MultiMap::new(); + + for username in usernames { + info!("Fetching posts from user [{}]", username); + match self + .fetch_user_posts_with_retry(&username, post_count) + .await + { + Ok(posts) => { + debug!("Found {} posts for user [{}]", posts.len(), username); + user_posts.insert_many(username, posts); + } + Err(e) => { + error!("Failed to fetch posts for user [{}]: {}", username, e); + // Continue with other users even if one fails + } + } + } + + Ok(user_posts) + } + + /// Fetch posts from a single Reddit user with retry logic + /// + /// # Arguments + /// + /// * `username` - The Reddit username to fetch posts from + /// * `post_count` - The maximum number of posts to fetch + /// + /// # Returns + /// + /// A vector of RedditPost objects + async fn fetch_user_posts_with_retry( + &self, + username: &str, + post_count: u32, + ) -> Result> { + let mut attempts = 0; + let max_retries = self.config.max_retries; + let retry_delay = Duration::from_millis(self.config.retry_delay_ms); + + loop { + attempts += 1; + match self + .client + .fetch_user_submissions(username, post_count) + .await + { + Ok(posts) => return Ok(posts), + Err(e) => { + if attempts > max_retries { + return Err(e).wrap_err_with(|| { + format!( + "Failed to fetch posts for user [{}] after {} attempts", + username, attempts + ) + }); + } + + warn!( + "Attempt {}/{} failed for user [{}]: {}. Retrying in {}ms...", + attempts, + max_retries + 1, + username, + e, + self.config.retry_delay_ms + ); + sleep(retry_delay).await; + } + } + } + } +} diff --git a/src/services/report.rs b/src/services/report.rs new file mode 100644 index 0000000..2b5b7c0 --- /dev/null +++ b/src/services/report.rs @@ -0,0 +1,40 @@ +use crate::actions::action::ProcessedMagnets; +use crate::report; +use log::info; +use std::collections::HashMap; + +/// Service for generating reports +pub struct ReportService; + +impl ReportService { + /// Create a new ReportService + pub fn new() -> Self { + Self + } + + /// Generate and display report + pub fn generate_report( + &self, + action_results: &HashMap, + total_new_links: usize, + ) { + for line in report::generate_report(action_results, total_new_links, true).lines() { + info!("{}", line); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::actions::action::ProcessedMagnets; + use crate::models::Magnet; + use std::collections::HashMap; + + #[test] + fn test_new_report_service() { + let report_service = ReportService::new(); + // Just verify that we can create the service + // The actual report generation is tested in the report module + } +}