From ac7945e4df8ba0418f949e7a48a363a2ee0b8051 Mon Sep 17 00:00:00 2001 From: shenjack <3695888@qq.com> Date: Mon, 24 Jun 2024 02:09:48 +0800 Subject: [PATCH] =?UTF-8?q?4T/d=3F=20=E6=88=91=E4=B8=8D=E4=BF=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- miner/src/cacluate.rs | 66 +++++++++++++++++++++++-------------------- miner/src/main.rs | 11 +++++++- 2 files changed, 46 insertions(+), 31 deletions(-) diff --git a/miner/src/cacluate.rs b/miner/src/cacluate.rs index 25277b2..3b8e605 100644 --- a/miner/src/cacluate.rs +++ b/miner/src/cacluate.rs @@ -53,6 +53,8 @@ pub struct ComputeStatus { pub start: u64, /// 总计算数 pub end: u64, + /// top + pub top_id: u64, /// 当前各个线程的计算速度 pub thread_speed: Vec, /// 当前各个线程是否在运算 @@ -64,6 +66,7 @@ impl ComputeStatus { ComputeStatus { start: config.start, end: config.end, + top_id: config.start, thread_speed: vec![0; config.thread_count as usize], thread_running: vec![false; config.thread_count as usize], } @@ -74,9 +77,9 @@ impl ComputeStatus { pub fn update_speed(&mut self, thread_id: ThreadId, speed: u64) { self.thread_speed[thread_id as usize] = speed; } pub fn update_running(&mut self, thread_id: ThreadId, running: bool) { self.thread_running[thread_id as usize] = running; } pub fn count_speed(&self) -> u64 { self.thread_speed.iter().sum() } - pub fn predict_time(&self, top_i: u64) -> chrono::Duration { + pub fn predict_time(&self) -> chrono::Duration { let speed = self.count_speed(); - let remain = self.end - top_i; + let remain = self.end - self.top_id; chrono::Duration::milliseconds((remain as f64 / speed as f64 * 1000.0) as i64) } } @@ -150,13 +153,12 @@ pub fn schdule_threads(cli_arg: Command, out_path: PathBuf) { info!("线程 {} 开始计算", thread_name); cacl(config, shared_status, work_receiver, work_requester); info!("线程 {} 结束计算", thread_name); - })) + })); } crate::set_process_cores(cores); // 任务分发 // 判断是否所有 work 都分发完了 // 当前分发到的 work 的 最大 index - let mut top_i = cli_arg.start; if cli_arg.batch_in_time() { info!("开始分发任务(动态 batch)"); let mut sended = vec![false; cli_arg.thread_count as usize]; @@ -184,26 +186,27 @@ pub fn schdule_threads(cli_arg: Command, out_path: PathBuf) { if latest_speed.1 == 0 { // 如果速度为 0, 则说明刚刚开始 // 直接发送一个 1w 的 batch - let _ = work_sender.send(Some((latest_speed.0, top_i..top_i + 10000))); - top_i += 10000; + let _ = work_sender.send(Some((latest_speed.0, shared_status.top_id..shared_status.top_id + 10000))); + shared_status.top_id += 10000; } else { // 计算出一个合适的 batch let batch = latest_speed.1 as u64 * cli_arg.report_interval.unwrap(); // 判断是否快完事了 // 如果是, 则发送剩余的 work, 然后直接发送 None - if top_i + batch > cli_arg.end { - let _ = work_sender.send(Some((latest_speed.0, top_i..cli_arg.end))); - for _ in 0..cli_arg.thread_count { - let _ = work_sender.send(None); - if shared_status.all_stoped() { - break; + if shared_status.top_id + batch > cli_arg.end { + let _ = work_sender.send(Some((latest_speed.0, shared_status.top_id..cli_arg.end))); + info!("最后一个 batch({}..{}) 已发送", shared_status.top_id, cli_arg.end); + loop { + let _ = thread_waiter.try_recv(); + let _ = work_sender.try_send(None); + if thread.iter().all(|t| t.is_finished()) { + return; } } - break; } else { // 如果不是, 则发送一个对应线程 id 的消息 - let _ = work_sender.send(Some((latest_speed.0, top_i..top_i + batch))); - top_i += batch; + let _ = work_sender.send(Some((latest_speed.0, shared_status.top_id..shared_status.top_id + batch))); + shared_status.top_id += batch; } } } @@ -222,22 +225,26 @@ pub fn schdule_threads(cli_arg: Command, out_path: PathBuf) { // 这里不确定是不是会有问题, 先用 unwarp 看看 let thread_id = shared_status.get_idle_thread().unwrap(); // 先检测是否快结束了 - if top_i + cli_arg.batch_size.unwrap() as u64 >= cli_arg.end { + if shared_status.top_id + cli_arg.batch_size.unwrap() as u64 >= cli_arg.end { // 如果快结束了, 则发送剩余的 work 然后发送 None - let _ = work_sender.send(Some((thread_id as u32, top_i..cli_arg.end))); - for _ in 0..cli_arg.thread_count { - let _ = work_sender.send(None); - if shared_status.all_stoped() { - break; + let _ = work_sender.send(Some((thread_id as u32, shared_status.top_id..cli_arg.end))); + info!("最后一个 batch({}..{}) 已发送", shared_status.top_id, cli_arg.end); + loop { + let _ = thread_waiter.try_recv(); + let _ = work_sender.try_send(None); + if thread.iter().all(|t| t.is_finished()) { + return; } } - break; } else { // 如果没有结束, 则发送一个 batch - let _ = work_sender.send(Some((thread_id as u32, top_i..top_i + cli_arg.batch_size.unwrap() as u64))); + let _ = work_sender.send(Some(( + thread_id as u32, + shared_status.top_id..shared_status.top_id + cli_arg.batch_size.unwrap() as u64, + ))); } // 更新 top_i - top_i += cli_arg.batch_size.unwrap() as u64; + shared_status.top_id += cli_arg.batch_size.unwrap() as u64; } } } @@ -258,7 +265,7 @@ pub fn cacl( work_sender: Sender<(ThreadId, u32)>, ) { if let Some(core_affinity) = config.core_affinity { - crate::set_thread2core(1 << core_affinity) + crate::set_thread2core(core_affinity); } // 提前准备好 team_namer let team_namer = TeamNamer::new(&config.team).unwrap(); @@ -305,12 +312,12 @@ pub fn cacl( // 先更新自己的状态上去 status.update_speed(config.thread_id, new_run_speed as u64); // 获取一个全局速度预测 - let predict_time = status.predict_time(top); + let predict_time = status.predict_time(); debug!("{:?}", status.thread_speed); // 输出状态 info!( // thread_id, top, 当前线程速度, 当前batch用时, emoji, 全局速度, 全局E/d 速度, 算到几个, 进度, 预计时间 - "|{:>2}|Id:{:>15}|{:6.2}/s {:>5.2}s {}|{:6.2}/s {:>3.3}E/d|{:<3}|{:3.2}% 预计:{}:{}:{}|", + "|{:>2}|Id:{:>15}|{:6.2}/s {:>5.2}s {}{:>3.3}E/d|{:<3}|{:3.3}% {}:{}:{}|", config.thread_id, top, new_run_speed, @@ -323,10 +330,9 @@ pub fn cacl( } else { "→".blue() }, - status.count_speed(), - status.count_speed() as f64 / 86400.0, + status.count_speed() as f64 * 8.64 / 1_0000.0, get_count, - (top - status.start) as f64 / (status.end - status.start) as f64 * 100.0, + (status.top_id - status.start) as f64 / (status.end - status.start) as f64 * 100.0, predict_time.num_hours(), predict_time.num_minutes() % 60, predict_time.num_seconds() % 60 diff --git a/miner/src/main.rs b/miner/src/main.rs index b8fcd7d..7e24ed9 100644 --- a/miner/src/main.rs +++ b/miner/src/main.rs @@ -42,6 +42,9 @@ pub struct Command { /// 单线程模式模式下的核心亲和性核心号 (从 0 开始) #[arg(long = "core-pick")] pub pick_core: Option, + /// 是否为 debug 模式 + #[arg(long, short = 'd')] + pub debug: bool, } impl Command { @@ -119,8 +122,14 @@ pub fn set_process_cores(cores: usize) { } fn main() { - tracing_subscriber::fmt().with_max_level(tracing::Level::DEBUG).init(); let mut cli_arg = Command::parse(); + tracing_subscriber::fmt() + .with_max_level(if cli_arg.debug { + tracing::Level::DEBUG + } else { + tracing::Level::INFO + }) + .init(); // 先验证参数 // batch 至少要是 size 或者 count 之一 if !cli_arg.batch_in_count() && !cli_arg.batch_in_time() {