大刀阔斧进行一波 async 改动(

This commit is contained in:
shenjack-5600u 2024-02-07 12:06:23 +08:00
parent d4020d3864
commit 7108b337f9
Signed by: shenjack
GPG Key ID: FDF9864E11C7E79F
4 changed files with 36 additions and 8 deletions

View File

@ -15,6 +15,8 @@ local_test = []
reqwest = { version = "0.11.23", features = ["json"] } reqwest = { version = "0.11.23", features = ["json"] }
axum = "0.7.4" axum = "0.7.4"
tokio = { version = "1.35.1", features = ["full"] } tokio = { version = "1.35.1", features = ["full"] }
futures-util = "0.3.30"
rust_socketio = { path = "D:/githubs/rust-socketio/socketio", features = ["async"]}
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.112" serde_json = "1.0.112"

View File

@ -1,10 +1,16 @@
use crate::config::Config; use crate::config::Config;
use crate::fatal;
use crate::utils::avro_data_to_file_list; use crate::utils::avro_data_to_file_list;
use crate::PROTOCOL_VERSION; use crate::PROTOCOL_VERSION;
use reqwest::{Client, StatusCode}; use futures_util::FutureExt;
use reqwest::{Client as reqClient, StatusCode};
use rust_socketio::{
asynchronous::{Client, ClientBuilder},
Payload, TransportType,
};
use serde::Deserialize; use serde::Deserialize;
use tracing::{info, warn}; use tracing::{debug, info, warn};
use zstd::stream::decode_all; use zstd::stream::decode_all;
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
@ -14,16 +20,35 @@ pub struct SyncFile {
pub size: i64, pub size: i64,
} }
#[derive(Debug, Clone)] #[derive(Clone)]
pub struct Cluster { pub struct Cluster {
pub config: Config, pub config: Config,
pub ua: String, pub ua: String,
pub socket: Client,
} }
impl Cluster { impl Cluster {
pub fn new(config: Config) -> Self { pub async fn new(config: Config) -> Self {
let disconnect = |reason: Payload, _: Client| {
async move {
fatal!("socket disconnect: {:?}", reason);
}
.boxed()
};
let ua = format!("openbmclapi-cluster/{}", PROTOCOL_VERSION); let ua = format!("openbmclapi-cluster/{}", PROTOCOL_VERSION);
Self { config, ua } let socket = ClientBuilder::new(config.center_url.clone())
.transport_type(TransportType::Websocket)
.on("error", |err, _| {
fatal!("socket error {:?}", err);
})
.on("message", |msg, _| {
async move { debug!("socket message: {:?}", msg) }.boxed()
})
.on("disconnect", disconnect)
.connect()
.await
.expect("Failed to connect to center");
Self { config, ua, socket }
} }
/// ```typescript /// ```typescript
@ -67,7 +92,7 @@ impl Cluster {
let url = self.config.join_center_url("/openbmclapi/files"); let url = self.config.join_center_url("/openbmclapi/files");
let password = self.config.cluster_secret.clone(); let password = self.config.cluster_secret.clone();
let username = self.config.cluster_id.clone(); let username = self.config.cluster_id.clone();
let client = Client::builder() let client = reqClient::builder()
.user_agent(self.ua.clone()) .user_agent(self.ua.clone())
.build() .build()
.unwrap(); .unwrap();
@ -137,7 +162,7 @@ mod tests {
async fn test_get_file_list() { async fn test_get_file_list() {
crate::log::init_log_with_cli(); crate::log::init_log_with_cli();
let config = gen_config(); let config = gen_config();
let cluster = Cluster::new(config); let cluster = Cluster::new(config).await;
cluster.get_file_list().await.unwrap(); cluster.get_file_list().await.unwrap();
} }
} }

View File

@ -44,6 +44,7 @@ impl IntoResponse for MeasureRes {
/// for (let i = 0; i < size; i++) { /// for (let i = 0; i < size; i++) {
/// res.write(buffer) /// res.write(buffer)
/// } /// }
/// res.end()
/// }) /// })
/// ///
/// export default MeasureRoute /// export default MeasureRoute

View File

@ -1,8 +1,8 @@
use crate::cluster::SyncFile; use crate::cluster::SyncFile;
use std::collections::HashMap;
use std::io::Cursor; use std::io::Cursor;
use std::path::PathBuf; use std::path::PathBuf;
use std::collections::HashMap;
use apache_avro::{from_avro_datum, from_value, types::Value}; use apache_avro::{from_avro_datum, from_value, types::Value};
use base64::Engine; use base64::Engine;