This commit is contained in:
shenjack 2024-10-12 23:05:39 +08:00
parent 8c3cd9e7b3
commit bc4e426b02
Signed by: shenjack
GPG Key ID: 7B1134A979775551
12 changed files with 345 additions and 295 deletions

4
Cargo.lock generated
View File

@ -566,10 +566,6 @@ dependencies = [
"typenum",
]
[[package]]
name = "dashboard"
version = "0.1.0"
[[package]]
name = "der"
version = "0.7.9"

View File

@ -1,4 +1,4 @@
[workspace]
resolver = "2"
members = ["sr_download", "migration", "dashboard"]
members = ["sr_download", "migration"]
default-members = ["sr_download"]

View File

@ -1,6 +0,0 @@
[package]
name = "dashboard"
version = "0.1.0"
edition = "2021"
[dependencies]

View File

@ -1,3 +0,0 @@
fn main() {
println!("Hello, world!");
}

View File

@ -152,7 +152,7 @@ impl ConfigFile {
event!(
Level::ERROR,
"template file like this: {}",
toml::to_string(&Self::default()).unwrap()
toml::to_string(&Self::default())?
);
}
Err(e)

View File

@ -1,13 +1,12 @@
use blake3::Hasher;
use sea_orm::{
ActiveModelTrait, ColumnTrait, ConnectionTrait, DatabaseConnection, EntityTrait,
IntoActiveModel, ModelTrait, QueryFilter, QueryResult, QuerySelect, Statement,
TransactionTrait,
IntoActiveModel, ModelTrait, QueryFilter, QuerySelect, Statement, TransactionTrait,
};
// use tracing::{event, Level};
use crate::model;
use crate::model::sea_orm_active_enums::SaveType;
pub use crate::model::sea_orm_active_enums::SaveType;
use migration::{SaveId, FULL_DATA_VIEW, TEXT_DATA_MAX_LEN};
pub mod defines;

View File

