好家伙

This commit is contained in:
shenjack 2024-07-21 00:42:06 +08:00
parent 7e63b1b7a6
commit 69af2c8a02
Signed by: shenjack
GPG Key ID: 7B1134A979775551
4 changed files with 41 additions and 22 deletions

View File

@ -13,6 +13,7 @@ pub struct ConfigFile {
pub worker_count: u32,
pub worker_size: u32,
pub start_id: SaveId,
pub max_timeout: f32,
}
impl Default for ConfigFile {
@ -21,10 +22,11 @@ impl Default for ConfigFile {
db_url: "postgres://srdown:srdown@192.168.3.22:10001/srdown".to_string(),
db_schema: "public".to_string(),
max_connections: 10,
sqlx_logging: true,
sqlx_logging: false,
worker_count: 10,
worker_size: 10,
start_id: 1,
start_id: 173860,
max_timeout: 1.0,
}
}
}
@ -42,4 +44,8 @@ impl ConfigFile {
std::fs::write(file_path, toml)?;
Ok(())
}
pub fn timeout_as_duration(&self) -> std::time::Duration {
std::time::Duration::from_secs_f32(self.max_timeout)
}
}

View File

@ -1,7 +1,7 @@
use blake3::Hasher;
use sea_orm::{
ActiveModelTrait, ColumnTrait, ConnectOptions, Database, DatabaseConnection, EntityTrait,
IntoActiveModel, ModelTrait, QueryFilter, QueryOrder, QuerySelect, TransactionTrait,
IntoActiveModel, ModelTrait, QueryFilter, QueryOrder, TransactionTrait,
};
use tracing::{event, Level};
@ -29,8 +29,6 @@ pub async fn find_max_id(db: &DatabaseConnection) -> SaveId {
// 我丢你老母, 有这时间写这个, 我都写完 sql 语句了
match model::main_data::Entity::find()
.order_by_desc(model::main_data::Column::SaveId)
// .select_only()
// .column(model::main_data::Column::SaveId)
.one(db)
.await
{
@ -42,6 +40,19 @@ pub async fn find_max_id(db: &DatabaseConnection) -> SaveId {
}
}
pub async fn check_have_none_empty_data(db: &DatabaseConnection, save_id: SaveId) -> bool {
// SELECT save_id from main_data WHERE save_id = $1 AND len > 0
match model::main_data::Entity::find()
.filter(model::main_data::Column::SaveId.eq(save_id as i32))
.filter(model::main_data::Column::Len.gt(0))
.one(db)
.await
{
Ok(model) => model.is_some(),
Err(_) => false,
}
}
#[derive(Debug, Clone, Copy, Default)]
pub enum CoverStrategy {
#[default]
@ -85,7 +96,7 @@ where
match cover_strategy {
CoverStrategy::Error => return Err(anyhow::anyhow!("Data already exists")),
CoverStrategy::Skip => return Ok(false),
_ => {}
_ => (),
}
}

View File

@ -15,9 +15,16 @@ use model::sea_orm_active_enums::SaveType;
use crate::db_part::CoverStrategy;
async fn big_worker(db: sea_orm::DatabaseConnection, work_range: Range<SaveId>) {
let client = net::Downloader::default();
async fn big_worker(
db: sea_orm::DatabaseConnection,
client: net::Downloader,
work_range: Range<SaveId>,
) {
for work_id in work_range {
if db_part::check_have_none_empty_data(&db, work_id).await {
event!(Level::INFO, "{}", format!("Skip {}", work_id).blue());
continue;
}
match match client.try_download_as_any(work_id).await {
Some(file) => {
event!(
@ -85,9 +92,11 @@ async fn main() -> anyhow::Result<()> {
let mut works = Vec::with_capacity(conf.worker_count as usize);
let max_works = conf.worker_count as usize;
for _ in 0..works.len() {
let client = net::Downloader::new(conf.timeout_as_duration());
let end = current_id + batch_size;
works.push(tokio::spawn(big_worker(
db_connect.clone(),
client,
current_id..end,
)));
current_id = end;
@ -95,9 +104,11 @@ async fn main() -> anyhow::Result<()> {
while current_id < end_id || !works.is_empty() {
while current_id < end_id && works.len() < max_works {
let client = net::Downloader::new(conf.timeout_as_duration());
let end = current_id + batch_size;
works.push(tokio::spawn(big_worker(
db_connect.clone(),
client,
current_id..end,
)));
current_id = end;

View File

@ -3,6 +3,7 @@ use std::time::Duration;
use crate::{model::sea_orm_active_enums::SaveType, SaveId};
#[derive(Debug, Clone)]
pub struct Downloader {
pub client: Client,
timeout: Duration,
@ -95,7 +96,7 @@ impl Downloader {
let ship_try = self
.client
.get(&ship_url)
.timeout(self.timeout.clone())
.timeout(self.timeout)
.send()
.await;
if let Ok(ship_try) = ship_try {
@ -113,7 +114,7 @@ impl Downloader {
let save_try = self
.client
.get(&save_url)
.timeout(self.timeout.clone())
.timeout(self.timeout)
.send()
.await;
if let Ok(save_try) = save_try {
@ -132,12 +133,7 @@ impl Downloader {
/// 尝试用 ship 的 API 下载文件
pub async fn download_as_ship(&self, id: SaveId) -> Option<String> {
let url = Self::as_ship_url(id);
let try_res = self
.client
.get(&url)
.timeout(self.timeout.clone())
.send()
.await;
let try_res = self.client.get(&url).timeout(self.timeout).send().await;
if let Ok(try_res) = try_res {
if try_res.status().is_success() {
if let Ok(body) = try_res.text().await {
@ -153,12 +149,7 @@ impl Downloader {
/// 尝试用 save 的 API 下载文件
pub async fn download_as_save(&self, id: SaveId) -> Option<String> {
let url = Self::as_save_url(id);
let try_res = self
.client
.get(&url)
.timeout(self.timeout.clone())
.send()
.await;
let try_res = self.client.get(&url).timeout(self.timeout).send().await;
if let Ok(try_res) = try_res {
if try_res.status().is_success() {
if let Ok(body) = try_res.text().await {