Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion rust/cube/cubestore-cli/src/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub async fn run(client: &Client, mut timing: bool) -> Result<()> {
let history_path = dirs::home_dir().map(|p| p.join(".cubestore_history"));

let mut rl: Editor<(), DefaultHistory> = Editor::new()?;

if let Some(ref p) = history_path {
let _ = rl.load_history(p);
}
Expand All @@ -31,7 +32,15 @@ pub async fn run(client: &Client, mut timing: bool) -> Result<()> {
"cubestore-> "
};

let line = match rl.readline(prompt) {
let (line_res, rl_back) = tokio::task::spawn_blocking(move || {
let res = rl.readline(prompt);
(res, rl)
})
.await?;

rl = rl_back;

let line = match line_res {
Ok(line) => line,
Err(ReadlineError::Interrupted) => {
// Ctrl-C: clear current buffer, continue.
Expand Down
18 changes: 15 additions & 3 deletions rust/cube/cubestore-ws-transport/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub(crate) struct Actor {
inbox: mpsc::UnboundedReceiver<ActorRequest>,
ws: Option<WsStream>,
last_pong: Instant,
last_ping: Instant,
}

impl Actor {
Expand All @@ -61,6 +62,7 @@ impl Actor {
inbox,
ws: Some(ws),
last_pong: Instant::now(),
last_ping: Instant::now(),
}
}

Expand All @@ -72,9 +74,9 @@ impl Actor {

let (mut sink, mut stream) = ws.split();
self.last_pong = Instant::now();
self.last_ping = Instant::now();

// After a reconnect: flush any unanswered buffers with their original message ids.
// Matches WebSocketConnection.ts:128-143.
let resend: Vec<Bytes> = std::mem::take(&mut self.pending_resend);
for buf in resend {
if let Err(e) = sink.send(Message::Binary(buf)).await {
Expand Down Expand Up @@ -164,14 +166,24 @@ impl Actor {
}

_ = ping_interval.tick() => {
if self.last_pong.elapsed() > self.cfg.no_heartbeat_timeout {
log::warn!("heartbeat timeout — reconnecting");
// Only treat a stale pong as a heartbeat loss if we've actually
// sent a ping that the server hasn't answered yet — otherwise
// the timer could fire on a healthy but quiet connection.
if self.last_ping > self.last_pong
&& self.last_ping.elapsed() > self.cfg.no_heartbeat_timeout
{
log::warn!(
"heartbeat timeout: no pong for {:?} since last ping — reconnecting",
self.last_ping.elapsed()
);
break true;
}

if let Err(e) = sink.send(Message::Ping(Bytes::new())).await {
log::warn!("ping send failed: {e}");
break true;
}
self.last_ping = Instant::now();
}
}
};
Expand Down
Loading