@ -6,7 +6,7 @@ use tracing::{event, Level};
use crate::{
config::ConfigFile,
db_part::{
defines::{self, db_names, SaveId},
defines::{db_names, SaveId},
save_data_to_db, CoverStrategy,
},
model::sea_orm_active_enums::SaveType,
@ -64,7 +64,7 @@ pub async fn update_xml_tested(db: &DatabaseConnection) -> Option<()> {
return Some(());
}
event!(Level::INFO, "正在检查 {} 条数据的xml状态", count);
let sql = format!("SELECT {}()", defines::db_names::UPDATE_XML_TESTED);
let sql = format!("SELECT {}()", db_names::UPDATE_XML_TESTED);
let stmt = Statement::from_string(sea_orm::DatabaseBackend::Postgres, sql);
event!(Level::INFO, "正在更新数据库内所有 xml_tested = null 的数据");
let _ = db.execute(stmt).await;
@ -77,7 +77,7 @@ pub async fn update_xml_tested(db: &DatabaseConnection) -> Option<()> {
pub async fn check_null_data(db: &DatabaseConnection) -> Option<()> {
let sql = format!(
"SELECT count(1) from {} where data is NULL",
defines::db_names::FULL_DATA_TABLE
db_names::FULL_DATA_TABLE
);
let data = db
.query_one(Statement::from_string(
@ -98,7 +98,7 @@ pub async fn check_null_data(db: &DatabaseConnection) -> Option<()> {
);
let sql = format!(
"SELECT save_id from {} where data is NULL",
defines::db_names::FULL_DATA_TABLE
db_names::FULL_DATA_TABLE
);
let stmt = Statement::from_string(sea_orm::DatabaseBackend::Postgres, sql);
let quert_results = db.query_all(stmt).await.ok()?;

View File

@ -0,0 +1,141 @@
use std::ops::Range;
use migration::SaveId;
use colored::Colorize;
use futures::future::select_all;
use tokio::sync::oneshot::Receiver;
use tracing::{event, Level};
use crate::db_part::{CoverStrategy, SaveType};
use crate::{config, db_part, Downloader};
async fn big_worker(
db: sea_orm::DatabaseConnection,
client: Downloader,
work_range: Range<SaveId>,
) {
for work_id in work_range {
let exist_len = db_part::check_data_len(&db, work_id).await;
if exist_len.is_some() && exist_len.unwrap() > 0 {
event!(
Level::INFO,
"{}",
format!("Skip download {} with exist data", work_id).blue()
);
continue;
}
match match client.try_download_as_any(work_id).await {
Some(file) => {
event!(
Level::INFO,
"{}",
format!(
"Download {} with {} data len: {}",
work_id,
file.type_name(),
file.len()
)
.green()
);
let save_type = (&file).into();
db_part::save_data_to_db(
work_id,
save_type,
file.take_data(),
Some(CoverStrategy::CoverIfDifferent),
&db,
)
}
None => {
if exist_len.is_some() {
event!(
Level::INFO,
"{}",
format!("Skip save {} with no data", work_id).cyan()
);
continue;
}
event!(
Level::INFO,
"{}",
format!("Download {} with no data", work_id).yellow()
);
db_part::save_data_to_db(work_id, SaveType::None, "".to_string(), None, &db)
}
}
.await
{
Ok(_) => (),
Err(e) => event!(Level::WARN, "Save data {} failed: {:?}", work_id, e),
}
}
}
pub async fn main(mut stop_receiver: Receiver<()>) -> anyhow::Result<()> {
let span = tracing::span!(Level::INFO, "fast_mode");
let _enter = span.enter();
let conf = config::ConfigFile::try_read()?;
let db_connect = db_part::connect(&conf).await?;
db_part::migrate(&db_connect).await?;
db_part::utils::check_null_data(&db_connect).await;
db_part::utils::update_xml_tested(&db_connect).await;
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
if stop_receiver.try_recv().is_ok() {
event!(Level::INFO, "{}", "Stop download".red());
// 结束 db
db_connect.close().await?;
return Ok(());
}
let end_id: SaveId = conf.sync.fast.end_id;
let worker_size = conf.sync.fast.worker_size;
let mut current_id = conf.sync.fast.start_id;
let mut works = Vec::with_capacity(conf.sync.fast.worker_count as usize);
let max_works = conf.sync.fast.worker_count as usize;
for _ in 0..works.len() {
if stop_receiver.try_recv().is_ok() {
event!(Level::INFO, "{}", "Stop download".red());
// 结束 db
db_connect.close().await?;
return Ok(());
}
let client = Downloader::new(Some(conf.net_timeout()));
let end = current_id + worker_size;
works.push(tokio::spawn(big_worker(
db_connect.clone(),
client,
current_id..end,
)));
current_id = end;
}
while current_id < end_id || !works.is_empty() {
if stop_receiver.try_recv().is_ok() {
event!(Level::INFO, "{}", "Stop download".red());
// 结束 db
db_connect.close().await?;
return Ok(());
}
while current_id < end_id && works.len() < max_works {
let client = Downloader::new(Some(conf.net_timeout()));
let end = current_id + worker_size;
works.push(tokio::spawn(big_worker(
db_connect.clone(),
client,
current_id..end,
)));
current_id = end;
}
if !works.is_empty() {
let (_result, _index, remain) = select_all(works).await;
works = remain;
}
}
Ok(())
}

View File

@ -172,7 +172,7 @@
resultTitle.innerText = '请求结果';
resultDisplay.appendChild(resultTitle);
// 先判断数据拿没拿到
if (data["code"] != 200) {
if (data["code"] !== 200) {
// 没拿到
const resultContent = document.createElement('div');
resultContent.innerText = data["msg"];

View File

@ -1,285 +1,90 @@
use colored::Colorize;
use futures::future::select_all;
use std::{io::Write, ops::Range};
use tokio::sync::oneshot::Receiver;
use tracing::{event, Level};
pub mod config;
pub mod db_part;
/// 快速同步
pub mod fast_mode;
#[allow(unused)]
pub mod model;
pub mod net;
/// 服务模式
pub mod serve_mode;
pub mod web_part;
use crate::db_part::CoverStrategy;
use migration::SaveId;
use model::sea_orm_active_enums::SaveType;
pub use net::Downloader;
async fn big_worker(
db: sea_orm::DatabaseConnection,
client: Downloader,
work_range: Range<SaveId>,
) {
for work_id in work_range {
let exist_len = db_part::check_data_len(&db, work_id).await;
if exist_len.is_some() && exist_len.unwrap() > 0 {
event!(
Level::INFO,
"{}",
format!("Skip download {} with exist data", work_id).blue()
);
continue;
}
match match client.try_download_as_any(work_id).await {
Some(file) => {
event!(
Level::INFO,
"{}",
format!(
"Download {} with {} data len: {}",
work_id,
file.type_name(),
file.len()
)
.green()
);
let save_type = (&file).into();
db_part::save_data_to_db(
work_id,
save_type,
file.take_data(),
Some(CoverStrategy::CoverIfDifferent),
&db,
)
}
None => {
if exist_len.is_some() {
event!(
Level::INFO,
"{}",
format!("Skip save {} with no data", work_id).cyan()
);
continue;
}
event!(
Level::INFO,
"{}",
format!("Download {} with no data", work_id).yellow()
);
db_part::save_data_to_db(work_id, SaveType::None, "".to_string(), None, &db)
}
}
.await
{
Ok(_) => (),
Err(e) => event!(Level::WARN, "Save data {} failed: {:?}", work_id, e),
}
}
enum RunMode {
/// 服务模式
Serve,
/// 快速模式
Fast,
}
async fn serve_mode(mut stop_receiver: Receiver<()>) -> anyhow::Result<()> {
let span = tracing::span!(Level::INFO, "serve_mode");
let _enter = span.enter();
const HELP_MSG: &str = r#"Usage: srdownload [options] -s/f
Options:
-d Debug mode
-t=xx 线 ( 10)
-s
-f ()"#;
let conf = config::ConfigFile::try_read()?;
fn main() -> anyhow::Result<()> {
// 检查 CLI 参数
let db_connect = db_part::connect(&conf).await?;
db_part::migrate(&db_connect).await?;
db_part::utils::check_null_data(&db_connect).await;
db_part::utils::update_xml_tested(&db_connect).await;
let mut db_max_id = db_part::search::max_id(&db_connect).await;
let mut web_waiter = None;
if conf.serve.enable {
web_waiter = Some(tokio::spawn(web_part::web_main()));
}
event!(
Level::INFO,
"{}",
format!(
"数据库中最大的现有数据 id 为: {} 将从这里开始下载",
db_max_id
)
.green()
);
let serve_wait_time = conf.serve_duration();
let client = Downloader::new(None);
let mut waited = false;
// 开始等待的时间
let mut start_wait_time = tokio::time::Instant::now();
loop {
if stop_receiver.try_recv().is_ok() {
event!(Level::INFO, "{}", "结束下载!".yellow());
// 结束 db
db_connect.close().await?;
if conf.serve.enable {
if let Some(web_waiter) = web_waiter {
web_waiter.abort();
}
}
return Ok(());
}
tokio::select! {
_ = tokio::time::sleep(serve_wait_time) => {
let work_id = db_max_id + 1;
match client.try_download_as_any(work_id).await {
Some(file) => {
if waited {
println!();
waited = false;
}
let wait_time = start_wait_time.elapsed();
start_wait_time = tokio::time::Instant::now();
event!(
Level::INFO,
"{}",
format!(
"下载到了新的 {}!(懒得做中文了) ID为: {} 长度: {}, 等了 {}",
file.type_name(),
work_id,
file.len(),
format!("{:?}", wait_time).blue()
)
.green()
);
let save_type: SaveType = (&file).into();
match db_part::save_data_to_db(
work_id,
save_type,
file.take_data(),
Some(CoverStrategy::CoverIfDifferent),
&db_connect,
)
.await
{
Ok(_) => {
{
db_max_id = work_id;
event!(
Level::INFO,
"{}",
format!(
"保存好啦! (下一排的每一个 . 代表一个 {:?})",
serve_wait_time
)
.green()
);
continue; // 保存好之后立即尝试下一次, 保证连续上传的时候的效率
};
}
Err(e) => {
event!(Level::ERROR, "呜呜呜, 数据保存失败了: {:?}\n我不玩了!", e);
return Err(e);
}
}
}
None => {
print!(".");
waited = true;
let _ = std::io::stdout().flush();
}
}
}
_ = &mut stop_receiver => {
event!(Level::INFO, "{}", "结束下载!".yellow());
// 结束 db
db_connect.close().await?;
return Ok(());
}
}
}
}
async fn fast_mode(mut stop_receiver: Receiver<()>) -> anyhow::Result<()> {
let span = tracing::span!(Level::INFO, "fast_mode");
let _enter = span.enter();
let conf = config::ConfigFile::try_read()?;
let db_connect = db_part::connect(&conf).await?;
db_part::migrate(&db_connect).await?;
db_part::utils::check_null_data(&db_connect).await;
db_part::utils::update_xml_tested(&db_connect).await;
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
if stop_receiver.try_recv().is_ok() {
event!(Level::INFO, "{}", "Stop download".red());
// 结束 db
db_connect.close().await?;
let args: Vec<String> = std::env::args().collect();
if args.contains(&"-h".to_string()) {
println!("{}", HELP_MSG);
return Ok(());
}
let end_id: SaveId = conf.sync.fast.end_id;
let worker_size = conf.sync.fast.worker_size;
let mut current_id = conf.sync.fast.start_id;
let mut works = Vec::with_capacity(conf.sync.fast.worker_count as usize);
let max_works = conf.sync.fast.worker_count as usize;
for _ in 0..works.len() {
if stop_receiver.try_recv().is_ok() {
event!(Level::INFO, "{}", "Stop download".red());
// 结束 db
db_connect.close().await?;
return Ok(());
}
let client = net::Downloader::new(Some(conf.net_timeout()));
let end = current_id + worker_size;
works.push(tokio::spawn(big_worker(
db_connect.clone(),
client,
current_id..end,
)));
current_id = end;
let mut thread_count = 10;
if args.iter().any(|x| x.starts_with("-t=")) {
thread_count = args
.iter()
.find(|x| x.starts_with("-t="))
.unwrap()
.split('=')
.last()
.unwrap()
.parse::<usize>()?;
}
while current_id < end_id || !works.is_empty() {
if stop_receiver.try_recv().is_ok() {
event!(Level::INFO, "{}", "Stop download".red());
// 结束 db
db_connect.close().await?;
return Ok(());
}
while current_id < end_id && works.len() < max_works {
let client = net::Downloader::new(Some(conf.net_timeout()));
let end = current_id + worker_size;
works.push(tokio::spawn(big_worker(
db_connect.clone(),
client,
current_id..end,
)));
current_id = end;
}
if !works.is_empty() {
let (_result, _index, remain) = select_all(works).await;
works = remain;
}
}
Ok(())
}
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> {
// 判断是否有 -f / -s 参数
let args: Vec<String> = std::env::args().collect();
if args.contains(&"-d".to_string()) {
// debug 模式
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();
} else {
tracing_subscriber::fmt().with_max_level(Level::INFO).init();
}
event!(Level::INFO, "Starting srdownload");
let mode = {
if args.contains(&"-s".to_string()) {
RunMode::Serve
} else if args.contains(&"-f".to_string()) {
RunMode::Fast
} else {
event!(
Level::ERROR,
"{}",
"Please use -s or -f to start the program".red()
);
event!(Level::ERROR, "{}", "Use -s to start serve mode".red());
event!(Level::ERROR, "{}", "Use -f to start fast mode".red());
return Ok(());
}
};
// 判断是否有 -f / -s 参数
event!(Level::INFO, "Starting sr download");
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(thread_count)
.enable_all()
.build()?;
rt.block_on(async_main(mode))
}
async fn async_main(run_mode: RunMode) -> anyhow::Result<()> {
let (stop_sender, stop_receiver) = tokio::sync::oneshot::channel::<()>();
let stop_waiter = tokio::spawn(async move {
tokio::signal::ctrl_c()
.await
@ -287,21 +92,15 @@ async fn main() -> anyhow::Result<()> {
event!(Level::INFO, "{}", "Ctrl-C received".red());
stop_sender.send(()).unwrap();
});
let job_waiter;
if args.contains(&"-s".to_string()) {
job_waiter = tokio::spawn(serve_mode(stop_receiver));
} else if args.contains(&"-f".to_string()) {
job_waiter = tokio::spawn(fast_mode(stop_receiver));
} else {
event!(
Level::ERROR,
"{}",
"Please use -s or -f to start the program".red()
);
event!(Level::ERROR, "{}", "Use -s to start serve mode".red());
event!(Level::ERROR, "{}", "Use -f to start fast mode".red());
return Ok(());
let job_waiter;
match run_mode {
RunMode::Serve => {
job_waiter = tokio::spawn(serve_mode::main(stop_receiver));
}
RunMode::Fast => {
job_waiter = tokio::spawn(fast_mode::main(stop_receiver));
}
}
job_waiter.await??;
let _ = stop_waiter.await;

View File

@ -94,14 +94,14 @@ impl Downloader {
Self { client }
}
pub fn as_ship_url(id: SaveId) -> String {
pub fn fmt_ship_url(id: SaveId) -> String {
format!(
"http://jundroo.com/service/SimpleRockets/DownloadRocket?id={}",
id
)
}
pub fn as_save_url(id: SaveId) -> String {
pub fn fmt_save_url(id: SaveId) -> String {
format!(
"http://jundroo.com/service/SimpleRockets/DownloadSandBox?id={}",
id
@ -115,7 +115,7 @@ impl Downloader {
let span = tracing::span!(Level::DEBUG, "try_download_as_any", id);
let _enter = span.enter();
// 先尝试用 ship 的 API 下载
let ship_url = Self::as_ship_url(id);
let ship_url = Self::fmt_ship_url(id);
let ship_try = self.client.get(&ship_url).send().await;
event!(Level::DEBUG, "trying to Download as ship {:?}", ship_try);
if let Ok(ship_try) = ship_try {
@ -132,7 +132,7 @@ impl Downloader {
}
}
// 否则尝试用 save 的 API 下载
let save_url = Self::as_save_url(id);
let save_url = Self::fmt_save_url(id);
let save_try = self.client.get(&save_url).send().await;
if let Ok(save_try) = save_try {
if save_try.status().is_success() {
@ -150,7 +150,7 @@ impl Downloader {
#[allow(unused)]
/// 尝试用 ship 的 API 下载文件
pub async fn download_as_ship(&self, id: SaveId) -> Option<String> {
let url = Self::as_ship_url(id);
let url = Self::fmt_ship_url(id);
let try_res = self.client.get(&url).send().await;
if let Ok(try_res) = try_res {
if try_res.status().is_success() {
@ -167,7 +167,7 @@ impl Downloader {
#[allow(unused)]
/// 尝试用 save 的 API 下载文件
pub async fn download_as_save(&self, id: SaveId) -> Option<String> {
let url = Self::as_save_url(id);
let url = Self::fmt_save_url(id);
let try_res = self.client.get(&url).send().await;
if let Ok(try_res) = try_res {
if try_res.status().is_success() {

View File

@ -0,0 +1,124 @@
use std::io::Write;
use colored::Colorize;
use tokio::sync::oneshot::Receiver;
use tracing::{event, Level};
use crate::db_part::{CoverStrategy, SaveType};
use crate::{config, db_part, web_part, Downloader};
pub async fn main(mut stop_receiver: Receiver<()>) -> anyhow::Result<()> {
let span = tracing::span!(Level::INFO, "serve_mode");
let _enter = span.enter();
let conf = config::ConfigFile::try_read()?;
let db_connect = db_part::connect(&conf).await?;
db_part::migrate(&db_connect).await?;
db_part::utils::check_null_data(&db_connect).await;
db_part::utils::update_xml_tested(&db_connect).await;
let mut db_max_id = db_part::search::max_id(&db_connect).await;
let mut web_waiter = None;
if conf.serve.enable {
web_waiter = Some(tokio::spawn(web_part::web_main()));
}
event!(
Level::INFO,
"{}",
format!(
"数据库中最大的现有数据 id 为: {} 将从这里开始下载",
db_max_id
)
.green()
);
let serve_wait_time = conf.serve_duration();
let client = Downloader::new(None);
let mut waited = false;
// 开始等待的时间
let mut start_wait_time = tokio::time::Instant::now();
loop {
if stop_receiver.try_recv().is_ok() {
event!(Level::INFO, "{}", "结束下载!".yellow());
// 结束 db
db_connect.close().await?;
if conf.serve.enable {
if let Some(web_waiter) = web_waiter {
web_waiter.abort();
}
}
return Ok(());
}
tokio::select! {
_ = tokio::time::sleep(serve_wait_time) => {
let work_id = db_max_id + 1;
match client.try_download_as_any(work_id).await {
Some(file) => {
if waited {
println!();
waited = false;
}
let wait_time = start_wait_time.elapsed();
start_wait_time = tokio::time::Instant::now();
event!(
Level::INFO,
"{}",
format!(
"下载到了新的 {}!(懒得做中文了) ID为: {} 长度: {}, 等了 {}",
file.type_name(),
work_id,
file.len(),
format!("{:?}", wait_time).blue()
)
.green()
);
let save_type: SaveType = (&file).into();
match db_part::save_data_to_db(
work_id,
save_type,
file.take_data(),
Some(CoverStrategy::CoverIfDifferent),
&db_connect,
)
.await
{
Ok(_) => {
db_max_id = work_id;
event!(
Level::INFO,
"{}",
format!(
"保存好啦! (下一排的每一个 . 代表一个 {:?})",
serve_wait_time
)
.green()
);
continue; // 保存好之后立即尝试下一次, 保证连续上传的时候的效率
}
Err(e) => {
event!(Level::ERROR, "呜呜呜, 数据保存失败了: {:?}\n我不玩了!", e);
return Err(e);
}
}
}
None => {
print!(".");
waited = true;
let _ = std::io::stdout().flush();
}
}
}
_ = &mut stop_receiver => {
event!(Level::INFO, "{}", "结束下载!".yellow());
// 结束 db
db_connect.close().await?;
return Ok(());
}
}
}
}