Add a send to Transmission action

This commit is contained in:
Marc Plano-Lesay 2025-05-01 12:51:07 +10:00
parent 88419cbf97
commit 3f2b002f52
Signed by: kernald
GPG key ID: 66A41B08CC62A6CF
10 changed files with 952 additions and 83 deletions

1
src/actions/mod.rs Normal file
View file

@ -0,0 +1 @@
pub mod transmission;

View file

@ -0,0 +1,99 @@
use crate::db::{Database, TransmissionProcessedTable};
use color_eyre::eyre::{eyre, Result, WrapErr};
use log::{debug, info, warn};
use serde::{Deserialize, Serialize};
use transmission_rpc::{
types::{BasicAuth, TorrentAddArgs},
TransClient,
};
use url::Url;
/// Configuration for the Transmission action
#[derive(Debug, Serialize, Deserialize)]
pub struct TransmissionConfig {
pub enable: bool,
pub host: String,
pub username: String,
pub password: String,
pub port: u16,
pub download_dir: String,
}
/// Action for submitting magnet links to Transmission
pub struct TransmissionAction {
client: TransClient,
download_dir: String,
db: Database,
}
impl TransmissionAction {
pub async fn new(config: &TransmissionConfig, db: Database) -> Result<Self> {
if !config.enable {
return Err(eyre!("Transmission action is disabled"));
}
let url_str = format!("{}:{}/transmission/rpc", config.host, config.port);
let url = Url::parse(&url_str).wrap_err_with(|| format!("Invalid URL: {}", url_str))?;
let auth = BasicAuth {
user: config.username.clone(),
password: config.password.clone(),
};
let client = TransClient::with_auth(url, auth);
Ok(TransmissionAction {
client,
download_dir: config.download_dir.clone(),
db,
})
}
/// Process all unprocessed magnet links
pub async fn process_unprocessed_magnets(&mut self) -> Result<usize> {
let unprocessed_magnets = self
.db
.get_unprocessed_magnets_for_table::<TransmissionProcessedTable>()?;
let mut processed_count = 0;
for magnet in unprocessed_magnets {
if let Some(id) = magnet.id {
match self.submit_magnet(&magnet.link).await {
Ok(_) => {
info!(
"Successfully submitted magnet link to Transmission: {}",
magnet.title
);
debug!("Magnet link: {}", magnet.link);
self.db
.mark_magnet_processed_for_table::<TransmissionProcessedTable>(id)?;
processed_count += 1;
}
Err(e) => {
warn!("Failed to submit magnet link to Transmission: {}", e);
}
}
} else {
warn!("Skipping magnet with null ID: {}", magnet.link);
}
}
Ok(processed_count)
}
/// Submit a magnet link to Transmission
async fn submit_magnet(&mut self, magnet: &str) -> Result<()> {
let args = TorrentAddArgs {
filename: Some(magnet.to_string()),
download_dir: Some(self.download_dir.clone()),
..Default::default()
};
self.client
.torrent_add(args)
.await
.map_err(|e| eyre!("Failed to add torrent to Transmission: {}", e))?;
Ok(())
}
}

182
src/db.rs
View file

@ -1,5 +1,5 @@
use crate::models::{Magnet, NewMagnet};
use crate::schema::magnets;
use crate::models::{Magnet, NewMagnet, NewTransmissionProcessed, TransmissionProcessed};
use crate::schema::{magnets, transmission_processed};
use crate::PostInfo;
use color_eyre::eyre::{eyre, Result, WrapErr};
use diesel::prelude::*;
@ -8,6 +8,40 @@ use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
use std::fs::create_dir_all;
use std::path::Path;
pub trait ProcessedTable {
/// Get all processed magnet IDs from this table
fn get_processed_ids(conn: &mut SqliteConnection) -> Result<Vec<i32>>;
/// Mark a magnet as processed in this table
fn mark_processed(conn: &mut SqliteConnection, magnet_id: i32) -> Result<()>;
}
pub struct TransmissionProcessedTable;
impl ProcessedTable for TransmissionProcessedTable {
fn get_processed_ids(conn: &mut SqliteConnection) -> Result<Vec<i32>> {
transmission_processed::table
.select(transmission_processed::magnet_id)
.load(conn)
.wrap_err("Failed to load processed magnet IDs for Transmission")
}
fn mark_processed(conn: &mut SqliteConnection, magnet_id: i32) -> Result<()> {
let now = chrono::Utc::now().naive_utc();
let new_processed = NewTransmissionProcessed {
magnet_id,
processed_at: &now,
};
diesel::insert_into(transmission_processed::table)
.values(&new_processed)
.execute(conn)
.wrap_err("Failed to mark magnet as processed by Transmission")?;
Ok(())
}
}
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");
/// Database for storing magnet links and associated information
@ -15,6 +49,90 @@ pub struct Database {
conn: SqliteConnection,
}
impl Database {
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
let database_url = path
.as_ref()
.to_str()
.ok_or_else(|| eyre!("Database path is not valid UTF-8"))?;
if let Some(parent) = path.as_ref().parent() {
create_dir_all(parent)
.wrap_err_with(|| format!("Failed to create directory: {:?}", parent))?;
}
let mut conn = SqliteConnection::establish(database_url)
.wrap_err("Failed to open database connection")?;
conn.run_pending_migrations(MIGRATIONS)
.expect("Failed to apply database migrations");
Ok(Database { conn })
}
pub fn get_all_magnets(&mut self) -> Result<Vec<Magnet>> {
let results = magnets::table
.select(Magnet::as_select())
.load(&mut self.conn)
.wrap_err("Failed to load magnets from database")?;
Ok(results)
}
pub fn store_magnets(&mut self, post: &PostInfo) -> Result<usize> {
let published_at = post.timestamp.naive_utc();
// Filter out magnet links that already exist in the database
let existing_links: Vec<String> = magnets::table
.select(magnets::link)
.filter(magnets::link.eq_any(&post.magnet_links))
.load(&mut self.conn)
.wrap_err("Failed to query existing magnets")?;
let links = post
.magnet_links
.iter()
.filter(|link| !existing_links.contains(link))
.map(|m| NewMagnet {
title: post.title.as_str(),
submitter: post.submitter.as_str(),
subreddit: post.subreddit.as_str(),
link: m,
published_at: &published_at,
})
.collect::<Vec<NewMagnet>>();
diesel::insert_into(magnets::table)
.values(&links)
.execute(&mut self.conn)
.wrap_err("Failed to save new magnet")
}
/// Get all magnets that have not been processed by a specific table
pub fn get_unprocessed_magnets_for_table<T: ProcessedTable>(&mut self) -> Result<Vec<Magnet>> {
// Get all magnet IDs that have been processed by the specified table
let processed_ids = T::get_processed_ids(&mut self.conn)?;
// Get all magnets that are not in the processed list
let results = magnets::table
.select(Magnet::as_select())
.filter(magnets::id.is_not_null())
.filter(magnets::id.ne_all(processed_ids))
.load(&mut self.conn)
.wrap_err("Failed to load unprocessed magnets from database")?;
Ok(results)
}
/// Mark a magnet as processed by a specific table
pub fn mark_magnet_processed_for_table<T: ProcessedTable>(
&mut self,
magnet_id: i32,
) -> Result<()> {
T::mark_processed(&mut self.conn, magnet_id)
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -137,63 +255,3 @@ mod tests {
assert_eq!(magnets.len(), 3);
}
}
impl Database {
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
let database_url = path
.as_ref()
.to_str()
.ok_or_else(|| eyre!("Database path is not valid UTF-8"))?;
if let Some(parent) = path.as_ref().parent() {
create_dir_all(parent)
.wrap_err_with(|| format!("Failed to create directory: {:?}", parent))?;
}
let mut conn = SqliteConnection::establish(database_url)
.wrap_err("Failed to open database connection")?;
conn.run_pending_migrations(MIGRATIONS)
.expect("Failed to apply database migrations");
Ok(Database { conn })
}
pub fn get_all_magnets(&mut self) -> Result<Vec<Magnet>> {
let results = magnets::table
.select(Magnet::as_select())
.load(&mut self.conn)
.wrap_err("Failed to load magnets from database")?;
Ok(results)
}
pub fn store_magnets(&mut self, post: &PostInfo) -> Result<usize> {
let published_at = post.timestamp.naive_utc();
// Filter out magnet links that already exist in the database
let existing_links: Vec<String> = magnets::table
.select(magnets::link)
.filter(magnets::link.eq_any(&post.magnet_links))
.load(&mut self.conn)
.wrap_err("Failed to query existing magnets")?;
let links = post
.magnet_links
.iter()
.filter(|link| !existing_links.contains(link))
.map(|m| NewMagnet {
title: post.title.as_str(),
submitter: post.submitter.as_str(),
subreddit: post.subreddit.as_str(),
link: m,
published_at: &published_at,
})
.collect::<Vec<NewMagnet>>();
diesel::insert_into(magnets::table)
.values(&links)
.execute(&mut self.conn)
.wrap_err("Failed to save new magnet")
}
}

