reddit-magnet/src/db.rs

287 lines
9.7 KiB
Rust

use crate::models::{Magnet, NewBitmagnetProcessed, NewMagnet, NewTransmissionProcessed};
use crate::schema::{bitmagnet_processed, magnets, transmission_processed};
use crate::PostInfo;
use color_eyre::eyre::{eyre, Result, WrapErr};
use diesel::prelude::*;
use diesel::sqlite::SqliteConnection;
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
use std::fs::create_dir_all;
use std::path::Path;
pub trait ProcessedTable {
/// Get all processed magnet IDs from this table
fn get_processed_ids(conn: &mut SqliteConnection) -> Result<Vec<i32>>;
/// Mark a magnet as processed in this table
fn mark_processed(conn: &mut SqliteConnection, magnet_id: i32) -> Result<()>;
}
pub struct BitmagnetProcessedTable;
pub struct TransmissionProcessedTable;
impl ProcessedTable for BitmagnetProcessedTable {
fn get_processed_ids(conn: &mut SqliteConnection) -> Result<Vec<i32>> {
bitmagnet_processed::table
.select(bitmagnet_processed::magnet_id)
.load(conn)
.wrap_err("Failed to load processed magnet IDs for Bitmagnet")
}
fn mark_processed(conn: &mut SqliteConnection, magnet_id: i32) -> Result<()> {
let now = chrono::Utc::now().naive_utc();
let new_processed = NewBitmagnetProcessed {
magnet_id,
processed_at: &now,
};
diesel::insert_into(bitmagnet_processed::table)
.values(&new_processed)
.execute(conn)
.wrap_err("Failed to mark magnet as processed by Bitmagnet")?;
Ok(())
}
}
impl ProcessedTable for TransmissionProcessedTable {
fn get_processed_ids(conn: &mut SqliteConnection) -> Result<Vec<i32>> {
transmission_processed::table
.select(transmission_processed::magnet_id)
.load(conn)
.wrap_err("Failed to load processed magnet IDs for Transmission")
}
fn mark_processed(conn: &mut SqliteConnection, magnet_id: i32) -> Result<()> {
let now = chrono::Utc::now().naive_utc();
let new_processed = NewTransmissionProcessed {
magnet_id,
processed_at: &now,
};
diesel::insert_into(transmission_processed::table)
.values(&new_processed)
.execute(conn)
.wrap_err("Failed to mark magnet as processed by Transmission")?;
Ok(())
}
}
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");
/// Database for storing magnet links and associated information
pub struct Database {
conn: SqliteConnection,
}
impl Database {
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
let database_url = path
.as_ref()
.to_str()
.ok_or_else(|| eyre!("Database path is not valid UTF-8"))?;
if let Some(parent) = path.as_ref().parent() {
create_dir_all(parent)
.wrap_err_with(|| format!("Failed to create directory: {:?}", parent))?;
}
let mut conn = SqliteConnection::establish(database_url)
.wrap_err("Failed to open database connection")?;
conn.run_pending_migrations(MIGRATIONS)
.expect("Failed to apply database migrations");
Ok(Database { conn })
}
pub fn get_all_magnets(&mut self) -> Result<Vec<Magnet>> {
let results = magnets::table
.select(Magnet::as_select())
.load(&mut self.conn)
.wrap_err("Failed to load magnets from database")?;
Ok(results)
}
pub fn store_magnets(&mut self, post: &PostInfo) -> Result<usize> {
let published_at = post.timestamp.naive_utc();
// Filter out magnet links that already exist in the database
let existing_links: Vec<String> = magnets::table
.select(magnets::link)
.filter(magnets::link.eq_any(&post.magnet_links))
.load(&mut self.conn)
.wrap_err("Failed to query existing magnets")?;
let links = post
.magnet_links
.iter()
.filter(|link| !existing_links.contains(link))
.map(|m| NewMagnet {
title: post.title.as_str(),
submitter: post.submitter.as_str(),
subreddit: post.subreddit.as_str(),
link: m,
published_at: &published_at,
imdb_id: post.imdb_id.as_deref(),
})
.collect::<Vec<NewMagnet>>();
diesel::insert_into(magnets::table)
.values(&links)
.execute(&mut self.conn)
.wrap_err("Failed to save new magnet")
}
/// Get all magnets that have not been processed by a specific table
pub fn get_unprocessed_magnets_for_table<T: ProcessedTable>(&mut self) -> Result<Vec<Magnet>> {
// Get all magnet IDs that have been processed by the specified table
let processed_ids = T::get_processed_ids(&mut self.conn)?;
// Get all magnets that are not in the processed list
let results = magnets::table
.select(Magnet::as_select())
.filter(magnets::id.is_not_null())
.filter(magnets::id.ne_all(processed_ids))
.load(&mut self.conn)
.wrap_err("Failed to load unprocessed magnets from database")?;
Ok(results)
}
/// Mark a magnet as processed by a specific table
pub fn mark_magnet_processed_for_table<T: ProcessedTable>(
&mut self,
magnet_id: i32,
) -> Result<()> {
T::mark_processed(&mut self.conn, magnet_id)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::PostInfo;
use chrono::Utc;
use tempfile::tempdir;
#[test]
fn test_database_initialization() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let db = Database::new(&db_path);
assert!(db.is_ok());
assert!(db_path.exists());
}
#[test]
fn test_store_and_retrieve_magnet_links() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let mut db = Database::new(&db_path).unwrap();
let post_info = PostInfo {
title: "Test Title".to_string(),
submitter: "test_user".to_string(),
subreddit: "test_subreddit".to_string(),
magnet_links: vec![
"magnet:?xt=urn:btih:test1".to_string(),
"magnet:?xt=urn:btih:test2".to_string(),
],
timestamp: Utc::now(),
imdb_id: None,
};
let expected_timestamp = post_info.timestamp.naive_utc();
let result = db.store_magnets(&post_info);
assert!(result.is_ok());
let magnets = db.get_all_magnets().unwrap();
assert_eq!(magnets.len(), 2);
for magnet in magnets {
assert!(
magnet.link == "magnet:?xt=urn:btih:test1"
|| magnet.link == "magnet:?xt=urn:btih:test2"
);
assert_eq!(magnet.title, "Test Title");
assert_eq!(magnet.submitter, "test_user");
assert_eq!(magnet.subreddit, "test_subreddit");
assert_eq!(magnet.published_at, expected_timestamp);
}
}
#[test]
fn test_prevent_duplicate_magnets() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let mut db = Database::new(&db_path).unwrap();
// Create initial post with magnet links
let post_info = PostInfo {
title: "Test Title".to_string(),
submitter: "test_user".to_string(),
subreddit: "test_subreddit".to_string(),
magnet_links: vec![
"magnet:?xt=urn:btih:test1".to_string(),
"magnet:?xt=urn:btih:test2".to_string(),
],
timestamp: Utc::now(),
imdb_id: None,
};
// First insertion should succeed and insert 2 links
let result = db.store_magnets(&post_info);
assert!(result.is_ok());
assert_eq!(result.unwrap(), 2); // 2 links inserted
// Create a second post with some duplicate and some new links
let post_info2 = PostInfo {
title: "Test Title 2".to_string(),
submitter: "test_user2".to_string(),
subreddit: "test_subreddit2".to_string(),
magnet_links: vec![
"magnet:?xt=urn:btih:test1".to_string(), // Duplicate
"magnet:?xt=urn:btih:test3".to_string(), // New
],
timestamp: Utc::now(),
imdb_id: Some("tt1234567".to_string()),
};
// Second insertion should succeed but only insert the new link
let result2 = db.store_magnets(&post_info2);
assert!(result2.is_ok());
assert_eq!(result2.unwrap(), 1); // Only 1 new link inserted
// Verify we have 3 total links in the database
let magnets = db.get_all_magnets().unwrap();
assert_eq!(magnets.len(), 3);
// Try inserting only duplicates
let post_info3 = PostInfo {
title: "Test Title 3".to_string(),
submitter: "test_user3".to_string(),
subreddit: "test_subreddit3".to_string(),
magnet_links: vec![
"magnet:?xt=urn:btih:test1".to_string(), // Duplicate
"magnet:?xt=urn:btih:test2".to_string(), // Duplicate
],
timestamp: Utc::now(),
imdb_id: None,
};
// Third insertion should succeed but insert 0 links
let result3 = db.store_magnets(&post_info3);
assert!(result3.is_ok());
assert_eq!(result3.unwrap(), 0); // No new links inserted
// Verify we still have 3 total links in the database
let magnets = db.get_all_magnets().unwrap();
assert_eq!(magnets.len(), 3);
}
}