Merge branch 'app-cleanup' into 'main'
Refactor App struct See merge request kernald/reddit-magnet!33
This commit is contained in:
commit
c26fcc4b2a
13 changed files with 1147 additions and 163 deletions
|
|
@ -1,4 +1,5 @@
|
||||||
use crate::models::Magnet;
|
use crate::models::Magnet;
|
||||||
|
use crate::services::database::DatabaseService;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use color_eyre::eyre::Result;
|
use color_eyre::eyre::Result;
|
||||||
|
|
||||||
|
|
@ -21,6 +22,6 @@ pub trait Action {
|
||||||
/// Process all unprocessed magnet links and return the list of processed magnets
|
/// Process all unprocessed magnet links and return the list of processed magnets
|
||||||
async fn process_unprocessed_magnets(
|
async fn process_unprocessed_magnets(
|
||||||
&mut self,
|
&mut self,
|
||||||
db: &mut crate::db::Database,
|
db_service: &mut DatabaseService,
|
||||||
) -> Result<ProcessedMagnets>;
|
) -> Result<ProcessedMagnets>;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,8 @@
|
||||||
use crate::actions::action::{Action, ProcessedMagnets};
|
use crate::actions::action::{Action, ProcessedMagnets};
|
||||||
use crate::actions::bitmagnet::client::BitmagnetClient;
|
use crate::actions::bitmagnet::client::BitmagnetClient;
|
||||||
use crate::actions::bitmagnet::config::BitmagnetConfig;
|
use crate::actions::bitmagnet::config::BitmagnetConfig;
|
||||||
use crate::db::{BitmagnetProcessedTable, Database};
|
use crate::db::BitmagnetProcessedTable;
|
||||||
|
use crate::services::database::DatabaseService;
|
||||||
use color_eyre::eyre::Result;
|
use color_eyre::eyre::Result;
|
||||||
use log::{debug, warn};
|
use log::{debug, warn};
|
||||||
|
|
||||||
|
|
@ -30,14 +31,17 @@ impl Action for BitmagnetAction {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process all unprocessed magnet links and return the list of processed magnets
|
/// Process all unprocessed magnet links and return the list of processed magnets
|
||||||
async fn process_unprocessed_magnets(&mut self, db: &mut Database) -> Result<ProcessedMagnets> {
|
async fn process_unprocessed_magnets(
|
||||||
|
&mut self,
|
||||||
|
db_service: &mut DatabaseService,
|
||||||
|
) -> Result<ProcessedMagnets> {
|
||||||
let unprocessed_magnets =
|
let unprocessed_magnets =
|
||||||
db.get_unprocessed_magnets_for_table::<BitmagnetProcessedTable>()?;
|
db_service.get_unprocessed_magnets::<BitmagnetProcessedTable>()?;
|
||||||
let mut processed_magnets = Vec::new();
|
let mut processed_magnets = Vec::new();
|
||||||
let mut failed_magnets = Vec::new();
|
let mut failed_magnets = Vec::new();
|
||||||
|
|
||||||
for magnet in unprocessed_magnets {
|
for magnet in unprocessed_magnets {
|
||||||
let tags = db.get_tags_for_magnet(magnet.id)?;
|
let tags = db_service.get_tags_for_magnet(magnet.id)?;
|
||||||
let tag_refs: Vec<&str> = tags.iter().map(|s| s.as_str()).collect();
|
let tag_refs: Vec<&str> = tags.iter().map(|s| s.as_str()).collect();
|
||||||
|
|
||||||
match self
|
match self
|
||||||
|
|
@ -60,7 +64,7 @@ impl Action for BitmagnetAction {
|
||||||
if !tags.is_empty() {
|
if !tags.is_empty() {
|
||||||
debug!("Tags: {:?}", tags);
|
debug!("Tags: {:?}", tags);
|
||||||
}
|
}
|
||||||
db.mark_magnet_processed_for_table::<BitmagnetProcessedTable>(magnet.id)?;
|
db_service.mark_magnet_processed::<BitmagnetProcessedTable>(magnet.id)?;
|
||||||
processed_magnets.push(magnet);
|
processed_magnets.push(magnet);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|
|
||||||
|
|
@ -37,6 +37,7 @@ mod tests {
|
||||||
use crate::actions::action::ProcessedMagnets;
|
use crate::actions::action::ProcessedMagnets;
|
||||||
use crate::db::Database;
|
use crate::db::Database;
|
||||||
use crate::models::Magnet;
|
use crate::models::Magnet;
|
||||||
|
use crate::services::database::DatabaseService;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use color_eyre::eyre::{eyre, Result};
|
use color_eyre::eyre::{eyre, Result};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
|
@ -66,7 +67,7 @@ mod tests {
|
||||||
|
|
||||||
async fn process_unprocessed_magnets(
|
async fn process_unprocessed_magnets(
|
||||||
&mut self,
|
&mut self,
|
||||||
_db: &mut Database,
|
_db_service: &mut DatabaseService,
|
||||||
) -> Result<ProcessedMagnets> {
|
) -> Result<ProcessedMagnets> {
|
||||||
Ok(ProcessedMagnets {
|
Ok(ProcessedMagnets {
|
||||||
success: Vec::<Magnet>::new(),
|
success: Vec::<Magnet>::new(),
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,8 @@
|
||||||
use crate::actions::action::{Action, ProcessedMagnets};
|
use crate::actions::action::{Action, ProcessedMagnets};
|
||||||
use crate::actions::transmission::client::TransmissionClient;
|
use crate::actions::transmission::client::TransmissionClient;
|
||||||
use crate::actions::transmission::config::TransmissionConfig;
|
use crate::actions::transmission::config::TransmissionConfig;
|
||||||
use crate::db::{Database, TransmissionProcessedTable};
|
use crate::db::TransmissionProcessedTable;
|
||||||
|
use crate::services::database::DatabaseService;
|
||||||
use color_eyre::eyre::Result;
|
use color_eyre::eyre::Result;
|
||||||
use log::{debug, warn};
|
use log::{debug, warn};
|
||||||
|
|
||||||
|
|
@ -30,14 +31,17 @@ impl Action for TransmissionAction {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process all unprocessed magnet links and return the list of processed magnets
|
/// Process all unprocessed magnet links and return the list of processed magnets
|
||||||
async fn process_unprocessed_magnets(&mut self, db: &mut Database) -> Result<ProcessedMagnets> {
|
async fn process_unprocessed_magnets(
|
||||||
|
&mut self,
|
||||||
|
db_service: &mut DatabaseService,
|
||||||
|
) -> Result<ProcessedMagnets> {
|
||||||
let unprocessed_magnets =
|
let unprocessed_magnets =
|
||||||
db.get_unprocessed_magnets_for_table::<TransmissionProcessedTable>()?;
|
db_service.get_unprocessed_magnets::<TransmissionProcessedTable>()?;
|
||||||
let mut processed_magnets = Vec::new();
|
let mut processed_magnets = Vec::new();
|
||||||
let mut failed_magnets = Vec::new();
|
let mut failed_magnets = Vec::new();
|
||||||
|
|
||||||
for magnet in unprocessed_magnets {
|
for magnet in unprocessed_magnets {
|
||||||
let tags = db.get_tags_for_magnet(magnet.id)?;
|
let tags = db_service.get_tags_for_magnet(magnet.id)?;
|
||||||
let tag_refs: Vec<&str> = tags.iter().map(|s| s.as_str()).collect();
|
let tag_refs: Vec<&str> = tags.iter().map(|s| s.as_str()).collect();
|
||||||
|
|
||||||
match self.client.submit_magnet(&magnet.link, tag_refs).await {
|
match self.client.submit_magnet(&magnet.link, tag_refs).await {
|
||||||
|
|
@ -51,7 +55,7 @@ impl Action for TransmissionAction {
|
||||||
if !tags.is_empty() {
|
if !tags.is_empty() {
|
||||||
debug!("Tags: {:?}", tags);
|
debug!("Tags: {:?}", tags);
|
||||||
}
|
}
|
||||||
db.mark_magnet_processed_for_table::<TransmissionProcessedTable>(magnet.id)?;
|
db_service.mark_magnet_processed::<TransmissionProcessedTable>(magnet.id)?;
|
||||||
processed_magnets.push(magnet);
|
processed_magnets.push(magnet);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|
|
||||||
184
src/app.rs
184
src/app.rs
|
|
@ -1,19 +1,18 @@
|
||||||
use crate::actions::action::{Action, ProcessedMagnets};
|
use crate::actions::action::ProcessedMagnets;
|
||||||
use crate::actions::bitmagnet::BitmagnetAction;
|
|
||||||
use crate::actions::factory::init_action;
|
|
||||||
use crate::actions::transmission::TransmissionAction;
|
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
use crate::db::Database;
|
use crate::db::Database;
|
||||||
use crate::notifications::factory::init_notification;
|
use crate::reddit_client::RedditPost;
|
||||||
use crate::notifications::notification::Notification;
|
use crate::services::action::ActionService;
|
||||||
use crate::notifications::ntfy::NtfyNotification;
|
use crate::services::database::DatabaseService;
|
||||||
use crate::reddit_client::{RedditClient, RedditPost};
|
use crate::services::notification::NotificationService;
|
||||||
use crate::report;
|
use crate::services::post_processor::PostProcessorService;
|
||||||
use color_eyre::eyre::{Result, WrapErr};
|
use crate::services::reddit::RedditService;
|
||||||
use log::{debug, info, warn};
|
use crate::services::report::ReportService;
|
||||||
|
use color_eyre::eyre::Result;
|
||||||
use multimap::MultiMap;
|
use multimap::MultiMap;
|
||||||
use regex::Regex;
|
use std::cell::RefCell;
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
|
use std::rc::Rc;
|
||||||
|
|
||||||
/// Trait for configurations that can be enabled or disabled
|
/// Trait for configurations that can be enabled or disabled
|
||||||
pub trait Enableable {
|
pub trait Enableable {
|
||||||
|
|
@ -23,44 +22,37 @@ pub trait Enableable {
|
||||||
|
|
||||||
/// Application state and behavior
|
/// Application state and behavior
|
||||||
pub struct App {
|
pub struct App {
|
||||||
db: Database,
|
|
||||||
config: Config,
|
config: Config,
|
||||||
actions: Vec<Box<dyn Action>>,
|
action_service: ActionService,
|
||||||
notifications: Vec<Box<dyn Notification>>,
|
notification_service: NotificationService,
|
||||||
reddit_client: RedditClient,
|
reddit_service: RedditService,
|
||||||
|
post_processor: PostProcessorService,
|
||||||
|
report_service: ReportService,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl App {
|
impl App {
|
||||||
/// Create a new App instance
|
/// Create a new App instance
|
||||||
pub fn new(db: Database, config: Config) -> Self {
|
pub fn new(db: Database, config: Config) -> Self {
|
||||||
|
let db_service = Rc::new(RefCell::new(DatabaseService::new(db)));
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
db,
|
|
||||||
config,
|
config,
|
||||||
actions: Vec::new(),
|
action_service: ActionService::new(Rc::clone(&db_service)),
|
||||||
notifications: Vec::new(),
|
notification_service: NotificationService::new(),
|
||||||
reddit_client: RedditClient::new(),
|
reddit_service: RedditService::new(),
|
||||||
|
post_processor: PostProcessorService::new(Rc::clone(&db_service)),
|
||||||
|
report_service: ReportService::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Initialize actions based on configuration
|
/// Initialize actions based on configuration
|
||||||
pub fn init_actions(&mut self) -> Result<()> {
|
pub fn init_actions(&mut self) -> Result<()> {
|
||||||
if let Some(action) = init_action(&self.config.bitmagnet, BitmagnetAction::new)? {
|
self.action_service.init_actions(&self.config)
|
||||||
self.actions.push(action);
|
|
||||||
}
|
|
||||||
if let Some(action) = init_action(&self.config.transmission, TransmissionAction::new)? {
|
|
||||||
self.actions.push(action);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Initialize notifications based on configuration
|
/// Initialize notifications based on configuration
|
||||||
pub fn init_notifications(&mut self) -> Result<()> {
|
pub fn init_notifications(&mut self) -> Result<()> {
|
||||||
if let Some(notification) = init_notification(&self.config.ntfy, NtfyNotification::new)? {
|
self.notification_service.init_notifications(&self.config)
|
||||||
self.notifications.push(notification);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetch posts from Reddit
|
/// Fetch posts from Reddit
|
||||||
|
|
@ -70,15 +62,10 @@ impl App {
|
||||||
unique_usernames.insert(source_config.username.clone());
|
unique_usernames.insert(source_config.username.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut user_posts = MultiMap::new();
|
let user_posts = self
|
||||||
for username in unique_usernames {
|
.reddit_service
|
||||||
info!("Fetching posts from user [{}]", username);
|
.fetch_posts_from_users(unique_usernames, post_count)
|
||||||
let submissions = self
|
|
||||||
.reddit_client
|
|
||||||
.fetch_user_submissions(&username, post_count)
|
|
||||||
.await?;
|
.await?;
|
||||||
user_posts.insert_many(username, submissions);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(user_posts)
|
Ok(user_posts)
|
||||||
}
|
}
|
||||||
|
|
@ -88,86 +75,13 @@ impl App {
|
||||||
&mut self,
|
&mut self,
|
||||||
user_posts: &MultiMap<String, RedditPost>,
|
user_posts: &MultiMap<String, RedditPost>,
|
||||||
) -> Result<usize> {
|
) -> Result<usize> {
|
||||||
let mut total_new_links = 0;
|
self.post_processor
|
||||||
|
.process_sources(user_posts, &self.config.sources)
|
||||||
for (source_name, source_config) in &self.config.sources {
|
|
||||||
info!("Processing source [{}]", source_name);
|
|
||||||
|
|
||||||
let username = source_config.username.clone();
|
|
||||||
let title_filter = match &source_config.title_filter {
|
|
||||||
Some(filter) => Some(
|
|
||||||
Regex::new(filter.as_str())
|
|
||||||
.context(format!("Invalid regex pattern: {}", filter))?,
|
|
||||||
),
|
|
||||||
None => None,
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(submissions) = user_posts.get_vec(&username) {
|
|
||||||
for post in submissions
|
|
||||||
.iter()
|
|
||||||
.filter(|s| filter_post(&s.title, title_filter.clone()))
|
|
||||||
{
|
|
||||||
let title = &post.title;
|
|
||||||
let body = &post.body;
|
|
||||||
let subreddit = &post.subreddit;
|
|
||||||
|
|
||||||
let magnet_links = crate::magnet::extract_magnet_links(body);
|
|
||||||
if !magnet_links.is_empty() {
|
|
||||||
let post_info = crate::PostInfo {
|
|
||||||
title: title.to_string(),
|
|
||||||
submitter: username.clone(),
|
|
||||||
subreddit: subreddit.to_string(),
|
|
||||||
magnet_links,
|
|
||||||
timestamp: post.created,
|
|
||||||
imdb_id: source_config.imdb_id.clone(),
|
|
||||||
tags: source_config.tags.clone(),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Store the post info in the database
|
|
||||||
match self.db.store_magnets(&post_info) {
|
|
||||||
Ok(count) => {
|
|
||||||
total_new_links += count;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Failed to store post info in database: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(total_new_links)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process magnet links with actions
|
/// Process magnet links with actions
|
||||||
pub async fn process_actions(&mut self) -> Result<HashMap<String, ProcessedMagnets>> {
|
pub async fn process_actions(&mut self) -> Result<HashMap<String, ProcessedMagnets>> {
|
||||||
let mut action_results = HashMap::new();
|
self.action_service.process_actions().await
|
||||||
|
|
||||||
for action in &mut self.actions {
|
|
||||||
debug!("Processing magnet links with {}", action.get_name());
|
|
||||||
|
|
||||||
match action.process_unprocessed_magnets(&mut self.db).await {
|
|
||||||
Ok(processed_magnets) => {
|
|
||||||
let count = processed_magnets.success.len();
|
|
||||||
debug!(
|
|
||||||
"Successfully processed {} magnet links with {}",
|
|
||||||
count,
|
|
||||||
action.get_name()
|
|
||||||
);
|
|
||||||
action_results.insert(action.get_name().to_string(), processed_magnets);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!(
|
|
||||||
"Failed to process magnet links with {}: {}",
|
|
||||||
action.get_name(),
|
|
||||||
e
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(action_results)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send notifications
|
/// Send notifications
|
||||||
|
|
@ -176,28 +90,9 @@ impl App {
|
||||||
action_results: &HashMap<String, ProcessedMagnets>,
|
action_results: &HashMap<String, ProcessedMagnets>,
|
||||||
total_new_links: usize,
|
total_new_links: usize,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
for notification in &self.notifications {
|
self.notification_service
|
||||||
match notification
|
.send_notifications(action_results, total_new_links)
|
||||||
.send_notification(action_results, total_new_links)
|
|
||||||
.await
|
.await
|
||||||
{
|
|
||||||
Ok(_) => {
|
|
||||||
debug!(
|
|
||||||
"Successfully sent notification to {}",
|
|
||||||
notification.get_name()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!(
|
|
||||||
"Failed to send notification to {}: {}",
|
|
||||||
notification.get_name(),
|
|
||||||
e
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generate and display report
|
/// Generate and display report
|
||||||
|
|
@ -206,9 +101,8 @@ impl App {
|
||||||
action_results: &HashMap<String, ProcessedMagnets>,
|
action_results: &HashMap<String, ProcessedMagnets>,
|
||||||
total_new_links: usize,
|
total_new_links: usize,
|
||||||
) {
|
) {
|
||||||
for line in report::generate_report(action_results, total_new_links, true).lines() {
|
self.report_service
|
||||||
info!("{}", line);
|
.generate_report(action_results, total_new_links);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(&mut self, post_count: u32) -> Result<()> {
|
pub async fn run(&mut self, post_count: u32) -> Result<()> {
|
||||||
|
|
@ -234,11 +128,3 @@ impl App {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Filters posts based on a title filter pattern
|
|
||||||
fn filter_post(title: &str, title_filter: Option<Regex>) -> bool {
|
|
||||||
match title_filter {
|
|
||||||
Some(pattern) => pattern.is_match(title),
|
|
||||||
None => true,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,7 @@ mod notifications;
|
||||||
mod reddit_client;
|
mod reddit_client;
|
||||||
mod report;
|
mod report;
|
||||||
mod schema;
|
mod schema;
|
||||||
|
mod services;
|
||||||
|
|
||||||
/// Post information with magnet links
|
/// Post information with magnet links
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
|
|
||||||
312
src/services/action.rs
Normal file
312
src/services/action.rs
Normal file
|
|
@ -0,0 +1,312 @@
|
||||||
|
use crate::actions::action::{Action, ProcessedMagnets};
|
||||||
|
use crate::actions::bitmagnet::BitmagnetAction;
|
||||||
|
use crate::actions::factory::init_action;
|
||||||
|
use crate::actions::transmission::TransmissionAction;
|
||||||
|
use crate::config::Config;
|
||||||
|
use crate::services::database::DatabaseService;
|
||||||
|
use color_eyre::eyre::Result;
|
||||||
|
use log::{debug, warn};
|
||||||
|
use std::cell::RefCell;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::rc::Rc;
|
||||||
|
|
||||||
|
/// Service for managing actions
|
||||||
|
pub struct ActionService {
|
||||||
|
actions: Vec<Box<dyn Action>>,
|
||||||
|
db_service: Rc<RefCell<DatabaseService>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ActionService {
|
||||||
|
/// Create a new ActionService
|
||||||
|
pub fn new(db_service: Rc<RefCell<DatabaseService>>) -> Self {
|
||||||
|
Self {
|
||||||
|
actions: Vec::new(),
|
||||||
|
db_service,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Initialize actions based on configuration
|
||||||
|
pub fn init_actions(&mut self, config: &Config) -> Result<()> {
|
||||||
|
if let Some(action) = init_action(&config.bitmagnet, BitmagnetAction::new)? {
|
||||||
|
self.actions.push(action);
|
||||||
|
}
|
||||||
|
if let Some(action) = init_action(&config.transmission, TransmissionAction::new)? {
|
||||||
|
self.actions.push(action);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Process magnet links with actions
|
||||||
|
#[allow(clippy::await_holding_refcell_ref)] // Not sure how to address this
|
||||||
|
pub async fn process_actions(&mut self) -> Result<HashMap<String, ProcessedMagnets>> {
|
||||||
|
let mut action_results = HashMap::new();
|
||||||
|
|
||||||
|
for action in &mut self.actions {
|
||||||
|
debug!("Processing magnet links with {}", action.get_name());
|
||||||
|
|
||||||
|
match action
|
||||||
|
.process_unprocessed_magnets(&mut self.db_service.borrow_mut())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(processed_magnets) => {
|
||||||
|
let count = processed_magnets.success.len();
|
||||||
|
debug!(
|
||||||
|
"Successfully processed {} magnet links with {}",
|
||||||
|
count,
|
||||||
|
action.get_name()
|
||||||
|
);
|
||||||
|
action_results.insert(action.get_name().to_string(), processed_magnets);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(
|
||||||
|
"Failed to process magnet links with {}: {}",
|
||||||
|
action.get_name(),
|
||||||
|
e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(action_results)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::actions::action::ProcessedMagnets;
|
||||||
|
use crate::db::Database;
|
||||||
|
use crate::models::Magnet;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use chrono::NaiveDateTime;
|
||||||
|
use color_eyre::eyre::{eyre, Result};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
use tempfile::tempdir;
|
||||||
|
|
||||||
|
struct MockAction {
|
||||||
|
name: &'static str,
|
||||||
|
process_called: AtomicBool,
|
||||||
|
should_fail: bool,
|
||||||
|
success_count: usize,
|
||||||
|
failed_count: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MockAction {
|
||||||
|
fn new(name: &'static str, should_fail: bool) -> Self {
|
||||||
|
Self {
|
||||||
|
name,
|
||||||
|
process_called: AtomicBool::new(false),
|
||||||
|
should_fail,
|
||||||
|
success_count: 0,
|
||||||
|
failed_count: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn with_success_count(mut self, count: usize) -> Self {
|
||||||
|
self.success_count = count;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
fn with_failed_count(mut self, count: usize) -> Self {
|
||||||
|
self.failed_count = count;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl Action for MockAction {
|
||||||
|
fn name() -> &'static str
|
||||||
|
where
|
||||||
|
Self: Sized,
|
||||||
|
{
|
||||||
|
"MockAction"
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_name(&self) -> &'static str {
|
||||||
|
self.name
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn process_unprocessed_magnets(
|
||||||
|
&mut self,
|
||||||
|
_db_service: &mut DatabaseService,
|
||||||
|
) -> Result<ProcessedMagnets> {
|
||||||
|
self.process_called.store(true, Ordering::SeqCst);
|
||||||
|
|
||||||
|
if self.should_fail {
|
||||||
|
return Err(eyre!("Mock action failed"));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut success = Vec::new();
|
||||||
|
for i in 0..self.success_count {
|
||||||
|
success.push(create_test_magnet(
|
||||||
|
i as i32,
|
||||||
|
&format!("Success Magnet {}", i),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut failed = Vec::new();
|
||||||
|
for i in 0..self.failed_count {
|
||||||
|
failed.push(create_test_magnet(
|
||||||
|
i as i32 + 100,
|
||||||
|
&format!("Failed Magnet {}", i),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(ProcessedMagnets { success, failed })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_test_magnet(id: i32, title: &str) -> Magnet {
|
||||||
|
Magnet {
|
||||||
|
id,
|
||||||
|
title: title.to_string(),
|
||||||
|
submitter: "test_user".to_string(),
|
||||||
|
subreddit: "test_subreddit".to_string(),
|
||||||
|
link: format!("magnet:?xt=urn:btih:{}", id),
|
||||||
|
published_at: NaiveDateTime::default(),
|
||||||
|
imdb_id: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn setup_test_db() -> Rc<RefCell<DatabaseService>> {
|
||||||
|
let temp_dir = tempdir().unwrap();
|
||||||
|
let db_path = temp_dir.path().join("test.db");
|
||||||
|
|
||||||
|
let db = Database::new(&db_path).unwrap();
|
||||||
|
let db_service = DatabaseService::new(db);
|
||||||
|
Rc::new(RefCell::new(db_service))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_new_action_service() {
|
||||||
|
let db_service = setup_test_db();
|
||||||
|
|
||||||
|
let action_service = ActionService::new(db_service);
|
||||||
|
|
||||||
|
assert_eq!(action_service.actions.len(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_init_actions_with_empty_config() {
|
||||||
|
let db_service = setup_test_db();
|
||||||
|
let mut action_service = ActionService::new(db_service);
|
||||||
|
|
||||||
|
// Create an empty config
|
||||||
|
let config = Config {
|
||||||
|
bitmagnet: None,
|
||||||
|
transmission: None,
|
||||||
|
ntfy: None,
|
||||||
|
sources: HashMap::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let result = action_service.init_actions(&config);
|
||||||
|
|
||||||
|
assert!(result.is_ok());
|
||||||
|
assert_eq!(action_service.actions.len(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_process_actions_with_no_actions() {
|
||||||
|
let db_service = setup_test_db();
|
||||||
|
let mut action_service = ActionService::new(db_service);
|
||||||
|
|
||||||
|
let result = action_service.process_actions().await;
|
||||||
|
|
||||||
|
assert!(result.is_ok());
|
||||||
|
let action_results = result.unwrap();
|
||||||
|
assert_eq!(action_results.len(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_process_actions_with_successful_action() {
|
||||||
|
let db_service = setup_test_db();
|
||||||
|
let mut action_service = ActionService::new(db_service);
|
||||||
|
|
||||||
|
// Create a mock action that succeeds
|
||||||
|
let mock_action = MockAction::new("SuccessAction", false).with_success_count(2);
|
||||||
|
|
||||||
|
action_service.actions.push(Box::new(mock_action));
|
||||||
|
|
||||||
|
let result = action_service.process_actions().await;
|
||||||
|
|
||||||
|
assert!(result.is_ok());
|
||||||
|
let action_results = result.unwrap();
|
||||||
|
assert_eq!(action_results.len(), 1);
|
||||||
|
assert!(action_results.contains_key("SuccessAction"));
|
||||||
|
|
||||||
|
let processed_magnets = &action_results["SuccessAction"];
|
||||||
|
assert_eq!(processed_magnets.success.len(), 2);
|
||||||
|
assert_eq!(processed_magnets.failed.len(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_process_actions_with_failing_action() {
|
||||||
|
let db_service = setup_test_db();
|
||||||
|
let mut action_service = ActionService::new(db_service);
|
||||||
|
|
||||||
|
// Create a mock action that fails
|
||||||
|
let mock_action = MockAction::new("FailingAction", true);
|
||||||
|
|
||||||
|
action_service.actions.push(Box::new(mock_action));
|
||||||
|
|
||||||
|
let result = action_service.process_actions().await;
|
||||||
|
|
||||||
|
assert!(result.is_ok());
|
||||||
|
let action_results = result.unwrap();
|
||||||
|
assert_eq!(action_results.len(), 0); // No results for failing action
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_process_actions_with_mixed_results() {
|
||||||
|
let db_service = setup_test_db();
|
||||||
|
let mut action_service = ActionService::new(db_service);
|
||||||
|
|
||||||
|
// Create a mock action with both successful and failed magnets
|
||||||
|
let mock_action = MockAction::new("MixedAction", false)
|
||||||
|
.with_success_count(1)
|
||||||
|
.with_failed_count(1);
|
||||||
|
|
||||||
|
action_service.actions.push(Box::new(mock_action));
|
||||||
|
|
||||||
|
let result = action_service.process_actions().await;
|
||||||
|
|
||||||
|
assert!(result.is_ok());
|
||||||
|
let action_results = result.unwrap();
|
||||||
|
assert_eq!(action_results.len(), 1);
|
||||||
|
assert!(action_results.contains_key("MixedAction"));
|
||||||
|
|
||||||
|
let processed_magnets = &action_results["MixedAction"];
|
||||||
|
assert_eq!(processed_magnets.success.len(), 1);
|
||||||
|
assert_eq!(processed_magnets.failed.len(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_process_actions_with_multiple_actions() {
|
||||||
|
let db_service = setup_test_db();
|
||||||
|
let mut action_service = ActionService::new(db_service);
|
||||||
|
|
||||||
|
// Create two mock actions
|
||||||
|
let action1 = MockAction::new("Action1", false).with_success_count(1);
|
||||||
|
|
||||||
|
let action2 = MockAction::new("Action2", false).with_success_count(1);
|
||||||
|
|
||||||
|
action_service.actions.push(Box::new(action1));
|
||||||
|
action_service.actions.push(Box::new(action2));
|
||||||
|
|
||||||
|
let result = action_service.process_actions().await;
|
||||||
|
|
||||||
|
assert!(result.is_ok());
|
||||||
|
let action_results = result.unwrap();
|
||||||
|
assert_eq!(action_results.len(), 2);
|
||||||
|
assert!(action_results.contains_key("Action1"));
|
||||||
|
assert!(action_results.contains_key("Action2"));
|
||||||
|
|
||||||
|
let processed_magnets1 = &action_results["Action1"];
|
||||||
|
assert_eq!(processed_magnets1.success.len(), 1);
|
||||||
|
|
||||||
|
let processed_magnets2 = &action_results["Action2"];
|
||||||
|
assert_eq!(processed_magnets2.success.len(), 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
140
src/services/database.rs
Normal file
140
src/services/database.rs
Normal file
|
|
@ -0,0 +1,140 @@
|
||||||
|
use crate::db::{Database, ProcessedTable};
|
||||||
|
use crate::models;
|
||||||
|
use crate::PostInfo;
|
||||||
|
use color_eyre::eyre::{Result, WrapErr};
|
||||||
|
use log::debug;
|
||||||
|
|
||||||
|
/// Service for database operations
|
||||||
|
pub struct DatabaseService {
|
||||||
|
db: Database,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DatabaseService {
|
||||||
|
/// Create a new DatabaseService
|
||||||
|
pub fn new(db: Database) -> Self {
|
||||||
|
Self { db }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Store magnet links from a post
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `post_info` - Information about the post containing magnet links
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
///
|
||||||
|
/// The number of new magnet links stored
|
||||||
|
pub fn store_magnets(&mut self, post_info: &PostInfo) -> Result<usize> {
|
||||||
|
debug!(
|
||||||
|
"Storing {} magnet links from post: {}",
|
||||||
|
post_info.magnet_links.len(),
|
||||||
|
post_info.title
|
||||||
|
);
|
||||||
|
|
||||||
|
self.db
|
||||||
|
.store_magnets(post_info)
|
||||||
|
.wrap_err_with(|| format!("Failed to store magnets for post: {}", post_info.title))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get all magnet links from the database
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
///
|
||||||
|
/// A vector of all magnet links
|
||||||
|
#[allow(dead_code)] // Used in tests
|
||||||
|
pub fn get_all_magnets(&mut self) -> Result<Vec<models::Magnet>> {
|
||||||
|
debug!("Retrieving all magnet links from database");
|
||||||
|
|
||||||
|
self.db
|
||||||
|
.get_all_magnets()
|
||||||
|
.wrap_err("Failed to retrieve all magnet links from database")
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get tags for a magnet link
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `magnet_id` - The ID of the magnet link
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
///
|
||||||
|
/// A vector of tags associated with the magnet link
|
||||||
|
pub fn get_tags_for_magnet(&mut self, magnet_id: i32) -> Result<Vec<String>> {
|
||||||
|
debug!("Retrieving tags for magnet ID: {}", magnet_id);
|
||||||
|
|
||||||
|
self.db
|
||||||
|
.get_tags_for_magnet(magnet_id)
|
||||||
|
.wrap_err_with(|| format!("Failed to retrieve tags for magnet ID: {}", magnet_id))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get unprocessed magnet links for a specific action
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
///
|
||||||
|
/// A vector of unprocessed magnet links
|
||||||
|
pub fn get_unprocessed_magnets<T: ProcessedTable>(&mut self) -> Result<Vec<models::Magnet>> {
|
||||||
|
debug!("Retrieving unprocessed magnet links");
|
||||||
|
|
||||||
|
self.db
|
||||||
|
.get_unprocessed_magnets_for_table::<T>()
|
||||||
|
.wrap_err("Failed to retrieve unprocessed magnet links")
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mark a magnet link as processed for a specific action
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `magnet_id` - The ID of the magnet link
|
||||||
|
pub fn mark_magnet_processed<T: ProcessedTable>(&mut self, magnet_id: i32) -> Result<()> {
|
||||||
|
debug!("Marking magnet ID {} as processed", magnet_id);
|
||||||
|
|
||||||
|
self.db
|
||||||
|
.mark_magnet_processed_for_table::<T>(magnet_id)
|
||||||
|
.wrap_err_with(|| format!("Failed to mark magnet ID {} as processed", magnet_id))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use chrono::Utc;
|
||||||
|
use tempfile::tempdir;
|
||||||
|
|
||||||
|
fn create_test_post_info() -> PostInfo {
|
||||||
|
PostInfo {
|
||||||
|
title: "Test Post".to_string(),
|
||||||
|
submitter: "test_user".to_string(),
|
||||||
|
subreddit: "test_subreddit".to_string(),
|
||||||
|
magnet_links: vec![
|
||||||
|
"magnet:?xt=urn:btih:test1".to_string(),
|
||||||
|
"magnet:?xt=urn:btih:test2".to_string(),
|
||||||
|
],
|
||||||
|
timestamp: Utc::now(),
|
||||||
|
imdb_id: Some("tt1234567".to_string()),
|
||||||
|
tags: vec!["tag1".to_string(), "tag2".to_string()],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_store_and_retrieve_magnets() -> Result<()> {
|
||||||
|
let temp_dir = tempdir()?;
|
||||||
|
let db_path = temp_dir.path().join("test.db");
|
||||||
|
|
||||||
|
let db = Database::new(&db_path)?;
|
||||||
|
let mut service = DatabaseService::new(db);
|
||||||
|
|
||||||
|
let post_info = create_test_post_info();
|
||||||
|
|
||||||
|
let count = service.store_magnets(&post_info)?;
|
||||||
|
assert_eq!(count, 2, "Should have stored 2 new magnet links");
|
||||||
|
|
||||||
|
let magnets = service.get_all_magnets()?;
|
||||||
|
assert_eq!(magnets.len(), 2, "Should have retrieved 2 magnet links");
|
||||||
|
|
||||||
|
let magnet_links: Vec<String> = magnets.iter().map(|m| m.link.clone()).collect();
|
||||||
|
assert!(magnet_links.contains(&"magnet:?xt=urn:btih:test1".to_string()));
|
||||||
|
assert!(magnet_links.contains(&"magnet:?xt=urn:btih:test2".to_string()));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
10
src/services/mod.rs
Normal file
10
src/services/mod.rs
Normal file
|
|
@ -0,0 +1,10 @@
|
||||||
|
/// Service layer for the application
|
||||||
|
///
|
||||||
|
/// This module contains service implementations for various parts of the application.
|
||||||
|
/// Services encapsulate business logic and provide a clean API for the rest of the application.
|
||||||
|
pub mod action;
|
||||||
|
pub mod database;
|
||||||
|
pub mod notification;
|
||||||
|
pub mod post_processor;
|
||||||
|
pub mod reddit;
|
||||||
|
pub mod report;
|
||||||
305
src/services/notification.rs
Normal file
305
src/services/notification.rs
Normal file
|
|
@ -0,0 +1,305 @@
|
||||||
|
use crate::actions::action::ProcessedMagnets;
|
||||||
|
use crate::config::Config;
|
||||||
|
use crate::notifications::factory::init_notification;
|
||||||
|
use crate::notifications::notification::Notification;
|
||||||
|
use crate::notifications::ntfy::NtfyNotification;
|
||||||
|
use color_eyre::eyre::Result;
|
||||||
|
use log::{debug, warn};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
/// Service for managing notifications
|
||||||
|
pub struct NotificationService {
|
||||||
|
notifications: Vec<Box<dyn Notification>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NotificationService {
|
||||||
|
/// Create a new NotificationService
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
notifications: Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Initialize notifications based on configuration
|
||||||
|
pub fn init_notifications(&mut self, config: &Config) -> Result<()> {
|
||||||
|
if let Some(notification) = init_notification(&config.ntfy, NtfyNotification::new)? {
|
||||||
|
self.notifications.push(notification);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send notifications
|
||||||
|
pub async fn send_notifications(
|
||||||
|
&self,
|
||||||
|
action_results: &HashMap<String, ProcessedMagnets>,
|
||||||
|
total_new_links: usize,
|
||||||
|
) -> Result<()> {
|
||||||
|
for notification in &self.notifications {
|
||||||
|
match notification
|
||||||
|
.send_notification(action_results, total_new_links)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(_) => {
|
||||||
|
debug!(
|
||||||
|
"Successfully sent notification to {}",
|
||||||
|
notification.get_name()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(
|
||||||
|
"Failed to send notification to {}: {}",
|
||||||
|
notification.get_name(),
|
||||||
|
e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::actions::action::ProcessedMagnets;
|
||||||
|
use crate::models::Magnet;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use chrono::NaiveDateTime;
|
||||||
|
use color_eyre::eyre::{eyre, Result};
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
struct MockNotification {
|
||||||
|
name: &'static str,
|
||||||
|
should_notify_result: bool,
|
||||||
|
send_called: Arc<AtomicBool>,
|
||||||
|
should_fail: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MockNotification {
|
||||||
|
fn new(name: &'static str, should_notify: bool, should_fail: bool) -> Self {
|
||||||
|
Self {
|
||||||
|
name,
|
||||||
|
should_notify_result: should_notify,
|
||||||
|
send_called: Arc::new(AtomicBool::new(false)),
|
||||||
|
should_fail,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl Notification for MockNotification {
|
||||||
|
fn name() -> &'static str
|
||||||
|
where
|
||||||
|
Self: Sized,
|
||||||
|
{
|
||||||
|
"MockNotification"
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_name(&self) -> &'static str {
|
||||||
|
self.name
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_notification(
|
||||||
|
&self,
|
||||||
|
_action_results: &HashMap<String, ProcessedMagnets>,
|
||||||
|
_total_new_links: usize,
|
||||||
|
) -> Result<()> {
|
||||||
|
self.send_called.store(true, Ordering::SeqCst);
|
||||||
|
|
||||||
|
if self.should_fail {
|
||||||
|
return Err(eyre!("Mock notification failed"));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn should_notify(
|
||||||
|
&self,
|
||||||
|
_action_results: &HashMap<String, ProcessedMagnets>,
|
||||||
|
_total_new_links: usize,
|
||||||
|
) -> bool {
|
||||||
|
self.should_notify_result
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_test_magnet(id: i32, title: &str) -> Magnet {
|
||||||
|
Magnet {
|
||||||
|
id,
|
||||||
|
title: title.to_string(),
|
||||||
|
submitter: "test_user".to_string(),
|
||||||
|
subreddit: "test_subreddit".to_string(),
|
||||||
|
link: format!("magnet:?xt=urn:btih:{}", id),
|
||||||
|
published_at: NaiveDateTime::default(),
|
||||||
|
imdb_id: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_processed_magnets(success_count: usize, failed_count: usize) -> ProcessedMagnets {
|
||||||
|
let mut success = Vec::new();
|
||||||
|
for i in 0..success_count {
|
||||||
|
success.push(create_test_magnet(
|
||||||
|
i as i32,
|
||||||
|
&format!("Success Magnet {}", i),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut failed = Vec::new();
|
||||||
|
for i in 0..failed_count {
|
||||||
|
failed.push(create_test_magnet(
|
||||||
|
i as i32 + 100,
|
||||||
|
&format!("Failed Magnet {}", i),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
ProcessedMagnets { success, failed }
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_new_notification_service() {
|
||||||
|
let notification_service = NotificationService::new();
|
||||||
|
assert_eq!(notification_service.notifications.len(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_init_notifications_with_empty_config() {
|
||||||
|
let mut notification_service = NotificationService::new();
|
||||||
|
|
||||||
|
// Create an empty config
|
||||||
|
let config = Config {
|
||||||
|
bitmagnet: None,
|
||||||
|
transmission: None,
|
||||||
|
ntfy: None,
|
||||||
|
sources: HashMap::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let result = notification_service.init_notifications(&config);
|
||||||
|
|
||||||
|
assert!(result.is_ok());
|
||||||
|
assert_eq!(notification_service.notifications.len(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_send_notifications_with_no_notifications() {
|
||||||
|
let notification_service = NotificationService::new();
|
||||||
|
let action_results = HashMap::new();
|
||||||
|
|
||||||
|
let result = notification_service
|
||||||
|
.send_notifications(&action_results, 0)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
assert!(result.is_ok());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_send_notifications_with_successful_notification() {
|
||||||
|
let mut notification_service = NotificationService::new();
|
||||||
|
|
||||||
|
// Create a mock notification that succeeds
|
||||||
|
let mock_notification = MockNotification::new("SuccessNotification", true, false);
|
||||||
|
let send_called = mock_notification.send_called.clone();
|
||||||
|
|
||||||
|
notification_service
|
||||||
|
.notifications
|
||||||
|
.push(Box::new(mock_notification));
|
||||||
|
|
||||||
|
// Create some action results
|
||||||
|
let mut action_results = HashMap::new();
|
||||||
|
action_results.insert("TestAction".to_string(), create_processed_magnets(1, 0));
|
||||||
|
|
||||||
|
let result = notification_service
|
||||||
|
.send_notifications(&action_results, 1)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
assert!(result.is_ok());
|
||||||
|
assert!(send_called.load(Ordering::SeqCst));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_send_notifications_with_failing_notification() {
|
||||||
|
let mut notification_service = NotificationService::new();
|
||||||
|
|
||||||
|
// Create a mock notification that fails
|
||||||
|
let mock_notification = MockNotification::new("FailingNotification", true, true);
|
||||||
|
let send_called = mock_notification.send_called.clone();
|
||||||
|
|
||||||
|
notification_service
|
||||||
|
.notifications
|
||||||
|
.push(Box::new(mock_notification));
|
||||||
|
|
||||||
|
// Create some action results
|
||||||
|
let mut action_results = HashMap::new();
|
||||||
|
action_results.insert("TestAction".to_string(), create_processed_magnets(1, 0));
|
||||||
|
|
||||||
|
let result = notification_service
|
||||||
|
.send_notifications(&action_results, 1)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
assert!(result.is_ok()); // The service should still return Ok even if a notification fails
|
||||||
|
assert!(send_called.load(Ordering::SeqCst));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_send_notifications_with_multiple_notifications() {
|
||||||
|
let mut notification_service = NotificationService::new();
|
||||||
|
|
||||||
|
// Create two mock notifications
|
||||||
|
let mock_notification1 = MockNotification::new("Notification1", true, false);
|
||||||
|
let send_called1 = mock_notification1.send_called.clone();
|
||||||
|
|
||||||
|
let mock_notification2 = MockNotification::new("Notification2", true, false);
|
||||||
|
let send_called2 = mock_notification2.send_called.clone();
|
||||||
|
|
||||||
|
notification_service
|
||||||
|
.notifications
|
||||||
|
.push(Box::new(mock_notification1));
|
||||||
|
notification_service
|
||||||
|
.notifications
|
||||||
|
.push(Box::new(mock_notification2));
|
||||||
|
|
||||||
|
// Create some action results
|
||||||
|
let mut action_results = HashMap::new();
|
||||||
|
action_results.insert("TestAction".to_string(), create_processed_magnets(1, 0));
|
||||||
|
|
||||||
|
let result = notification_service
|
||||||
|
.send_notifications(&action_results, 1)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
assert!(result.is_ok());
|
||||||
|
assert!(send_called1.load(Ordering::SeqCst));
|
||||||
|
assert!(send_called2.load(Ordering::SeqCst));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_send_notifications_with_mixed_results() {
|
||||||
|
let mut notification_service = NotificationService::new();
|
||||||
|
|
||||||
|
// Create one successful and one failing notification
|
||||||
|
let mock_notification1 = MockNotification::new("SuccessNotification", true, false);
|
||||||
|
let send_called1 = mock_notification1.send_called.clone();
|
||||||
|
|
||||||
|
let mock_notification2 = MockNotification::new("FailingNotification", true, true);
|
||||||
|
let send_called2 = mock_notification2.send_called.clone();
|
||||||
|
|
||||||
|
notification_service
|
||||||
|
.notifications
|
||||||
|
.push(Box::new(mock_notification1));
|
||||||
|
notification_service
|
||||||
|
.notifications
|
||||||
|
.push(Box::new(mock_notification2));
|
||||||
|
|
||||||
|
// Create some action results with both successful and failed magnets
|
||||||
|
let mut action_results = HashMap::new();
|
||||||
|
action_results.insert("TestAction".to_string(), create_processed_magnets(1, 1));
|
||||||
|
|
||||||
|
let result = notification_service
|
||||||
|
.send_notifications(&action_results, 1)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
assert!(result.is_ok());
|
||||||
|
assert!(send_called1.load(Ordering::SeqCst));
|
||||||
|
assert!(send_called2.load(Ordering::SeqCst));
|
||||||
|
}
|
||||||
|
}
|
||||||
146
src/services/post_processor.rs
Normal file
146
src/services/post_processor.rs
Normal file
|
|
@ -0,0 +1,146 @@
|
||||||
|
use crate::services::database::DatabaseService;
|
||||||
|
use color_eyre::eyre::{Result, WrapErr};
|
||||||
|
use log::{info, warn};
|
||||||
|
use regex::Regex;
|
||||||
|
|
||||||
|
use std::cell::RefCell;
|
||||||
|
use std::rc::Rc;
|
||||||
|
|
||||||
|
/// Service for processing posts and extracting magnet links
|
||||||
|
pub struct PostProcessorService {
|
||||||
|
db_service: Rc<RefCell<DatabaseService>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PostProcessorService {
|
||||||
|
/// Create a new PostProcessorService
|
||||||
|
pub fn new(db_service: Rc<RefCell<DatabaseService>>) -> Self {
|
||||||
|
Self { db_service }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Process a collection of posts from various sources
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `user_posts` - A map of usernames to their posts
|
||||||
|
/// * `source_configs` - A map of source names to their configurations
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
///
|
||||||
|
/// The number of new magnet links stored
|
||||||
|
pub fn process_sources(
|
||||||
|
&mut self,
|
||||||
|
user_posts: &multimap::MultiMap<String, crate::reddit_client::RedditPost>,
|
||||||
|
source_configs: &std::collections::HashMap<String, crate::config::SourceConfig>,
|
||||||
|
) -> Result<usize> {
|
||||||
|
let mut total_new_links = 0;
|
||||||
|
|
||||||
|
for (source_name, source_config) in source_configs {
|
||||||
|
info!("Processing source [{}]", source_name);
|
||||||
|
|
||||||
|
let username = source_config.username.clone();
|
||||||
|
let title_filter = match &source_config.title_filter {
|
||||||
|
Some(filter) => Some(
|
||||||
|
Regex::new(filter.as_str())
|
||||||
|
.wrap_err_with(|| format!("Invalid regex pattern: {}", filter))?,
|
||||||
|
),
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(submissions) = user_posts.get_vec(&username) {
|
||||||
|
for post in submissions
|
||||||
|
.iter()
|
||||||
|
.filter(|s| filter_post(&s.title, title_filter.clone()))
|
||||||
|
{
|
||||||
|
let title = &post.title;
|
||||||
|
let body = &post.body;
|
||||||
|
let subreddit = &post.subreddit;
|
||||||
|
|
||||||
|
let magnet_links = crate::magnet::extract_magnet_links(body);
|
||||||
|
if !magnet_links.is_empty() {
|
||||||
|
let post_info = crate::PostInfo {
|
||||||
|
title: title.to_string(),
|
||||||
|
submitter: username.clone(),
|
||||||
|
subreddit: subreddit.to_string(),
|
||||||
|
magnet_links,
|
||||||
|
timestamp: post.created,
|
||||||
|
imdb_id: source_config.imdb_id.clone(),
|
||||||
|
tags: source_config.tags.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Store the post info in the database
|
||||||
|
match self.db_service.borrow_mut().store_magnets(&post_info) {
|
||||||
|
Ok(count) => {
|
||||||
|
total_new_links += count;
|
||||||
|
info!(
|
||||||
|
"Stored {} new magnet links from post: {}",
|
||||||
|
count, post_info.title
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to store post info in database: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(total_new_links)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Filters posts based on a title filter pattern
|
||||||
|
fn filter_post(title: &str, title_filter: Option<Regex>) -> bool {
|
||||||
|
match title_filter {
|
||||||
|
Some(pattern) => pattern.is_match(title),
|
||||||
|
None => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::config::SourceConfig;
|
||||||
|
use crate::db::Database;
|
||||||
|
use crate::reddit_client::RedditPost;
|
||||||
|
use chrono::Utc;
|
||||||
|
use multimap::MultiMap;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use tempfile::tempdir;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_process_sources() -> Result<()> {
|
||||||
|
let temp_dir = tempdir()?;
|
||||||
|
let db_path = temp_dir.path().join("test.db");
|
||||||
|
|
||||||
|
let db = Database::new(&db_path)?;
|
||||||
|
let db_service = DatabaseService::new(db);
|
||||||
|
let db_service = Rc::new(RefCell::new(db_service));
|
||||||
|
let mut service = PostProcessorService::new(db_service);
|
||||||
|
|
||||||
|
let mut user_posts = MultiMap::new();
|
||||||
|
let mut source_configs = HashMap::new();
|
||||||
|
|
||||||
|
let post = RedditPost {
|
||||||
|
title: "Test Post".to_string(),
|
||||||
|
body: "Here are some magnet links: magnet:?xt=urn:btih:test1 and magnet:?xt=urn:btih:test2".to_string(),
|
||||||
|
subreddit: "test_subreddit".to_string(),
|
||||||
|
created: Utc::now(),
|
||||||
|
};
|
||||||
|
user_posts.insert("test_user".to_string(), post);
|
||||||
|
|
||||||
|
let source_config = SourceConfig {
|
||||||
|
username: "test_user".to_string(),
|
||||||
|
title_filter: None,
|
||||||
|
imdb_id: Some("tt1234567".to_string()),
|
||||||
|
tags: vec!["tag1".to_string(), "tag2".to_string()],
|
||||||
|
};
|
||||||
|
source_configs.insert("test_source".to_string(), source_config);
|
||||||
|
|
||||||
|
let count = service.process_sources(&mut user_posts, &mut source_configs)?;
|
||||||
|
|
||||||
|
assert_eq!(count, 2, "Should have stored 2 new magnet links");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
134
src/services/reddit.rs
Normal file
134
src/services/reddit.rs
Normal file
|
|
@ -0,0 +1,134 @@
|
||||||
|
use crate::reddit_client::{RedditClient, RedditPost};
|
||||||
|
use color_eyre::eyre::{Result, WrapErr};
|
||||||
|
use log::{debug, error, info, warn};
|
||||||
|
use multimap::MultiMap;
|
||||||
|
use std::collections::HashSet;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::time::sleep;
|
||||||
|
|
||||||
|
/// Configuration for the Reddit service
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct RedditServiceConfig {
|
||||||
|
/// Maximum number of retries for API calls
|
||||||
|
pub max_retries: u32,
|
||||||
|
/// Delay between retries in milliseconds
|
||||||
|
pub retry_delay_ms: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for RedditServiceConfig {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
max_retries: 3,
|
||||||
|
retry_delay_ms: 1000,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Service for interacting with Reddit
|
||||||
|
pub struct RedditService {
|
||||||
|
client: RedditClient,
|
||||||
|
config: RedditServiceConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RedditService {
|
||||||
|
/// Create a new RedditService with default configuration
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self::with_config(RedditServiceConfig::default())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a new RedditService with custom configuration
|
||||||
|
pub fn with_config(config: RedditServiceConfig) -> Self {
|
||||||
|
Self {
|
||||||
|
client: RedditClient::new(),
|
||||||
|
config,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fetch posts from multiple Reddit users
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `usernames` - A set of Reddit usernames to fetch posts from
|
||||||
|
/// * `post_count` - The maximum number of posts to fetch per user
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
///
|
||||||
|
/// A MultiMap mapping usernames to their posts
|
||||||
|
pub async fn fetch_posts_from_users(
|
||||||
|
&self,
|
||||||
|
usernames: HashSet<String>,
|
||||||
|
post_count: u32,
|
||||||
|
) -> Result<MultiMap<String, RedditPost>> {
|
||||||
|
let mut user_posts = MultiMap::new();
|
||||||
|
|
||||||
|
for username in usernames {
|
||||||
|
info!("Fetching posts from user [{}]", username);
|
||||||
|
match self
|
||||||
|
.fetch_user_posts_with_retry(&username, post_count)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(posts) => {
|
||||||
|
debug!("Found {} posts for user [{}]", posts.len(), username);
|
||||||
|
user_posts.insert_many(username, posts);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to fetch posts for user [{}]: {}", username, e);
|
||||||
|
// Continue with other users even if one fails
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(user_posts)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fetch posts from a single Reddit user with retry logic
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `username` - The Reddit username to fetch posts from
|
||||||
|
/// * `post_count` - The maximum number of posts to fetch
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
///
|
||||||
|
/// A vector of RedditPost objects
|
||||||
|
async fn fetch_user_posts_with_retry(
|
||||||
|
&self,
|
||||||
|
username: &str,
|
||||||
|
post_count: u32,
|
||||||
|
) -> Result<Vec<RedditPost>> {
|
||||||
|
let mut attempts = 0;
|
||||||
|
let max_retries = self.config.max_retries;
|
||||||
|
let retry_delay = Duration::from_millis(self.config.retry_delay_ms);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
attempts += 1;
|
||||||
|
match self
|
||||||
|
.client
|
||||||
|
.fetch_user_submissions(username, post_count)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(posts) => return Ok(posts),
|
||||||
|
Err(e) => {
|
||||||
|
if attempts > max_retries {
|
||||||
|
return Err(e).wrap_err_with(|| {
|
||||||
|
format!(
|
||||||
|
"Failed to fetch posts for user [{}] after {} attempts",
|
||||||
|
username, attempts
|
||||||
|
)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
warn!(
|
||||||
|
"Attempt {}/{} failed for user [{}]: {}. Retrying in {}ms...",
|
||||||
|
attempts,
|
||||||
|
max_retries + 1,
|
||||||
|
username,
|
||||||
|
e,
|
||||||
|
self.config.retry_delay_ms
|
||||||
|
);
|
||||||
|
sleep(retry_delay).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
40
src/services/report.rs
Normal file
40
src/services/report.rs
Normal file
|
|
@ -0,0 +1,40 @@
|
||||||
|
use crate::actions::action::ProcessedMagnets;
|
||||||
|
use crate::report;
|
||||||
|
use log::info;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
/// Service for generating reports
|
||||||
|
pub struct ReportService;
|
||||||
|
|
||||||
|
impl ReportService {
|
||||||
|
/// Create a new ReportService
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Generate and display report
|
||||||
|
pub fn generate_report(
|
||||||
|
&self,
|
||||||
|
action_results: &HashMap<String, ProcessedMagnets>,
|
||||||
|
total_new_links: usize,
|
||||||
|
) {
|
||||||
|
for line in report::generate_report(action_results, total_new_links, true).lines() {
|
||||||
|
info!("{}", line);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::actions::action::ProcessedMagnets;
|
||||||
|
use crate::models::Magnet;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_new_report_service() {
|
||||||
|
let report_service = ReportService::new();
|
||||||
|
// Just verify that we can create the service
|
||||||
|
// The actual report generation is tested in the report module
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue