甚至也许行了

This commit is contained in:
shenjack 2024-06-24 01:23:56 +08:00
parent 542aba050f
commit 69c84f5e30
Signed by: shenjack
GPG Key ID: 7B1134A979775551

View File

@ -4,7 +4,12 @@ use crate::{
Command, Command,
}; };
use std::{io::Write, ops::Range, path::PathBuf, time::Instant}; use std::{
io::Write,
ops::Range,
path::PathBuf,
time::{Duration, Instant},
};
use base16384::Base16384Utf8; use base16384::Base16384Utf8;
use colored::Colorize; use colored::Colorize;
@ -75,9 +80,14 @@ impl ComputeStatus {
pub fn get_idle_thread(&self) -> Option<usize> { self.thread_running.iter().position(|&x| !x) } pub fn get_idle_thread(&self) -> Option<usize> { self.thread_running.iter().position(|&x| !x) }
pub fn all_stoped(&self) -> bool { self.thread_running.iter().all(|&x| !x) } pub fn all_stoped(&self) -> bool { self.thread_running.iter().all(|&x| !x) }
pub fn update_speed(&mut self, thread_id: usize, speed: u64) { self.thread_speed[thread_id] = speed; } pub fn update_speed(&mut self, thread_id: ThreadId, speed: u64) { self.thread_speed[thread_id as usize] = speed; }
pub fn update_running(&mut self, thread_id: usize, running: bool) { self.thread_running[thread_id] = running; } pub fn update_running(&mut self, thread_id: ThreadId, running: bool) { self.thread_running[thread_id as usize] = running; }
pub fn count_speed(&self) -> u64 { self.thread_speed.iter().sum() } pub fn count_speed(&self) -> u64 { self.thread_speed.iter().sum() }
pub fn predict_time(&self, top_i: u64) -> chrono::Duration {
let speed = self.count_speed();
let remain = self.end - top_i;
chrono::Duration::milliseconds((remain as f64 / speed as f64 * 1000.0) as i64)
}
} }
pub fn start_main(cli_arg: Command, out_path: PathBuf) { pub fn start_main(cli_arg: Command, out_path: PathBuf) {
@ -124,6 +134,7 @@ pub type ThreadId = u32;
/// 3.ended 为 true 的时候, 再发送消息的时候直接发送 None /// 3.ended 为 true 的时候, 再发送消息的时候直接发送 None
/// - 如果是 动态大小 的 batch /// - 如果是 动态大小 的 batch
pub fn schdule_threads(cli_arg: Command, out_path: PathBuf) { pub fn schdule_threads(cli_arg: Command, out_path: PathBuf) {
let mut cores = 0;
let mut thread = vec![]; let mut thread = vec![];
let mut shared_status = ComputeStatus::new(&cli_arg); let mut shared_status = ComputeStatus::new(&cli_arg);
let (work_sender, work_receiver) = bounded::<WorkInfo>(0); let (work_sender, work_receiver) = bounded::<WorkInfo>(0);
@ -149,6 +160,7 @@ pub fn schdule_threads(cli_arg: Command, out_path: PathBuf) {
info!("线程 {} 结束计算", thread_name); info!("线程 {} 结束计算", thread_name);
})) }))
} }
crate::set_process_cores(cores);
// 任务分发 // 任务分发
// 判断是否所有 work 都分发完了 // 判断是否所有 work 都分发完了
// 当前分发到的 work 的 最大 index // 当前分发到的 work 的 最大 index
@ -251,88 +263,88 @@ pub fn cacl(
receiver: Receiver<WorkInfo>, receiver: Receiver<WorkInfo>,
work_sender: Sender<(ThreadId, u32)>, work_sender: Sender<(ThreadId, u32)>,
) { ) {
// k += 1;
// if k >= report_interval {
// let now = std::time::Instant::now();
// let d_t: std::time::Duration = now.duration_since(start_time);
// let new_run_speed = k as f64 / d_t.as_secs_f64();
// // 预估剩余时间
// let wait_time = (range.end - i) / new_run_speed as u64;
// let wait_time = chrono::Duration::seconds(wait_time as i64);
// // 转换成 时:分:秒
// // 根据实际运行速率来调整 report_interval
// report_interval = config.report_interval * new_run_speed as u64;
// info!(
// "|{:>2}|Id:{:>15}|{:6.2}/s {:>3.3}E/d {:>5.2}{}|{:<3}|预计:{}:{}:{}|",
// config.thread_id,
// i,
// new_run_speed,
// new_run_speed * 8.64 / 1_0000.0,
// d_t.as_secs_f64(),
// // 根据对比上一段运行速度 输出 emoji
// // ⬆️ ➡️ ⬇️
// // 两个值 相差 0.1 之内都是 ➡️
// if new_run_speed > run_speed + 0.1 {
// "⬆️".green()
// } else if new_run_speed < run_speed - 0.1 {
// // 橙色
// "⬇️".red()
// } else {
// "➡️".blue()
// },
// get_count,
// wait_time.num_hours(),
// wait_time.num_minutes() % 60,
// wait_time.num_seconds() % 60
// );
// run_speed = new_run_speed;
// range_time = std::time::Instant::now();
// k = 0;
// }
}
/// 固定 batch 的计算函数
pub fn count_batch_cacl(config: CacluateConfig, status: &ComputeStatus, receiver: Receiver<WorkInfo>) {
// 初始猜测的时间间隔
// 设置线程亲和性
if let Some(core_affinity) = config.core_affinity { if let Some(core_affinity) = config.core_affinity {
crate::set_thread2core(1 << core_affinity) crate::set_thread2core(1 << core_affinity)
} }
// 提前准备好 team_namer // 提前准备好 team_namer
let team_namer = TeamNamer::new(&config.team).unwrap(); let team_namer = TeamNamer::new(&config.team).unwrap();
let mut main_namer = Namer::new_from_team_namer_unchecked(&team_namer, "dummy"); let mut main_namer = Namer::new_from_team_namer_unchecked(&team_namer, "dummy");
let mut get_count = 0;
let mut start_time = std::time::Instant::now();
let mut run_speed = 0.0; let mut run_speed = 0.0;
let mut k: u64 = 0; loop {
} // 先 request 一个 work
let work = match receiver.recv() {
/// 动态 batch 的计算函数 Ok(work) => match work {
pub fn time_batch_cacl(config: CacluateConfig, status: &ComputeStatus, receiver: Receiver<WorkInfo>, work_sender: Sender<u32>) { Some(work) => {
// 初始猜测的时间间隔 if work.0 == config.thread_id {
// 设置线程亲和性 work.1
if let Some(core_affinity) = config.core_affinity { } else {
crate::set_thread2core(1 << core_affinity) // 如果不是自己的 work, 则再次发送一个 request
let _ = work_sender.send((config.thread_id, if run_speed == 0.0 { 0 } else { run_speed as u32 }));
// 然后进入下一次循环
continue;
}
}
None => {
// 如果接收到了 None, 则说明活都干完了, 退出
return;
}
},
Err(_) => {
// 如果接收到了错误, 则说明主线程已经结束了, 退出
return;
}
};
// 开始计算
let count = work.end - work.start;
let top = work.end;
let start_time = std::time::Instant::now();
// 计算
get_count += inner_cacl(&config, work, &mut main_namer, &team_namer);
// 完事, 统计
let now = std::time::Instant::now();
let d_t: std::time::Duration = now.duration_since(start_time);
let new_run_speed = count as f64 / d_t.as_secs_f64();
// 预估剩余时间
// 先更新自己的状态上去
status.update_speed(config.thread_id, new_run_speed as u64);
// 获取一个全局速度预测
let predict_time = status.predict_time(top);
// 输出状态
info!(
// thread_id, top, 当前线程速度, 当前batch用时, emoji, 全局速度, 全局E/d 速度, 算到几个, 预计时间
"|{:>2}|Id:{:>15}|{:6.2}/s {:>5.2}|{:6.2}/s {:>3.3}E/d {}|{:<3}|预计:{}:{}:{}|",
config.thread_id,
top,
new_run_speed,
d_t.as_secs_f64(),
// 如果速度差 1k 以上, 则输出emoji
if new_run_speed > run_speed + 1000.0 {
"⬆️".green()
} else if new_run_speed < run_speed - 1000.0 {
"⬇️".red()
} else {
"➡️".blue()
},
status.count_speed(),
status.count_speed() as f64 / 86400.0,
get_count,
predict_time.num_hours(),
predict_time.num_minutes() % 60,
predict_time.num_seconds() % 60
);
run_speed = new_run_speed;
// 然后是调度相关
status.update_running(config.thread_id, false);
// 直接下一次循环
} }
// 提前准备好 team_namer
let team_namer = TeamNamer::new(&config.team).unwrap();
let mut main_namer = Namer::new_from_team_namer_unchecked(&team_namer, "dummy");
let mut start_time = std::time::Instant::now();
let mut run_speed = 0.0;
let mut k: u64 = 0;
} }
/// 每一个 batch 的具体运算 /// 每一个 batch 的具体运算
/// 不负责状态统计 /// 不负责状态统计
/// 状态统计的最小颗粒度是整个 batch /// 状态统计的最小颗粒度是整个 batch
pub fn inner_cacl( pub fn inner_cacl(config: &CacluateConfig, range: Range<u64>, main_namer: &mut Namer, team_namer: &TeamNamer) -> u64 {
config: &CacluateConfig, let mut get_count = 0;
range: Range<u64>,
main_namer: &mut Namer,
team_namer: &TeamNamer,
get_count: &mut u64,
) {
for i in range { for i in range {
// 这堆操作放在这边了, 保证统计没问题 // 这堆操作放在这边了, 保证统计没问题
let name = gen_name(i); let name = gen_name(i);
@ -355,7 +367,7 @@ pub fn inner_cacl(
continue; continue;
} }
*get_count += 1; get_count += 1;
info!("Id:{:>15}|{}|{:.4}|{:.4}|{}", i, full_name, xu, xu_qd, main_namer.get_info()); info!("Id:{:>15}|{}|{:.4}|{:.4}|{}", i, full_name, xu, xu_qd, main_namer.get_info());
// 写入文件 // 写入文件
let write_in = format!( let write_in = format!(
@ -382,4 +394,5 @@ pub fn inner_cacl(
} }
} }
} }
get_count
} }