View file

@ -1,5 +1,9 @@
use crate::actions::transmission::{TransmissionAction, TransmissionConfig};
use crate::db::Database;
use crate::magnet::{extract_magnet_links, Magnet};
use chrono::{DateTime, Utc};
use clap::Parser;
use clap_verbosity_flag::{InfoLevel, Verbosity};
use color_eyre::eyre::{eyre, Result, WrapErr};
use directories::ProjectDirs;
use figment::providers::Env;
@ -8,18 +12,16 @@ use figment::{
Figment,
};
use figment_file_provider_adapter::FileAdapter;
use log::{debug, warn};
use log::{debug, info, warn};
use multimap::MultiMap;
use reddit_client::RedditClient;
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::fs::create_dir_all;
use std::path::{Path, PathBuf};
use crate::db::Database;
use crate::magnet::{extract_magnet_links, Magnet};
use reddit_client::RedditClient;
mod actions;
mod db;
mod magnet;
mod models;
@ -34,6 +36,9 @@ struct SourceConfig {
#[derive(Debug, Serialize, Deserialize)]
struct Config {
#[serde(default)]
transmission: Option<TransmissionConfig>,
#[serde(default)]
sources: HashMap<String, SourceConfig>,
}
@ -57,6 +62,9 @@ struct Args {
/// Path to the database file
#[arg(short, long)]
db: Option<String>,
#[command(flatten)]
verbose: Verbosity<InfoLevel>,
}
/// Filters posts based on a title filter pattern
@ -110,6 +118,10 @@ async fn main() -> Result<()> {
let args = Args::parse();
pretty_env_logger::formatted_timed_builder()
.filter_level(args.verbose.log_level_filter())
.init();
// Initialize database
let db_path = match args.db {
Some(path) => PathBuf::from(path),
@ -168,6 +180,7 @@ async fn main() -> Result<()> {
user_posts.insert_many(username, submissions);
}
// Process sources and store magnet links
for (source_name, source_config) in conf.sources {
println!("\nProcessing source [{}]", source_name);
@ -214,5 +227,34 @@ async fn main() -> Result<()> {
}
}
// Process magnet links with Transmission if enabled
if let Some(transmission_config) = conf.transmission {
if transmission_config.enable {
info!("Processing magnet links with Transmission");
match TransmissionAction::new(&transmission_config, db).await {
Ok(mut transmission_action) => {
match transmission_action.process_unprocessed_magnets().await {
Ok(count) => {
info!(
"Successfully processed {} magnet links with Transmission",
count
);
}
Err(e) => {
warn!("Failed to process magnet links with Transmission: {}", e);
}
}
}
Err(e) => {
warn!("Failed to initialize Transmission action: {}", e);
}
}
} else {
debug!("Transmission action is disabled");
}
} else {
debug!("No Transmission configuration found");
}
Ok(())
}

View file

@ -1,4 +1,4 @@
use crate::schema::magnets;
use crate::schema::{magnets, transmission_processed};
use chrono::NaiveDateTime;
use diesel::prelude::*;
@ -24,3 +24,20 @@ pub struct NewMagnet<'a> {
pub link: &'a str,
pub published_at: &'a NaiveDateTime,
}
#[derive(Queryable, Selectable)]
#[diesel(table_name = transmission_processed)]
#[diesel(check_for_backend(diesel::sqlite::Sqlite))]
pub struct TransmissionProcessed {
pub id: Option<i32>,
pub magnet_id: i32,
pub processed_at: NaiveDateTime,
}
#[derive(Insertable)]
#[diesel(table_name = transmission_processed)]
#[diesel(check_for_backend(diesel::sqlite::Sqlite))]
pub struct NewTransmissionProcessed<'a> {
pub magnet_id: i32,
pub processed_at: &'a NaiveDateTime,
}

View file

@ -10,3 +10,15 @@ diesel::table! {
published_at -> Timestamp,
}
}
diesel::table! {
transmission_processed (id) {
id -> Nullable<Integer>,
magnet_id -> Integer,
processed_at -> Timestamp,
}
}
diesel::joinable!(transmission_processed -> magnets (magnet_id));
diesel::allow_tables_to_appear_in_same_query!(magnets, transmission_processed,);