Separate code out of `main.rs` and reformat to improve readability
This commit is contained in:
parent
5327c578cd
commit
7311645b45
|
@ -1,4 +1,5 @@
|
|||
use serde::{Deserialize,Serialize};
|
||||
use tracing::metadata::LevelFilter;
|
||||
use std::path::Path;
|
||||
use std::env;
|
||||
use std::collections::HashMap;
|
||||
|
@ -6,6 +7,7 @@ use figment::{Figment, providers::{Format, Toml, Env}};
|
|||
use figment::value::Value as FigmentValue;
|
||||
|
||||
use crate::torznab::TorznabClient;
|
||||
use crate::indexer::Indexer;
|
||||
|
||||
use super::CliProvider;
|
||||
|
||||
|
@ -37,6 +39,9 @@ pub struct Config {
|
|||
#[serde(default)]
|
||||
pub strip_public_trackers: bool,
|
||||
|
||||
#[serde(default)]
|
||||
pub log_level: LogLevel,
|
||||
|
||||
/// The category of added cross-seed torrents.
|
||||
torrent_category: Option<String>,
|
||||
|
||||
|
@ -91,28 +96,42 @@ impl Default for TorrentMode {
|
|||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct Indexer {
|
||||
#[serde(skip_deserializing)]
|
||||
/// Name of the indexer
|
||||
pub name: String,
|
||||
/// Whether the indexer is enabled or not for searching
|
||||
pub enabled: Option<bool>,
|
||||
/// URL to query for searches
|
||||
pub url: String,
|
||||
/// API key to pass to prowlarr/jackett
|
||||
pub api_key: String,
|
||||
|
||||
#[serde(skip)]
|
||||
pub client: Option<TorznabClient>,
|
||||
pub enum LogLevel {
|
||||
#[serde(alias = "error")]
|
||||
Error,
|
||||
|
||||
#[serde(alias = "warn")]
|
||||
Warn,
|
||||
|
||||
#[serde(alias = "info")]
|
||||
Info,
|
||||
|
||||
#[serde(alias = "debug")]
|
||||
Debug,
|
||||
|
||||
#[serde(alias = "trace")]
|
||||
Trace,
|
||||
|
||||
#[serde(alias = "off", alias = "disabled")]
|
||||
Off,
|
||||
}
|
||||
|
||||
impl Indexer {
|
||||
pub async fn create_client(&mut self) -> Result<&TorznabClient, crate::torznab::ClientError> {
|
||||
if self.client.is_none() {
|
||||
self.client = Some(TorznabClient::new(self.name.clone(), &self.url, &self.api_key).await?);
|
||||
}
|
||||
impl Default for LogLevel {
|
||||
fn default() -> Self {
|
||||
Self::Info
|
||||
}
|
||||
}
|
||||
|
||||
Ok(self.client.as_ref().unwrap())
|
||||
impl Into<LevelFilter> for LogLevel {
|
||||
fn into(self) -> LevelFilter {
|
||||
match self {
|
||||
LogLevel::Error => LevelFilter::ERROR,
|
||||
LogLevel::Warn => LevelFilter::WARN,
|
||||
LogLevel::Info => LevelFilter::INFO,
|
||||
LogLevel::Debug => LevelFilter::DEBUG,
|
||||
LogLevel::Trace => LevelFilter::TRACE,
|
||||
LogLevel::Off => LevelFilter::OFF,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,257 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use lava_torrent::torrent::v1::Torrent;
|
||||
use lava_torrent::bencode::BencodeElem;
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
use crate::{config::{Config, TorrentMode}, indexer::Indexer};
|
||||
|
||||
use abstracttorrent::torrent::{TorrentUpload, TorrentState, TorrentInfo};
|
||||
|
||||
pub struct CrossSeed {
|
||||
config: Arc<Config>,
|
||||
indexers: Arc<Vec<Indexer>>,
|
||||
torrent_client: Arc<crate::torrent_client::TorrentClient>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl CrossSeed {
|
||||
pub fn new(config: Config, indexers: Vec<Indexer>, torrent_client: crate::torrent_client::TorrentClient) -> Self {
|
||||
Self {
|
||||
config: Arc::new(config),
|
||||
indexers: Arc::new(indexers),
|
||||
torrent_client: Arc::new(torrent_client),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_arcs(config: Arc<Config>, indexers: Arc<Vec<Indexer>>, torrent_client: Arc<crate::torrent_client::TorrentClient>) -> Self {
|
||||
Self {
|
||||
config,
|
||||
indexers,
|
||||
torrent_client,
|
||||
}
|
||||
}
|
||||
|
||||
/// Start searching for all torrents, this searches for torrents in sequential order.
|
||||
pub async fn start_searching(&self, torrents: Vec<Torrent>) -> Result<(), CrossSeedError> {
|
||||
for torrent in torrents.iter() {
|
||||
self.search_for_torrent(torrent).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Search for a specific torrent in the indexers.
|
||||
pub async fn search_for_torrent(&self, torrent: &Torrent) -> Result<(), CrossSeedError> {
|
||||
// TODO: Add a `tracing` log scope.
|
||||
|
||||
for indexer in self.indexers.iter() {
|
||||
match self.torrent_client.get_torrent_info(&torrent).await? {
|
||||
Some(info) => match self.search_for_cross_torrent(indexer, torrent, info.clone()).await? {
|
||||
Some(found_torrent) => self.add_cross_seed_torrent(&torrent, found_torrent, info).await?,
|
||||
/* {
|
||||
match self.torrent_client.get_torrent_info(&torrent).await? {
|
||||
Some(info) => self.add_cross_seed_torrent(&torrent, found_torrent, info).await?,
|
||||
None => error!("Failed to find torrent in the client!"),
|
||||
}
|
||||
}, */
|
||||
None => {}, // TODO
|
||||
},
|
||||
None => error!("Failed to find torrent in the client!"), // TODO
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn add_cross_seed_torrent(&self, torrent: &Torrent, found_torrent: Torrent, info: TorrentInfo) -> Result<(), CrossSeedError> {
|
||||
match info.state {
|
||||
TorrentState::Uploading | TorrentState::QueuedUploading => {
|
||||
match self.config.torrent_mode {
|
||||
TorrentMode::InjectTrackers => {
|
||||
if found_torrent.is_private() {
|
||||
debug!("The found torrent is private, so we must remove the torrent and re-add it with the new trackers...");
|
||||
|
||||
// We have to merge the announce urls before we remove the torrent since we retrieve the
|
||||
// urls from the torrent client.
|
||||
let torrent = self.merge_torrent_announces(&torrent, &found_torrent).await?;
|
||||
|
||||
self.torrent_client.remove_torrent(&info, false).await?;
|
||||
|
||||
debug!("Re-uploading torrent to client...");
|
||||
|
||||
// Clone some fields from the torrent due to ownership issues with
|
||||
// torrent.encode()
|
||||
let name = torrent.name.clone();
|
||||
let hash = torrent.info_hash().clone();
|
||||
|
||||
match torrent.encode() {
|
||||
Ok(bytes) => {
|
||||
let upload = TorrentUpload::builder()
|
||||
.category(self.config.torrent_category())
|
||||
.tags(info.tags)
|
||||
.torrent_data(format!("{}.torrent", hash), bytes)
|
||||
//.paused()
|
||||
.build();
|
||||
|
||||
match self.torrent_client.add_torrent(&upload).await {
|
||||
Ok(()) => info!("Added cross-seed torrent {}!", name),
|
||||
Err(err) => error!("Error adding cross-seed torrent: {} (error: {:?}", name, err),
|
||||
}
|
||||
},
|
||||
Err(e) => error!("Error encoding torrent for upload: {}", e),
|
||||
}
|
||||
} else {
|
||||
debug!("Adding trackers to torrent since they aren't private...");
|
||||
// Flatten the announce list
|
||||
let found_announces: Vec<String> = found_torrent.announce_list.as_ref()
|
||||
.unwrap().iter()
|
||||
.flat_map(|array| array.iter())
|
||||
.into_iter().cloned()
|
||||
.collect();
|
||||
|
||||
if let Err(err) = self.torrent_client.add_torrent_trackers(&info, found_announces).await {
|
||||
error!("Error adding torrent trackers to torrent: {} (err: {:?})", torrent.name, err);
|
||||
}
|
||||
}
|
||||
},
|
||||
TorrentMode::InjectFile => {
|
||||
debug!("Cannot add trackers, uploading new torrent...");
|
||||
|
||||
// Clone some fields from the torrent due to ownership issues with
|
||||
// found_torrent.encode()
|
||||
let name = found_torrent.name.clone();
|
||||
let hash = found_torrent.info_hash().clone();
|
||||
|
||||
match found_torrent.encode() {
|
||||
Ok(bytes) => {
|
||||
let upload = TorrentUpload::builder()
|
||||
.torrent_data(format!("{}.torrent", hash), bytes)
|
||||
.category(self.config.torrent_category())
|
||||
//.paused() // TODO: don't pause new uploads
|
||||
.build();
|
||||
|
||||
match self.torrent_client.add_torrent(&upload).await {
|
||||
Ok(()) => info!("Added cross-seed torrent {}!", name),
|
||||
Err(err) => error!("Failure to add cross-seed torrent: {} (Error {:?})", name, err),
|
||||
}
|
||||
},
|
||||
Err(e) => error!("Failure to encode ({}) {}", e, name),
|
||||
}
|
||||
},
|
||||
TorrentMode::Filesystem => {
|
||||
todo!(); // TODO: implement
|
||||
}
|
||||
}
|
||||
},
|
||||
_ => debug!("Torrent is not done downloading, skipping..."),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Merge two torrent's announce urls into one torrent.
|
||||
pub async fn merge_torrent_announces(&self, torrent: &Torrent, found_torrent: &Torrent) -> Result<Torrent, abstracttorrent::error::ClientError> {
|
||||
// Get announce urls of both torrents.
|
||||
let request_info = TorrentInfo::from_hash(torrent.info_hash());
|
||||
let torrent_announces = self.torrent_client.get_torrent_trackers(&request_info).await?;
|
||||
let torrent_announces: Vec<&String> = torrent_announces.iter().map(|t| &t.url).collect();
|
||||
|
||||
// Flatten the announce list
|
||||
let found_announces: Vec<&String> = found_torrent.announce_list.as_ref()
|
||||
.unwrap().iter()
|
||||
.flat_map(|array| array.iter())
|
||||
.collect();
|
||||
|
||||
// Combine both announces and deref the Strings by cloning them.
|
||||
let mut torrent_announces: Vec<String> = torrent_announces.into_iter()
|
||||
.chain(found_announces)
|
||||
.cloned()
|
||||
.collect();
|
||||
// Remove the [DHT], [PeX] and [LSD] announces from the list.
|
||||
// The client should handle those.
|
||||
torrent_announces.retain(|announce| !(announce.starts_with("** [") && announce.ends_with("] **")));
|
||||
|
||||
// Copy the torrent file and add the announces to it.
|
||||
// Additionally, add the private field to the torrent.
|
||||
let mut torrent = torrent.clone();
|
||||
torrent.announce_list = Some(vec![torrent_announces]);
|
||||
if let Some(extra) = torrent.extra_info_fields.as_mut() {
|
||||
extra.insert(String::from("private"), BencodeElem::Integer(1));
|
||||
} else {
|
||||
let mut extra = std::collections::HashMap::new();
|
||||
extra.insert(String::from("private"), BencodeElem::Integer(1));
|
||||
torrent.extra_info_fields = Some(extra);
|
||||
}
|
||||
|
||||
Ok(torrent)
|
||||
}
|
||||
|
||||
/// Searches for a torrent in another indexer. Will return the found torrent.
|
||||
pub async fn search_for_cross_torrent(&self, indexer: &Indexer, torrent: &Torrent, info: TorrentInfo) -> Result<Option<Torrent>, CrossSeedError> {
|
||||
if let Some(found_torrent) = indexer.search_indexer(&torrent).await? {
|
||||
|
||||
// Check if we found the same torrent in its own indexer
|
||||
if found_torrent.info_hash() == torrent.info_hash() {
|
||||
debug!("Found same torrent in its own indexer, skipping...");
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Check if we're already seeding this specific torrent file.
|
||||
if self.torrent_client.has_exact_torrent(&found_torrent).await? {
|
||||
info!("Already cross-seeding to this tracker (with a separate torrent file), skipping...");
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if let Some(found_announces) = &found_torrent.announce_list {
|
||||
// Some urls can be encoded so we need to decode to compare them.
|
||||
let found_announces: Vec<Vec<String>> = found_announces.iter()
|
||||
.map(|a_list|
|
||||
a_list.iter().map(|a| urlencoding::decode(a)
|
||||
.unwrap().to_string())
|
||||
.collect::<Vec<String>>())
|
||||
.collect();
|
||||
|
||||
// Get the trackers of the torrent from the download client.
|
||||
let torrent_announces = self.torrent_client.get_torrent_trackers(&info).await.unwrap(); // TODO: Remove
|
||||
let torrent_announces: Vec<&String> = torrent_announces.iter().map(|t| &t.url).collect();
|
||||
|
||||
// Flatten the announce list to make them easier to search.
|
||||
let found_announces: Vec<&String> = found_announces.iter()
|
||||
.flat_map(|array| array.iter())
|
||||
.collect();
|
||||
|
||||
// Check if the client has the trackers of the torrent already.
|
||||
let client_has_trackers = found_announces.iter()
|
||||
.all(|tracker| torrent_announces.contains(tracker));
|
||||
|
||||
if !client_has_trackers {
|
||||
return Ok(Some(found_torrent));
|
||||
} else {
|
||||
info!("Already cross seeding to this tracker, skipping...");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum CrossSeedError {
|
||||
TorznabClient(crate::torznab::ClientError),
|
||||
TorrentClient(abstracttorrent::error::ClientError),
|
||||
}
|
||||
|
||||
impl From<crate::torznab::ClientError> for CrossSeedError {
|
||||
fn from(err: crate::torznab::ClientError) -> Self {
|
||||
Self::TorznabClient(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<abstracttorrent::error::ClientError> for CrossSeedError {
|
||||
fn from(err: abstracttorrent::error::ClientError) -> Self {
|
||||
Self::TorrentClient(err)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use lava_torrent::torrent::v1::Torrent;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::torznab::{TorznabClient, GenericSearchParameters, SearchFunction};
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct Indexer {
|
||||
#[serde(skip_deserializing)]
|
||||
/// Name of the indexer
|
||||
pub name: String,
|
||||
/// Whether the indexer is enabled or not for searching
|
||||
pub enabled: Option<bool>,
|
||||
/// URL to query for searches
|
||||
pub url: String,
|
||||
/// API key to pass to prowlarr/jackett
|
||||
pub api_key: String,
|
||||
|
||||
#[serde(skip)]
|
||||
pub client: Option<Arc<RwLock<TorznabClient>>>, // TODO: Create a client pool.
|
||||
}
|
||||
|
||||
impl Indexer {
|
||||
pub async fn create_client(&mut self) -> Result<&Arc<RwLock<TorznabClient>>, crate::torznab::ClientError> {
|
||||
if self.client.is_none() {
|
||||
self.client = Some(Arc::new(RwLock::new(TorznabClient::new(self.name.clone(), &self.url, &self.api_key).await?)));
|
||||
}
|
||||
|
||||
Ok(self.client.as_ref().unwrap())
|
||||
}
|
||||
|
||||
/// Search an indexer for a torrent with its name, and return the found torrent.
|
||||
pub async fn search_indexer(&self, torrent: &Torrent) -> Result<Option<Torrent>, crate::torznab::ClientError> {
|
||||
// The client should be set to something already
|
||||
let client = self.client.as_ref().unwrap().read().await;
|
||||
|
||||
let generic = GenericSearchParameters::builder()
|
||||
.query(torrent.name.clone())
|
||||
.build();
|
||||
let results = client.search(SearchFunction::Search, generic).await.unwrap();
|
||||
|
||||
// Drop the indexer client asap for other torrent searches.
|
||||
drop(client);
|
||||
|
||||
// The first result should be the correct one.
|
||||
if let Some(result) = results.first() {
|
||||
let found_torrent = result.download_torrent().await?;
|
||||
|
||||
Ok(Some(found_torrent))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum IndexerSearchError {
|
||||
TorznabError(crate::torznab::ClientError),
|
||||
}
|
||||
|
||||
impl From<crate::torznab::ClientError> for IndexerSearchError {
|
||||
fn from(err: crate::torznab::ClientError) -> Self {
|
||||
Self::TorznabError(err)
|
||||
}
|
||||
}
|
324
src/main.rs
324
src/main.rs
|
@ -1,28 +1,65 @@
|
|||
mod config;
|
||||
mod torznab;
|
||||
mod torrent_client;
|
||||
mod indexer;
|
||||
mod cross_seed;
|
||||
|
||||
use config::{Config, TorrentMode};
|
||||
use config::Config;
|
||||
|
||||
use abstracttorrent::common::GetTorrentListParams;
|
||||
use abstracttorrent::torrent::{TorrentUpload, TorrentState, TorrentInfo};
|
||||
use lava_torrent::bencode::BencodeElem;
|
||||
use tracing::{info, Level, debug, warn, error};
|
||||
use indexer::Indexer;
|
||||
use torrent_client::TorrentClient;
|
||||
use tracing::metadata::LevelFilter;
|
||||
use tracing::{info};
|
||||
|
||||
use std::ops::Deref;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::error::Error;
|
||||
use std::vec;
|
||||
|
||||
use lava_torrent::torrent::v1::{Torrent, AnnounceList};
|
||||
use lava_torrent::torrent::v1::Torrent;
|
||||
|
||||
use crate::torznab::{GenericSearchParameters, SearchFunction};
|
||||
use crate::torznab::search_parameters::{GenericSearchParametersBuilder, MovieSearchParametersBuilder};
|
||||
|
||||
use tokio::sync::RwLock;
|
||||
use crate::cross_seed::CrossSeed;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
// Get config and debug the torrents
|
||||
let config = Arc::new(Config::new());
|
||||
|
||||
let subscriber = tracing_subscriber::fmt()
|
||||
.with_max_level(Into::<LevelFilter>::into(config.log_level.clone()))
|
||||
.finish();
|
||||
tracing::subscriber::set_global_default(subscriber)
|
||||
.expect("Failed to set global default log subscriber");
|
||||
|
||||
info!("Searching for torrents in: {}", config.torrents_path_str());
|
||||
|
||||
// Get torrent client
|
||||
let torrent_client = get_torrent_client(&config).await;
|
||||
|
||||
// Get indexers
|
||||
let indexers = get_indexers(&config).await;
|
||||
info!("Searching {} trackers: ", indexers.len());
|
||||
|
||||
// Parse torrents from filesystem
|
||||
let torrents = parse_torrents(&config, Arc::clone(&torrent_client)).await;
|
||||
info!("Found {} torrents possibly eligible for cross-seeding.", torrents.len());
|
||||
|
||||
// Store async tasks to wait for them to finish
|
||||
let mut indexer_handles = vec![];
|
||||
|
||||
let seed = Arc::new(CrossSeed::new_arcs(config, indexers, torrent_client));
|
||||
for torrent in torrents {
|
||||
let seed = Arc::clone(&seed);
|
||||
|
||||
indexer_handles.push(tokio::spawn(async move {
|
||||
seed.search_for_torrent(&torrent).await.unwrap();
|
||||
}));
|
||||
}
|
||||
|
||||
futures::future::join_all(indexer_handles).await;
|
||||
}
|
||||
|
||||
fn read_torrents(path: &Path) -> Result<Vec<PathBuf>, Box<dyn Error>> {
|
||||
let mut torrents = Vec::new();
|
||||
for entry in path.read_dir()? {
|
||||
|
@ -42,25 +79,7 @@ fn read_torrents(path: &Path) -> Result<Vec<PathBuf>, Box<dyn Error>> {
|
|||
return Ok(torrents);
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let subscriber = tracing_subscriber::fmt()
|
||||
.with_max_level(Level::DEBUG)
|
||||
.finish();
|
||||
|
||||
tracing::subscriber::set_global_default(subscriber).expect("Failed to set global default log subscriber");
|
||||
|
||||
// Get config and debug the torrents
|
||||
let config = Arc::new(Config::new());
|
||||
info!("Searching for torrents in: {}", config.torrents_path_str());
|
||||
|
||||
// Get a torrent client from the config.
|
||||
let mut torrent_client = torrent_client::TorrentClient::from_config(&config);
|
||||
torrent_client.login(&config).await.unwrap();
|
||||
|
||||
// Torrent client no longer needs to mut, so we can just create an `Arc` without a mutex.
|
||||
let torrent_client = Arc::new(torrent_client);
|
||||
|
||||
async fn get_indexers(config: &Config) -> Arc<Vec<Indexer>> {
|
||||
let mut indexers = config.indexers.clone();
|
||||
|
||||
// Create torznab clients for each indexer.
|
||||
|
@ -68,245 +87,66 @@ async fn main() {
|
|||
indexer.create_client().await.unwrap();
|
||||
}
|
||||
|
||||
// Log the trackers
|
||||
info!("Searching {} trackers: ", indexers.len());
|
||||
for indexer in indexers.iter() {
|
||||
info!(" {}: {}", indexer.name, indexer.url);
|
||||
debug!(" Can Search: {:?}", indexer.client.as_ref().unwrap().capabilities.searching_capabilities);
|
||||
}
|
||||
// Create arc of indexers
|
||||
Arc::new(indexers)
|
||||
}
|
||||
|
||||
// Log the amount of torrents.
|
||||
async fn get_torrent_client(config: &Config) -> Arc<TorrentClient> {
|
||||
// Get a torrent client from the config.
|
||||
let mut torrent_client = torrent_client::TorrentClient::from_config(&config);
|
||||
torrent_client.login(&config).await.unwrap();
|
||||
|
||||
// Torrent client no longer needs to mut, so we can just create an `Arc` without a mutex.
|
||||
Arc::new(torrent_client)
|
||||
}
|
||||
|
||||
async fn parse_torrents(config: &Config, torrent_client: Arc<TorrentClient>) -> Vec<Torrent> {
|
||||
// Read the torrents from the config as `PathBuf`s
|
||||
let torrent_files = read_torrents(config.torrents_path()).unwrap();
|
||||
info!("Found {} torrent files...", torrent_files.len());
|
||||
|
||||
// Convert the indexers to be async friendly.
|
||||
let mut indexers = indexers.iter()
|
||||
.map(|indexer| Arc::new(RwLock::new(indexer.clone())))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Store async tasks to wait for them to finish
|
||||
let mut indexer_handles = vec![];
|
||||
|
||||
// Parse the torrent files as `Torrent` structs.
|
||||
info!("Parsing all torrent files...");
|
||||
|
||||
let mut stop = stopwatch::Stopwatch::start_new();
|
||||
|
||||
// Get the torrents and from the paths
|
||||
let mut torrents: Vec<Result<Torrent, lava_torrent::LavaTorrentError>> = torrent_files.iter()
|
||||
.map(|path| Torrent::read_from_file(path))
|
||||
.collect();
|
||||
stop.stop();
|
||||
|
||||
info!("Took {} seconds to parse all torrents", stop.elapsed().as_secs());
|
||||
drop(stop);
|
||||
drop(stop); // Drop for memory
|
||||
|
||||
// Remove the torrents that failed to be read from the file, and
|
||||
// are not in the download client.
|
||||
|
||||
//
|
||||
// NOTE: It might be better to get all torrents on the client and check that the torrents are on the
|
||||
// client locally.
|
||||
|
||||
/* let torrents = torrents.iter()
|
||||
.map(|res| res.map(|torrent| {
|
||||
let info = futures::executor::block_on(torrent_client.get_torrent_info(&torrent))
|
||||
.unwrap_or(None);
|
||||
|
||||
torrent.ha
|
||||
//(torrent, info)
|
||||
})).collect(); */
|
||||
torrents.retain(|torrent| {
|
||||
if let Ok(torrent) = torrent {
|
||||
let info = futures::executor::block_on(torrent_client.get_torrent_info(&torrent)).unwrap();
|
||||
let info = futures::executor::block_on(torrent_client.get_torrent_info(&torrent))
|
||||
.unwrap_or(None);
|
||||
|
||||
info.is_some()
|
||||
} else {
|
||||
false
|
||||
}
|
||||
});
|
||||
|
||||
// Unwrap the results, all errored ones were removed from the `.retain`
|
||||
let torrents: Vec<Torrent> = torrents.iter()
|
||||
.map(|res| res.as_ref().unwrap().clone())
|
||||
.collect();
|
||||
|
||||
info!("Found {} torrents that are in the client and on the filesystem", torrents.len());
|
||||
|
||||
for torrent in torrents {
|
||||
let torrent = Arc::new(torrent);
|
||||
|
||||
for indexer in indexers.iter() {
|
||||
info!("Checking for \"{}\"", torrent.name);
|
||||
|
||||
// Clone some `Arc`s for the new async task.
|
||||
let mut indexer = Arc::clone(indexer);
|
||||
let torrent = Arc::clone(&torrent);
|
||||
let torrent_client = Arc::clone(&torrent_client);
|
||||
let config = Arc::clone(&config);
|
||||
|
||||
indexer_handles.push(tokio::spawn(async move {
|
||||
let lock = indexer.read().await;
|
||||
match &lock.client {
|
||||
Some(client) => {
|
||||
let generic = GenericSearchParametersBuilder::new()
|
||||
.query(torrent.name.clone())
|
||||
.build();
|
||||
let results = client.search(SearchFunction::Search, generic).await.unwrap();
|
||||
|
||||
// The first result should be the correct one.
|
||||
if let Some(result) = results.first() {
|
||||
let found_torrent = result.download_torrent().await.unwrap();
|
||||
|
||||
// Check if we found the same torrent in its own indexer
|
||||
if found_torrent.info_hash() == torrent.info_hash() {
|
||||
debug!("Found same torrent in its own indexer, skipping...");
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(found_announces) = &found_torrent.announce_list {
|
||||
// Some urls can be encoded so we need to decode to compare them.
|
||||
let found_announces: Vec<Vec<String>> = found_announces.iter()
|
||||
.map(|a_list|
|
||||
a_list.iter().map(|a| urlencoding::decode(a)
|
||||
.unwrap().to_string())
|
||||
.collect::<Vec<String>>())
|
||||
.collect();
|
||||
|
||||
// Get the trackers of the torrent from the download client.
|
||||
let request_info = TorrentInfo::from_hash(torrent.info_hash());
|
||||
let torrent_announces = torrent_client.get_torrent_trackers(&request_info).await.unwrap();
|
||||
let torrent_announces: Vec<&String> = torrent_announces.iter().map(|t| &t.url).collect();
|
||||
|
||||
// Flatten the announce list to make them easier to search.
|
||||
let found_announces: Vec<&String> = found_announces.iter()
|
||||
.flat_map(|array| array.iter())
|
||||
.collect();
|
||||
|
||||
// Check if the client has the trackers of the torrent already.
|
||||
let mut client_has_trackers = found_announces.iter()
|
||||
.all(|tracker| torrent_announces.contains(tracker));
|
||||
|
||||
if !client_has_trackers {
|
||||
info!("Found a cross-seedable torrent for {}", found_torrent.name);
|
||||
|
||||
match torrent_client.get_torrent_info(&torrent).await.unwrap() {
|
||||
Some(info) => {
|
||||
info!("Got info: {:?}", info);
|
||||
|
||||
match info.state {
|
||||
TorrentState::Uploading | TorrentState::QueuedUploading => {
|
||||
debug!("The torrent is being uploaded on the client");
|
||||
|
||||
//if config.add_trackers {
|
||||
match config.torrent_mode {
|
||||
TorrentMode::InjectTrackers => {
|
||||
debug!("Can add trackers to the torrent");
|
||||
|
||||
if found_torrent.is_private() {
|
||||
debug!("The found torrent is private, so we must remove the torrent and re-add it with the new trackers...");
|
||||
|
||||
match torrent_client.remove_torrent(&info, false).await {
|
||||
Ok(()) => {
|
||||
debug!("Re-uploading torrent to client...");
|
||||
|
||||
info!("Found announces: {:?}", found_announces);
|
||||
|
||||
// Combine both announces and deref the Strings by cloning them.
|
||||
let mut torrent_announces: Vec<String> = torrent_announces.into_iter()
|
||||
.chain(found_announces)
|
||||
.cloned()
|
||||
.collect();
|
||||
// Remove the [DHT], [PeX] and [LSD] announces from the list.
|
||||
// The client should handle those.
|
||||
torrent_announces.retain(|announce| !(announce.starts_with("** [") && announce.ends_with("] **")));
|
||||
|
||||
info!("Old torrent: {:?}", torrent.announce_list);
|
||||
|
||||
let mut torrent = (*torrent).clone();
|
||||
torrent.announce_list = Some(vec![torrent_announces]);
|
||||
if let Some(extra) = torrent.extra_info_fields.as_mut() {
|
||||
extra.insert(String::from("private"), BencodeElem::Integer(1));
|
||||
} else {
|
||||
let mut extra = std::collections::HashMap::new();
|
||||
extra.insert(String::from("private"), BencodeElem::Integer(1));
|
||||
torrent.extra_info_fields = Some(extra);
|
||||
}
|
||||
/* torrent.extra_info_fields.as_mut()
|
||||
.unwrap_or(&mut std::collections::HashMap::new())
|
||||
.insert(String::from("private"), BencodeElem::Integer(1)); */
|
||||
|
||||
|
||||
info!("Torrent that will be uploaded: {:?}, private: {}", torrent.announce_list, torrent.is_private());
|
||||
|
||||
// Clone some fields from the torrent due to ownership issues with
|
||||
// torrent.encode()
|
||||
let name = torrent.name.clone();
|
||||
let hash = torrent.info_hash().clone();
|
||||
|
||||
match torrent.encode() {
|
||||
Ok(bytes) => {
|
||||
let upload = TorrentUpload::builder()
|
||||
.category(config.torrent_category())
|
||||
.tags(info.tags)
|
||||
.torrent_data(format!("{}.torrent", hash), bytes)
|
||||
//.paused()
|
||||
.build();
|
||||
|
||||
match torrent_client.add_torrent(&upload).await {
|
||||
Ok(()) => info!("Added cross-seed torrent {}!", name),
|
||||
Err(err) => error!("Error adding cross-seed torrent: {} (error: {:?}", name, err),
|
||||
}
|
||||
},
|
||||
Err(e) => error!("Error encoding torrent for upload: {}", e),
|
||||
}
|
||||
},
|
||||
Err(err) => error!("Error removing torrent from client: {} (error: {:?})", torrent.name, err),
|
||||
}
|
||||
} else {
|
||||
debug!("Adding trackers to torrent since they aren't private...");
|
||||
let torrent_announces = torrent_announces.iter()
|
||||
.map(|u| u.to_owned().to_owned())
|
||||
.collect();
|
||||
|
||||
if let Err(err) = torrent_client.add_torrent_trackers(&info, torrent_announces).await {
|
||||
error!("Error adding torrent trackers to torrent: {} (err: {:?})", torrent.name, err);
|
||||
}
|
||||
}
|
||||
},
|
||||
TorrentMode::InjectFile => {
|
||||
debug!("Cannot add trackers, uploading new torrent...");
|
||||
|
||||
// Clone some fields from the torrent due to ownership issues with
|
||||
// found_torrent.encode()
|
||||
let name = found_torrent.name.clone();
|
||||
let hash = found_torrent.info_hash().clone();
|
||||
|
||||
match found_torrent.encode() {
|
||||
Ok(bytes) => {
|
||||
let upload = TorrentUpload::builder()
|
||||
.torrent_data(format!("{}.torrent", hash), bytes)
|
||||
.category(config.torrent_category())
|
||||
//.paused() // TODO: don't pause new uploads
|
||||
.build();
|
||||
|
||||
match torrent_client.add_torrent(&upload).await {
|
||||
Ok(()) => info!("Added cross-seed torrent {}!", name),
|
||||
Err(err) => error!("Failure to add cross-seed torrent: {} (Error {:?})", name, err),
|
||||
}
|
||||
},
|
||||
Err(e) => warn!("Failure to encode ({}) {}", e, name),
|
||||
}
|
||||
},
|
||||
TorrentMode::Filesystem => {
|
||||
todo!(); // TODO: implement
|
||||
}
|
||||
}
|
||||
},
|
||||
_ => debug!("Torrent is not done downloading, skipping..."),
|
||||
}
|
||||
|
||||
},
|
||||
None => info!("Torrent file {} was not found in the client, skipping...", torrent.name),
|
||||
}
|
||||
} else {
|
||||
debug!("Found the torrent in its original indexer, skipping...");
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
None => {
|
||||
panic!("idfk");
|
||||
}
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
futures::future::join_all(indexer_handles).await;
|
||||
torrents
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
use abstracttorrent::{client::qbittorrent, torrent::TorrentInfo, common::GetTorrentListParams};
|
||||
use lava_torrent::torrent::v1::Torrent;
|
||||
|
||||
use crate::config::Config;
|
||||
|
||||
|
@ -34,11 +35,8 @@ impl TorrentClient {
|
|||
self.client.login(&url, username, password).await
|
||||
}
|
||||
|
||||
/* pub fn login_with_config(&self, config: &Config) -> abstracttorrent::client::ClientResult<()> {
|
||||
self.login(url, username, password)
|
||||
} */
|
||||
|
||||
pub async fn get_torrent_info(&self, torrent: &lava_torrent::torrent::v1::Torrent) -> abstracttorrent::client::ClientResult<Option<TorrentInfo>> {
|
||||
/// Gets a torrent's info from the client.
|
||||
pub async fn get_torrent_info(&self, torrent: &Torrent) -> abstracttorrent::client::ClientResult<Option<TorrentInfo>> {
|
||||
let params = GetTorrentListParams::builder()
|
||||
.hash(&torrent.info_hash())
|
||||
.build();
|
||||
|
@ -46,6 +44,17 @@ impl TorrentClient {
|
|||
let results = self.client.get_torrent_list(Some(params)).await?;
|
||||
Ok(results.first().cloned())
|
||||
}
|
||||
|
||||
/// Checks if the client has the torrent with the exact hash, no like torrents.
|
||||
pub async fn has_exact_torrent(&self, torrent: &Torrent) -> abstracttorrent::client::ClientResult<bool> {
|
||||
let params = GetTorrentListParams::builder()
|
||||
.hash(&torrent.info_hash())
|
||||
.build();
|
||||
|
||||
let results = self.client.get_torrent_list(Some(params)).await?;
|
||||
|
||||
Ok(results.iter().any(|info| info.hash == torrent.info_hash()))
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for TorrentClient {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
#[derive(Debug)]
|
||||
#[derive(Debug, Default)]
|
||||
pub struct GenericSearchParameters {
|
||||
/// The string search query.
|
||||
pub query: Option<String>,
|
||||
|
@ -15,6 +15,10 @@ pub struct GenericSearchParameters {
|
|||
}
|
||||
|
||||
impl GenericSearchParameters {
|
||||
pub fn builder() -> GenericSearchParametersBuilder {
|
||||
GenericSearchParametersBuilder::default()
|
||||
}
|
||||
|
||||
/// Convert the search parameters to a query string.
|
||||
/// This will be prefixed with "&"
|
||||
pub fn to_params(&self) -> String {
|
||||
|
@ -56,24 +60,12 @@ impl GenericSearchParameters {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct GenericSearchParametersBuilder {
|
||||
params: GenericSearchParameters,
|
||||
}
|
||||
|
||||
impl GenericSearchParametersBuilder {
|
||||
pub fn new() -> GenericSearchParametersBuilder {
|
||||
GenericSearchParametersBuilder {
|
||||
params: GenericSearchParameters {
|
||||
query: None,
|
||||
categories: Vec::new(),
|
||||
attributes: Vec::new(),
|
||||
extended: None,
|
||||
offset: None,
|
||||
limit: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn query(mut self, query: String) -> GenericSearchParametersBuilder {
|
||||
self.params.query = Some(query);
|
||||
self
|
||||
|
|
Loading…
Reference in New Issue