diff --git a/miner/src/cacluate.rs b/miner/src/cacluate.rs index 1887455..a453fa9 100644 --- a/miner/src/cacluate.rs +++ b/miner/src/cacluate.rs @@ -28,8 +28,10 @@ pub struct CacluateConfig { pub start: u64, /// 结束 pub end: u64, - /// 线程数 + /// 线程 id pub thread_id: u32, + /// 线程数 + pub thread_count: u32, /// 八围预期值 pub prop_expect: u32, /// qp 预期值 @@ -44,6 +46,36 @@ pub struct CacluateConfig { pub out_file: PathBuf, } +pub type WorkInfo = Option<(u32, u64)>; + +/// 用于在先成之间共享的运行状态 +/// 正常状态下是会在多个线程之间共享的 +/// 单线程状态下就直接在主线程里面 +/// 用于记录当前各个线程的计算状态 +pub struct ComputeStatus { + /// 总计算数 + pub start: u64, + /// 总计算数 + pub end: u64, + /// 当前各个线程的计算速度 + pub thread_speed: Vec, + /// 当前各个线程是否在运算 + pub thread_running: Vec, +} + +impl ComputeStatus { + pub fn new(config: &Command) -> Self { + ComputeStatus { + start: config.start, + end: config.end, + thread_speed: vec![0; config.thread_count as usize], + thread_running: vec![false; config.thread_count as usize], + } + } + + pub fn get_first_stoped(&self) -> Option { self.thread_running.iter().position(|&x| !x) } +} + pub fn start_main(cli_arg: Command, out_path: PathBuf) { if cli_arg.is_single_thread() { // 单线程运行的时候也是让他放在主线程跑 @@ -56,10 +88,53 @@ pub fn start_main(cli_arg: Command, out_path: PathBuf) { } } +/// 描述一下思路吧 +/// +/// 首先, 本地有几个信息 +/// sended: 一个数组, 用于记录每个线程是否已经发送了消息 +/// shared_status: 一个共享的状态, 用于记录每个线程的状态 +/// threads: 一个线程数组, 用于 hold 住线程 +/// +/// 其中, shared_status 里面记录了每个线程的状态 +/// 里面的 thread_running 用于在分发任务的时候判断是否有线程空闲和哪个线程空闲 +/// +/// 初始化分发的时候的逻辑如下: +/// 1. 初始化一个 0 大小的 bounded channel +/// +/// 分发任务(消息)的时候的逻辑如下: +/// - 如果是 固定大小 的 batch +/// 1. 每次直接发送一个 id 为 -1 (即任意线程都可以接收的) 的消息 +/// - 如果是 动态大小 的 batch +/// 0. 等待回返的 request work 的消息 +/// 1. 遍历 sended 中 true 的部分, 检查对应 thread_running 是否为 true +/// 2. 如果为 true, 则将对应 sended 置为 false +/// 3. 找到 sended 中 第一个为 false 的线程, 根据 thread_speed 计算出一个合适的 batch +/// 4. 发送一个对应线程 id 的消息 +/// +/// 最后结尾的时候的逻辑如下: +/// - 如果是 固定大小 的 batch +/// 1.每次发送之前检测是不是快完事了 ( batch size > 剩余 work size ) +/// 2.如果是, 则发送剩余的 work, 并且把 ended 置为 true +/// 3.ended 为 true 的时候, 再发送消息的时候直接发送 None +/// - 如果是 动态大小 的 batch pub fn schdule_threads(cli_arg: Command, out_path: PathBuf) { + if cli_arg.batch_in_time() { + schdule_count_batch(cli_arg, out_path); + } else { + schdule_time_batch(cli_arg, out_path); + } +} + +/// 简单的部分 +/// +/// 固定大小的 batch 的分发函数 +pub fn schdule_count_batch(cli_arg: Command, out_path: PathBuf) { let mut n = 0; let mut cores = 0; let mut threads = vec![]; + let mut shared_status = ComputeStatus::new(&cli_arg); + let mut sended = vec![false; cli_arg.thread_count as usize]; + let (sender, receiver) = bounded::>>(0); for i in 0..cli_arg.thread_count { n += 1; let mut config = cli_arg.as_cacl_config(&out_path); @@ -69,7 +144,7 @@ pub fn schdule_threads(cli_arg: Command, out_path: PathBuf) { let thread_name = format!("thread_{}", n); threads.push(std::thread::spawn(move || { info!("线程 {} 开始计算", thread_name); - cacl(config, n); + cacl(config, &shared_status, receiver.clone()); info!("线程 {} 结束计算", thread_name); })); } @@ -79,7 +154,92 @@ pub fn schdule_threads(cli_arg: Command, out_path: PathBuf) { } } -pub fn batch_cacl( +/// 麻烦的要死的部分 +/// +/// 动态大小的 batch 的分发函数 +pub fn schdule_time_batch(cli_arg: Command, out_path: PathBuf) {} + +/// 所有的状态输出都在子线程, 也就是这里 +/// +/// 1. 通过 `Receiver` 获取到主线程的数据 +/// 获取到数据后, 开始计算 +/// 计算完一个 batch 后, 输出一次状态 +/// 这里的状态是在所有运算线程中共享的一个状态 +/// 每一个线程运算完一个 batch 后, 都会更新这个状态 +/// 输出的时候顺带输出其他线程的状态 +#[inline(always)] +pub fn cacl(config: CacluateConfig, status: &ComputeStatus, receiver: Receiver, work_sender: Sender) { + // 初始猜测的时间间隔 + // 设置线程亲和性 + if let Some(core_affinity) = config.core_affinity { + crate::set_thread2core(1 << core_affinity) + } + + // 提前准备好 team_namer + let team_namer = TeamNamer::new(&config.team).unwrap(); + let mut main_namer = Namer::new_from_team_namer_unchecked(&team_namer, "dummy"); + + let mut start_time = std::time::Instant::now(); + + let mut report_interval = 100000; // 第一次猜测测 10w 次, 获取初始数据 + let mut run_speed = 0.0; + let mut k: u64 = 0; + // k += 1; + // if k >= report_interval { + // let now = std::time::Instant::now(); + // let d_t: std::time::Duration = now.duration_since(start_time); + // let new_run_speed = k as f64 / d_t.as_secs_f64(); + // // 预估剩余时间 + // let wait_time = (range.end - i) / new_run_speed as u64; + // let wait_time = chrono::Duration::seconds(wait_time as i64); + // // 转换成 时:分:秒 + // // 根据实际运行速率来调整 report_interval + // report_interval = config.report_interval * new_run_speed as u64; + // info!( + // "|{:>2}|Id:{:>15}|{:6.2}/s {:>3.3}E/d {:>5.2}{}|{:<3}|预计:{}:{}:{}|", + // config.thread_id, + // i, + // new_run_speed, + // new_run_speed * 8.64 / 1_0000.0, + // d_t.as_secs_f64(), + // // 根据对比上一段运行速度 输出 emoji + // // ⬆️ ➡️ ⬇️ + // // 两个值 相差 0.1 之内都是 ➡️ + // if new_run_speed > run_speed + 0.1 { + // "⬆️".green() + // } else if new_run_speed < run_speed - 0.1 { + // // 橙色 + // "⬇️".red() + // } else { + // "➡️".blue() + // }, + // get_count, + // wait_time.num_hours(), + // wait_time.num_minutes() % 60, + // wait_time.num_seconds() % 60 + // ); + // run_speed = new_run_speed; + // range_time = std::time::Instant::now(); + // k = 0; + // } +} + +/// 固定 batch 的计算函数 +pub fn count_batch_cacl(config: CacluateConfig, status: &ComputeStatus, receiver: Receiver>) {} + +/// 动态 batch 的计算函数 +pub fn time_batch_cacl( + config: CacluateConfig, + status: &ComputeStatus, + receiver: Receiver>, + work_sender: Sender, +) { +} + +/// 每一个 batch 的具体运算 +/// 不负责状态统计 +/// 状态统计的最小颗粒度是整个 batch +pub fn inner_cacl( config: &CacluateConfig, range: Range, main_namer: &mut Namer, @@ -136,61 +296,3 @@ pub fn batch_cacl( } } } - -#[inline(always)] -pub fn cacl(config: CacluateConfig, receiver: Receiver>>) { - // 初始猜测的时间间隔 - // 设置线程亲和性 - if let Some(core_affinity) = config.core_affinity { - crate::set_thread2core(1 << core_affinity) - } - - // 提前准备好 team_namer - let team_namer = TeamNamer::new(&config.team).unwrap(); - - let mut main_namer = Namer::new_from_team_namer_unchecked(&team_namer, "dummy"); - - let mut start_time = std::time::Instant::now(); - - let mut report_interval = 100000; // 第一次猜测测 10w 次, 获取初始数据 - let mut run_speed = 0.0; - let mut k: u64 = 0; - k += 1; - if k >= report_interval { - let now = std::time::Instant::now(); - let d_t: std::time::Duration = now.duration_since(start_time); - let new_run_speed = k as f64 / d_t.as_secs_f64(); - // 预估剩余时间 - let wait_time = (range.end - i) / new_run_speed as u64; - let wait_time = chrono::Duration::seconds(wait_time as i64); - // 转换成 时:分:秒 - // 根据实际运行速率来调整 report_interval - report_interval = config.report_interval * new_run_speed as u64; - info!( - "|{:>2}|Id:{:>15}|{:6.2}/s {:>3.3}E/d {:>5.2}{}|{:<3}|预计:{}:{}:{}|", - config.thread_id, - i, - new_run_speed, - new_run_speed * 8.64 / 1_0000.0, - d_t.as_secs_f64(), - // 根据对比上一段运行速度 输出 emoji - // ⬆️ ➡️ ⬇️ - // 两个值 相差 0.1 之内都是 ➡️ - if new_run_speed > run_speed + 0.1 { - "⬆️".green() - } else if new_run_speed < run_speed - 0.1 { - // 橙色 - "⬇️".red() - } else { - "➡️".blue() - }, - get_count, - wait_time.num_hours(), - wait_time.num_minutes() % 60, - wait_time.num_seconds() % 60 - ); - run_speed = new_run_speed; - range_time = std::time::Instant::now(); - k = 0; - } -} diff --git a/miner/src/main.rs b/miner/src/main.rs index 027b2c8..67bfb18 100644 --- a/miner/src/main.rs +++ b/miner/src/main.rs @@ -33,12 +33,12 @@ pub struct Command { /// 队伍名称 #[arg(long)] pub team: String, - /// 预期状态输出时间间隔 (秒) - #[arg(long, short = 'r', default_value_t = 10)] - pub report_interval: u64, - /// 一个 batch 多大 单线程下无效 - #[arg(long, short = 'b', default_value_t = 10_0000)] - pub batch_size: u64, + /// 如果指定, 则根据线程的实时速度*时间为单位作为 batch 大小 + #[arg(long, short = 'r')] + pub report_interval: Option, + /// 如果指定, 则使用固定的数量作为 batch 大小 + #[arg(long, short = 'b')] + pub batch_size: Option, /// 单线程模式模式下的核心亲和性核心号 (从 0 开始) #[arg(long = "core-pick")] pub pick_core: Option, @@ -50,6 +50,7 @@ impl Command { start: self.start, end: self.end, thread_id: 0, + thread_count: self.thread_count, prop_expect: self.prop_expect, xp_expect: self.xp_expect, team: self.team.clone(), @@ -61,20 +62,23 @@ impl Command { pub fn is_single_thread(&self) -> bool { self.thread_count == 1 } + pub fn batch_in_time(&self) -> bool { self.report_interval.is_some() } + + pub fn batch_in_count(&self) -> bool { self.batch_size.is_some() } + pub fn display_info(&self) -> String { format!( - "开始: {} 结尾: {}\n线程数: {}\n八围预期: {}\n强评/强单最低值: {}\n队伍名: {}\n预期状态输出时间间隔: {} 秒\n{}", + "开始: {} 结尾: {}\n线程数: {}\n八围预期: {}\n强评/强单最低值: {}\n队伍名: {}\n{}", self.start, self.end, self.thread_count, self.prop_expect, self.xp_expect, self.team, - self.report_interval, - if self.is_single_thread() { - "".to_string() + if self.batch_in_count() { + format!("固定 batch 大小: {}", self.batch_size.unwrap()) } else { - format!("batch 大小: {}", self.batch_size) + format!("时间 batch 大小: {}s", self.report_interval.unwrap()) } ) } @@ -118,6 +122,16 @@ pub fn set_process_cores(cores: usize) { fn main() { tracing_subscriber::fmt().with_max_level(tracing::Level::DEBUG).init(); let mut cli_arg = Command::parse(); + // 先验证参数 + // batch 至少要是 size 或者 count 之一 + if !cli_arg.batch_in_count() && !cli_arg.batch_in_time() { + warn!("必须指定 batch 大小, 请使用 -r 或者 -b 选项"); + return; + } + // 如果俩都指定了, 则使用时间为准 + if cli_arg.batch_in_count() && cli_arg.batch_in_time() { + warn!("两个 batch 选项都指定了, 将使用时间为准"); + } // 将数据量处理成可被 thread_count 整除 let left = cli_arg.start % cli_arg.thread_count as u64;