果然完善的路道阻且长(

This commit is contained in:
shenjack 2024-07-21 01:41:55 +08:00
parent 69af2c8a02
commit dfcce1e132
Signed by: shenjack
GPG Key ID: 7B1134A979775551
3 changed files with 73 additions and 13 deletions

View File

@ -40,19 +40,24 @@ pub async fn find_max_id(db: &DatabaseConnection) -> SaveId {
} }
} }
pub async fn check_have_none_empty_data(db: &DatabaseConnection, save_id: SaveId) -> bool { pub async fn check_data_len(db: &DatabaseConnection, save_id: SaveId) -> Option<i64> {
// SELECT save_id from main_data WHERE save_id = $1 AND len > 0 // SELECT save_id from main_data WHERE save_id = $1 AND len > 0
match model::main_data::Entity::find() match model::main_data::Entity::find()
.filter(model::main_data::Column::SaveId.eq(save_id as i32)) .filter(model::main_data::Column::SaveId.eq(save_id as i32))
.filter(model::main_data::Column::Len.gt(0))
.one(db) .one(db)
.await .await
{ {
Ok(model) => model.is_some(), Ok(model) => {
Err(_) => false, if let Some(model) = model {
return Some(model.len);
}
None
}
Err(_) => None,
} }
} }
#[allow(unused)]
#[derive(Debug, Clone, Copy, Default)] #[derive(Debug, Clone, Copy, Default)]
pub enum CoverStrategy { pub enum CoverStrategy {
#[default] #[default]
@ -108,6 +113,9 @@ where
hasher.update(data.as_bytes()); hasher.update(data.as_bytes());
let hash = hasher.finalize().to_hex().to_string(); let hash = hasher.finalize().to_hex().to_string();
if db.ping().await.is_err() {
return Err(anyhow::anyhow!("Database connection is broken"));
}
let stuf = db.begin().await?; let stuf = db.begin().await?;
// 开个事务 // 开个事务

View File

@ -1,10 +1,12 @@
use colored::Colorize; use colored::Colorize;
use futures::future::select_all; use futures::future::select_all;
use std::{ops::Range, path::Path}; use std::{ops::Range, path::Path};
use tokio::sync::oneshot::Receiver;
use tracing::{event, Level}; use tracing::{event, Level};
mod config; mod config;
mod db_part; mod db_part;
#[allow(unused)]
mod model; mod model;
mod net; mod net;
@ -21,8 +23,13 @@ async fn big_worker(
work_range: Range<SaveId>, work_range: Range<SaveId>,
) { ) {
for work_id in work_range { for work_id in work_range {
if db_part::check_have_none_empty_data(&db, work_id).await { let exist_len = db_part::check_data_len(&db, work_id).await;
event!(Level::INFO, "{}", format!("Skip {}", work_id).blue()); if exist_len.is_some() && exist_len.unwrap() > 0 {
event!(
Level::INFO,
"{}",
format!("Skip download {} with exist data", work_id).blue()
);
continue; continue;
} }
match match client.try_download_as_any(work_id).await { match match client.try_download_as_any(work_id).await {
@ -48,6 +55,14 @@ async fn big_worker(
) )
} }
None => { None => {
if exist_len.is_some() {
event!(
Level::INFO,
"{}",
format!("Skip save {} with no data", work_id).cyan()
);
continue;
}
event!( event!(
Level::INFO, Level::INFO,
"{}", "{}",
@ -64,11 +79,7 @@ async fn big_worker(
} }
} }
#[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main_works(mut stop_receiver: Receiver<()>) -> anyhow::Result<()> {
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt().with_max_level(Level::INFO).init();
event!(Level::INFO, "Starting srdownload");
let conf = match config::ConfigFile::read_from_file(Path::new("config.toml")) { let conf = match config::ConfigFile::read_from_file(Path::new("config.toml")) {
Ok(conf) => conf, Ok(conf) => conf,
Err(_) => { Err(_) => {
@ -80,7 +91,13 @@ async fn main() -> anyhow::Result<()> {
let db_connect = db_part::connect(&conf).await?; let db_connect = db_part::connect(&conf).await?;
let db_max_id = db_part::find_max_id(&db_connect).await; let db_max_id = db_part::find_max_id(&db_connect).await;
event!(Level::INFO, "db max downloaded save_id: {}", db_max_id); event!(
Level::INFO,
"{}",
format!("db max downloaded save_id: {}", db_max_id).green()
);
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
// 1321469 end // 1321469 end
let end_id: SaveId = 1321469; let end_id: SaveId = 1321469;
@ -118,7 +135,39 @@ async fn main() -> anyhow::Result<()> {
let (_result, _index, remain) = select_all(works).await; let (_result, _index, remain) = select_all(works).await;
works = remain; works = remain;
} }
if stop_receiver.try_recv().is_ok() {
event!(Level::INFO, "{}", "Stop download".red());
// 结束 db
db_connect.close().await?;
break;
}
}
Ok(())
}
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt().with_max_level(Level::INFO).init();
event!(Level::INFO, "Starting srdownload");
// 初始化一个 ctrl-c 的监听器
let (ctrl_c_sender, ctrl_c_receiver) = tokio::sync::oneshot::channel::<()>();
// 把 main_works spawn 出去, 这样就可以在主线程检测 ctrl-c 了
let main_works = tokio::spawn(main_works(ctrl_c_receiver));
// ctrl-c 信号处理
let ctrl_c_waiter = tokio::spawn(async move {
tokio::signal::ctrl_c()
.await
.expect("Failed to listen for Ctrl+C event");
event!(Level::INFO, "{}", "Ctrl-C received".red());
ctrl_c_sender.send(()).unwrap();
});
main_works.await??;
if !ctrl_c_waiter.is_finished() {
ctrl_c_waiter.abort()
} }
Ok(()) Ok(())
} }

View File

@ -130,6 +130,7 @@ impl Downloader {
None None
} }
#[allow(unused)]
/// 尝试用 ship 的 API 下载文件 /// 尝试用 ship 的 API 下载文件
pub async fn download_as_ship(&self, id: SaveId) -> Option<String> { pub async fn download_as_ship(&self, id: SaveId) -> Option<String> {
let url = Self::as_ship_url(id); let url = Self::as_ship_url(id);
@ -146,6 +147,7 @@ impl Downloader {
None None
} }
#[allow(unused)]
/// 尝试用 save 的 API 下载文件 /// 尝试用 save 的 API 下载文件
pub async fn download_as_save(&self, id: SaveId) -> Option<String> { pub async fn download_as_save(&self, id: SaveId) -> Option<String> {
let url = Self::as_save_url(id); let url = Self::as_save_url(id);
@ -162,6 +164,7 @@ impl Downloader {
None None
} }
#[allow(unused)]
pub fn set_timeout(&mut self, timeout: Duration) { pub fn set_timeout(&mut self, timeout: Duration) {
self.timeout = timeout; self.timeout = timeout;
} }