4T/d? 我不信

This commit is contained in:
shenjack 2024-06-24 02:09:48 +08:00
parent b2d33bdec5
commit ac7945e4df
Signed by: shenjack
GPG Key ID: 7B1134A979775551
2 changed files with 46 additions and 31 deletions

View File

@ -53,6 +53,8 @@ pub struct ComputeStatus {
pub start: u64,
/// 总计算数
pub end: u64,
/// top
pub top_id: u64,
/// 当前各个线程的计算速度
pub thread_speed: Vec<u64>,
/// 当前各个线程是否在运算
@ -64,6 +66,7 @@ impl ComputeStatus {
ComputeStatus {
start: config.start,
end: config.end,
top_id: config.start,
thread_speed: vec![0; config.thread_count as usize],
thread_running: vec![false; config.thread_count as usize],
}
@ -74,9 +77,9 @@ impl ComputeStatus {
pub fn update_speed(&mut self, thread_id: ThreadId, speed: u64) { self.thread_speed[thread_id as usize] = speed; }
pub fn update_running(&mut self, thread_id: ThreadId, running: bool) { self.thread_running[thread_id as usize] = running; }
pub fn count_speed(&self) -> u64 { self.thread_speed.iter().sum() }
pub fn predict_time(&self, top_i: u64) -> chrono::Duration {
pub fn predict_time(&self) -> chrono::Duration {
let speed = self.count_speed();
let remain = self.end - top_i;
let remain = self.end - self.top_id;
chrono::Duration::milliseconds((remain as f64 / speed as f64 * 1000.0) as i64)
}
}
@ -150,13 +153,12 @@ pub fn schdule_threads(cli_arg: Command, out_path: PathBuf) {
info!("线程 {} 开始计算", thread_name);
cacl(config, shared_status, work_receiver, work_requester);
info!("线程 {} 结束计算", thread_name);
}))
}));
}
crate::set_process_cores(cores);
// 任务分发
// 判断是否所有 work 都分发完了
// 当前分发到的 work 的 最大 index
let mut top_i = cli_arg.start;
if cli_arg.batch_in_time() {
info!("开始分发任务(动态 batch)");
let mut sended = vec![false; cli_arg.thread_count as usize];
@ -184,26 +186,27 @@ pub fn schdule_threads(cli_arg: Command, out_path: PathBuf) {
if latest_speed.1 == 0 {
// 如果速度为 0, 则说明刚刚开始
// 直接发送一个 1w 的 batch
let _ = work_sender.send(Some((latest_speed.0, top_i..top_i + 10000)));
top_i += 10000;
let _ = work_sender.send(Some((latest_speed.0, shared_status.top_id..shared_status.top_id + 10000)));
shared_status.top_id += 10000;
} else {
// 计算出一个合适的 batch
let batch = latest_speed.1 as u64 * cli_arg.report_interval.unwrap();
// 判断是否快完事了
// 如果是, 则发送剩余的 work, 然后直接发送 None
if top_i + batch > cli_arg.end {
let _ = work_sender.send(Some((latest_speed.0, top_i..cli_arg.end)));
for _ in 0..cli_arg.thread_count {
let _ = work_sender.send(None);
if shared_status.all_stoped() {
break;
if shared_status.top_id + batch > cli_arg.end {
let _ = work_sender.send(Some((latest_speed.0, shared_status.top_id..cli_arg.end)));
info!("最后一个 batch({}..{}) 已发送", shared_status.top_id, cli_arg.end);
loop {
let _ = thread_waiter.try_recv();
let _ = work_sender.try_send(None);
if thread.iter().all(|t| t.is_finished()) {
return;
}
}
break;
} else {
// 如果不是, 则发送一个对应线程 id 的消息
let _ = work_sender.send(Some((latest_speed.0, top_i..top_i + batch)));
top_i += batch;
let _ = work_sender.send(Some((latest_speed.0, shared_status.top_id..shared_status.top_id + batch)));
shared_status.top_id += batch;
}
}
}
@ -222,22 +225,26 @@ pub fn schdule_threads(cli_arg: Command, out_path: PathBuf) {
// 这里不确定是不是会有问题, 先用 unwarp 看看
let thread_id = shared_status.get_idle_thread().unwrap();
// 先检测是否快结束了
if top_i + cli_arg.batch_size.unwrap() as u64 >= cli_arg.end {
if shared_status.top_id + cli_arg.batch_size.unwrap() as u64 >= cli_arg.end {
// 如果快结束了, 则发送剩余的 work 然后发送 None
let _ = work_sender.send(Some((thread_id as u32, top_i..cli_arg.end)));
for _ in 0..cli_arg.thread_count {
let _ = work_sender.send(None);
if shared_status.all_stoped() {
break;
let _ = work_sender.send(Some((thread_id as u32, shared_status.top_id..cli_arg.end)));
info!("最后一个 batch({}..{}) 已发送", shared_status.top_id, cli_arg.end);
loop {
let _ = thread_waiter.try_recv();
let _ = work_sender.try_send(None);
if thread.iter().all(|t| t.is_finished()) {
return;
}
}
break;
} else {
// 如果没有结束, 则发送一个 batch
let _ = work_sender.send(Some((thread_id as u32, top_i..top_i + cli_arg.batch_size.unwrap() as u64)));
let _ = work_sender.send(Some((
thread_id as u32,
shared_status.top_id..shared_status.top_id + cli_arg.batch_size.unwrap() as u64,
)));
}
// 更新 top_i
top_i += cli_arg.batch_size.unwrap() as u64;
shared_status.top_id += cli_arg.batch_size.unwrap() as u64;
}
}
}
@ -258,7 +265,7 @@ pub fn cacl(
work_sender: Sender<(ThreadId, u32)>,
) {
if let Some(core_affinity) = config.core_affinity {
crate::set_thread2core(1 << core_affinity)
crate::set_thread2core(core_affinity);
}
// 提前准备好 team_namer
let team_namer = TeamNamer::new(&config.team).unwrap();
@ -305,12 +312,12 @@ pub fn cacl(
// 先更新自己的状态上去
status.update_speed(config.thread_id, new_run_speed as u64);
// 获取一个全局速度预测
let predict_time = status.predict_time(top);
let predict_time = status.predict_time();
debug!("{:?}", status.thread_speed);
// 输出状态
info!(
// thread_id, top, 当前线程速度, 当前batch用时, emoji, 全局速度, 全局E/d 速度, 算到几个, 进度, 预计时间
"|{:>2}|Id:{:>15}|{:6.2}/s {:>5.2}s {}|{:6.2}/s {:>3.3}E/d|{:<3}|{:3.2}% 预计:{}:{}:{}|",
"|{:>2}|Id:{:>15}|{:6.2}/s {:>5.2}s {}{:>3.3}E/d|{:<3}|{:3.3}% {}:{}:{}|",
config.thread_id,
top,
new_run_speed,
@ -323,10 +330,9 @@ pub fn cacl(
} else {
"".blue()
},
status.count_speed(),
status.count_speed() as f64 / 86400.0,
status.count_speed() as f64 * 8.64 / 1_0000.0,
get_count,
(top - status.start) as f64 / (status.end - status.start) as f64 * 100.0,
(status.top_id - status.start) as f64 / (status.end - status.start) as f64 * 100.0,
predict_time.num_hours(),
predict_time.num_minutes() % 60,
predict_time.num_seconds() % 60

View File

@ -42,6 +42,9 @@ pub struct Command {
/// 单线程模式模式下的核心亲和性核心号 (从 0 开始)
#[arg(long = "core-pick")]
pub pick_core: Option<usize>,
/// 是否为 debug 模式
#[arg(long, short = 'd')]
pub debug: bool,
}
impl Command {
@ -119,8 +122,14 @@ pub fn set_process_cores(cores: usize) {
}
fn main() {
tracing_subscriber::fmt().with_max_level(tracing::Level::DEBUG).init();
let mut cli_arg = Command::parse();
tracing_subscriber::fmt()
.with_max_level(if cli_arg.debug {
tracing::Level::DEBUG
} else {
tracing::Level::INFO
})
.init();
// 先验证参数
// batch 至少要是 size 或者 count 之一
if !cli_arg.batch_in_count() && !cli_arg.batch_in_time() {