diff --git a/rust/cube/cubestore-cli/src/repl.rs b/rust/cube/cubestore-cli/src/repl.rs index 2f578a23e9b8f..7093e2a321868 100644 --- a/rust/cube/cubestore-cli/src/repl.rs +++ b/rust/cube/cubestore-cli/src/repl.rs @@ -18,6 +18,7 @@ pub async fn run(client: &Client, mut timing: bool) -> Result<()> { let history_path = dirs::home_dir().map(|p| p.join(".cubestore_history")); let mut rl: Editor<(), DefaultHistory> = Editor::new()?; + if let Some(ref p) = history_path { let _ = rl.load_history(p); } @@ -31,7 +32,15 @@ pub async fn run(client: &Client, mut timing: bool) -> Result<()> { "cubestore-> " }; - let line = match rl.readline(prompt) { + let (line_res, rl_back) = tokio::task::spawn_blocking(move || { + let res = rl.readline(prompt); + (res, rl) + }) + .await?; + + rl = rl_back; + + let line = match line_res { Ok(line) => line, Err(ReadlineError::Interrupted) => { // Ctrl-C: clear current buffer, continue. diff --git a/rust/cube/cubestore-ws-transport/src/actor.rs b/rust/cube/cubestore-ws-transport/src/actor.rs index 5dd0c025af2da..bfd4ed2f6f55b 100644 --- a/rust/cube/cubestore-ws-transport/src/actor.rs +++ b/rust/cube/cubestore-ws-transport/src/actor.rs @@ -41,6 +41,7 @@ pub(crate) struct Actor { inbox: mpsc::UnboundedReceiver, ws: Option, last_pong: Instant, + last_ping: Instant, } impl Actor { @@ -61,6 +62,7 @@ impl Actor { inbox, ws: Some(ws), last_pong: Instant::now(), + last_ping: Instant::now(), } } @@ -72,9 +74,9 @@ impl Actor { let (mut sink, mut stream) = ws.split(); self.last_pong = Instant::now(); + self.last_ping = Instant::now(); // After a reconnect: flush any unanswered buffers with their original message ids. - // Matches WebSocketConnection.ts:128-143. let resend: Vec = std::mem::take(&mut self.pending_resend); for buf in resend { if let Err(e) = sink.send(Message::Binary(buf)).await { @@ -164,14 +166,24 @@ impl Actor { } _ = ping_interval.tick() => { - if self.last_pong.elapsed() > self.cfg.no_heartbeat_timeout { - log::warn!("heartbeat timeout — reconnecting"); + // Only treat a stale pong as a heartbeat loss if we've actually + // sent a ping that the server hasn't answered yet — otherwise + // the timer could fire on a healthy but quiet connection. + if self.last_ping > self.last_pong + && self.last_ping.elapsed() > self.cfg.no_heartbeat_timeout + { + log::warn!( + "heartbeat timeout: no pong for {:?} since last ping — reconnecting", + self.last_ping.elapsed() + ); break true; } + if let Err(e) = sink.send(Message::Ping(Bytes::new())).await { log::warn!("ping send failed: {e}"); break true; } + self.last_ping = Instant::now(); } } };