serve 模式下默认没 timeout

This commit is contained in:
shenjack-5600u 2024-07-27 21:58:56 +08:00
parent f399cbf979
commit e338d31ef3
Signed by: shenjack
GPG Key ID: FDF9864E11C7E79F
2 changed files with 38 additions and 22 deletions

View File

@ -77,6 +77,9 @@ async fn big_worker(
} }
async fn serve_mode(mut stop_receiver: Receiver<()>) -> anyhow::Result<()> { async fn serve_mode(mut stop_receiver: Receiver<()>) -> anyhow::Result<()> {
let span = tracing::span!(Level::INFO, "serve_mode");
let _enter = span.enter();
let conf = config::ConfigFile::try_read()?; let conf = config::ConfigFile::try_read()?;
let db_connect = db_part::connect(&conf).await?; let db_connect = db_part::connect(&conf).await?;
@ -94,7 +97,7 @@ async fn serve_mode(mut stop_receiver: Receiver<()>) -> anyhow::Result<()> {
); );
let serve_wait_time = conf.serve_duration(); let serve_wait_time = conf.serve_duration();
let client = net::Downloader::new(conf.net_timeout()); let client = net::Downloader::new(None);
let mut waited = false; let mut waited = false;
loop { loop {
@ -166,6 +169,9 @@ async fn serve_mode(mut stop_receiver: Receiver<()>) -> anyhow::Result<()> {
} }
async fn fast_mode(mut stop_receiver: Receiver<()>) -> anyhow::Result<()> { async fn fast_mode(mut stop_receiver: Receiver<()>) -> anyhow::Result<()> {
let span = tracing::span!(Level::INFO, "fast_mode");
let _enter = span.enter();
let conf = config::ConfigFile::try_read()?; let conf = config::ConfigFile::try_read()?;
let db_connect = db_part::connect(&conf).await?; let db_connect = db_part::connect(&conf).await?;
@ -192,7 +198,7 @@ async fn fast_mode(mut stop_receiver: Receiver<()>) -> anyhow::Result<()> {
db_connect.close().await?; db_connect.close().await?;
return Ok(()); return Ok(());
} }
let client = net::Downloader::new(conf.net_timeout()); let client = net::Downloader::new(Some(conf.net_timeout()));
let end = current_id + worker_size; let end = current_id + worker_size;
works.push(tokio::spawn(big_worker( works.push(tokio::spawn(big_worker(
db_connect.clone(), db_connect.clone(),
@ -210,7 +216,7 @@ async fn fast_mode(mut stop_receiver: Receiver<()>) -> anyhow::Result<()> {
return Ok(()); return Ok(());
} }
while current_id < end_id && works.len() < max_works { while current_id < end_id && works.len() < max_works {
let client = net::Downloader::new(conf.net_timeout()); let client = net::Downloader::new(Some(conf.net_timeout()));
let end = current_id + worker_size; let end = current_id + worker_size;
works.push(tokio::spawn(big_worker( works.push(tokio::spawn(big_worker(
db_connect.clone(), db_connect.clone(),
@ -230,11 +236,19 @@ async fn fast_mode(mut stop_receiver: Receiver<()>) -> anyhow::Result<()> {
#[tokio::main(flavor = "multi_thread", worker_threads = 10)] #[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
// 判断是否有 -f / -s 参数
let args: Vec<String> = std::env::args().collect();
if args.contains(&"-d".to_string()) {
// debug 模式
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();
} else {
tracing_subscriber::fmt().with_max_level(Level::INFO).init(); tracing_subscriber::fmt().with_max_level(Level::INFO).init();
}
event!(Level::INFO, "Starting srdownload"); event!(Level::INFO, "Starting srdownload");
// 判断是否有 -f / -s 参数 // 判断是否有 -f / -s 参数
let args: Vec<String> = std::env::args().collect();
let (stop_sender, stop_receiver) = tokio::sync::oneshot::channel::<()>(); let (stop_sender, stop_receiver) = tokio::sync::oneshot::channel::<()>();
let stop_waiter = tokio::spawn(async move { let stop_waiter = tokio::spawn(async move {
tokio::signal::ctrl_c() tokio::signal::ctrl_c()

View File

@ -1,12 +1,13 @@
use reqwest::{Client, ClientBuilder}; use reqwest::{Client, ClientBuilder};
use core::time;
use std::time::Duration; use std::time::Duration;
use tracing::{event, Event, Level};
use crate::{model::sea_orm_active_enums::SaveType, SaveId}; use crate::{model::sea_orm_active_enums::SaveType, SaveId};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Downloader { pub struct Downloader {
pub client: Client, pub client: Client,
timeout: Duration,
} }
/// 使用 any 下载下来的文件 /// 使用 any 下载下来的文件
@ -66,14 +67,15 @@ impl From<&DownloadFile> for SaveType {
} }
impl Downloader { impl Downloader {
pub fn new(timeout: Duration) -> Self { pub fn new(timeout: Option<Duration>) -> Self {
let ua = format!("sr_download/{}", env!("CARGO_PKG_VERSION")); let ua = format!("sr_download/{}", env!("CARGO_PKG_VERSION"));
let client = ClientBuilder::new() let mut client = ClientBuilder::new()
.timeout(timeout) .user_agent(ua);
.user_agent(ua) if let Some(timeout) = timeout {
.build() client = client.timeout(timeout);
.unwrap(); }
Self { client, timeout } let client = client.build().unwrap();
Self { client }
} }
pub fn as_ship_url(id: SaveId) -> String { pub fn as_ship_url(id: SaveId) -> String {
@ -94,17 +96,22 @@ impl Downloader {
/// 如果两个都没下载到,返回 None /// 如果两个都没下载到,返回 None
/// 如果下载到了,返回 Some(文件内容) /// 如果下载到了,返回 Some(文件内容)
pub async fn try_download_as_any(&self, id: SaveId) -> Option<DownloadFile> { pub async fn try_download_as_any(&self, id: SaveId) -> Option<DownloadFile> {
let span = tracing::span!(Level::DEBUG, "try_download_as_any", id);
let _enter = span.enter();
// 先尝试用 ship 的 API 下载 // 先尝试用 ship 的 API 下载
let ship_url = Self::as_ship_url(id); let ship_url = Self::as_ship_url(id);
let ship_try = self let ship_try = self
.client .client
.get(&ship_url) .get(&ship_url)
.timeout(self.timeout)
.send() .send()
.await; .await;
event!(Level::DEBUG, "trying to Download as ship {:?}", ship_try);
if let Ok(ship_try) = ship_try { if let Ok(ship_try) = ship_try {
event!(Level::DEBUG, "Download as ship {:?}", ship_try.status());
if ship_try.status().is_success() { if ship_try.status().is_success() {
event!(Level::DEBUG, "Download as ship {:?}", ship_try);
if let Ok(body) = ship_try.text().await { if let Ok(body) = ship_try.text().await {
event!(Level::DEBUG, "get ship body {:?}", body);
// 再判空 // 再判空
if !(body.is_empty() || body == "0") { if !(body.is_empty() || body == "0") {
return Some(DownloadFile::Ship(body)); return Some(DownloadFile::Ship(body));
@ -117,7 +124,6 @@ impl Downloader {
let save_try = self let save_try = self
.client .client
.get(&save_url) .get(&save_url)
.timeout(self.timeout)
.send() .send()
.await; .await;
if let Ok(save_try) = save_try { if let Ok(save_try) = save_try {
@ -137,7 +143,7 @@ impl Downloader {
/// 尝试用 ship 的 API 下载文件 /// 尝试用 ship 的 API 下载文件
pub async fn download_as_ship(&self, id: SaveId) -> Option<String> { pub async fn download_as_ship(&self, id: SaveId) -> Option<String> {
let url = Self::as_ship_url(id); let url = Self::as_ship_url(id);
let try_res = self.client.get(&url).timeout(self.timeout).send().await; let try_res = self.client.get(&url).send().await;
if let Ok(try_res) = try_res { if let Ok(try_res) = try_res {
if try_res.status().is_success() { if try_res.status().is_success() {
if let Ok(body) = try_res.text().await { if let Ok(body) = try_res.text().await {
@ -154,7 +160,7 @@ impl Downloader {
/// 尝试用 save 的 API 下载文件 /// 尝试用 save 的 API 下载文件
pub async fn download_as_save(&self, id: SaveId) -> Option<String> { pub async fn download_as_save(&self, id: SaveId) -> Option<String> {
let url = Self::as_save_url(id); let url = Self::as_save_url(id);
let try_res = self.client.get(&url).timeout(self.timeout).send().await; let try_res = self.client.get(&url).send().await;
if let Ok(try_res) = try_res { if let Ok(try_res) = try_res {
if try_res.status().is_success() { if try_res.status().is_success() {
if let Ok(body) = try_res.text().await { if let Ok(body) = try_res.text().await {
@ -167,15 +173,11 @@ impl Downloader {
None None
} }
#[allow(unused)]
pub fn set_timeout(&mut self, timeout: Duration) {
self.timeout = timeout;
}
} }
impl Default for Downloader { impl Default for Downloader {
fn default() -> Self { fn default() -> Self {
Self::new(Duration::from_secs(1)) Self::new(Some(Duration::from_secs(1)))
} }
} }