diff --git a/miner/src/cacluate.rs b/miner/src/cacluate.rs index 6250699..d41af50 100644 --- a/miner/src/cacluate.rs +++ b/miner/src/cacluate.rs @@ -4,7 +4,12 @@ use crate::{ Command, }; -use std::{io::Write, ops::Range, path::PathBuf, time::Instant}; +use std::{ + io::Write, + ops::Range, + path::PathBuf, + time::{Duration, Instant}, +}; use base16384::Base16384Utf8; use colored::Colorize; @@ -75,9 +80,14 @@ impl ComputeStatus { pub fn get_idle_thread(&self) -> Option { self.thread_running.iter().position(|&x| !x) } pub fn all_stoped(&self) -> bool { self.thread_running.iter().all(|&x| !x) } - pub fn update_speed(&mut self, thread_id: usize, speed: u64) { self.thread_speed[thread_id] = speed; } - pub fn update_running(&mut self, thread_id: usize, running: bool) { self.thread_running[thread_id] = running; } + pub fn update_speed(&mut self, thread_id: ThreadId, speed: u64) { self.thread_speed[thread_id as usize] = speed; } + pub fn update_running(&mut self, thread_id: ThreadId, running: bool) { self.thread_running[thread_id as usize] = running; } pub fn count_speed(&self) -> u64 { self.thread_speed.iter().sum() } + pub fn predict_time(&self, top_i: u64) -> chrono::Duration { + let speed = self.count_speed(); + let remain = self.end - top_i; + chrono::Duration::milliseconds((remain as f64 / speed as f64 * 1000.0) as i64) + } } pub fn start_main(cli_arg: Command, out_path: PathBuf) { @@ -124,6 +134,7 @@ pub type ThreadId = u32; /// 3.ended 为 true 的时候, 再发送消息的时候直接发送 None /// - 如果是 动态大小 的 batch pub fn schdule_threads(cli_arg: Command, out_path: PathBuf) { + let mut cores = 0; let mut thread = vec![]; let mut shared_status = ComputeStatus::new(&cli_arg); let (work_sender, work_receiver) = bounded::(0); @@ -149,6 +160,7 @@ pub fn schdule_threads(cli_arg: Command, out_path: PathBuf) { info!("线程 {} 结束计算", thread_name); })) } + crate::set_process_cores(cores); // 任务分发 // 判断是否所有 work 都分发完了 // 当前分发到的 work 的 最大 index @@ -251,88 +263,88 @@ pub fn cacl( receiver: Receiver, work_sender: Sender<(ThreadId, u32)>, ) { - // k += 1; - // if k >= report_interval { - // let now = std::time::Instant::now(); - // let d_t: std::time::Duration = now.duration_since(start_time); - // let new_run_speed = k as f64 / d_t.as_secs_f64(); - // // 预估剩余时间 - // let wait_time = (range.end - i) / new_run_speed as u64; - // let wait_time = chrono::Duration::seconds(wait_time as i64); - // // 转换成 时:分:秒 - // // 根据实际运行速率来调整 report_interval - // report_interval = config.report_interval * new_run_speed as u64; - // info!( - // "|{:>2}|Id:{:>15}|{:6.2}/s {:>3.3}E/d {:>5.2}{}|{:<3}|预计:{}:{}:{}|", - // config.thread_id, - // i, - // new_run_speed, - // new_run_speed * 8.64 / 1_0000.0, - // d_t.as_secs_f64(), - // // 根据对比上一段运行速度 输出 emoji - // // ⬆️ ➡️ ⬇️ - // // 两个值 相差 0.1 之内都是 ➡️ - // if new_run_speed > run_speed + 0.1 { - // "⬆️".green() - // } else if new_run_speed < run_speed - 0.1 { - // // 橙色 - // "⬇️".red() - // } else { - // "➡️".blue() - // }, - // get_count, - // wait_time.num_hours(), - // wait_time.num_minutes() % 60, - // wait_time.num_seconds() % 60 - // ); - // run_speed = new_run_speed; - // range_time = std::time::Instant::now(); - // k = 0; - // } -} - -/// 固定 batch 的计算函数 -pub fn count_batch_cacl(config: CacluateConfig, status: &ComputeStatus, receiver: Receiver) { - // 初始猜测的时间间隔 - // 设置线程亲和性 if let Some(core_affinity) = config.core_affinity { crate::set_thread2core(1 << core_affinity) } // 提前准备好 team_namer let team_namer = TeamNamer::new(&config.team).unwrap(); let mut main_namer = Namer::new_from_team_namer_unchecked(&team_namer, "dummy"); - - let mut start_time = std::time::Instant::now(); + let mut get_count = 0; let mut run_speed = 0.0; - let mut k: u64 = 0; -} - -/// 动态 batch 的计算函数 -pub fn time_batch_cacl(config: CacluateConfig, status: &ComputeStatus, receiver: Receiver, work_sender: Sender) { - // 初始猜测的时间间隔 - // 设置线程亲和性 - if let Some(core_affinity) = config.core_affinity { - crate::set_thread2core(1 << core_affinity) + loop { + // 先 request 一个 work + let work = match receiver.recv() { + Ok(work) => match work { + Some(work) => { + if work.0 == config.thread_id { + work.1 + } else { + // 如果不是自己的 work, 则再次发送一个 request + let _ = work_sender.send((config.thread_id, if run_speed == 0.0 { 0 } else { run_speed as u32 })); + // 然后进入下一次循环 + continue; + } + } + None => { + // 如果接收到了 None, 则说明活都干完了, 退出 + return; + } + }, + Err(_) => { + // 如果接收到了错误, 则说明主线程已经结束了, 退出 + return; + } + }; + // 开始计算 + let count = work.end - work.start; + let top = work.end; + let start_time = std::time::Instant::now(); + // 计算 + get_count += inner_cacl(&config, work, &mut main_namer, &team_namer); + // 完事, 统计 + let now = std::time::Instant::now(); + let d_t: std::time::Duration = now.duration_since(start_time); + let new_run_speed = count as f64 / d_t.as_secs_f64(); + // 预估剩余时间 + // 先更新自己的状态上去 + status.update_speed(config.thread_id, new_run_speed as u64); + // 获取一个全局速度预测 + let predict_time = status.predict_time(top); + // 输出状态 + info!( + // thread_id, top, 当前线程速度, 当前batch用时, emoji, 全局速度, 全局E/d 速度, 算到几个, 预计时间 + "|{:>2}|Id:{:>15}|{:6.2}/s {:>5.2}|{:6.2}/s {:>3.3}E/d {}|{:<3}|预计:{}:{}:{}|", + config.thread_id, + top, + new_run_speed, + d_t.as_secs_f64(), + // 如果速度差 1k 以上, 则输出emoji + if new_run_speed > run_speed + 1000.0 { + "⬆️".green() + } else if new_run_speed < run_speed - 1000.0 { + "⬇️".red() + } else { + "➡️".blue() + }, + status.count_speed(), + status.count_speed() as f64 / 86400.0, + get_count, + predict_time.num_hours(), + predict_time.num_minutes() % 60, + predict_time.num_seconds() % 60 + ); + run_speed = new_run_speed; + // 然后是调度相关 + status.update_running(config.thread_id, false); + // 直接下一次循环 } - // 提前准备好 team_namer - let team_namer = TeamNamer::new(&config.team).unwrap(); - let mut main_namer = Namer::new_from_team_namer_unchecked(&team_namer, "dummy"); - - let mut start_time = std::time::Instant::now(); - let mut run_speed = 0.0; - let mut k: u64 = 0; } /// 每一个 batch 的具体运算 /// 不负责状态统计 /// 状态统计的最小颗粒度是整个 batch -pub fn inner_cacl( - config: &CacluateConfig, - range: Range, - main_namer: &mut Namer, - team_namer: &TeamNamer, - get_count: &mut u64, -) { +pub fn inner_cacl(config: &CacluateConfig, range: Range, main_namer: &mut Namer, team_namer: &TeamNamer) -> u64 { + let mut get_count = 0; for i in range { // 这堆操作放在这边了, 保证统计没问题 let name = gen_name(i); @@ -355,7 +367,7 @@ pub fn inner_cacl( continue; } - *get_count += 1; + get_count += 1; info!("Id:{:>15}|{}|{:.4}|{:.4}|{}", i, full_name, xu, xu_qd, main_namer.get_info()); // 写入文件 let write_in = format!( @@ -382,4 +394,5 @@ pub fn inner_cacl( } } } + get_count }