From 6a28c4483c4d82d856f129d8f9a362c0a4fbc494 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Mon, 18 May 2026 18:47:40 +0200 Subject: [PATCH] feat: Introduce cubestore-cli (#10898) --- rust/cube/Cargo.lock | 557 +++++++++++++++++- rust/cube/Cargo.toml | 2 + rust/cube/cubestore-cli/Cargo.toml | 24 + rust/cube/cubestore-cli/src/args.rs | 32 + rust/cube/cubestore-cli/src/bin/csql.rs | 86 +++ rust/cube/cubestore-cli/src/exec.rs | 117 ++++ rust/cube/cubestore-cli/src/format.rs | 150 +++++ rust/cube/cubestore-cli/src/lib.rs | 4 + rust/cube/cubestore-cli/src/repl.rs | 134 +++++ rust/cube/cubestore-ws-transport/Cargo.toml | 24 + rust/cube/cubestore-ws-transport/src/actor.rs | 336 +++++++++++ .../cube/cubestore-ws-transport/src/client.rs | 153 +++++ rust/cube/cubestore-ws-transport/src/codec.rs | 116 ++++ rust/cube/cubestore-ws-transport/src/error.rs | 28 + rust/cube/cubestore-ws-transport/src/lib.rs | 17 + .../cube/cubestore-ws-transport/src/result.rs | 15 + .../tests/mock_server.rs | 303 ++++++++++ .../cubestore-ws-transport/tests/reconnect.rs | 126 ++++ .../tests/wire_roundtrip.rs | 172 ++++++ 19 files changed, 2387 insertions(+), 9 deletions(-) create mode 100644 rust/cube/cubestore-cli/Cargo.toml create mode 100644 rust/cube/cubestore-cli/src/args.rs create mode 100644 rust/cube/cubestore-cli/src/bin/csql.rs create mode 100644 rust/cube/cubestore-cli/src/exec.rs create mode 100644 rust/cube/cubestore-cli/src/format.rs create mode 100644 rust/cube/cubestore-cli/src/lib.rs create mode 100644 rust/cube/cubestore-cli/src/repl.rs create mode 100644 rust/cube/cubestore-ws-transport/Cargo.toml create mode 100644 rust/cube/cubestore-ws-transport/src/actor.rs create mode 100644 rust/cube/cubestore-ws-transport/src/client.rs create mode 100644 rust/cube/cubestore-ws-transport/src/codec.rs create mode 100644 rust/cube/cubestore-ws-transport/src/error.rs create mode 100644 rust/cube/cubestore-ws-transport/src/lib.rs create mode 100644 rust/cube/cubestore-ws-transport/src/result.rs create mode 100644 rust/cube/cubestore-ws-transport/tests/mock_server.rs create mode 100644 rust/cube/cubestore-ws-transport/tests/reconnect.rs create mode 100644 rust/cube/cubestore-ws-transport/tests/wire_roundtrip.rs diff --git a/rust/cube/Cargo.lock b/rust/cube/Cargo.lock index bb6f02a56e8cb..0d3a298152a91 100644 --- a/rust/cube/Cargo.lock +++ b/rust/cube/Cargo.lock @@ -45,12 +45,56 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" +[[package]] +name = "anstream" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + [[package]] name = "anstyle" version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000" +[[package]] +name = "anstyle-parse" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52ce7f38b242319f7cabaa6813055467063ecdc9d355bbb4ce0c68908cd8130e" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" +dependencies = [ + "windows-sys 0.61.2", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys 0.61.2", +] + [[package]] name = "anyhow" version = "1.0.102" @@ -116,6 +160,15 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "block-buffer" version = "0.12.0" @@ -215,6 +268,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "cfg_aliases" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" + [[package]] name = "cfg_aliases" version = "0.2.1" @@ -228,7 +287,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.3.0", "rand_core 0.10.1", ] @@ -302,6 +361,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ddb117e43bbf7dacf0a4190fef4d345b9bad68dfc649cb349e7d17d28428e51" dependencies = [ "clap_builder", + "clap_derive", ] [[package]] @@ -310,8 +370,22 @@ version = "4.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" dependencies = [ + "anstream", "anstyle", "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2ce8604710f6733aa641a2b3731eaa1e8b3d9973d5e3565da11800813f997a9" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -320,12 +394,27 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" +[[package]] +name = "clipboard-win" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bde03770d3df201d4fb868f2c9c59e66a3e4e2bd06692a0fe701e7103c7e84d4" +dependencies = [ + "error-code", +] + [[package]] name = "cmov" version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f88a43d011fc4a6876cb7344703e297c71dda42494fee094d5f7c76bf13f746" +[[package]] +name = "colorchoice" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -386,6 +475,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + [[package]] name = "cpufeatures" version = "0.3.0" @@ -440,6 +538,16 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" +[[package]] +name = "crypto-common" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "crypto-common" version = "0.2.1" @@ -546,6 +654,42 @@ dependencies = [ "typed-builder", ] +[[package]] +name = "cubestore-cli" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "cubestore-ws-transport", + "dirs", + "env_logger", + "log", + "rpassword", + "rustyline", + "tokio", + "url", +] + +[[package]] +name = "cubestore-ws-transport" +version = "0.1.0" +dependencies = [ + "base64 0.22.1", + "bytes", + "cubeshared", + "env_logger", + "flatbuffers", + "futures-util", + "http", + "log", + "percent-encoding", + "thiserror 1.0.69", + "tokio", + "tokio-tungstenite", + "url", + "uuid 1.23.1", +] + [[package]] name = "darling" version = "0.23.0" @@ -580,6 +724,12 @@ dependencies = [ "syn", ] +[[package]] +name = "data-encoding" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4ae5f15dda3c708c0ade84bfee31ccab44a3da4f88015ed22f63732abe300c8" + [[package]] name = "deranged" version = "0.5.8" @@ -590,18 +740,49 @@ dependencies = [ "serde_core", ] +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer 0.10.4", + "crypto-common 0.1.7", +] + [[package]] name = "digest" version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4850db49bf08e663084f7fb5c87d202ef91a3907271aff24a94eb97ff039153c" dependencies = [ - "block-buffer", + "block-buffer 0.12.0", "const-oid", - "crypto-common", + "crypto-common 0.2.1", "ctutils", ] +[[package]] +name = "dirs" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" +dependencies = [ + "libc", + "option-ext", + "redox_users", + "windows-sys 0.48.0", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -642,6 +823,35 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0" +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + +[[package]] +name = "env_filter" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32e90c2accc4b07a8456ea0debdc2e7587bdd890680d71173a15d4ae604f6eef" +dependencies = [ + "log", + "regex", +] + +[[package]] +name = "env_logger" +version = "0.11.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0621c04f2196ac3f488dd583365b9c09be011a4ab8b9f37248ffcc8f6198b56a" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "jiff", + "log", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -658,6 +868,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "error-code" +version = "3.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dea2df4cf52843e0452895c455a1a2cfbb842a1e7329671acf418fdc53ed4c59" + [[package]] name = "etcetera" version = "0.8.0" @@ -702,6 +918,17 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" +[[package]] +name = "fd-lock" +version = "4.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce92ff622d6dadf7349484f42c93271a0d49b7cc4d466a936405bacbe10aa78" +dependencies = [ + "cfg-if", + "rustix", + "windows-sys 0.59.0", +] + [[package]] name = "filetime" version = "0.2.27" @@ -741,6 +968,21 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -838,6 +1080,16 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.17" @@ -929,7 +1181,7 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6303bc9732ae41b04cb554b844a762b4115a61bfaa81e3e83050991eeb56863f" dependencies = [ - "digest", + "digest 0.11.2", ] [[package]] @@ -1284,6 +1536,12 @@ dependencies = [ "serde", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + [[package]] name = "itertools" version = "0.10.5" @@ -1308,6 +1566,30 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" +[[package]] +name = "jiff" +version = "0.2.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f00b5dbd620d61dfdcb6007c9c1f6054ebd75319f163d886a9055cec1155073d" +dependencies = [ + "jiff-static", + "log", + "portable-atomic", + "portable-atomic-util", + "serde_core", +] + +[[package]] +name = "jiff-static" +version = "0.2.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e000de030ff8022ea1da3f466fbb0f3a809f5e51ed31f6dd931c35181ad8e6d7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "js-sys" version = "0.3.97" @@ -1420,7 +1702,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69b6441f590336821bb897fb28fc622898ccceb1d6cea3fde5ea86b090c4de98" dependencies = [ "cfg-if", - "digest", + "digest 0.11.2", ] [[package]] @@ -1474,6 +1756,23 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "native-tls" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "465500e14ea162429d264d44189adc38b199b62b1c21eea9f69e4b73cb03bbf2" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nativebridge" version = "0.1.0" @@ -1515,6 +1814,27 @@ dependencies = [ "syn", ] +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + +[[package]] +name = "nix" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" +dependencies = [ + "bitflags 2.11.1", + "cfg-if", + "cfg_aliases 0.1.1", + "libc", +] + [[package]] name = "num-conv" version = "0.2.1" @@ -1554,18 +1874,67 @@ version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + [[package]] name = "oorandom" version = "11.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" +[[package]] +name = "openssl" +version = "0.10.80" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a45fa2aa886c42762255da344f0a0d313e254066c46aad76f300c3d3da62d967" +dependencies = [ + "bitflags 2.11.1", + "cfg-if", + "foreign-types", + "libc", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "openssl-probe" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" +[[package]] +name = "openssl-sys" +version = "0.9.116" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28a22dc7140cda5f096e5e7724a6962ca81a7f8bfd2979f9b18c11af56318c4" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + [[package]] name = "page_size" version = "0.6.0" @@ -1718,12 +2087,33 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" +[[package]] +name = "pkg-config" +version = "0.3.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" + [[package]] name = "plain" version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" +[[package]] +name = "portable-atomic" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" + +[[package]] +name = "portable-atomic-util" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a106d1259c23fac8e543272398ae0e3c0b8d33c88ed73d0cc71b0f1d902618" +dependencies = [ + "portable-atomic", +] + [[package]] name = "postgres-protocol" version = "0.6.11" @@ -1803,7 +2193,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" dependencies = [ "bytes", - "cfg_aliases", + "cfg_aliases 0.2.1", "pin-project-lite", "quinn-proto", "quinn-udp", @@ -1843,7 +2233,7 @@ version = "0.5.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" dependencies = [ - "cfg_aliases", + "cfg_aliases 0.2.1", "libc", "once_cell", "socket2", @@ -1872,6 +2262,16 @@ version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.8.6" @@ -1960,6 +2360,17 @@ dependencies = [ "bitflags 2.11.1", ] +[[package]] +name = "redox_users" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43" +dependencies = [ + "getrandom 0.2.17", + "libredox", + "thiserror 1.0.69", +] + [[package]] name = "ref-cast" version = "1.0.25" @@ -2078,6 +2489,27 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rpassword" +version = "7.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ac5b223d9738ef56e0b98305410be40fa0941bf6036c56f1506751e43552d64" +dependencies = [ + "libc", + "rtoolbox", + "windows-sys 0.61.2", +] + +[[package]] +name = "rtoolbox" +version = "0.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50a0e551c1e27e1731aba276dbeaeac73f53c7cd34d1bda485d02bd1e0f36844" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "rustc-hash" version = "2.1.2" @@ -2168,6 +2600,28 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" +[[package]] +name = "rustyline" +version = "14.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7803e8936da37efd9b6d4478277f4b2b9bb5cdb37a113e8d63222e58da647e63" +dependencies = [ + "bitflags 2.11.1", + "cfg-if", + "clipboard-win", + "fd-lock", + "home", + "libc", + "log", + "memchr", + "nix", + "radix_trie", + "unicode-segmentation", + "unicode-width", + "utf8parse", + "windows-sys 0.52.0", +] + [[package]] name = "ryu" version = "1.0.23" @@ -2373,6 +2827,17 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures 0.2.17", + "digest 0.10.7", +] + [[package]] name = "sha2" version = "0.11.0" @@ -2380,8 +2845,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "446ba717509524cb3f22f17ecc096f10f4822d76ab5c0b9822c5f9c284e825f4" dependencies = [ "cfg-if", - "cpufeatures", - "digest", + "cpufeatures 0.3.0", + "digest 0.11.2", ] [[package]] @@ -2703,6 +3168,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-postgres" version = "0.7.17" @@ -2765,6 +3240,20 @@ dependencies = [ "xattr", ] +[[package]] +name = "tokio-tungstenite" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f72a05e828585856dacd553fba484c242c46e391fb0e58917c942ee9202915c" +dependencies = [ + "futures-util", + "log", + "native-tls", + "tokio", + "tokio-native-tls", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -2848,6 +3337,23 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c01152af293afb9c7c2a57e4b559c5620b421f6d133261c60dd2d0cdb38e6b8" +dependencies = [ + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "native-tls", + "rand 0.9.4", + "sha1", + "thiserror 2.0.18", +] + [[package]] name = "typed-builder" version = "0.21.2" @@ -2913,6 +3419,12 @@ version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9629274872b2bfaf8d66f5f15725007f635594914870f65218920345aa11aa8c" +[[package]] +name = "unicode-width" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" + [[package]] name = "unicode-xid" version = "0.2.6" @@ -2950,6 +3462,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "uuid" version = "0.8.2" @@ -2970,6 +3488,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "walkdir" version = "2.5.0" @@ -3270,6 +3800,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.60.2" diff --git a/rust/cube/Cargo.toml b/rust/cube/Cargo.toml index b80ee191b6b79..255d66ba7dbc5 100644 --- a/rust/cube/Cargo.toml +++ b/rust/cube/Cargo.toml @@ -6,4 +6,6 @@ members = [ "cubeorchestrator", "cubesqlplanner/cubesqlplanner", "cubesqlplanner/nativebridge", + "cubestore-ws-transport", + "cubestore-cli", ] diff --git a/rust/cube/cubestore-cli/Cargo.toml b/rust/cube/cubestore-cli/Cargo.toml new file mode 100644 index 0000000000000..c1013b03935e5 --- /dev/null +++ b/rust/cube/cubestore-cli/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "cubestore-cli" +version = "0.1.0" +edition = "2021" +description = "Interactive CLI for CubeStore over its native WebSocket protocol." + +[lib] +path = "src/lib.rs" + +[[bin]] +name = "csql" +path = "src/bin/csql.rs" + +[dependencies] +cubestore-ws-transport = { path = "../cubestore-ws-transport" } +clap = { version = "4", features = ["derive", "env"] } +tokio = { version = "1", features = ["full"] } +rustyline = "14" +anyhow = "1" +url = "2" +rpassword = "7" +dirs = "5" +env_logger = "0.11" +log = "0.4" diff --git a/rust/cube/cubestore-cli/src/args.rs b/rust/cube/cubestore-cli/src/args.rs new file mode 100644 index 0000000000000..93995e8ce458c --- /dev/null +++ b/rust/cube/cubestore-cli/src/args.rs @@ -0,0 +1,32 @@ +use std::path::PathBuf; + +use clap::Parser; + +#[derive(Debug, Parser)] +#[command(name = "cubestore-cli", version, about, long_about = None)] +pub struct Cli { + /// Full WebSocket URL (ws://host:port or wss://host:port). Takes precedence over --host/--port. + #[arg(long, env = "CUBESTORE_URL")] + pub url: Option, + + #[arg(long, env = "CUBESTORE_HOST", default_value = "127.0.0.1")] + pub host: String, + + #[arg(long, env = "CUBESTORE_PORT", default_value_t = 3030)] + pub port: u16, + + #[arg(long, env = "CUBESTORE_USER")] + pub user: Option, + + /// Pass `-` to read from a TTY prompt. + #[arg(long, env = "CUBESTORE_PASSWORD")] + pub password: Option, + + /// Execute a single SQL statement and exit. + #[arg(short = 'c', long = "command")] + pub command: Option, + + /// Execute SQL statements from a file and exit. + #[arg(short = 'f', long = "file")] + pub file: Option, +} diff --git a/rust/cube/cubestore-cli/src/bin/csql.rs b/rust/cube/cubestore-cli/src/bin/csql.rs new file mode 100644 index 0000000000000..b7280bf7ee034 --- /dev/null +++ b/rust/cube/cubestore-cli/src/bin/csql.rs @@ -0,0 +1,86 @@ +use std::io::{IsTerminal, Read}; + +use anyhow::{Context, Result}; +use clap::Parser; +use cubestore_cli::args::Cli; +use cubestore_cli::{exec, repl}; +use cubestore_ws_transport::{Client, ClientConfig}; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<()> { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn")).init(); + let cli = Cli::parse(); + + let url = build_url(&cli)?; + let mut cfg = ClientConfig::new(url); + // CLI flags override credentials embedded in the URL, but only when they + // are actually provided — otherwise we'd clobber the URL-side creds with + // None. + if cli.user.is_some() { + cfg.username = cli.user.clone(); + } + if let Some(pass) = resolve_password(&cli)? { + cfg.password = Some(pass); + } + + let client = Client::connect(cfg) + .await + .context("failed to connect to cubestore")?; + + // Dispatch: + // -c → run single statement, exit + // -f → run file as script, exit + // stdin not a TTY → read all stdin as script, exit + // otherwise → REPL + let result = if let Some(sql) = cli.command.as_deref() { + let ok = exec::run_script(&client, sql, true).await?; + Ok::(ok) + } else if let Some(path) = cli.file.as_deref() { + let script = std::fs::read_to_string(path) + .with_context(|| format!("failed to read {}", path.display()))?; + let ok = exec::run_script(&client, &script, true).await?; + Ok(ok) + } else if !std::io::stdin().is_terminal() { + let mut script = String::new(); + std::io::stdin().read_to_string(&mut script)?; + let ok = exec::run_script(&client, &script, true).await?; + Ok(ok) + } else { + print_banner(&client); + repl::run(&client, true).await?; + Ok(true) + }; + + client.close(); + if !result? { + std::process::exit(1); + } + + Ok(()) +} + +fn build_url(cli: &Cli) -> Result { + if let Some(raw) = cli.url.as_deref() { + return url::Url::parse(raw).with_context(|| format!("invalid --url: {raw}")); + } + + let raw = format!("ws://{}:{}", cli.host, cli.port); + url::Url::parse(&raw).with_context(|| format!("invalid host/port: {raw}")) +} + +fn resolve_password(cli: &Cli) -> Result> { + match cli.password.as_deref() { + Some("-") => { + let pw = rpassword::prompt_password("Password: ")?; + Ok(Some(pw)) + } + Some(p) => Ok(Some(p.to_string())), + None => Ok(None), + } +} + +fn print_banner(client: &Client) { + let ver = client.server_version().unwrap_or("unknown"); + println!("csql {} (server: {})", env!("CARGO_PKG_VERSION"), ver); + println!("Type \"\\h\" for help, \"\\q\" to quit."); +} diff --git a/rust/cube/cubestore-cli/src/exec.rs b/rust/cube/cubestore-cli/src/exec.rs new file mode 100644 index 0000000000000..63c21d634f66f --- /dev/null +++ b/rust/cube/cubestore-cli/src/exec.rs @@ -0,0 +1,117 @@ +use std::time::Instant; + +use anyhow::Result; +use cubestore_ws_transport::Client; + +use crate::format; + +/// Run a single statement and print the result. Returns Ok even on a query error, +/// matching `psql` behavior in script mode (errors print but don't kill the process) +/// — caller decides whether to bail. +pub async fn run_one(client: &Client, sql: &str, show_timing: bool) -> Result { + let started = Instant::now(); + match client.query(sql).await { + Ok(result) => { + let elapsed_ms = started.elapsed().as_millis(); + let footer = make_footer(&result, show_timing, elapsed_ms); + // `\x1b[?7h` = DECAWM on. rustyline may turn autowrap off for its own + // cursor accounting and not restore it; without this, long table rows + // get clipped at the right edge of the terminal instead of wrapping. + print!("\x1b[?7h"); + match format::render_table(&result) { + Some(table) => println!("{table}\n{footer}"), + None => println!("{footer}"), + } + Ok(true) + } + Err(e) => { + print!("\x1b[?7h"); + eprintln!("ERROR: {e}"); + Ok(false) + } + } +} + +fn make_footer( + result: &cubestore_ws_transport::QueryResult, + show_timing: bool, + elapsed_ms: u128, +) -> String { + let has_table = !result.columns.is_empty(); + let n = result.rows.len(); + let rows_part = if has_table { + format!("{n} {}", if n == 1 { "row" } else { "rows" }) + } else { + "OK".to_string() + }; + if show_timing { + format!("({rows_part}, Time: {elapsed_ms} ms)") + } else { + format!("({rows_part})") + } +} + +/// Split a script into SQL statements on `;` boundaries that are outside string +/// literals. Empty statements are skipped. Same level of rigor as `psql -f`. +pub fn split_statements(input: &str) -> Vec { + let mut out: Vec = Vec::new(); + let mut buf = String::new(); + let mut in_single = false; + let mut in_double = false; + let mut chars = input.chars().peekable(); + + while let Some(c) = chars.next() { + match c { + // Inside a single-quoted string, '' is an escaped quote — stay inside. + '\'' if in_single => { + buf.push(c); + if chars.peek() == Some(&'\'') { + buf.push(chars.next().unwrap()); + } else { + in_single = false; + } + } + '\'' if !in_double => { + buf.push(c); + in_single = true; + } + // Same rule for double-quoted identifiers: "" is an escaped ". + '"' if in_double => { + buf.push(c); + if chars.peek() == Some(&'"') { + buf.push(chars.next().unwrap()); + } else { + in_double = false; + } + } + '"' if !in_single => { + buf.push(c); + in_double = true; + } + ';' if !in_single && !in_double => { + let trimmed = buf.trim().to_string(); + if !trimmed.is_empty() { + out.push(trimmed); + } + buf.clear(); + } + _ => buf.push(c), + } + } + + let trailing = buf.trim(); + if !trailing.is_empty() { + out.push(trailing.to_string()); + } + out +} + +pub async fn run_script(client: &Client, script: &str, show_timing: bool) -> Result { + let stmts = split_statements(script); + let mut all_ok = true; + for stmt in stmts { + let ok = run_one(client, &stmt, show_timing).await?; + all_ok &= ok; + } + Ok(all_ok) +} diff --git a/rust/cube/cubestore-cli/src/format.rs b/rust/cube/cubestore-cli/src/format.rs new file mode 100644 index 0000000000000..ac97a88f018ed --- /dev/null +++ b/rust/cube/cubestore-cli/src/format.rs @@ -0,0 +1,150 @@ +use cubestore_ws_transport::QueryResult; + +const NULL_RENDER: &str = "NULL"; + +/// psql-style aligned text table. Returns `None` for empty (DDL/INSERT) results. +pub fn render_table(result: &QueryResult) -> Option { + if result.columns.is_empty() { + return None; + } + + let ncols = result.columns.len(); + let mut widths: Vec = result.columns.iter().map(|c| c.chars().count()).collect(); + for row in &result.rows { + for (i, cell) in row.iter().enumerate() { + if i >= ncols { + break; + } + let len = match cell.as_deref() { + Some(s) => s.chars().count(), + None => NULL_RENDER.chars().count(), + }; + if len > widths[i] { + widths[i] = len; + } + } + } + + let mut out = String::new(); + + // Header + for (i, name) in result.columns.iter().enumerate() { + if i > 0 { + out.push('|'); + } + out.push(' '); + push_center(&mut out, name, widths[i]); + out.push(' '); + } + out.push('\n'); + + // Separator + for (i, w) in widths.iter().enumerate() { + if i > 0 { + out.push('+'); + } + for _ in 0..(w + 2) { + out.push('-'); + } + } + + // Data rows + for row in &result.rows { + out.push('\n'); + for (i, &width) in widths.iter().enumerate() { + if i > 0 { + out.push('|'); + } + out.push(' '); + let cell = match row.get(i) { + Some(Some(s)) => s.as_str(), + Some(None) | None => NULL_RENDER, + }; + push_left(&mut out, cell, width); + out.push(' '); + } + } + + Some(out) +} + +fn push_left(out: &mut String, s: &str, width: usize) { + out.push_str(s); + let pad = width.saturating_sub(s.chars().count()); + for _ in 0..pad { + out.push(' '); + } +} + +fn push_center(out: &mut String, s: &str, width: usize) { + let len = s.chars().count(); + if len >= width { + out.push_str(s); + return; + } + let total = width - len; + let left = total / 2; + let right = total - left; + for _ in 0..left { + out.push(' '); + } + out.push_str(s); + for _ in 0..right { + out.push(' '); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn renders_all_rows_without_truncation() { + let result = QueryResult { + columns: vec!["table_catalog".to_string(), "table_schema".to_string()], + rows: vec![ + vec![ + Some("ovr".to_string()), + Some("information_schema".to_string()), + ], + vec![ + Some("ovr".to_string()), + Some("information_schema".to_string()), + ], + vec![ + Some("ovr".to_string()), + Some("information_schema".to_string()), + ], + vec![Some("ovr".to_string()), Some("public".to_string())], + vec![Some("ovr".to_string()), Some("public".to_string())], + ], + }; + let out = render_table(&result).expect("table"); + + // header line + separator + 5 data rows = 7 lines + assert_eq!(out.lines().count(), 7); + + // every data cell present + assert_eq!(out.matches("information_schema").count(), 3); + assert_eq!(out.matches("public").count(), 2); + assert_eq!(out.matches(" ovr").count(), 5); + + // long-value column width is honored (header centered to data width 18) + let header = out.lines().next().unwrap(); + assert!( + header.contains(" table_schema "), + "header centered to data width: {header:?}" + ); + } + + #[test] + fn preserves_long_cell_values() { + let long = "x".repeat(500); + let result = QueryResult { + columns: vec!["v".to_string()], + rows: vec![vec![Some(long.clone())]], + }; + let out = render_table(&result).expect("table"); + assert!(out.contains(&long), "long value preserved in output"); + } +} diff --git a/rust/cube/cubestore-cli/src/lib.rs b/rust/cube/cubestore-cli/src/lib.rs new file mode 100644 index 0000000000000..40ee4ca02a594 --- /dev/null +++ b/rust/cube/cubestore-cli/src/lib.rs @@ -0,0 +1,4 @@ +pub mod args; +pub mod exec; +pub mod format; +pub mod repl; diff --git a/rust/cube/cubestore-cli/src/repl.rs b/rust/cube/cubestore-cli/src/repl.rs new file mode 100644 index 0000000000000..2f578a23e9b8f --- /dev/null +++ b/rust/cube/cubestore-cli/src/repl.rs @@ -0,0 +1,134 @@ +use anyhow::Result; +use cubestore_ws_transport::Client; +use rustyline::error::ReadlineError; +use rustyline::history::DefaultHistory; +use rustyline::Editor; + +use crate::exec; + +const HELP: &str = "\ +Meta-commands: + \\q, \\quit quit the REPL + \\h, \\? this help + \\timing toggle elapsed-time display + +Statements are terminated with `;`."; + +pub async fn run(client: &Client, mut timing: bool) -> Result<()> { + let history_path = dirs::home_dir().map(|p| p.join(".cubestore_history")); + + let mut rl: Editor<(), DefaultHistory> = Editor::new()?; + if let Some(ref p) = history_path { + let _ = rl.load_history(p); + } + + let mut buffer = String::new(); + + loop { + let prompt = if buffer.is_empty() { + "cubestore=> " + } else { + "cubestore-> " + }; + + let line = match rl.readline(prompt) { + Ok(line) => line, + Err(ReadlineError::Interrupted) => { + // Ctrl-C: clear current buffer, continue. + buffer.clear(); + continue; + } + Err(ReadlineError::Eof) => { + // Ctrl-D at empty buffer: exit. + if buffer.is_empty() { + break; + } + buffer.clear(); + continue; + } + Err(e) => return Err(e.into()), + }; + + let trimmed = line.trim(); + + // Meta commands only apply on an empty buffer. + if buffer.is_empty() && trimmed.starts_with('\\') { + let _ = rl.add_history_entry(line.as_str()); + match trimmed { + "\\q" | "\\quit" => break, + "\\h" | "\\?" => { + println!("{HELP}"); + } + "\\timing" => { + timing = !timing; + println!("Timing is {}", if timing { "on" } else { "off" }); + } + other => { + eprintln!("Unknown meta-command: {other}"); + } + } + continue; + } + + if !buffer.is_empty() { + buffer.push('\n'); + } + buffer.push_str(&line); + + // Submit when the accumulated buffer ends with `;` outside any string literal. + if statement_complete(&buffer) { + let stmt = buffer.trim().trim_end_matches(';').trim().to_string(); + let _ = rl.add_history_entry(buffer.trim_end().to_string()); + buffer.clear(); + if stmt.is_empty() { + continue; + } + exec::run_one(client, &stmt, timing).await?; + } + } + + if let Some(ref p) = history_path { + let _ = rl.save_history(p); + } + + // Make sure autowrap is on when we hand the terminal back. + print!("\x1b[?7h"); + + Ok(()) +} + +fn statement_complete(s: &str) -> bool { + let mut in_single = false; + let mut in_double = false; + let mut last_non_ws: Option = None; + let mut chars = s.chars().peekable(); + + while let Some(c) = chars.next() { + match c { + // Inside a single-quoted string, '' is an escaped quote — stay inside. + '\'' if in_single => { + if chars.peek() == Some(&'\'') { + chars.next(); + } else { + in_single = false; + } + } + '\'' if !in_double => in_single = true, + // Same rule for double-quoted identifiers: "" is an escaped ". + '"' if in_double => { + if chars.peek() == Some(&'"') { + chars.next(); + } else { + in_double = false; + } + } + '"' if !in_single => in_double = true, + _ => {} + } + if !c.is_whitespace() { + last_non_ws = Some(c); + } + } + + !in_single && !in_double && last_non_ws == Some(';') +} diff --git a/rust/cube/cubestore-ws-transport/Cargo.toml b/rust/cube/cubestore-ws-transport/Cargo.toml new file mode 100644 index 0000000000000..9cae852b514e9 --- /dev/null +++ b/rust/cube/cubestore-ws-transport/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "cubestore-ws-transport" +version = "0.1.0" +edition = "2021" +description = "Async WebSocket client for CubeStore's binary FlatBuffers protocol." + +[dependencies] +cubeshared = { path = "../cubeshared" } +flatbuffers = "25.12.19" +tokio = { version = "1", features = ["full"] } +tokio-tungstenite = { version = "0.29", features = ["native-tls"] } +futures-util = "0.3" +bytes = "1" +url = "2" +percent-encoding = "2" +base64 = "0.22" +thiserror = "1" +log = "0.4" +uuid = { version = "1", features = ["v4"] } +http = "1" + +[dev-dependencies] +tokio = { version = "1", features = ["full", "test-util"] } +env_logger = "0.11" diff --git a/rust/cube/cubestore-ws-transport/src/actor.rs b/rust/cube/cubestore-ws-transport/src/actor.rs new file mode 100644 index 0000000000000..5dd0c025af2da --- /dev/null +++ b/rust/cube/cubestore-ws-transport/src/actor.rs @@ -0,0 +1,336 @@ +use std::collections::HashMap; +use std::time::{Duration, Instant}; + +use base64::Engine as _; +use bytes::Bytes; +use futures_util::{SinkExt, StreamExt}; +use http::{HeaderValue, Request}; +use tokio::net::TcpStream; +use tokio::sync::{mpsc, oneshot}; +use tokio::time::MissedTickBehavior; +use tokio_tungstenite::tungstenite::protocol::{Message, WebSocketConfig}; +use tokio_tungstenite::{connect_async_with_config, MaybeTlsStream, WebSocketStream}; + +use crate::client::ClientConfig; +use crate::codec::{decode_frame, encode_query, DecodedResponse}; +use crate::error::TransportError; +use crate::result::QueryResult; + +pub(crate) type WsStream = WebSocketStream>; + +pub(crate) enum ActorRequest { + Query { + sql: String, + reply: oneshot::Sender>, + }, + Close, +} + +struct PendingQuery { + reply: oneshot::Sender>, + buffer: Bytes, +} + +pub(crate) struct Actor { + cfg: ClientConfig, + connection_id: String, + process_id: String, + next_msg_id: u32, + pending: HashMap, + pending_resend: Vec, + inbox: mpsc::UnboundedReceiver, + ws: Option, + last_pong: Instant, +} + +impl Actor { + pub(crate) fn new( + cfg: ClientConfig, + connection_id: String, + process_id: String, + inbox: mpsc::UnboundedReceiver, + ws: WsStream, + ) -> Self { + Self { + cfg, + connection_id, + process_id, + next_msg_id: 1, + pending: HashMap::new(), + pending_resend: Vec::new(), + inbox, + ws: Some(ws), + last_pong: Instant::now(), + } + } + + pub(crate) async fn run(mut self) { + 'outer: loop { + let Some(ws) = self.ws.take() else { + break; + }; + + let (mut sink, mut stream) = ws.split(); + self.last_pong = Instant::now(); + + // After a reconnect: flush any unanswered buffers with their original message ids. + // Matches WebSocketConnection.ts:128-143. + let resend: Vec = std::mem::take(&mut self.pending_resend); + for buf in resend { + if let Err(e) = sink.send(Message::Binary(buf)).await { + log::warn!("resend after reconnect failed: {e}"); + self.requeue_pending_for_resend(); + if !self.attempt_reconnect().await { + break 'outer; + } + continue 'outer; + } + } + + let mut ping_interval = tokio::time::interval(self.cfg.ping_interval); + ping_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + // Burn the immediate first tick so we don't ping the instant we connect. + ping_interval.tick().await; + + let disconnected = loop { + tokio::select! { + biased; + + maybe_req = self.inbox.recv() => { + match maybe_req { + None | Some(ActorRequest::Close) => { + let _ = sink.send(Message::Close(None)).await; + self.fail_all_pending(TransportError::Closed); + return; + } + Some(ActorRequest::Query { sql, reply }) => { + let msg_id = self.next_msg_id; + self.next_msg_id = self.next_msg_id.wrapping_add(1).max(1); + let buf = encode_query(msg_id, &self.connection_id, &sql); + self.pending.insert(msg_id, PendingQuery { reply, buffer: buf.clone() }); + if let Err(e) = sink.send(Message::Binary(buf)).await { + log::warn!("send failed, will reconnect: {e}"); + break true; + } + } + } + } + + maybe_msg = stream.next() => { + match maybe_msg { + None => { + log::warn!("websocket stream ended"); + break true; + } + Some(Err(e)) => { + log::warn!("websocket error: {e}"); + break true; + } + Some(Ok(Message::Binary(bytes))) => { + match decode_frame(&bytes) { + Ok(frame) => { + if let Some(pending) = self.pending.remove(&frame.message_id) { + let result = match frame.response { + DecodedResponse::Ok(qr) => Ok(qr), + DecodedResponse::Error(msg) => Err(TransportError::Query(msg)), + }; + let _ = pending.reply.send(result); + } else { + log::warn!("unsolicited message id {}", frame.message_id); + } + } + Err(e) => { + log::warn!("frame decode error: {e}"); + } + } + } + Some(Ok(Message::Pong(_))) => { + self.last_pong = Instant::now(); + } + Some(Ok(Message::Ping(payload))) => { + if let Err(e) = sink.send(Message::Pong(payload)).await { + log::warn!("pong send failed: {e}"); + break true; + } + } + Some(Ok(Message::Close(_))) => { + log::info!("websocket closed by server"); + break true; + } + Some(Ok(_)) => { + // Frame / text — ignore. + } + } + } + + _ = ping_interval.tick() => { + if self.last_pong.elapsed() > self.cfg.no_heartbeat_timeout { + log::warn!("heartbeat timeout — reconnecting"); + break true; + } + if let Err(e) = sink.send(Message::Ping(Bytes::new())).await { + log::warn!("ping send failed: {e}"); + break true; + } + } + } + }; + + drop(sink); + drop(stream); + + if !disconnected { + break; + } + + self.requeue_pending_for_resend(); + if !self.attempt_reconnect().await { + break; + } + } + + self.fail_all_pending(TransportError::Disconnected); + } + + fn requeue_pending_for_resend(&mut self) { + self.pending_resend.clear(); + self.pending_resend + .extend(self.pending.values().map(|p| p.buffer.clone())); + } + + fn fail_all_pending(&mut self, err: TransportError) { + let pending = std::mem::take(&mut self.pending); + for (_, p) in pending { + // err is not Clone — synthesize a fresh equivalent for each. + let e = match &err { + TransportError::Disconnected => TransportError::Disconnected, + TransportError::Closed => TransportError::Closed, + other => TransportError::Protocol(other.to_string()), + }; + let _ = p.reply.send(Err(e)); + } + } + + async fn attempt_reconnect(&mut self) -> bool { + for attempt in 0..self.cfg.max_connect_retries { + let wait = Duration::from_millis((attempt as u64 + 1) * 1000); + tokio::time::sleep(wait).await; + match connect_ws(&self.cfg, &self.process_id).await { + Ok((ws, _version)) => { + self.ws = Some(ws); + return true; + } + Err(e) => { + log::warn!( + "reconnect attempt {}/{} failed: {}", + attempt + 1, + self.cfg.max_connect_retries, + e + ); + } + } + } + false + } +} + +/// Build an HTTP upgrade request, perform a WebSocket handshake to cubestore, and +/// return both the stream and the server's `X-CubeStore-Version` header. +pub(crate) async fn connect_ws( + cfg: &ClientConfig, + process_id: &str, +) -> Result<(WsStream, Option), TransportError> { + let ws_url = build_ws_url(&cfg.url)?; + + let mut builder = Request::builder().method("GET").uri(ws_url.as_str()); + + // Required WebSocket headers (tokio-tungstenite injects most but be explicit for clarity). + builder = builder + .header( + "Host", + host_header(&cfg.url).ok_or_else(|| { + TransportError::InvalidUrl(format!("missing host in url: {}", cfg.url)) + })?, + ) + .header("Connection", "Upgrade") + .header("Upgrade", "websocket") + .header("Sec-WebSocket-Version", "13") + .header( + "Sec-WebSocket-Key", + tokio_tungstenite::tungstenite::handshake::client::generate_key(), + ); + + // RFC 7617 allows either side of `user:pass` to be empty (token-style auth + // is commonly sent as `:`), so emit the header whenever any credential + // is configured rather than silently dropping it when one side is missing. + if cfg.username.is_some() || cfg.password.is_some() { + let user = cfg.username.as_deref().unwrap_or(""); + let pass = cfg.password.as_deref().unwrap_or(""); + + let token = base64::engine::general_purpose::STANDARD.encode(format!("{user}:{pass}")); + let value = HeaderValue::from_str(&format!("Basic {token}")) + .map_err(|e| TransportError::Auth(e.to_string()))?; + builder = builder.header("Authorization", value); + } + + let truncated: String = process_id.chars().take(64).collect(); + let value = HeaderValue::from_str(&truncated) + .map_err(|e| TransportError::Auth(format!("x-process-id: {e}")))?; + builder = builder.header("x-process-id", value); + + let request = builder + .body(()) + .map_err(|e| TransportError::InvalidUrl(e.to_string()))?; + + // Match cubestore's transport caps (default 64MiB message / 32MiB frame; the + // server can be configured up to 256MiB). The tungstenite default of 16MiB + // per frame is too tight for large query results. + let ws_config = WebSocketConfig::default() + .max_message_size(Some(256 << 20)) + .max_frame_size(Some(256 << 20)); + let connect_future = connect_async_with_config(request, Some(ws_config), false); + let (ws, response) = tokio::time::timeout(cfg.connect_timeout, connect_future) + .await + .map_err(|_| { + TransportError::Connect(tokio_tungstenite::tungstenite::Error::Io( + std::io::Error::new(std::io::ErrorKind::TimedOut, "connect timeout"), + )) + })??; + + let version = response + .headers() + .get("X-CubeStore-Version") + .or_else(|| response.headers().get("x-cubestore-version")) + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); + + Ok((ws, version)) +} + +fn build_ws_url(base: &url::Url) -> Result { + let mut u = base.clone(); + let path = u.path(); + + // If the user already pointed at /ws (or another explicit path), keep it. + // Otherwise append /ws. + if path == "/" || path.is_empty() { + u.set_path("/ws"); + } + + let scheme = u.scheme(); + if scheme != "ws" && scheme != "wss" { + return Err(TransportError::InvalidUrl(format!( + "expected ws:// or wss://, got {scheme}://" + ))); + } + + Ok(u) +} + +fn host_header(url: &url::Url) -> Option { + let host = url.host_str()?; + match url.port() { + Some(p) => Some(format!("{host}:{p}")), + None => Some(host.to_string()), + } +} diff --git a/rust/cube/cubestore-ws-transport/src/client.rs b/rust/cube/cubestore-ws-transport/src/client.rs new file mode 100644 index 0000000000000..e31be02d595d5 --- /dev/null +++ b/rust/cube/cubestore-ws-transport/src/client.rs @@ -0,0 +1,153 @@ +use std::time::Duration; + +use tokio::sync::{mpsc, oneshot}; +use uuid::Uuid; + +use crate::actor::{connect_ws, Actor, ActorRequest}; +use crate::error::TransportError; +use crate::result::QueryResult; + +#[derive(Debug, Clone)] +pub struct ClientConfig { + pub url: url::Url, + pub username: Option, + pub password: Option, + pub ping_interval: Duration, + pub no_heartbeat_timeout: Duration, + pub max_connect_retries: u32, + pub connect_timeout: Duration, +} + +impl ClientConfig { + pub fn new(url: url::Url) -> Self { + // Peel credentials out of the URL (e.g. `wss://user:pass@host/path`) so + // we can put them in the Authorization header and keep the request-URI + // clean — most WS proxies reject userinfo in the request-line. + let (url, username, password) = extract_userinfo(url); + + Self { + url, + username, + password, + ping_interval: Duration::from_secs(5), + no_heartbeat_timeout: Duration::from_secs(30), + max_connect_retries: 20, + connect_timeout: Duration::from_secs(10), + } + } + + pub fn with_credentials(mut self, user: impl Into, pass: impl Into) -> Self { + self.username = Some(user.into()); + self.password = Some(pass.into()); + self + } +} + +fn extract_userinfo(mut url: url::Url) -> (url::Url, Option, Option) { + if url.username().is_empty() && url.password().is_none() { + return (url, None, None); + } + + let user = percent_decode(url.username()); + let pass = url.password().map(percent_decode); + + let _ = url.set_username(""); + let _ = url.set_password(None); + + (url, Some(user), pass) +} + +fn percent_decode(s: &str) -> String { + percent_encoding::percent_decode_str(s) + .decode_utf8_lossy() + .into_owned() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn extracts_userinfo_from_url() { + let url = url::Url::parse( + "wss://production-6321-9-4:fe84ac54de19e0509b27ba7cab63fc55f7156ddd3707eb5d1c53b6bbb3c2819c@cube-store-aws-eu-west-1.cubecloudapp.dev/staging/3/ws", + ) + .unwrap(); + let cfg = ClientConfig::new(url); + assert_eq!(cfg.username.as_deref(), Some("production-6321-9-4")); + assert_eq!( + cfg.password.as_deref(), + Some("fe84ac54de19e0509b27ba7cab63fc55f7156ddd3707eb5d1c53b6bbb3c2819c"), + ); + // Userinfo stripped, path preserved, scheme intact. + assert_eq!(cfg.url.username(), ""); + assert!(cfg.url.password().is_none()); + assert_eq!(cfg.url.scheme(), "wss"); + assert_eq!(cfg.url.path(), "/staging/3/ws"); + assert_eq!( + cfg.url.host_str(), + Some("cube-store-aws-eu-west-1.cubecloudapp.dev"), + ); + } + + #[test] + fn no_userinfo_leaves_url_untouched() { + let url = url::Url::parse("ws://127.0.0.1:3030/").unwrap(); + let cfg = ClientConfig::new(url.clone()); + assert!(cfg.username.is_none()); + assert!(cfg.password.is_none()); + assert_eq!(cfg.url, url); + } + + #[test] + fn percent_encoded_password_is_decoded() { + let url = url::Url::parse("wss://user:p%40ss@host/path").unwrap(); + let cfg = ClientConfig::new(url); + assert_eq!(cfg.password.as_deref(), Some("p@ss")); + } +} + +#[derive(Clone)] +pub struct Client { + tx: mpsc::UnboundedSender, + server_version: Option, +} + +impl Client { + pub async fn connect(cfg: ClientConfig) -> Result { + // Process id is auto-generated per Client and stays stable across reconnects. + // Matches the JS driver's `getProcessUid()` semantics. + let process_id = Uuid::new_v4().to_string(); + let (ws, version) = connect_ws(&cfg, &process_id).await?; + let connection_id = Uuid::new_v4().to_string(); + let (tx, rx) = mpsc::unbounded_channel(); + let actor = Actor::new(cfg, connection_id, process_id, rx, ws); + tokio::spawn(actor.run()); + Ok(Self { + tx, + server_version: version, + }) + } + + /// Server version captured from the `X-CubeStore-Version` upgrade response header. + pub fn server_version(&self) -> Option<&str> { + self.server_version.as_deref() + } + + /// Execute a SQL statement against cubestore and return the (Legacy-format) result. + pub async fn query(&self, sql: impl Into) -> Result { + let (reply_tx, reply_rx) = oneshot::channel(); + self.tx + .send(ActorRequest::Query { + sql: sql.into(), + reply: reply_tx, + }) + .map_err(|_| TransportError::Closed)?; + reply_rx.await.map_err(|_| TransportError::Closed)? + } + + /// Initiate a graceful shutdown of the background actor. + pub fn close(&self) { + let _ = self.tx.send(ActorRequest::Close); + } +} diff --git a/rust/cube/cubestore-ws-transport/src/codec.rs b/rust/cube/cubestore-ws-transport/src/codec.rs new file mode 100644 index 0000000000000..5ef4f12d82108 --- /dev/null +++ b/rust/cube/cubestore-ws-transport/src/codec.rs @@ -0,0 +1,116 @@ +use bytes::Bytes; +use cubeshared::codegen::{ + root_as_http_message, HttpCommand, HttpMessage, HttpMessageArgs, HttpQuery, HttpQueryArgs, + QueryResultFormat, +}; +use flatbuffers::FlatBufferBuilder; + +use crate::error::TransportError; +use crate::result::QueryResult; + +/// Build a binary FlatBuffer payload carrying an `HttpQuery` command. +pub fn encode_query(message_id: u32, connection_id: &str, sql: &str) -> Bytes { + let mut builder = FlatBufferBuilder::with_capacity(1024); + let query_offset = builder.create_string(sql); + + let http_query = HttpQuery::create( + &mut builder, + &HttpQueryArgs { + query: Some(query_offset), + trace_obj: None, + inline_tables: None, + parameters: None, + response_format: QueryResultFormat::Legacy, + }, + ); + + let connection_id_offset = builder.create_string(connection_id); + + let message = HttpMessage::create( + &mut builder, + &HttpMessageArgs { + message_id, + command_type: HttpCommand::HttpQuery, + command: Some(http_query.as_union_value()), + connection_id: Some(connection_id_offset), + }, + ); + builder.finish(message, None); + Bytes::copy_from_slice(builder.finished_data()) +} + +/// Decoded response from the server. +pub enum DecodedResponse { + Ok(QueryResult), + Error(String), +} + +pub struct DecodedFrame { + pub message_id: u32, + pub response: DecodedResponse, +} + +/// Parse an incoming binary frame, extracting message id and result/error. +pub fn decode_frame(bytes: &[u8]) -> Result { + let msg = root_as_http_message(bytes) + .map_err(|e| TransportError::Protocol(format!("flatbuffer decode: {e}")))?; + let message_id = msg.message_id(); + + let response = match msg.command_type() { + HttpCommand::HttpError => { + let err = msg.command_as_http_error().ok_or_else(|| { + TransportError::Protocol("HttpError union variant missing".into()) + })?; + DecodedResponse::Error(err.error().unwrap_or("unknown error").to_string()) + } + HttpCommand::HttpResultSet => { + let rs = msg.command_as_http_result_set().ok_or_else(|| { + TransportError::Protocol("HttpResultSet union variant missing".into()) + })?; + + let columns: Vec = rs + .columns() + .map(|cols| cols.iter().map(|s| s.to_string()).collect()) + .unwrap_or_default(); + + let mut rows: Vec>> = Vec::new(); + if let Some(row_vec) = rs.rows() { + rows.reserve(row_vec.len()); + for row in row_vec.iter() { + let mut out: Vec> = Vec::with_capacity(columns.len()); + if let Some(values) = row.values() { + for v in values.iter() { + out.push(v.string_value().map(|s| s.to_string())); + } + } + rows.push(out); + } + } + + log::debug!( + "decoded HttpResultSet: {} columns, {} rows", + columns.len(), + rows.len() + ); + DecodedResponse::Ok(QueryResult { columns, rows }) + } + HttpCommand::HttpQueryResult => { + // Arrow format is not supported in this transport version. + return Err(TransportError::Protocol( + "Arrow result format is not supported by this client (request Legacy format)" + .into(), + )); + } + other => { + return Err(TransportError::Protocol(format!( + "unexpected command variant: {:?}", + other.variant_name() + ))); + } + }; + + Ok(DecodedFrame { + message_id, + response, + }) +} diff --git a/rust/cube/cubestore-ws-transport/src/error.rs b/rust/cube/cubestore-ws-transport/src/error.rs new file mode 100644 index 0000000000000..0493253094a8c --- /dev/null +++ b/rust/cube/cubestore-ws-transport/src/error.rs @@ -0,0 +1,28 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum TransportError { + #[error("invalid url: {0}")] + InvalidUrl(String), + + #[error("auth header build error: {0}")] + Auth(String), + + #[error("connect failed: {0}")] + Connect(#[from] tokio_tungstenite::tungstenite::Error), + + #[error("query error: {0}")] + Query(String), + + #[error("protocol error: {0}")] + Protocol(String), + + #[error("disconnected")] + Disconnected, + + #[error("client closed")] + Closed, + + #[error("io error: {0}")] + Io(#[from] std::io::Error), +} diff --git a/rust/cube/cubestore-ws-transport/src/lib.rs b/rust/cube/cubestore-ws-transport/src/lib.rs new file mode 100644 index 0000000000000..5ab9057dea152 --- /dev/null +++ b/rust/cube/cubestore-ws-transport/src/lib.rs @@ -0,0 +1,17 @@ +//! Async Rust client for CubeStore's WebSocket protocol. +//! +//! The wire format is binary FlatBuffers (see `cubeshared::codegen::HttpMessage`). +//! This crate is a Rust port of the JavaScript driver in +//! `packages/cubejs-cubestore-driver/src/WebSocketConnection.ts` — request/response +//! correlation by `message_id`, 5-second heartbeats, exponential-backoff reconnect +//! with in-flight resend. + +mod actor; +mod client; +pub mod codec; +mod error; +mod result; + +pub use client::{Client, ClientConfig}; +pub use error::TransportError; +pub use result::QueryResult; diff --git a/rust/cube/cubestore-ws-transport/src/result.rs b/rust/cube/cubestore-ws-transport/src/result.rs new file mode 100644 index 0000000000000..43c2bf2779e56 --- /dev/null +++ b/rust/cube/cubestore-ws-transport/src/result.rs @@ -0,0 +1,15 @@ +#[derive(Debug, Clone)] +pub struct QueryResult { + pub columns: Vec, + pub rows: Vec>>, +} + +impl QueryResult { + pub fn is_empty(&self) -> bool { + self.rows.is_empty() + } + + pub fn row_count(&self) -> usize { + self.rows.len() + } +} diff --git a/rust/cube/cubestore-ws-transport/tests/mock_server.rs b/rust/cube/cubestore-ws-transport/tests/mock_server.rs new file mode 100644 index 0000000000000..b0e6f5f64249b --- /dev/null +++ b/rust/cube/cubestore-ws-transport/tests/mock_server.rs @@ -0,0 +1,303 @@ +//! Spin up a minimal tokio-tungstenite server that mimics cubestore's /ws endpoint +//! and verify the client's happy path: handshake, message_id correlation, +//! HttpResultSet decoding, and HttpError surfacing as `TransportError::Query`. + +use std::time::Duration; + +use cubeshared::codegen::{ + root_as_http_message, HttpColumnValueArgs, HttpCommand as FbCommand, HttpError as FbError, + HttpErrorArgs, HttpMessage, HttpMessageArgs, HttpResultSet as FbResultSet, HttpResultSetArgs, + HttpRow as FbRow, HttpRowArgs, +}; +use cubestore_ws_transport::{Client, ClientConfig, TransportError}; +use flatbuffers::FlatBufferBuilder; +use futures_util::{SinkExt, StreamExt}; +use tokio::net::TcpListener; +use tokio_tungstenite::tungstenite::protocol::Message; + +fn build_result_set(message_id: u32, connection_id: &str) -> bytes::Bytes { + let mut b = FlatBufferBuilder::with_capacity(1024); + + // columns: ["id", "name"] + let id_col = b.create_string("id"); + let name_col = b.create_string("name"); + let cols_vec = b.create_vector(&[id_col, name_col]); + + use cubeshared::codegen::HttpColumnValue as Cv; + + // row 1: ["1", "alice"] + let s1 = b.create_string("1"); + let v1_1 = Cv::create( + &mut b, + &HttpColumnValueArgs { + string_value: Some(s1), + }, + ); + let s2 = b.create_string("alice"); + let v1_2 = Cv::create( + &mut b, + &HttpColumnValueArgs { + string_value: Some(s2), + }, + ); + let r1_vals = b.create_vector(&[v1_1, v1_2]); + let r1 = FbRow::create( + &mut b, + &HttpRowArgs { + values: Some(r1_vals), + }, + ); + + // row 2: ["2", "bob"] + let s3 = b.create_string("2"); + let v2_1 = Cv::create( + &mut b, + &HttpColumnValueArgs { + string_value: Some(s3), + }, + ); + let s4 = b.create_string("bob"); + let v2_2 = Cv::create( + &mut b, + &HttpColumnValueArgs { + string_value: Some(s4), + }, + ); + let r2_vals = b.create_vector(&[v2_1, v2_2]); + let r2 = FbRow::create( + &mut b, + &HttpRowArgs { + values: Some(r2_vals), + }, + ); + + let rows_vec = b.create_vector(&[r1, r2]); + + let rs = FbResultSet::create( + &mut b, + &HttpResultSetArgs { + columns: Some(cols_vec), + rows: Some(rows_vec), + }, + ); + + let conn_off = b.create_string(connection_id); + let msg = HttpMessage::create( + &mut b, + &HttpMessageArgs { + message_id, + command_type: FbCommand::HttpResultSet, + command: Some(rs.as_union_value()), + connection_id: Some(conn_off), + }, + ); + b.finish(msg, None); + bytes::Bytes::copy_from_slice(b.finished_data()) +} + +fn build_error(message_id: u32, connection_id: &str, error: &str) -> bytes::Bytes { + let mut b = FlatBufferBuilder::with_capacity(256); + let err_off = b.create_string(error); + let err = FbError::create( + &mut b, + &HttpErrorArgs { + error: Some(err_off), + }, + ); + let conn_off = b.create_string(connection_id); + let msg = HttpMessage::create( + &mut b, + &HttpMessageArgs { + message_id, + command_type: FbCommand::HttpError, + command: Some(err.as_union_value()), + connection_id: Some(conn_off), + }, + ); + b.finish(msg, None); + bytes::Bytes::copy_from_slice(b.finished_data()) +} + +/// Boot a one-shot mock server. Returns the bound port. The server replies to every +/// HttpQuery with either a canned result set or, when the SQL starts with `ERR `, +/// an HttpError carrying the rest of the SQL as the message. +async fn boot_mock_server() -> u16 { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + tokio::spawn(async move { + while let Ok((stream, _)) = listener.accept().await { + tokio::spawn(async move { + let ws_stream = tokio_tungstenite::accept_async(stream).await.unwrap(); + let (mut sink, mut src) = ws_stream.split(); + while let Some(msg) = src.next().await { + let Ok(msg) = msg else { break }; + if !msg.is_binary() { + continue; + } + let bytes = msg.into_data(); + let parsed = root_as_http_message(&bytes).expect("parse client message"); + let msg_id = parsed.message_id(); + let conn_id = parsed.connection_id().unwrap_or("").to_string(); + let q = parsed + .command_as_http_query() + .expect("expected HttpQuery from client"); + let sql = q.query().unwrap_or("").to_string(); + + let reply = if let Some(rest) = sql.strip_prefix("ERR ") { + build_error(msg_id, &conn_id, rest) + } else { + build_result_set(msg_id, &conn_id) + }; + if sink.send(Message::Binary(reply)).await.is_err() { + break; + } + } + }); + } + }); + port +} + +#[tokio::test] +async fn happy_path_query_returns_rows() { + let port = boot_mock_server().await; + let url = url::Url::parse(&format!("ws://127.0.0.1:{port}/")).unwrap(); + let mut cfg = ClientConfig::new(url); + cfg.connect_timeout = Duration::from_secs(2); + let client = Client::connect(cfg).await.expect("connect"); + + let result = client.query("SELECT * FROM whatever").await.expect("query"); + + assert_eq!(result.columns, vec!["id".to_string(), "name".to_string()]); + assert_eq!(result.rows.len(), 2); + assert_eq!(result.rows[0][0].as_deref(), Some("1")); + assert_eq!(result.rows[0][1].as_deref(), Some("alice")); + assert_eq!(result.rows[1][0].as_deref(), Some("2")); + assert_eq!(result.rows[1][1].as_deref(), Some("bob")); +} + +/// Full WS round-trip with a 12-column result — guards against any layer in the +/// stack accidentally dropping columns past the first few. +#[tokio::test] +async fn wide_result_full_round_trip() { + use cubeshared::codegen::{HttpColumnValue as Cv, HttpMessage as FbMsg, HttpMessageArgs}; + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + tokio::spawn(async move { + while let Ok((stream, _)) = listener.accept().await { + tokio::spawn(async move { + let ws = tokio_tungstenite::accept_async(stream).await.unwrap(); + let (mut sink, mut src) = ws.split(); + while let Some(msg) = src.next().await { + let Ok(msg) = msg else { break }; + if !msg.is_binary() { + continue; + } + let bytes = msg.into_data(); + let parsed = cubeshared::codegen::root_as_http_message(&bytes).unwrap(); + let msg_id = parsed.message_id(); + let conn_id = parsed.connection_id().unwrap_or("").to_string(); + + let mut b = FlatBufferBuilder::with_capacity(2048); + let names = [ + "c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", + ]; + let col_offs: Vec<_> = names.iter().map(|n| b.create_string(n)).collect(); + let cols = b.create_vector(&col_offs); + + let cells: Vec<_> = (0..names.len()) + .map(|i| { + let s = b.create_string(&format!("v{i}")); + Cv::create( + &mut b, + &HttpColumnValueArgs { + string_value: Some(s), + }, + ) + }) + .collect(); + let vals = b.create_vector(&cells); + let row = FbRow::create(&mut b, &HttpRowArgs { values: Some(vals) }); + let rows = b.create_vector(&[row]); + let rs = FbResultSet::create( + &mut b, + &HttpResultSetArgs { + columns: Some(cols), + rows: Some(rows), + }, + ); + let conn = b.create_string(&conn_id); + let msg = FbMsg::create( + &mut b, + &HttpMessageArgs { + message_id: msg_id, + command_type: FbCommand::HttpResultSet, + command: Some(rs.as_union_value()), + connection_id: Some(conn), + }, + ); + b.finish(msg, None); + let _ = sink + .send(Message::Binary(bytes::Bytes::copy_from_slice( + b.finished_data(), + ))) + .await; + } + }); + } + }); + + let url = url::Url::parse(&format!("ws://127.0.0.1:{port}/")).unwrap(); + let mut cfg = ClientConfig::new(url); + cfg.connect_timeout = Duration::from_secs(2); + let client = Client::connect(cfg).await.expect("connect"); + + let result = client.query("SELECT *").await.expect("query"); + assert_eq!( + result.columns, + ["c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11"] + .iter() + .map(|s| s.to_string()) + .collect::>(), + "all 12 columns should survive WS round-trip" + ); + assert_eq!(result.rows.len(), 1); + assert_eq!(result.rows[0].len(), 12); +} + +#[tokio::test] +async fn server_error_surfaces_as_query_error() { + let port = boot_mock_server().await; + let url = url::Url::parse(&format!("ws://127.0.0.1:{port}/")).unwrap(); + let mut cfg = ClientConfig::new(url); + cfg.connect_timeout = Duration::from_secs(2); + let client = Client::connect(cfg).await.expect("connect"); + + let err = client.query("ERR boom: bad sql").await.unwrap_err(); + match err { + TransportError::Query(msg) => assert_eq!(msg, "boom: bad sql"), + other => panic!("expected Query error, got {other:?}"), + } +} + +#[tokio::test] +async fn many_queries_correlate_by_message_id() { + let port = boot_mock_server().await; + let url = url::Url::parse(&format!("ws://127.0.0.1:{port}/")).unwrap(); + let mut cfg = ClientConfig::new(url); + cfg.connect_timeout = Duration::from_secs(2); + let client = Client::connect(cfg).await.expect("connect"); + + let mut handles = Vec::new(); + for i in 0..10 { + let c = client.clone(); + handles.push(tokio::spawn( + async move { c.query(format!("SELECT {i}")).await }, + )); + } + for h in handles { + let r = h.await.unwrap().expect("query ok"); + assert_eq!(r.rows.len(), 2); + } +} diff --git a/rust/cube/cubestore-ws-transport/tests/reconnect.rs b/rust/cube/cubestore-ws-transport/tests/reconnect.rs new file mode 100644 index 0000000000000..ae63bac96c152 --- /dev/null +++ b/rust/cube/cubestore-ws-transport/tests/reconnect.rs @@ -0,0 +1,126 @@ +//! Verify reconnect + in-flight resend: the first connection drops mid-query, +//! and the actor re-establishes the WS, resubmits the original buffer (preserving +//! its message_id), and resolves the pending oneshot from the second connection. + +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use cubeshared::codegen::{ + root_as_http_message, HttpColumnValueArgs, HttpCommand as FbCommand, HttpMessage, + HttpMessageArgs, HttpResultSet as FbResultSet, HttpResultSetArgs, HttpRow as FbRow, + HttpRowArgs, +}; +use cubestore_ws_transport::{Client, ClientConfig}; +use flatbuffers::FlatBufferBuilder; +use futures_util::{SinkExt, StreamExt}; +use tokio::net::TcpListener; +use tokio_tungstenite::tungstenite::protocol::Message; + +fn build_result_set(message_id: u32, connection_id: &str) -> bytes::Bytes { + let mut b = FlatBufferBuilder::with_capacity(256); + let col = b.create_string("value"); + let cols = b.create_vector(&[col]); + let cell = b.create_string("ok"); + use cubeshared::codegen::HttpColumnValue as Cv; + let v = Cv::create( + &mut b, + &HttpColumnValueArgs { + string_value: Some(cell), + }, + ); + let row_vals = b.create_vector(&[v]); + let row = FbRow::create( + &mut b, + &HttpRowArgs { + values: Some(row_vals), + }, + ); + let rows = b.create_vector(&[row]); + let rs = FbResultSet::create( + &mut b, + &HttpResultSetArgs { + columns: Some(cols), + rows: Some(rows), + }, + ); + let conn = b.create_string(connection_id); + let msg = HttpMessage::create( + &mut b, + &HttpMessageArgs { + message_id, + command_type: FbCommand::HttpResultSet, + command: Some(rs.as_union_value()), + connection_id: Some(conn), + }, + ); + b.finish(msg, None); + bytes::Bytes::copy_from_slice(b.finished_data()) +} + +#[tokio::test] +async fn resends_in_flight_query_after_reconnect() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + + // Track which connection this is for the running server task. + let connection_count = Arc::new(AtomicU32::new(0)); + // Capture the message_id observed across connections — must match. + let observed_msg_ids = Arc::new(tokio::sync::Mutex::new(Vec::::new())); + + let observed_msg_ids_clone = observed_msg_ids.clone(); + let connection_count_clone = connection_count.clone(); + tokio::spawn(async move { + while let Ok((stream, _)) = listener.accept().await { + let nth = connection_count_clone.fetch_add(1, Ordering::SeqCst); + let observed = observed_msg_ids_clone.clone(); + tokio::spawn(async move { + let ws = tokio_tungstenite::accept_async(stream).await.unwrap(); + let (mut sink, mut src) = ws.split(); + while let Some(msg) = src.next().await { + let Ok(msg) = msg else { break }; + if !msg.is_binary() { + continue; + } + let bytes = msg.into_data(); + let parsed = root_as_http_message(&bytes).expect("parse"); + let msg_id = parsed.message_id(); + let conn_id = parsed.connection_id().unwrap_or("").to_string(); + observed.lock().await.push(msg_id); + + if nth == 0 { + // First connection: receive the query, then close abruptly without + // replying. The client should reconnect and resend. + let _ = sink.send(Message::Close(None)).await; + break; + } + + // Subsequent connections: echo back the result set. + let reply = build_result_set(msg_id, &conn_id); + let _ = sink.send(Message::Binary(reply)).await; + } + }); + } + }); + + let url = url::Url::parse(&format!("ws://127.0.0.1:{port}/")).unwrap(); + let mut cfg = ClientConfig::new(url); + cfg.connect_timeout = Duration::from_secs(2); + cfg.max_connect_retries = 5; + let client = Client::connect(cfg).await.expect("initial connect"); + + // The reconnect backoff is (attempt+1)*1000ms; allow generous headroom. + let result = tokio::time::timeout(Duration::from_secs(5), client.query("SELECT 1")) + .await + .expect("query did not complete within timeout") + .expect("query result"); + + assert_eq!(result.columns, vec!["value".to_string()]); + assert_eq!(result.rows.len(), 1); + assert_eq!(result.rows[0][0].as_deref(), Some("ok")); + + // Both connections should have seen the *same* message id — the resend preserves it. + let ids = observed_msg_ids.lock().await.clone(); + assert!(ids.len() >= 2, "expected resend, got ids: {ids:?}"); + assert_eq!(ids[0], ids[1], "resent message_id must match the original"); +} diff --git a/rust/cube/cubestore-ws-transport/tests/wire_roundtrip.rs b/rust/cube/cubestore-ws-transport/tests/wire_roundtrip.rs new file mode 100644 index 0000000000000..976bd6d7aec09 --- /dev/null +++ b/rust/cube/cubestore-ws-transport/tests/wire_roundtrip.rs @@ -0,0 +1,172 @@ +//! Round-trip the on-wire FlatBuffer: encode an HttpQuery, parse it back, and verify shape. + +use cubeshared::codegen::{ + root_as_http_message, HttpColumnValue, HttpColumnValueArgs, HttpCommand, HttpMessage, + HttpMessageArgs, HttpResultSet, HttpResultSetArgs, HttpRow, HttpRowArgs, QueryResultFormat, +}; +use cubestore_ws_transport::codec::{decode_frame, encode_query, DecodedResponse}; +use flatbuffers::FlatBufferBuilder; + +#[test] +fn query_round_trip() { + let bytes = encode_query(7, "conn-xyz", "SELECT 42"); + let msg = root_as_http_message(&bytes).expect("parse encoded message"); + + assert_eq!(msg.message_id(), 7); + assert_eq!(msg.connection_id(), Some("conn-xyz")); + assert_eq!(msg.command_type(), HttpCommand::HttpQuery); + + let q = msg.command_as_http_query().expect("HttpQuery variant"); + assert_eq!(q.query(), Some("SELECT 42")); + assert_eq!(q.response_format(), QueryResultFormat::Legacy); + assert!(q.trace_obj().is_none()); + assert!(q.inline_tables().is_none()); + assert!(q.parameters().is_none()); +} + +#[test] +fn decode_preserves_all_columns_for_wide_table() { + let mut b = FlatBufferBuilder::with_capacity(8 * 1024); + + let col_names = [ + "c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", + ]; + let col_offsets: Vec<_> = col_names.iter().map(|n| b.create_string(n)).collect(); + let cols_vec = b.create_vector(&col_offsets); + + let cell_offsets: Vec<_> = (0..col_names.len()) + .map(|i| { + let s = b.create_string(&format!("v{i}")); + HttpColumnValue::create( + &mut b, + &HttpColumnValueArgs { + string_value: Some(s), + }, + ) + }) + .collect(); + let values = b.create_vector(&cell_offsets); + let row = HttpRow::create( + &mut b, + &HttpRowArgs { + values: Some(values), + }, + ); + let rows = b.create_vector(&[row]); + + let rs = HttpResultSet::create( + &mut b, + &HttpResultSetArgs { + columns: Some(cols_vec), + rows: Some(rows), + }, + ); + let conn = b.create_string("c"); + let msg = HttpMessage::create( + &mut b, + &HttpMessageArgs { + message_id: 9, + command_type: HttpCommand::HttpResultSet, + command: Some(rs.as_union_value()), + connection_id: Some(conn), + }, + ); + b.finish(msg, None); + let bytes = b.finished_data().to_vec(); + + let decoded = decode_frame(&bytes).expect("decode"); + let r = match decoded.response { + DecodedResponse::Ok(r) => r, + DecodedResponse::Error(e) => panic!("err: {e}"), + }; + assert_eq!( + r.columns, + col_names.iter().map(|s| s.to_string()).collect::>(), + "all column names should decode" + ); + assert_eq!(r.rows.len(), 1); + assert_eq!(r.rows[0].len(), col_names.len(), "all cells should decode"); + for (i, cell) in r.rows[0].iter().enumerate() { + assert_eq!(cell.as_deref(), Some(format!("v{i}").as_str())); + } +} + +#[test] +fn decode_preserves_all_rows_and_long_strings() { + // Build a synthetic 500-row HttpResultSet with progressively longer string values. + let mut b = FlatBufferBuilder::with_capacity(64 * 1024); + let col_a = b.create_string("id"); + let col_b = b.create_string("payload"); + let cols = b.create_vector(&[col_a, col_b]); + + let mut row_offsets = Vec::with_capacity(500); + for i in 0..500u32 { + let id_str = b.create_string(&i.to_string()); + let id_cell = HttpColumnValue::create( + &mut b, + &HttpColumnValueArgs { + string_value: Some(id_str), + }, + ); + // Long-ish payload to make sure nothing is truncated. + let payload = "x".repeat((i as usize % 256) + 1); + let payload_str = b.create_string(&payload); + let payload_cell = HttpColumnValue::create( + &mut b, + &HttpColumnValueArgs { + string_value: Some(payload_str), + }, + ); + let values = b.create_vector(&[id_cell, payload_cell]); + let row = HttpRow::create( + &mut b, + &HttpRowArgs { + values: Some(values), + }, + ); + row_offsets.push(row); + } + let rows = b.create_vector(&row_offsets); + let rs = HttpResultSet::create( + &mut b, + &HttpResultSetArgs { + columns: Some(cols), + rows: Some(rows), + }, + ); + let conn = b.create_string("cx"); + let msg = HttpMessage::create( + &mut b, + &HttpMessageArgs { + message_id: 1, + command_type: HttpCommand::HttpResultSet, + command: Some(rs.as_union_value()), + connection_id: Some(conn), + }, + ); + b.finish(msg, None); + let bytes = b.finished_data().to_vec(); + + let decoded = decode_frame(&bytes).expect("decode"); + assert_eq!(decoded.message_id, 1); + let result = match decoded.response { + DecodedResponse::Ok(r) => r, + DecodedResponse::Error(e) => panic!("unexpected error: {e}"), + }; + + assert_eq!( + result.columns, + vec!["id".to_string(), "payload".to_string()] + ); + assert_eq!(result.rows.len(), 500, "expected all 500 rows decoded"); + for (i, row) in result.rows.iter().enumerate() { + assert_eq!(row.len(), 2); + assert_eq!(row[0].as_deref(), Some(i.to_string().as_str())); + let expected_payload_len = (i % 256) + 1; + assert_eq!( + row[1].as_deref().map(|s| s.len()), + Some(expected_payload_len), + "payload truncated at row {i}" + ); + } +}