diff --git a/Cargo.lock b/Cargo.lock index e20a4e9..cabc463 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -566,10 +566,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "dashboard" -version = "0.1.0" - [[package]] name = "der" version = "0.7.9" diff --git a/Cargo.toml b/Cargo.toml index e255bff..c7cf00e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,4 +1,4 @@ [workspace] resolver = "2" -members = ["sr_download", "migration", "dashboard"] +members = ["sr_download", "migration"] default-members = ["sr_download"] diff --git a/dashboard/Cargo.toml b/dashboard/Cargo.toml deleted file mode 100644 index fa2da78..0000000 --- a/dashboard/Cargo.toml +++ /dev/null @@ -1,6 +0,0 @@ -[package] -name = "dashboard" -version = "0.1.0" -edition = "2021" - -[dependencies] diff --git a/dashboard/src/main.rs b/dashboard/src/main.rs deleted file mode 100644 index e7a11a9..0000000 --- a/dashboard/src/main.rs +++ /dev/null @@ -1,3 +0,0 @@ -fn main() { - println!("Hello, world!"); -} diff --git a/sr_download/src/config.rs b/sr_download/src/config.rs index 0026c3c..fe161b4 100644 --- a/sr_download/src/config.rs +++ b/sr_download/src/config.rs @@ -152,7 +152,7 @@ impl ConfigFile { event!( Level::ERROR, "template file like this: {}", - toml::to_string(&Self::default()).unwrap() + toml::to_string(&Self::default())? ); } Err(e) diff --git a/sr_download/src/db_part.rs b/sr_download/src/db_part.rs index 1c5b71e..e761375 100644 --- a/sr_download/src/db_part.rs +++ b/sr_download/src/db_part.rs @@ -1,13 +1,12 @@ use blake3::Hasher; use sea_orm::{ ActiveModelTrait, ColumnTrait, ConnectionTrait, DatabaseConnection, EntityTrait, - IntoActiveModel, ModelTrait, QueryFilter, QueryResult, QuerySelect, Statement, - TransactionTrait, + IntoActiveModel, ModelTrait, QueryFilter, QuerySelect, Statement, TransactionTrait, }; // use tracing::{event, Level}; use crate::model; -use crate::model::sea_orm_active_enums::SaveType; +pub use crate::model::sea_orm_active_enums::SaveType; use migration::{SaveId, FULL_DATA_VIEW, TEXT_DATA_MAX_LEN}; pub mod defines; diff --git a/sr_download/src/db_part/utils.rs b/sr_download/src/db_part/utils.rs index c171d1e..737460b 100644 --- a/sr_download/src/db_part/utils.rs +++ b/sr_download/src/db_part/utils.rs @@ -6,7 +6,7 @@ use tracing::{event, Level}; use crate::{ config::ConfigFile, db_part::{ - defines::{self, db_names, SaveId}, + defines::{db_names, SaveId}, save_data_to_db, CoverStrategy, }, model::sea_orm_active_enums::SaveType, @@ -64,7 +64,7 @@ pub async fn update_xml_tested(db: &DatabaseConnection) -> Option<()> { return Some(()); } event!(Level::INFO, "正在检查 {} 条数据的xml状态", count); - let sql = format!("SELECT {}()", defines::db_names::UPDATE_XML_TESTED); + let sql = format!("SELECT {}()", db_names::UPDATE_XML_TESTED); let stmt = Statement::from_string(sea_orm::DatabaseBackend::Postgres, sql); event!(Level::INFO, "正在更新数据库内所有 xml_tested = null 的数据"); let _ = db.execute(stmt).await; @@ -77,7 +77,7 @@ pub async fn update_xml_tested(db: &DatabaseConnection) -> Option<()> { pub async fn check_null_data(db: &DatabaseConnection) -> Option<()> { let sql = format!( "SELECT count(1) from {} where data is NULL", - defines::db_names::FULL_DATA_TABLE + db_names::FULL_DATA_TABLE ); let data = db .query_one(Statement::from_string( @@ -98,7 +98,7 @@ pub async fn check_null_data(db: &DatabaseConnection) -> Option<()> { ); let sql = format!( "SELECT save_id from {} where data is NULL", - defines::db_names::FULL_DATA_TABLE + db_names::FULL_DATA_TABLE ); let stmt = Statement::from_string(sea_orm::DatabaseBackend::Postgres, sql); let quert_results = db.query_all(stmt).await.ok()?; diff --git a/sr_download/src/fast_mode.rs b/sr_download/src/fast_mode.rs new file mode 100644 index 0000000..d3eeb69 --- /dev/null +++ b/sr_download/src/fast_mode.rs @@ -0,0 +1,141 @@ +use std::ops::Range; + +use migration::SaveId; + +use colored::Colorize; +use futures::future::select_all; +use tokio::sync::oneshot::Receiver; +use tracing::{event, Level}; + +use crate::db_part::{CoverStrategy, SaveType}; +use crate::{config, db_part, Downloader}; + +async fn big_worker( + db: sea_orm::DatabaseConnection, + client: Downloader, + work_range: Range, +) { + for work_id in work_range { + let exist_len = db_part::check_data_len(&db, work_id).await; + if exist_len.is_some() && exist_len.unwrap() > 0 { + event!( + Level::INFO, + "{}", + format!("Skip download {} with exist data", work_id).blue() + ); + continue; + } + match match client.try_download_as_any(work_id).await { + Some(file) => { + event!( + Level::INFO, + "{}", + format!( + "Download {} with {} data len: {}", + work_id, + file.type_name(), + file.len() + ) + .green() + ); + let save_type = (&file).into(); + db_part::save_data_to_db( + work_id, + save_type, + file.take_data(), + Some(CoverStrategy::CoverIfDifferent), + &db, + ) + } + None => { + if exist_len.is_some() { + event!( + Level::INFO, + "{}", + format!("Skip save {} with no data", work_id).cyan() + ); + continue; + } + event!( + Level::INFO, + "{}", + format!("Download {} with no data", work_id).yellow() + ); + db_part::save_data_to_db(work_id, SaveType::None, "".to_string(), None, &db) + } + } + .await + { + Ok(_) => (), + Err(e) => event!(Level::WARN, "Save data {} failed: {:?}", work_id, e), + } + } +} + +pub async fn main(mut stop_receiver: Receiver<()>) -> anyhow::Result<()> { + let span = tracing::span!(Level::INFO, "fast_mode"); + let _enter = span.enter(); + + let conf = config::ConfigFile::try_read()?; + + let db_connect = db_part::connect(&conf).await?; + db_part::migrate(&db_connect).await?; + db_part::utils::check_null_data(&db_connect).await; + db_part::utils::update_xml_tested(&db_connect).await; + + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + if stop_receiver.try_recv().is_ok() { + event!(Level::INFO, "{}", "Stop download".red()); + // 结束 db + db_connect.close().await?; + return Ok(()); + } + + let end_id: SaveId = conf.sync.fast.end_id; + let worker_size = conf.sync.fast.worker_size; + let mut current_id = conf.sync.fast.start_id; + let mut works = Vec::with_capacity(conf.sync.fast.worker_count as usize); + let max_works = conf.sync.fast.worker_count as usize; + for _ in 0..works.len() { + if stop_receiver.try_recv().is_ok() { + event!(Level::INFO, "{}", "Stop download".red()); + // 结束 db + db_connect.close().await?; + return Ok(()); + } + let client = Downloader::new(Some(conf.net_timeout())); + let end = current_id + worker_size; + works.push(tokio::spawn(big_worker( + db_connect.clone(), + client, + current_id..end, + ))); + current_id = end; + } + + while current_id < end_id || !works.is_empty() { + if stop_receiver.try_recv().is_ok() { + event!(Level::INFO, "{}", "Stop download".red()); + // 结束 db + db_connect.close().await?; + return Ok(()); + } + while current_id < end_id && works.len() < max_works { + let client = Downloader::new(Some(conf.net_timeout())); + let end = current_id + worker_size; + works.push(tokio::spawn(big_worker( + db_connect.clone(), + client, + current_id..end, + ))); + current_id = end; + } + + if !works.is_empty() { + let (_result, _index, remain) = select_all(works).await; + works = remain; + } + } + Ok(()) +} diff --git a/sr_download/src/info.html b/sr_download/src/info.html index 95218b0..c5c1f11 100644 --- a/sr_download/src/info.html +++ b/sr_download/src/info.html @@ -172,7 +172,7 @@ resultTitle.innerText = '请求结果'; resultDisplay.appendChild(resultTitle); // 先判断数据拿没拿到 - if (data["code"] != 200) { + if (data["code"] !== 200) { // 没拿到 const resultContent = document.createElement('div'); resultContent.innerText = data["msg"]; diff --git a/sr_download/src/main.rs b/sr_download/src/main.rs index 140c6a6..ea0762e 100644 --- a/sr_download/src/main.rs +++ b/sr_download/src/main.rs @@ -1,285 +1,90 @@ use colored::Colorize; -use futures::future::select_all; -use std::{io::Write, ops::Range}; -use tokio::sync::oneshot::Receiver; use tracing::{event, Level}; pub mod config; pub mod db_part; +/// 快速同步 +pub mod fast_mode; #[allow(unused)] pub mod model; pub mod net; +/// 服务模式 +pub mod serve_mode; pub mod web_part; -use crate::db_part::CoverStrategy; use migration::SaveId; -use model::sea_orm_active_enums::SaveType; pub use net::Downloader; -async fn big_worker( - db: sea_orm::DatabaseConnection, - client: Downloader, - work_range: Range, -) { - for work_id in work_range { - let exist_len = db_part::check_data_len(&db, work_id).await; - if exist_len.is_some() && exist_len.unwrap() > 0 { - event!( - Level::INFO, - "{}", - format!("Skip download {} with exist data", work_id).blue() - ); - continue; - } - match match client.try_download_as_any(work_id).await { - Some(file) => { - event!( - Level::INFO, - "{}", - format!( - "Download {} with {} data len: {}", - work_id, - file.type_name(), - file.len() - ) - .green() - ); - let save_type = (&file).into(); - db_part::save_data_to_db( - work_id, - save_type, - file.take_data(), - Some(CoverStrategy::CoverIfDifferent), - &db, - ) - } - None => { - if exist_len.is_some() { - event!( - Level::INFO, - "{}", - format!("Skip save {} with no data", work_id).cyan() - ); - continue; - } - event!( - Level::INFO, - "{}", - format!("Download {} with no data", work_id).yellow() - ); - db_part::save_data_to_db(work_id, SaveType::None, "".to_string(), None, &db) - } - } - .await - { - Ok(_) => (), - Err(e) => event!(Level::WARN, "Save data {} failed: {:?}", work_id, e), - } - } +enum RunMode { + /// 服务模式 + Serve, + /// 快速模式 + Fast, } -async fn serve_mode(mut stop_receiver: Receiver<()>) -> anyhow::Result<()> { - let span = tracing::span!(Level::INFO, "serve_mode"); - let _enter = span.enter(); +const HELP_MSG: &str = r#"Usage: srdownload [options] -s/f +Options: + -d Debug mode + -t=xx 运行线程数 (默认 10) + -s 服务模式 + -f 快速同步模式(用于从零开始)"#; - let conf = config::ConfigFile::try_read()?; +fn main() -> anyhow::Result<()> { + // 检查 CLI 参数 - let db_connect = db_part::connect(&conf).await?; - db_part::migrate(&db_connect).await?; - db_part::utils::check_null_data(&db_connect).await; - db_part::utils::update_xml_tested(&db_connect).await; - let mut db_max_id = db_part::search::max_id(&db_connect).await; - - let mut web_waiter = None; - if conf.serve.enable { - web_waiter = Some(tokio::spawn(web_part::web_main())); - } - - event!( - Level::INFO, - "{}", - format!( - "数据库中最大的现有数据 id 为: {} 将从这里开始下载", - db_max_id - ) - .green() - ); - - let serve_wait_time = conf.serve_duration(); - let client = Downloader::new(None); - - let mut waited = false; - // 开始等待的时间 - let mut start_wait_time = tokio::time::Instant::now(); - - loop { - if stop_receiver.try_recv().is_ok() { - event!(Level::INFO, "{}", "结束下载!".yellow()); - // 结束 db - db_connect.close().await?; - if conf.serve.enable { - if let Some(web_waiter) = web_waiter { - web_waiter.abort(); - } - } - return Ok(()); - } - - tokio::select! { - _ = tokio::time::sleep(serve_wait_time) => { - let work_id = db_max_id + 1; - match client.try_download_as_any(work_id).await { - Some(file) => { - if waited { - println!(); - waited = false; - } - let wait_time = start_wait_time.elapsed(); - start_wait_time = tokio::time::Instant::now(); - event!( - Level::INFO, - "{}", - format!( - "下载到了新的 {}!(懒得做中文了) ID为: {} 长度: {}, 等了 {}", - file.type_name(), - work_id, - file.len(), - format!("{:?}", wait_time).blue() - ) - .green() - ); - let save_type: SaveType = (&file).into(); - match db_part::save_data_to_db( - work_id, - save_type, - file.take_data(), - Some(CoverStrategy::CoverIfDifferent), - &db_connect, - ) - .await - { - Ok(_) => { - { - db_max_id = work_id; - event!( - Level::INFO, - "{}", - format!( - "保存好啦! (下一排的每一个 . 代表一个 {:?})", - serve_wait_time - ) - .green() - ); - continue; // 保存好之后立即尝试下一次, 保证连续上传的时候的效率 - }; - } - Err(e) => { - event!(Level::ERROR, "呜呜呜, 数据保存失败了: {:?}\n我不玩了!", e); - return Err(e); - } - } - } - None => { - print!("."); - waited = true; - let _ = std::io::stdout().flush(); - } - } - } - _ = &mut stop_receiver => { - event!(Level::INFO, "{}", "结束下载!".yellow()); - // 结束 db - db_connect.close().await?; - return Ok(()); - } - } - } -} - -async fn fast_mode(mut stop_receiver: Receiver<()>) -> anyhow::Result<()> { - let span = tracing::span!(Level::INFO, "fast_mode"); - let _enter = span.enter(); - - let conf = config::ConfigFile::try_read()?; - - let db_connect = db_part::connect(&conf).await?; - db_part::migrate(&db_connect).await?; - db_part::utils::check_null_data(&db_connect).await; - db_part::utils::update_xml_tested(&db_connect).await; - - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - - if stop_receiver.try_recv().is_ok() { - event!(Level::INFO, "{}", "Stop download".red()); - // 结束 db - db_connect.close().await?; + let args: Vec = std::env::args().collect(); + if args.contains(&"-h".to_string()) { + println!("{}", HELP_MSG); return Ok(()); } - let end_id: SaveId = conf.sync.fast.end_id; - let worker_size = conf.sync.fast.worker_size; - let mut current_id = conf.sync.fast.start_id; - let mut works = Vec::with_capacity(conf.sync.fast.worker_count as usize); - let max_works = conf.sync.fast.worker_count as usize; - for _ in 0..works.len() { - if stop_receiver.try_recv().is_ok() { - event!(Level::INFO, "{}", "Stop download".red()); - // 结束 db - db_connect.close().await?; - return Ok(()); - } - let client = net::Downloader::new(Some(conf.net_timeout())); - let end = current_id + worker_size; - works.push(tokio::spawn(big_worker( - db_connect.clone(), - client, - current_id..end, - ))); - current_id = end; + let mut thread_count = 10; + if args.iter().any(|x| x.starts_with("-t=")) { + thread_count = args + .iter() + .find(|x| x.starts_with("-t=")) + .unwrap() + .split('=') + .last() + .unwrap() + .parse::()?; } - - while current_id < end_id || !works.is_empty() { - if stop_receiver.try_recv().is_ok() { - event!(Level::INFO, "{}", "Stop download".red()); - // 结束 db - db_connect.close().await?; - return Ok(()); - } - while current_id < end_id && works.len() < max_works { - let client = net::Downloader::new(Some(conf.net_timeout())); - let end = current_id + worker_size; - works.push(tokio::spawn(big_worker( - db_connect.clone(), - client, - current_id..end, - ))); - current_id = end; - } - - if !works.is_empty() { - let (_result, _index, remain) = select_all(works).await; - works = remain; - } - } - Ok(()) -} - -#[tokio::main(flavor = "multi_thread", worker_threads = 10)] -async fn main() -> anyhow::Result<()> { - // 判断是否有 -f / -s 参数 - let args: Vec = std::env::args().collect(); if args.contains(&"-d".to_string()) { - // debug 模式 tracing_subscriber::fmt() .with_max_level(Level::DEBUG) .init(); } else { tracing_subscriber::fmt().with_max_level(Level::INFO).init(); } - event!(Level::INFO, "Starting srdownload"); + let mode = { + if args.contains(&"-s".to_string()) { + RunMode::Serve + } else if args.contains(&"-f".to_string()) { + RunMode::Fast + } else { + event!( + Level::ERROR, + "{}", + "Please use -s or -f to start the program".red() + ); + event!(Level::ERROR, "{}", "Use -s to start serve mode".red()); + event!(Level::ERROR, "{}", "Use -f to start fast mode".red()); + return Ok(()); + } + }; - // 判断是否有 -f / -s 参数 + event!(Level::INFO, "Starting sr download"); + + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(thread_count) + .enable_all() + .build()?; + rt.block_on(async_main(mode)) +} + +async fn async_main(run_mode: RunMode) -> anyhow::Result<()> { let (stop_sender, stop_receiver) = tokio::sync::oneshot::channel::<()>(); + let stop_waiter = tokio::spawn(async move { tokio::signal::ctrl_c() .await @@ -287,21 +92,15 @@ async fn main() -> anyhow::Result<()> { event!(Level::INFO, "{}", "Ctrl-C received".red()); stop_sender.send(()).unwrap(); }); - let job_waiter; - if args.contains(&"-s".to_string()) { - job_waiter = tokio::spawn(serve_mode(stop_receiver)); - } else if args.contains(&"-f".to_string()) { - job_waiter = tokio::spawn(fast_mode(stop_receiver)); - } else { - event!( - Level::ERROR, - "{}", - "Please use -s or -f to start the program".red() - ); - event!(Level::ERROR, "{}", "Use -s to start serve mode".red()); - event!(Level::ERROR, "{}", "Use -f to start fast mode".red()); - return Ok(()); + let job_waiter; + match run_mode { + RunMode::Serve => { + job_waiter = tokio::spawn(serve_mode::main(stop_receiver)); + } + RunMode::Fast => { + job_waiter = tokio::spawn(fast_mode::main(stop_receiver)); + } } job_waiter.await??; let _ = stop_waiter.await; diff --git a/sr_download/src/net.rs b/sr_download/src/net.rs index 26428e1..b6ed96f 100644 --- a/sr_download/src/net.rs +++ b/sr_download/src/net.rs @@ -94,14 +94,14 @@ impl Downloader { Self { client } } - pub fn as_ship_url(id: SaveId) -> String { + pub fn fmt_ship_url(id: SaveId) -> String { format!( "http://jundroo.com/service/SimpleRockets/DownloadRocket?id={}", id ) } - pub fn as_save_url(id: SaveId) -> String { + pub fn fmt_save_url(id: SaveId) -> String { format!( "http://jundroo.com/service/SimpleRockets/DownloadSandBox?id={}", id @@ -115,7 +115,7 @@ impl Downloader { let span = tracing::span!(Level::DEBUG, "try_download_as_any", id); let _enter = span.enter(); // 先尝试用 ship 的 API 下载 - let ship_url = Self::as_ship_url(id); + let ship_url = Self::fmt_ship_url(id); let ship_try = self.client.get(&ship_url).send().await; event!(Level::DEBUG, "trying to Download as ship {:?}", ship_try); if let Ok(ship_try) = ship_try { @@ -132,7 +132,7 @@ impl Downloader { } } // 否则尝试用 save 的 API 下载 - let save_url = Self::as_save_url(id); + let save_url = Self::fmt_save_url(id); let save_try = self.client.get(&save_url).send().await; if let Ok(save_try) = save_try { if save_try.status().is_success() { @@ -150,7 +150,7 @@ impl Downloader { #[allow(unused)] /// 尝试用 ship 的 API 下载文件 pub async fn download_as_ship(&self, id: SaveId) -> Option { - let url = Self::as_ship_url(id); + let url = Self::fmt_ship_url(id); let try_res = self.client.get(&url).send().await; if let Ok(try_res) = try_res { if try_res.status().is_success() { @@ -167,7 +167,7 @@ impl Downloader { #[allow(unused)] /// 尝试用 save 的 API 下载文件 pub async fn download_as_save(&self, id: SaveId) -> Option { - let url = Self::as_save_url(id); + let url = Self::fmt_save_url(id); let try_res = self.client.get(&url).send().await; if let Ok(try_res) = try_res { if try_res.status().is_success() { diff --git a/sr_download/src/serve_mode.rs b/sr_download/src/serve_mode.rs new file mode 100644 index 0000000..1efeab8 --- /dev/null +++ b/sr_download/src/serve_mode.rs @@ -0,0 +1,124 @@ +use std::io::Write; + +use colored::Colorize; +use tokio::sync::oneshot::Receiver; +use tracing::{event, Level}; + +use crate::db_part::{CoverStrategy, SaveType}; +use crate::{config, db_part, web_part, Downloader}; + +pub async fn main(mut stop_receiver: Receiver<()>) -> anyhow::Result<()> { + let span = tracing::span!(Level::INFO, "serve_mode"); + let _enter = span.enter(); + + let conf = config::ConfigFile::try_read()?; + + let db_connect = db_part::connect(&conf).await?; + db_part::migrate(&db_connect).await?; + db_part::utils::check_null_data(&db_connect).await; + db_part::utils::update_xml_tested(&db_connect).await; + let mut db_max_id = db_part::search::max_id(&db_connect).await; + + let mut web_waiter = None; + if conf.serve.enable { + web_waiter = Some(tokio::spawn(web_part::web_main())); + } + + event!( + Level::INFO, + "{}", + format!( + "数据库中最大的现有数据 id 为: {} 将从这里开始下载", + db_max_id + ) + .green() + ); + + let serve_wait_time = conf.serve_duration(); + let client = Downloader::new(None); + + let mut waited = false; + // 开始等待的时间 + let mut start_wait_time = tokio::time::Instant::now(); + + loop { + if stop_receiver.try_recv().is_ok() { + event!(Level::INFO, "{}", "结束下载!".yellow()); + // 结束 db + db_connect.close().await?; + if conf.serve.enable { + if let Some(web_waiter) = web_waiter { + web_waiter.abort(); + } + } + return Ok(()); + } + + tokio::select! { + _ = tokio::time::sleep(serve_wait_time) => { + let work_id = db_max_id + 1; + match client.try_download_as_any(work_id).await { + Some(file) => { + if waited { + println!(); + waited = false; + } + let wait_time = start_wait_time.elapsed(); + start_wait_time = tokio::time::Instant::now(); + event!( + Level::INFO, + "{}", + format!( + "下载到了新的 {}!(懒得做中文了) ID为: {} 长度: {}, 等了 {}", + file.type_name(), + work_id, + file.len(), + format!("{:?}", wait_time).blue() + ) + .green() + ); + let save_type: SaveType = (&file).into(); + match db_part::save_data_to_db( + work_id, + save_type, + file.take_data(), + Some(CoverStrategy::CoverIfDifferent), + &db_connect, + ) + .await + { + Ok(_) => { + db_max_id = work_id; + event!( + Level::INFO, + "{}", + format!( + "保存好啦! (下一排的每一个 . 代表一个 {:?})", + serve_wait_time + ) + .green() + ); + continue; // 保存好之后立即尝试下一次, 保证连续上传的时候的效率 + } + Err(e) => { + event!(Level::ERROR, "呜呜呜, 数据保存失败了: {:?}\n我不玩了!", e); + return Err(e); + } + } + } + None => { + print!("."); + waited = true; + let _ = std::io::stdout().flush(); + } + } + } + _ = &mut stop_receiver => { + event!(Level::INFO, "{}", "结束下载!".yellow()); + // 结束 db + db_connect.close().await?; + return Ok(()); + } + } + } +}