From 7108b337f924eccf6fe3465e13a881c75f467d22 Mon Sep 17 00:00:00 2001 From: shenjack-5600u <3695888@qq.com> Date: Wed, 7 Feb 2024 12:06:23 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A4=A7=E5=88=80=E9=98=94=E6=96=A7=E8=BF=9B?= =?UTF-8?q?=E8=A1=8C=E4=B8=80=E6=B3=A2=20async=20=E6=94=B9=E5=8A=A8?= =?UTF-8?q?=EF=BC=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.toml | 2 ++ src/cluster.rs | 39 ++++++++++++++++++++++++++++++++------- src/serve.rs | 1 + src/utils.rs | 2 +- 4 files changed, 36 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 58594c8..37afa63 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,8 @@ local_test = [] reqwest = { version = "0.11.23", features = ["json"] } axum = "0.7.4" tokio = { version = "1.35.1", features = ["full"] } +futures-util = "0.3.30" +rust_socketio = { path = "D:/githubs/rust-socketio/socketio", features = ["async"]} serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.112" diff --git a/src/cluster.rs b/src/cluster.rs index 45d587d..fea9b92 100644 --- a/src/cluster.rs +++ b/src/cluster.rs @@ -1,10 +1,16 @@ use crate::config::Config; +use crate::fatal; use crate::utils::avro_data_to_file_list; use crate::PROTOCOL_VERSION; -use reqwest::{Client, StatusCode}; +use futures_util::FutureExt; +use reqwest::{Client as reqClient, StatusCode}; +use rust_socketio::{ + asynchronous::{Client, ClientBuilder}, + Payload, TransportType, +}; use serde::Deserialize; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use zstd::stream::decode_all; #[derive(Deserialize, Debug, Clone)] @@ -14,16 +20,35 @@ pub struct SyncFile { pub size: i64, } -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct Cluster { pub config: Config, pub ua: String, + pub socket: Client, } impl Cluster { - pub fn new(config: Config) -> Self { + pub async fn new(config: Config) -> Self { + let disconnect = |reason: Payload, _: Client| { + async move { + fatal!("socket disconnect: {:?}", reason); + } + .boxed() + }; let ua = format!("openbmclapi-cluster/{}", PROTOCOL_VERSION); - Self { config, ua } + let socket = ClientBuilder::new(config.center_url.clone()) + .transport_type(TransportType::Websocket) + .on("error", |err, _| { + fatal!("socket error {:?}", err); + }) + .on("message", |msg, _| { + async move { debug!("socket message: {:?}", msg) }.boxed() + }) + .on("disconnect", disconnect) + .connect() + .await + .expect("Failed to connect to center"); + Self { config, ua, socket } } /// ```typescript @@ -67,7 +92,7 @@ impl Cluster { let url = self.config.join_center_url("/openbmclapi/files"); let password = self.config.cluster_secret.clone(); let username = self.config.cluster_id.clone(); - let client = Client::builder() + let client = reqClient::builder() .user_agent(self.ua.clone()) .build() .unwrap(); @@ -137,7 +162,7 @@ mod tests { async fn test_get_file_list() { crate::log::init_log_with_cli(); let config = gen_config(); - let cluster = Cluster::new(config); + let cluster = Cluster::new(config).await; cluster.get_file_list().await.unwrap(); } } diff --git a/src/serve.rs b/src/serve.rs index f5da78b..72b5775 100644 --- a/src/serve.rs +++ b/src/serve.rs @@ -44,6 +44,7 @@ impl IntoResponse for MeasureRes { /// for (let i = 0; i < size; i++) { /// res.write(buffer) /// } +/// res.end() /// }) /// /// export default MeasureRoute diff --git a/src/utils.rs b/src/utils.rs index 52e99ec..8347842 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,8 +1,8 @@ use crate::cluster::SyncFile; +use std::collections::HashMap; use std::io::Cursor; use std::path::PathBuf; -use std::collections::HashMap; use apache_avro::{from_avro_datum, from_value, types::Value}; use base64::Engine;