From e4c8c43d2e060acfddd4288fa356b25f0df150bd Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 20 May 2026 15:55:26 +0200 Subject: [PATCH 1/5] feat(cubestore-cli): Support Arrow format for responses (#10915) --- rust/cube/.gitignore | 3 +- rust/cube/Cargo.lock | 321 ++++++++++++++++++ rust/cube/cubestore-cli/Cargo.toml | 1 + rust/cube/cubestore-cli/src/error.rs | 8 + rust/cube/cubestore-cli/src/exec.rs | 24 +- rust/cube/cubestore-cli/src/format.rs | 215 ++++++++---- rust/cube/cubestore-cli/src/lib.rs | 3 + rust/cube/cubestore-ws-transport/Cargo.toml | 3 +- rust/cube/cubestore-ws-transport/src/codec.rs | 49 ++- rust/cube/cubestore-ws-transport/src/lib.rs | 3 +- .../cube/cubestore-ws-transport/src/result.rs | 71 +++- .../tests/mock_server.rs | 64 ++-- .../cubestore-ws-transport/tests/reconnect.rs | 21 +- .../tests/wire_roundtrip.rs | 140 +++++++- 14 files changed, 800 insertions(+), 126 deletions(-) create mode 100644 rust/cube/cubestore-cli/src/error.rs diff --git a/rust/cube/.gitignore b/rust/cube/.gitignore index 1de565933b05f..6f3ca5d9527bf 100644 --- a/rust/cube/.gitignore +++ b/rust/cube/.gitignore @@ -1 +1,2 @@ -target \ No newline at end of file +target +.zed diff --git a/rust/cube/Cargo.lock b/rust/cube/Cargo.lock index 0d3a298152a91..42f0fbffd43f7 100644 --- a/rust/cube/Cargo.lock +++ b/rust/cube/Cargo.lock @@ -12,6 +12,20 @@ dependencies = [ "regex", ] +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "const-random", + "getrandom 0.3.4", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.4" @@ -101,6 +115,180 @@ version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" +[[package]] +name = "arrow" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "378530e55cd479eda3c14eb345310799717e6f76d0c332041e8487022166b471" +dependencies = [ + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "arrow-string", +] + +[[package]] +name = "arrow-arith" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0ab212d2c1886e802f51c5212d78ebbcbb0bec980fff9dadc1eb8d45cd0b738" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "num-traits", +] + +[[package]] +name = "arrow-array" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfd33d3e92f207444098c75b42de99d329562be0cf686b307b097cc52b4e999e" +dependencies = [ + "ahash", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "hashbrown 0.17.0", + "num-complex", + "num-integer", + "num-traits", +] + +[[package]] +name = "arrow-buffer" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6cd424c2693bcdbc150d843dc9d4d137dd2de4782ce6df491ad11a3a0416c0" +dependencies = [ + "bytes", + "half", + "num-bigint", + "num-traits", +] + +[[package]] +name = "arrow-cast" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c5aefb56a2c02e9e2b30746241058b85f8983f0fcff2ba0c6d09006e1cded7f" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-ord", + "arrow-schema", + "arrow-select", + "atoi", + "base64 0.22.1", + "chrono", + "half", + "lexical-core", + "num-traits", + "ryu", +] + +[[package]] +name = "arrow-data" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c88210023a2bfee1896af366309a3028fc3bcbd6515fa29a7990ee1baa08ee0" +dependencies = [ + "arrow-buffer", + "arrow-schema", + "half", + "num-integer", + "num-traits", +] + +[[package]] +name = "arrow-ipc" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "238438f0834483703d88896db6fe5a7138b2230debc31b34c0336c2996e3c64f" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "flatbuffers", +] + +[[package]] +name = "arrow-ord" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bffd8fd2579286a5d63bac898159873e5094a79009940bcb42bbfce4f19f1d0" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", +] + +[[package]] +name = "arrow-row" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bab5994731204603c73ba69267616c50f80780774c6bb0476f1f830625115e0c" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "half", +] + +[[package]] +name = "arrow-schema" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f633dbfdf39c039ada1bf9e34c694816eb71fbb7dc78f613993b7245e078a1ed" + +[[package]] +name = "arrow-select" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cd065c54172ac787cf3f2f8d4107e0d3fdc26edba76fdf4f4cc170258942222" +dependencies = [ + "ahash", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "num-traits", +] + +[[package]] +name = "arrow-string" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29dd7cda3ab9692f43a2e4acc444d760cc17b12bb6d8232ddf64e9bab7c06b42" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "memchr", + "num-traits", + "regex", + "regex-syntax", +] + [[package]] name = "async-channel" version = "2.5.0" @@ -124,6 +312,15 @@ dependencies = [ "syn", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -441,6 +638,26 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6ef517f0926dd24a1582492c791b6a4818a4d94e789a334894aa15b0d12f55c" +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom 0.2.17", + "once_cell", + "tiny-keccak", +] + [[package]] name = "convert_case" version = "0.6.0" @@ -666,6 +883,7 @@ dependencies = [ "log", "rpassword", "rustyline", + "thiserror 1.0.69", "tokio", "url", ] @@ -674,6 +892,7 @@ dependencies = [ name = "cubestore-ws-transport" version = "0.1.0" dependencies = [ + "arrow", "base64 0.22.1", "bytes", "cubeshared", @@ -1139,6 +1358,7 @@ checksum = "6ea2d84b969582b4b1864a92dc5d27cd2b77b622a8d79306834f1be5ba20d84b" dependencies = [ "cfg-if", "crunchy", + "num-traits", "zerocopy", ] @@ -1614,6 +1834,63 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" +[[package]] +name = "lexical-core" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d8d125a277f807e55a77304455eb7b1cb52f2b18c143b60e766c120bd64a594" +dependencies = [ + "lexical-parse-float", + "lexical-parse-integer", + "lexical-util", + "lexical-write-float", + "lexical-write-integer", +] + +[[package]] +name = "lexical-parse-float" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52a9f232fbd6f550bc0137dcb5f99ab674071ac2d690ac69704593cb4abbea56" +dependencies = [ + "lexical-parse-integer", + "lexical-util", +] + +[[package]] +name = "lexical-parse-integer" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a7a039f8fb9c19c996cd7b2fcce303c1b2874fe1aca544edc85c4a5f8489b34" +dependencies = [ + "lexical-util", +] + +[[package]] +name = "lexical-util" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2604dd126bb14f13fb5d1bd6a66155079cb9fa655b37f875b3a742c705dbed17" + +[[package]] +name = "lexical-write-float" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50c438c87c013188d415fbabbb1dceb44249ab81664efbd31b14ae55dabb6361" +dependencies = [ + "lexical-util", + "lexical-write-integer", +] + +[[package]] +name = "lexical-write-integer" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "409851a618475d2d5796377cad353802345cba92c867d9fbcde9cf4eac4e14df" +dependencies = [ + "lexical-util", +] + [[package]] name = "libc" version = "0.2.186" @@ -1630,6 +1907,12 @@ dependencies = [ "windows-link", ] +[[package]] +name = "libm" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" + [[package]] name = "libredox" version = "0.1.16" @@ -1835,12 +2118,40 @@ dependencies = [ "libc", ] +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + +[[package]] +name = "num-complex" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" +dependencies = [ + "num-traits", +] + [[package]] name = "num-conv" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967" +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -1848,6 +2159,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" dependencies = [ "autocfg", + "libm", ] [[package]] @@ -3104,6 +3416,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinystr" version = "0.8.3" diff --git a/rust/cube/cubestore-cli/Cargo.toml b/rust/cube/cubestore-cli/Cargo.toml index c1013b03935e5..e434d29f96cdd 100644 --- a/rust/cube/cubestore-cli/Cargo.toml +++ b/rust/cube/cubestore-cli/Cargo.toml @@ -22,3 +22,4 @@ rpassword = "7" dirs = "5" env_logger = "0.11" log = "0.4" +thiserror = "1" diff --git a/rust/cube/cubestore-cli/src/error.rs b/rust/cube/cubestore-cli/src/error.rs new file mode 100644 index 0000000000000..1cbe1bc289fd3 --- /dev/null +++ b/rust/cube/cubestore-cli/src/error.rs @@ -0,0 +1,8 @@ +use cubestore_ws_transport::arrow::error::ArrowError; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum CliError { + #[error("Arrow error: {0}")] + Arrow(#[from] ArrowError), +} diff --git a/rust/cube/cubestore-cli/src/exec.rs b/rust/cube/cubestore-cli/src/exec.rs index 63c21d634f66f..36f881fcec241 100644 --- a/rust/cube/cubestore-cli/src/exec.rs +++ b/rust/cube/cubestore-cli/src/exec.rs @@ -19,10 +19,19 @@ pub async fn run_one(client: &Client, sql: &str, show_timing: bool) -> Result println!("{table}\n{footer}"), - None => println!("{footer}"), + Ok(Some(table)) => { + println!("{table}\n{footer}"); + Ok(true) + } + Ok(None) => { + println!("{footer}"); + Ok(true) + } + Err(e) => { + eprintln!("ERROR: failed to render result: {e}"); + Ok(false) + } } - Ok(true) } Err(e) => { print!("\x1b[?7h"); @@ -37,15 +46,18 @@ fn make_footer( show_timing: bool, elapsed_ms: u128, ) -> String { - let has_table = !result.columns.is_empty(); - let n = result.rows.len(); + let has_table = !result.get_columns().is_empty(); + let n = result.row_count(); let rows_part = if has_table { format!("{n} {}", if n == 1 { "row" } else { "rows" }) } else { "OK".to_string() }; if show_timing { - format!("({rows_part}, Time: {elapsed_ms} ms)") + format!( + "({rows_part}, Time: {elapsed_ms} ms, Format: {})", + result.get_format() + ) } else { format!("({rows_part})") } diff --git a/rust/cube/cubestore-cli/src/format.rs b/rust/cube/cubestore-cli/src/format.rs index ac97a88f018ed..10ac588ced31e 100644 --- a/rust/cube/cubestore-cli/src/format.rs +++ b/rust/cube/cubestore-cli/src/format.rs @@ -1,44 +1,139 @@ -use cubestore_ws_transport::QueryResult; +use cubestore_ws_transport::arrow::array::RecordBatch; +use cubestore_ws_transport::arrow::error::ArrowError; +use cubestore_ws_transport::arrow::util::display::{ArrayFormatter, FormatOptions}; +use cubestore_ws_transport::{QueryResult, ResultData}; + +use crate::CliError; const NULL_RENDER: &str = "NULL"; -/// psql-style aligned text table. Returns `None` for empty (DDL/INSERT) results. -pub fn render_table(result: &QueryResult) -> Option { - if result.columns.is_empty() { - return None; +/// psql-style aligned text table. Returns `Ok(None)` for empty (DDL/INSERT) results. +pub fn render_table(result: &QueryResult) -> Result, CliError> { + let columns = result.get_columns(); + if columns.is_empty() { + return Ok(None); } - let ncols = result.columns.len(); - let mut widths: Vec = result.columns.iter().map(|c| c.chars().count()).collect(); - for row in &result.rows { - for (i, cell) in row.iter().enumerate() { - if i >= ncols { - break; - } - let len = match cell.as_deref() { - Some(s) => s.chars().count(), - None => NULL_RENDER.chars().count(), - }; - if len > widths[i] { - widths[i] = len; + let table = match &result.data { + ResultData::Legacy { rows, .. } => render_legacy_rows(&columns, rows), + ResultData::Arrow { batches, .. } => render_arrow_batches(&columns, batches)?, + }; + Ok(Some(table)) +} + +fn render_legacy_rows(columns: &[String], rows: &[Vec>]) -> String { + let ncols = columns.len(); + let mut widths = header_widths(columns); + for row in rows { + for (i, cell) in row.iter().enumerate().take(ncols) { + observe_width(&mut widths, i, cell_len(cell.as_deref())); + } + } + + let mut out = String::new(); + write_header(&mut out, columns, &widths); + write_separator(&mut out, &widths); + for row in rows { + out.push('\n'); + for (i, &width) in widths.iter().enumerate() { + let cell = row.get(i).and_then(|c| c.as_deref()); + write_cell(&mut out, i, cell, width); + } + } + out +} + +/// Render Arrow batches directly. Each cell is formatted twice — once to size +/// the columns, once to emit the row — which trades a bit of CPU for not +/// materializing the entire result into row based format +fn render_arrow_batches(columns: &[String], batches: &[RecordBatch]) -> Result { + let ncols = columns.len(); + let fmt_options = FormatOptions::default().with_display_error(true); + let mut widths = header_widths(columns); + + for batch in batches { + let formatters = batch_formatters(batch, &fmt_options)?; + let batch_cols = formatters.len().min(ncols); + for row_idx in 0..batch.num_rows() { + for col_idx in 0..batch_cols { + let cell = arrow_cell(batch, &formatters, col_idx, row_idx); + observe_width(&mut widths, col_idx, cell_len(cell.as_deref())); } } } let mut out = String::new(); + write_header(&mut out, columns, &widths); + write_separator(&mut out, &widths); + for batch in batches { + let formatters = batch_formatters(batch, &fmt_options)?; + let batch_cols = formatters.len().min(ncols); + for row_idx in 0..batch.num_rows() { + out.push('\n'); + for (i, &width) in widths.iter().enumerate() { + let cell = if i < batch_cols { + arrow_cell(batch, &formatters, i, row_idx) + } else { + None + }; + write_cell(&mut out, i, cell.as_deref(), width); + } + } + } + Ok(out) +} + +fn arrow_cell( + batch: &RecordBatch, + formatters: &[ArrayFormatter], + col_idx: usize, + row_idx: usize, +) -> Option { + if batch.column(col_idx).is_null(row_idx) { + None + } else { + Some(formatters[col_idx].value(row_idx).to_string()) + } +} - // Header - for (i, name) in result.columns.iter().enumerate() { +fn batch_formatters<'a>( + batch: &'a RecordBatch, + options: &'a FormatOptions, +) -> Result>, ArrowError> { + batch + .columns() + .iter() + .map(|col| ArrayFormatter::try_new(col.as_ref(), options)) + .collect() +} + +fn header_widths(columns: &[String]) -> Vec { + columns.iter().map(|c| c.chars().count()).collect() +} + +fn cell_len(cell: Option<&str>) -> usize { + cell.unwrap_or(NULL_RENDER).chars().count() +} + +fn observe_width(widths: &mut [usize], col: usize, len: usize) { + if len > widths[col] { + widths[col] = len; + } +} + +fn write_header(out: &mut String, columns: &[String], widths: &[usize]) { + for (i, name) in columns.iter().enumerate() { if i > 0 { out.push('|'); } out.push(' '); - push_center(&mut out, name, widths[i]); + push_center(out, name, widths[i]); out.push(' '); } out.push('\n'); +} - // Separator +fn write_separator(out: &mut String, widths: &[usize]) { for (i, w) in widths.iter().enumerate() { if i > 0 { out.push('+'); @@ -47,25 +142,15 @@ pub fn render_table(result: &QueryResult) -> Option { out.push('-'); } } +} - // Data rows - for row in &result.rows { - out.push('\n'); - for (i, &width) in widths.iter().enumerate() { - if i > 0 { - out.push('|'); - } - out.push(' '); - let cell = match row.get(i) { - Some(Some(s)) => s.as_str(), - Some(None) | None => NULL_RENDER, - }; - push_left(&mut out, cell, width); - out.push(' '); - } +fn write_cell(out: &mut String, col: usize, cell: Option<&str>, width: usize) { + if col > 0 { + out.push('|'); } - - Some(out) + out.push(' '); + push_left(out, cell.unwrap_or(NULL_RENDER), width); + out.push(' '); } fn push_left(out: &mut String, s: &str, width: usize) { @@ -99,27 +184,29 @@ mod tests { use super::*; #[test] - fn renders_all_rows_without_truncation() { + fn renders_all_rows_without_truncation() -> Result<(), CliError> { let result = QueryResult { - columns: vec!["table_catalog".to_string(), "table_schema".to_string()], - rows: vec![ - vec![ - Some("ovr".to_string()), - Some("information_schema".to_string()), - ], - vec![ - Some("ovr".to_string()), - Some("information_schema".to_string()), - ], - vec![ - Some("ovr".to_string()), - Some("information_schema".to_string()), + data: ResultData::Legacy { + columns: vec!["table_catalog".to_string(), "table_schema".to_string()], + rows: vec![ + vec![ + Some("ovr".to_string()), + Some("information_schema".to_string()), + ], + vec![ + Some("ovr".to_string()), + Some("information_schema".to_string()), + ], + vec![ + Some("ovr".to_string()), + Some("information_schema".to_string()), + ], + vec![Some("ovr".to_string()), Some("public".to_string())], + vec![Some("ovr".to_string()), Some("public".to_string())], ], - vec![Some("ovr".to_string()), Some("public".to_string())], - vec![Some("ovr".to_string()), Some("public".to_string())], - ], + }, }; - let out = render_table(&result).expect("table"); + let out = render_table(&result)?.expect("table"); // header line + separator + 5 data rows = 7 lines assert_eq!(out.lines().count(), 7); @@ -135,16 +222,22 @@ mod tests { header.contains(" table_schema "), "header centered to data width: {header:?}" ); + + Ok(()) } #[test] - fn preserves_long_cell_values() { + fn preserves_long_cell_values() -> Result<(), CliError> { let long = "x".repeat(500); let result = QueryResult { - columns: vec!["v".to_string()], - rows: vec![vec![Some(long.clone())]], + data: ResultData::Legacy { + columns: vec!["v".to_string()], + rows: vec![vec![Some(long.clone())]], + }, }; - let out = render_table(&result).expect("table"); + let out = render_table(&result)?.expect("table"); assert!(out.contains(&long), "long value preserved in output"); + + Ok(()) } } diff --git a/rust/cube/cubestore-cli/src/lib.rs b/rust/cube/cubestore-cli/src/lib.rs index 40ee4ca02a594..f7fb81b650286 100644 --- a/rust/cube/cubestore-cli/src/lib.rs +++ b/rust/cube/cubestore-cli/src/lib.rs @@ -1,4 +1,7 @@ pub mod args; +pub mod error; pub mod exec; pub mod format; pub mod repl; + +pub use error::CliError; diff --git a/rust/cube/cubestore-ws-transport/Cargo.toml b/rust/cube/cubestore-ws-transport/Cargo.toml index 9cae852b514e9..fe00b0d596d9a 100644 --- a/rust/cube/cubestore-ws-transport/Cargo.toml +++ b/rust/cube/cubestore-ws-transport/Cargo.toml @@ -2,7 +2,7 @@ name = "cubestore-ws-transport" version = "0.1.0" edition = "2021" -description = "Async WebSocket client for CubeStore's binary FlatBuffers protocol." +description = "Async WebSocket client for CubeStore's binary protocol." [dependencies] cubeshared = { path = "../cubeshared" } @@ -18,6 +18,7 @@ thiserror = "1" log = "0.4" uuid = { version = "1", features = ["v4"] } http = "1" +arrow = { version = "58", default-features = false, features = ["ipc"] } [dev-dependencies] tokio = { version = "1", features = ["full", "test-util"] } diff --git a/rust/cube/cubestore-ws-transport/src/codec.rs b/rust/cube/cubestore-ws-transport/src/codec.rs index 5ef4f12d82108..43eccb96bb8b4 100644 --- a/rust/cube/cubestore-ws-transport/src/codec.rs +++ b/rust/cube/cubestore-ws-transport/src/codec.rs @@ -1,3 +1,7 @@ +use std::io::Cursor; + +use arrow::array::RecordBatch; +use arrow::ipc::reader::StreamReader; use bytes::Bytes; use cubeshared::codegen::{ root_as_http_message, HttpCommand, HttpMessage, HttpMessageArgs, HttpQuery, HttpQueryArgs, @@ -6,7 +10,7 @@ use cubeshared::codegen::{ use flatbuffers::FlatBufferBuilder; use crate::error::TransportError; -use crate::result::QueryResult; +use crate::result::{QueryResult, ResultData}; /// Build a binary FlatBuffer payload carrying an `HttpQuery` command. pub fn encode_query(message_id: u32, connection_id: &str, sql: &str) -> Bytes { @@ -20,7 +24,7 @@ pub fn encode_query(message_id: u32, connection_id: &str, sql: &str) -> Bytes { trace_obj: None, inline_tables: None, parameters: None, - response_format: QueryResultFormat::Legacy, + response_format: QueryResultFormat::Arrow, }, ); @@ -92,14 +96,29 @@ pub fn decode_frame(bytes: &[u8]) -> Result { columns.len(), rows.len() ); - DecodedResponse::Ok(QueryResult { columns, rows }) + + DecodedResponse::Ok(QueryResult { + data: ResultData::Legacy { columns, rows }, + }) } HttpCommand::HttpQueryResult => { - // Arrow format is not supported in this transport version. - return Err(TransportError::Protocol( - "Arrow result format is not supported by this client (request Legacy format)" - .into(), - )); + let qr = msg.command_as_http_query_result().ok_or_else(|| { + TransportError::Protocol("HttpQueryResult union variant missing".into()) + })?; + let arrow = qr.data_as_http_query_result_arrow().ok_or_else(|| { + TransportError::Protocol( + "HttpQueryResult.data variant is not HttpQueryResultArrow".into(), + ) + })?; + + let result = decode_arrow_ipc(arrow.data().bytes())?; + log::debug!( + "decoded HttpQueryResult (Arrow IPC): {} columns, {} rows", + result.get_columns().len(), + result.row_count() + ); + + DecodedResponse::Ok(result) } other => { return Err(TransportError::Protocol(format!( @@ -114,3 +133,17 @@ pub fn decode_frame(bytes: &[u8]) -> Result { response, }) } + +fn decode_arrow_ipc(bytes: &[u8]) -> Result { + let reader = StreamReader::try_new(Cursor::new(bytes), None) + .map_err(|e| TransportError::Protocol(format!("arrow IPC open: {e}")))?; + + let schema = reader.schema(); + let batches: Vec = reader + .collect::>() + .map_err(|e| TransportError::Protocol(format!("arrow IPC read batch: {e}")))?; + + Ok(QueryResult { + data: ResultData::Arrow { schema, batches }, + }) +} diff --git a/rust/cube/cubestore-ws-transport/src/lib.rs b/rust/cube/cubestore-ws-transport/src/lib.rs index 5ab9057dea152..d0d4ba0e126b0 100644 --- a/rust/cube/cubestore-ws-transport/src/lib.rs +++ b/rust/cube/cubestore-ws-transport/src/lib.rs @@ -12,6 +12,7 @@ pub mod codec; mod error; mod result; +pub use arrow; pub use client::{Client, ClientConfig}; pub use error::TransportError; -pub use result::QueryResult; +pub use result::{QueryResult, ResponseFormat, ResultData}; diff --git a/rust/cube/cubestore-ws-transport/src/result.rs b/rust/cube/cubestore-ws-transport/src/result.rs index 43c2bf2779e56..caa0539911be1 100644 --- a/rust/cube/cubestore-ws-transport/src/result.rs +++ b/rust/cube/cubestore-ws-transport/src/result.rs @@ -1,15 +1,78 @@ +use arrow::array::RecordBatch; +use arrow::datatypes::SchemaRef; + +/// Wire encoding the server actually used for this result set. The client may +/// request `Arrow` but an older server can still answer with the legacy +/// row-format envelope, so this reflects what was decoded, not what was asked. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum ResponseFormat { + #[default] + Legacy, + Arrow, +} + +impl std::fmt::Display for ResponseFormat { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ResponseFormat::Legacy => f.write_str("Legacy"), + ResponseFormat::Arrow => f.write_str("Arrow"), + } + } +} + +/// Decoded payload, preserving the original on-wire shape. Renderers can +/// format-aware-render (e.g. iterate arrow arrays directly) or fall through +/// to the legacy string rows. Column names live inside the variant so they can +/// be derived from the data — there's no separate "header" on the wire. +#[derive(Debug, Clone)] +pub enum ResultData { + /// Per-cell stringified rows, as carried in the legacy FlatBuffers envelope. + Legacy { + columns: Vec, + rows: Vec>>, + }, + /// Raw Arrow record batches decoded from the IPC stream. The schema is + /// always present — the IPC stream writes it before any data batch — so + /// column names survive even when `batches` is empty. + Arrow { + schema: SchemaRef, + batches: Vec, + }, +} + #[derive(Debug, Clone)] pub struct QueryResult { - pub columns: Vec, - pub rows: Vec>>, + pub data: ResultData, } impl QueryResult { pub fn is_empty(&self) -> bool { - self.rows.is_empty() + self.row_count() == 0 } pub fn row_count(&self) -> usize { - self.rows.len() + match &self.data { + ResultData::Legacy { rows, .. } => rows.len(), + ResultData::Arrow { batches, .. } => batches.iter().map(|b| b.num_rows()).sum(), + } + } + + /// Wire encoding the server used, derived from the decoded payload variant. + pub fn get_format(&self) -> ResponseFormat { + match &self.data { + ResultData::Legacy { .. } => ResponseFormat::Legacy, + ResultData::Arrow { .. } => ResponseFormat::Arrow, + } + } + + /// Column names derived from the payload — the legacy variant stores them + /// alongside the row vector, the arrow variant pulls them from the schema. + pub fn get_columns(&self) -> Vec { + match &self.data { + ResultData::Legacy { columns, .. } => columns.clone(), + ResultData::Arrow { schema, .. } => { + schema.fields().iter().map(|f| f.name().clone()).collect() + } + } } } diff --git a/rust/cube/cubestore-ws-transport/tests/mock_server.rs b/rust/cube/cubestore-ws-transport/tests/mock_server.rs index b0e6f5f64249b..f06511cf96c04 100644 --- a/rust/cube/cubestore-ws-transport/tests/mock_server.rs +++ b/rust/cube/cubestore-ws-transport/tests/mock_server.rs @@ -9,12 +9,19 @@ use cubeshared::codegen::{ HttpErrorArgs, HttpMessage, HttpMessageArgs, HttpResultSet as FbResultSet, HttpResultSetArgs, HttpRow as FbRow, HttpRowArgs, }; -use cubestore_ws_transport::{Client, ClientConfig, TransportError}; +use cubestore_ws_transport::{Client, ClientConfig, QueryResult, ResultData, TransportError}; use flatbuffers::FlatBufferBuilder; use futures_util::{SinkExt, StreamExt}; use tokio::net::TcpListener; use tokio_tungstenite::tungstenite::protocol::Message; +fn legacy_rows(r: &QueryResult) -> &Vec>> { + match &r.data { + ResultData::Legacy { rows, .. } => rows, + ResultData::Arrow { .. } => panic!("expected ResultData::Legacy, got Arrow"), + } +} + fn build_result_set(message_id: u32, connection_id: &str) -> bytes::Bytes { let mut b = FlatBufferBuilder::with_capacity(1024); @@ -159,27 +166,33 @@ async fn boot_mock_server() -> u16 { } #[tokio::test] -async fn happy_path_query_returns_rows() { +async fn happy_path_query_returns_rows() -> Result<(), TransportError> { let port = boot_mock_server().await; let url = url::Url::parse(&format!("ws://127.0.0.1:{port}/")).unwrap(); let mut cfg = ClientConfig::new(url); cfg.connect_timeout = Duration::from_secs(2); - let client = Client::connect(cfg).await.expect("connect"); + let client = Client::connect(cfg).await?; - let result = client.query("SELECT * FROM whatever").await.expect("query"); + let result = client.query("SELECT * FROM whatever").await?; - assert_eq!(result.columns, vec!["id".to_string(), "name".to_string()]); - assert_eq!(result.rows.len(), 2); - assert_eq!(result.rows[0][0].as_deref(), Some("1")); - assert_eq!(result.rows[0][1].as_deref(), Some("alice")); - assert_eq!(result.rows[1][0].as_deref(), Some("2")); - assert_eq!(result.rows[1][1].as_deref(), Some("bob")); + assert_eq!( + result.get_columns(), + vec!["id".to_string(), "name".to_string()] + ); + let rows = legacy_rows(&result); + assert_eq!(rows.len(), 2); + assert_eq!(rows[0][0].as_deref(), Some("1")); + assert_eq!(rows[0][1].as_deref(), Some("alice")); + assert_eq!(rows[1][0].as_deref(), Some("2")); + assert_eq!(rows[1][1].as_deref(), Some("bob")); + + Ok(()) } /// Full WS round-trip with a 12-column result — guards against any layer in the /// stack accidentally dropping columns past the first few. #[tokio::test] -async fn wide_result_full_round_trip() { +async fn wide_result_full_round_trip() -> Result<(), TransportError> { use cubeshared::codegen::{HttpColumnValue as Cv, HttpMessage as FbMsg, HttpMessageArgs}; let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); @@ -251,43 +264,48 @@ async fn wide_result_full_round_trip() { let url = url::Url::parse(&format!("ws://127.0.0.1:{port}/")).unwrap(); let mut cfg = ClientConfig::new(url); cfg.connect_timeout = Duration::from_secs(2); - let client = Client::connect(cfg).await.expect("connect"); + let client = Client::connect(cfg).await?; - let result = client.query("SELECT *").await.expect("query"); + let result = client.query("SELECT *").await?; assert_eq!( - result.columns, + result.get_columns(), ["c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11"] .iter() .map(|s| s.to_string()) .collect::>(), "all 12 columns should survive WS round-trip" ); - assert_eq!(result.rows.len(), 1); - assert_eq!(result.rows[0].len(), 12); + let rows = legacy_rows(&result); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].len(), 12); + + Ok(()) } #[tokio::test] -async fn server_error_surfaces_as_query_error() { +async fn server_error_surfaces_as_query_error() -> Result<(), TransportError> { let port = boot_mock_server().await; let url = url::Url::parse(&format!("ws://127.0.0.1:{port}/")).unwrap(); let mut cfg = ClientConfig::new(url); cfg.connect_timeout = Duration::from_secs(2); - let client = Client::connect(cfg).await.expect("connect"); + let client = Client::connect(cfg).await?; let err = client.query("ERR boom: bad sql").await.unwrap_err(); match err { TransportError::Query(msg) => assert_eq!(msg, "boom: bad sql"), other => panic!("expected Query error, got {other:?}"), } + + Ok(()) } #[tokio::test] -async fn many_queries_correlate_by_message_id() { +async fn many_queries_correlate_by_message_id() -> Result<(), TransportError> { let port = boot_mock_server().await; let url = url::Url::parse(&format!("ws://127.0.0.1:{port}/")).unwrap(); let mut cfg = ClientConfig::new(url); cfg.connect_timeout = Duration::from_secs(2); - let client = Client::connect(cfg).await.expect("connect"); + let client = Client::connect(cfg).await?; let mut handles = Vec::new(); for i in 0..10 { @@ -297,7 +315,9 @@ async fn many_queries_correlate_by_message_id() { )); } for h in handles { - let r = h.await.unwrap().expect("query ok"); - assert_eq!(r.rows.len(), 2); + let r = h.await.unwrap()?; + assert_eq!(legacy_rows(&r).len(), 2); } + + Ok(()) } diff --git a/rust/cube/cubestore-ws-transport/tests/reconnect.rs b/rust/cube/cubestore-ws-transport/tests/reconnect.rs index ae63bac96c152..b6c70da50ddc2 100644 --- a/rust/cube/cubestore-ws-transport/tests/reconnect.rs +++ b/rust/cube/cubestore-ws-transport/tests/reconnect.rs @@ -11,7 +11,7 @@ use cubeshared::codegen::{ HttpMessageArgs, HttpResultSet as FbResultSet, HttpResultSetArgs, HttpRow as FbRow, HttpRowArgs, }; -use cubestore_ws_transport::{Client, ClientConfig}; +use cubestore_ws_transport::{Client, ClientConfig, ResultData, TransportError}; use flatbuffers::FlatBufferBuilder; use futures_util::{SinkExt, StreamExt}; use tokio::net::TcpListener; @@ -59,7 +59,7 @@ fn build_result_set(message_id: u32, connection_id: &str) -> bytes::Bytes { } #[tokio::test] -async fn resends_in_flight_query_after_reconnect() { +async fn resends_in_flight_query_after_reconnect() -> Result<(), TransportError> { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let port = listener.local_addr().unwrap().port(); @@ -107,20 +107,25 @@ async fn resends_in_flight_query_after_reconnect() { let mut cfg = ClientConfig::new(url); cfg.connect_timeout = Duration::from_secs(2); cfg.max_connect_retries = 5; - let client = Client::connect(cfg).await.expect("initial connect"); + let client = Client::connect(cfg).await?; // The reconnect backoff is (attempt+1)*1000ms; allow generous headroom. let result = tokio::time::timeout(Duration::from_secs(5), client.query("SELECT 1")) .await - .expect("query did not complete within timeout") - .expect("query result"); + .expect("query did not complete within timeout")?; - assert_eq!(result.columns, vec!["value".to_string()]); - assert_eq!(result.rows.len(), 1); - assert_eq!(result.rows[0][0].as_deref(), Some("ok")); + assert_eq!(result.get_columns(), vec!["value".to_string()]); + let rows = match &result.data { + ResultData::Legacy { rows, .. } => rows, + _ => panic!("expected ResultData::Legacy"), + }; + assert_eq!(rows.len(), 1); + assert_eq!(rows[0][0].as_deref(), Some("ok")); // Both connections should have seen the *same* message id — the resend preserves it. let ids = observed_msg_ids.lock().await.clone(); assert!(ids.len() >= 2, "expected resend, got ids: {ids:?}"); assert_eq!(ids[0], ids[1], "resent message_id must match the original"); + + Ok(()) } diff --git a/rust/cube/cubestore-ws-transport/tests/wire_roundtrip.rs b/rust/cube/cubestore-ws-transport/tests/wire_roundtrip.rs index 976bd6d7aec09..4c5caba415f1a 100644 --- a/rust/cube/cubestore-ws-transport/tests/wire_roundtrip.rs +++ b/rust/cube/cubestore-ws-transport/tests/wire_roundtrip.rs @@ -1,14 +1,22 @@ //! Round-trip the on-wire FlatBuffer: encode an HttpQuery, parse it back, and verify shape. +use std::sync::Arc; + +use arrow::array::{Array, Int32Array, RecordBatch, StringArray}; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::ipc::writer::StreamWriter; use cubeshared::codegen::{ root_as_http_message, HttpColumnValue, HttpColumnValueArgs, HttpCommand, HttpMessage, - HttpMessageArgs, HttpResultSet, HttpResultSetArgs, HttpRow, HttpRowArgs, QueryResultFormat, + HttpMessageArgs, HttpQueryResult, HttpQueryResultArgs, HttpQueryResultArrow, + HttpQueryResultArrowArgs, HttpQueryResultData, HttpResultSet, HttpResultSetArgs, HttpRow, + HttpRowArgs, QueryResultFormat, }; use cubestore_ws_transport::codec::{decode_frame, encode_query, DecodedResponse}; +use cubestore_ws_transport::{ResponseFormat, ResultData, TransportError}; use flatbuffers::FlatBufferBuilder; #[test] -fn query_round_trip() { +fn query_round_trip() -> Result<(), TransportError> { let bytes = encode_query(7, "conn-xyz", "SELECT 42"); let msg = root_as_http_message(&bytes).expect("parse encoded message"); @@ -18,14 +26,16 @@ fn query_round_trip() { let q = msg.command_as_http_query().expect("HttpQuery variant"); assert_eq!(q.query(), Some("SELECT 42")); - assert_eq!(q.response_format(), QueryResultFormat::Legacy); + assert_eq!(q.response_format(), QueryResultFormat::Arrow); assert!(q.trace_obj().is_none()); assert!(q.inline_tables().is_none()); assert!(q.parameters().is_none()); + + Ok(()) } #[test] -fn decode_preserves_all_columns_for_wide_table() { +fn decode_preserves_all_columns_for_wide_table() -> Result<(), TransportError> { let mut b = FlatBufferBuilder::with_capacity(8 * 1024); let col_names = [ @@ -74,25 +84,31 @@ fn decode_preserves_all_columns_for_wide_table() { b.finish(msg, None); let bytes = b.finished_data().to_vec(); - let decoded = decode_frame(&bytes).expect("decode"); + let decoded = decode_frame(&bytes)?; let r = match decoded.response { DecodedResponse::Ok(r) => r, DecodedResponse::Error(e) => panic!("err: {e}"), }; assert_eq!( - r.columns, + r.get_columns(), col_names.iter().map(|s| s.to_string()).collect::>(), "all column names should decode" ); - assert_eq!(r.rows.len(), 1); - assert_eq!(r.rows[0].len(), col_names.len(), "all cells should decode"); - for (i, cell) in r.rows[0].iter().enumerate() { + let rows = match &r.data { + ResultData::Legacy { rows, .. } => rows, + _ => panic!("expected ResultData::Legacy"), + }; + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].len(), col_names.len(), "all cells should decode"); + for (i, cell) in rows[0].iter().enumerate() { assert_eq!(cell.as_deref(), Some(format!("v{i}").as_str())); } + + Ok(()) } #[test] -fn decode_preserves_all_rows_and_long_strings() { +fn decode_preserves_all_rows_and_long_strings() -> Result<(), TransportError> { // Build a synthetic 500-row HttpResultSet with progressively longer string values. let mut b = FlatBufferBuilder::with_capacity(64 * 1024); let col_a = b.create_string("id"); @@ -147,7 +163,7 @@ fn decode_preserves_all_rows_and_long_strings() { b.finish(msg, None); let bytes = b.finished_data().to_vec(); - let decoded = decode_frame(&bytes).expect("decode"); + let decoded = decode_frame(&bytes)?; assert_eq!(decoded.message_id, 1); let result = match decoded.response { DecodedResponse::Ok(r) => r, @@ -155,11 +171,15 @@ fn decode_preserves_all_rows_and_long_strings() { }; assert_eq!( - result.columns, + result.get_columns(), vec!["id".to_string(), "payload".to_string()] ); - assert_eq!(result.rows.len(), 500, "expected all 500 rows decoded"); - for (i, row) in result.rows.iter().enumerate() { + let rows = match &result.data { + ResultData::Legacy { rows, .. } => rows, + _ => panic!("expected ResultData::Legacy"), + }; + assert_eq!(rows.len(), 500, "expected all 500 rows decoded"); + for (i, row) in rows.iter().enumerate() { assert_eq!(row.len(), 2); assert_eq!(row[0].as_deref(), Some(i.to_string().as_str())); let expected_payload_len = (i % 256) + 1; @@ -169,4 +189,96 @@ fn decode_preserves_all_rows_and_long_strings() { "payload truncated at row {i}" ); } + + Ok(()) +} + +#[test] +fn decode_arrow_ipc_result_with_nulls() -> Result<(), TransportError> { + // Build a small RecordBatch mirroring what the server would write via + // datafusion::arrow::ipc::writer::StreamWriter, wrap the IPC bytes into + // the HttpQueryResult / HttpQueryResultArrow flatbuffer, and decode. + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ])); + let ids = Int32Array::from(vec![1, 2, 3]); + let names = StringArray::from(vec![Some("alice"), None, Some("carol")]); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(names)]) + .expect("record batch"); + + let mut ipc_bytes: Vec = Vec::new(); + { + let mut w = StreamWriter::try_new(&mut ipc_bytes, schema.as_ref()).expect("ipc writer"); + w.write(&batch).expect("write batch"); + w.finish().expect("finish ipc"); + } + + let mut b = FlatBufferBuilder::with_capacity(ipc_bytes.len() + 1024); + let ipc_vec = b.create_vector(&ipc_bytes); + let arrow_payload = HttpQueryResultArrow::create( + &mut b, + &HttpQueryResultArrowArgs { + data: Some(ipc_vec), + is_last: true, + }, + ); + let qr = HttpQueryResult::create( + &mut b, + &HttpQueryResultArgs { + data_type: HttpQueryResultData::HttpQueryResultArrow, + data: Some(arrow_payload.as_union_value()), + }, + ); + let conn = b.create_string("c"); + let msg = HttpMessage::create( + &mut b, + &HttpMessageArgs { + message_id: 42, + command_type: HttpCommand::HttpQueryResult, + command: Some(qr.as_union_value()), + connection_id: Some(conn), + }, + ); + b.finish(msg, None); + let bytes = b.finished_data().to_vec(); + + let decoded = decode_frame(&bytes)?; + assert_eq!(decoded.message_id, 42); + let result = match decoded.response { + DecodedResponse::Ok(r) => r, + DecodedResponse::Error(e) => panic!("unexpected error: {e}"), + }; + + assert_eq!( + result.get_columns(), + vec!["id".to_string(), "name".to_string()] + ); + assert_eq!(result.get_format(), ResponseFormat::Arrow); + assert_eq!(result.row_count(), 3); + + let batches = match result.data { + ResultData::Arrow { batches, .. } => batches, + _ => panic!("expected ResultData::Arrow"), + }; + assert_eq!(batches.len(), 1, "single record batch expected"); + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 3); + assert_eq!(batch.num_columns(), 2); + let id_col = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("Int32 id column"); + assert_eq!(id_col.values(), &[1, 2, 3]); + let name_col = batch + .column(1) + .as_any() + .downcast_ref::() + .expect("Utf8 name column"); + assert_eq!(name_col.value(0), "alice"); + assert!(name_col.is_null(1), "row 1 name should be NULL"); + assert_eq!(name_col.value(2), "carol"); + + Ok(()) } From 346eb567808363efb39c623f25f056018ce5a981 Mon Sep 17 00:00:00 2001 From: Alex Qyoun-ae <4062971+MazterQyou@users.noreply.github.com> Date: Wed, 20 May 2026 19:13:36 +0400 Subject: [PATCH 2/5] feat(cubesql): Support parsing extra timezone formats (#10913) --- packages/cubejs-backend-native/Cargo.lock | 16 ++++----- rust/cubesql/Cargo.lock | 16 ++++----- rust/cubesql/cubesql/Cargo.toml | 2 +- rust/cubesql/cubesql/src/compile/mod.rs | 34 +++++++++++++++++++ ...t_cast_to_timestamp_with_tz_offset_hh.snap | 9 +++++ ...ast_to_timestamp_with_tz_offset_hh_mm.snap | 9 +++++ ..._to_timestamp_with_tz_offset_hh_mm_ss.snap | 9 +++++ 7 files changed, 78 insertions(+), 17 deletions(-) create mode 100644 rust/cubesql/cubesql/src/compile/snapshots/cubesql__compile__tests__test_cast_to_timestamp_with_tz_offset_hh.snap create mode 100644 rust/cubesql/cubesql/src/compile/snapshots/cubesql__compile__tests__test_cast_to_timestamp_with_tz_offset_hh_mm.snap create mode 100644 rust/cubesql/cubesql/src/compile/snapshots/cubesql__compile__tests__test_cast_to_timestamp_with_tz_offset_hh_mm_ss.snap diff --git a/packages/cubejs-backend-native/Cargo.lock b/packages/cubejs-backend-native/Cargo.lock index 516fa404fb777..1488ed863e946 100644 --- a/packages/cubejs-backend-native/Cargo.lock +++ b/packages/cubejs-backend-native/Cargo.lock @@ -107,7 +107,7 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "arrow" version = "13.0.0" -source = "git+https://github.com/cube-js/arrow-rs.git?rev=8277f9a051c90482ff9557a91c5dc3926d9ba19e#8277f9a051c90482ff9557a91c5dc3926d9ba19e" +source = "git+https://github.com/cube-js/arrow-rs.git?rev=64899e4bbf07a111a94ac2084a6b70ae2374e421#64899e4bbf07a111a94ac2084a6b70ae2374e421" dependencies = [ "bitflags 1.3.2", "chrono", @@ -702,7 +702,7 @@ dependencies = [ [[package]] name = "cube-ext" version = "1.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=fc9fa63d78671bf38f31c3319302579f7c3961de#fc9fa63d78671bf38f31c3319302579f7c3961de" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=9f49caadcd0d023b2e68ded973ff725ddc5809a5#9f49caadcd0d023b2e68ded973ff725ddc5809a5" dependencies = [ "arrow", "chrono", @@ -878,7 +878,7 @@ checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" [[package]] name = "datafusion" version = "7.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=fc9fa63d78671bf38f31c3319302579f7c3961de#fc9fa63d78671bf38f31c3319302579f7c3961de" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=9f49caadcd0d023b2e68ded973ff725ddc5809a5#9f49caadcd0d023b2e68ded973ff725ddc5809a5" dependencies = [ "ahash 0.7.8", "arrow", @@ -911,7 +911,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "7.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=fc9fa63d78671bf38f31c3319302579f7c3961de#fc9fa63d78671bf38f31c3319302579f7c3961de" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=9f49caadcd0d023b2e68ded973ff725ddc5809a5#9f49caadcd0d023b2e68ded973ff725ddc5809a5" dependencies = [ "arrow", "ordered-float 2.10.1", @@ -922,7 +922,7 @@ dependencies = [ [[package]] name = "datafusion-data-access" version = "1.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=fc9fa63d78671bf38f31c3319302579f7c3961de#fc9fa63d78671bf38f31c3319302579f7c3961de" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=9f49caadcd0d023b2e68ded973ff725ddc5809a5#9f49caadcd0d023b2e68ded973ff725ddc5809a5" dependencies = [ "async-trait", "chrono", @@ -935,7 +935,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "7.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=fc9fa63d78671bf38f31c3319302579f7c3961de#fc9fa63d78671bf38f31c3319302579f7c3961de" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=9f49caadcd0d023b2e68ded973ff725ddc5809a5#9f49caadcd0d023b2e68ded973ff725ddc5809a5" dependencies = [ "ahash 0.7.8", "arrow", @@ -946,7 +946,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "7.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=fc9fa63d78671bf38f31c3319302579f7c3961de#fc9fa63d78671bf38f31c3319302579f7c3961de" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=9f49caadcd0d023b2e68ded973ff725ddc5809a5#9f49caadcd0d023b2e68ded973ff725ddc5809a5" dependencies = [ "ahash 0.7.8", "arrow", @@ -2297,7 +2297,7 @@ dependencies = [ [[package]] name = "parquet" version = "13.0.0" -source = "git+https://github.com/cube-js/arrow-rs.git?rev=8277f9a051c90482ff9557a91c5dc3926d9ba19e#8277f9a051c90482ff9557a91c5dc3926d9ba19e" +source = "git+https://github.com/cube-js/arrow-rs.git?rev=64899e4bbf07a111a94ac2084a6b70ae2374e421#64899e4bbf07a111a94ac2084a6b70ae2374e421" dependencies = [ "arrow", "base64 0.13.1", diff --git a/rust/cubesql/Cargo.lock b/rust/cubesql/Cargo.lock index 3c9068f01d873..4d4b8c27f904f 100644 --- a/rust/cubesql/Cargo.lock +++ b/rust/cubesql/Cargo.lock @@ -112,7 +112,7 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "arrow" version = "13.0.0" -source = "git+https://github.com/cube-js/arrow-rs.git?rev=8277f9a051c90482ff9557a91c5dc3926d9ba19e#8277f9a051c90482ff9557a91c5dc3926d9ba19e" +source = "git+https://github.com/cube-js/arrow-rs.git?rev=64899e4bbf07a111a94ac2084a6b70ae2374e421#64899e4bbf07a111a94ac2084a6b70ae2374e421" dependencies = [ "bitflags 1.3.2", "chrono", @@ -707,7 +707,7 @@ dependencies = [ [[package]] name = "cube-ext" version = "1.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=fc9fa63d78671bf38f31c3319302579f7c3961de#fc9fa63d78671bf38f31c3319302579f7c3961de" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=9f49caadcd0d023b2e68ded973ff725ddc5809a5#9f49caadcd0d023b2e68ded973ff725ddc5809a5" dependencies = [ "arrow", "chrono", @@ -832,7 +832,7 @@ dependencies = [ [[package]] name = "datafusion" version = "7.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=fc9fa63d78671bf38f31c3319302579f7c3961de#fc9fa63d78671bf38f31c3319302579f7c3961de" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=9f49caadcd0d023b2e68ded973ff725ddc5809a5#9f49caadcd0d023b2e68ded973ff725ddc5809a5" dependencies = [ "ahash 0.7.8", "arrow", @@ -865,7 +865,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "7.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=fc9fa63d78671bf38f31c3319302579f7c3961de#fc9fa63d78671bf38f31c3319302579f7c3961de" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=9f49caadcd0d023b2e68ded973ff725ddc5809a5#9f49caadcd0d023b2e68ded973ff725ddc5809a5" dependencies = [ "arrow", "ordered-float 2.10.0", @@ -876,7 +876,7 @@ dependencies = [ [[package]] name = "datafusion-data-access" version = "1.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=fc9fa63d78671bf38f31c3319302579f7c3961de#fc9fa63d78671bf38f31c3319302579f7c3961de" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=9f49caadcd0d023b2e68ded973ff725ddc5809a5#9f49caadcd0d023b2e68ded973ff725ddc5809a5" dependencies = [ "async-trait", "chrono", @@ -889,7 +889,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "7.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=fc9fa63d78671bf38f31c3319302579f7c3961de#fc9fa63d78671bf38f31c3319302579f7c3961de" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=9f49caadcd0d023b2e68ded973ff725ddc5809a5#9f49caadcd0d023b2e68ded973ff725ddc5809a5" dependencies = [ "ahash 0.7.8", "arrow", @@ -900,7 +900,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "7.0.0" -source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=fc9fa63d78671bf38f31c3319302579f7c3961de#fc9fa63d78671bf38f31c3319302579f7c3961de" +source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=9f49caadcd0d023b2e68ded973ff725ddc5809a5#9f49caadcd0d023b2e68ded973ff725ddc5809a5" dependencies = [ "ahash 0.7.8", "arrow", @@ -2156,7 +2156,7 @@ dependencies = [ [[package]] name = "parquet" version = "13.0.0" -source = "git+https://github.com/cube-js/arrow-rs.git?rev=8277f9a051c90482ff9557a91c5dc3926d9ba19e#8277f9a051c90482ff9557a91c5dc3926d9ba19e" +source = "git+https://github.com/cube-js/arrow-rs.git?rev=64899e4bbf07a111a94ac2084a6b70ae2374e421#64899e4bbf07a111a94ac2084a6b70ae2374e421" dependencies = [ "arrow", "base64 0.13.0", diff --git a/rust/cubesql/cubesql/Cargo.toml b/rust/cubesql/cubesql/Cargo.toml index a1557fe6cfb44..f5788affedbdd 100644 --- a/rust/cubesql/cubesql/Cargo.toml +++ b/rust/cubesql/cubesql/Cargo.toml @@ -10,7 +10,7 @@ homepage = "https://cube.dev" [dependencies] arc-swap = "1" -datafusion = { git = 'https://github.com/cube-js/arrow-datafusion.git', rev = "fc9fa63d78671bf38f31c3319302579f7c3961de", default-features = false, features = [ +datafusion = { git = 'https://github.com/cube-js/arrow-datafusion.git', rev = "9f49caadcd0d023b2e68ded973ff725ddc5809a5", default-features = false, features = [ "regex_expressions", "unicode_expressions", ] } diff --git a/rust/cubesql/cubesql/src/compile/mod.rs b/rust/cubesql/cubesql/src/compile/mod.rs index 64f261c2f8e7b..2cd5f8ede53f7 100644 --- a/rust/cubesql/cubesql/src/compile/mod.rs +++ b/rust/cubesql/cubesql/src/compile/mod.rs @@ -10198,6 +10198,40 @@ ORDER BY "source"."str0" ASC Ok(()) } + #[tokio::test] + async fn test_cast_to_timestamp_with_tz_offset() -> Result<(), CubeError> { + init_testing_logger(); + + insta::assert_snapshot!( + "test_cast_to_timestamp_with_tz_offset_hh", + execute_query( + "SELECT CAST('2026-01-26 00:00:00+00' AS timestamp);".to_string(), + DatabaseProtocol::PostgreSQL + ) + .await? + ); + + insta::assert_snapshot!( + "test_cast_to_timestamp_with_tz_offset_hh_mm", + execute_query( + "SELECT CAST('2026-01-26 00:00:00+00:00' AS timestamp);".to_string(), + DatabaseProtocol::PostgreSQL + ) + .await? + ); + + insta::assert_snapshot!( + "test_cast_to_timestamp_with_tz_offset_hh_mm_ss", + execute_query( + "SELECT CAST('2026-01-26 00:00:00+00:00:00' AS timestamp);".to_string(), + DatabaseProtocol::PostgreSQL + ) + .await? + ); + + Ok(()) + } + #[tokio::test] async fn test_join_with_distinct() -> Result<(), CubeError> { insta::assert_snapshot!( diff --git a/rust/cubesql/cubesql/src/compile/snapshots/cubesql__compile__tests__test_cast_to_timestamp_with_tz_offset_hh.snap b/rust/cubesql/cubesql/src/compile/snapshots/cubesql__compile__tests__test_cast_to_timestamp_with_tz_offset_hh.snap new file mode 100644 index 0000000000000..36139410d28cc --- /dev/null +++ b/rust/cubesql/cubesql/src/compile/snapshots/cubesql__compile__tests__test_cast_to_timestamp_with_tz_offset_hh.snap @@ -0,0 +1,9 @@ +--- +source: cubesql/src/compile/mod.rs +expression: "execute_query(\"SELECT CAST('2026-01-26 00:00:00+00' AS timestamp);\".to_string(),\nDatabaseProtocol::PostgreSQL).await?" +--- ++---------------------------------------------------------------------+ +| CAST(Utf8("2026-01-26 00:00:00+00") AS Timestamp(Nanosecond, None)) | ++---------------------------------------------------------------------+ +| 2026-01-26T00:00:00.000 | ++---------------------------------------------------------------------+ diff --git a/rust/cubesql/cubesql/src/compile/snapshots/cubesql__compile__tests__test_cast_to_timestamp_with_tz_offset_hh_mm.snap b/rust/cubesql/cubesql/src/compile/snapshots/cubesql__compile__tests__test_cast_to_timestamp_with_tz_offset_hh_mm.snap new file mode 100644 index 0000000000000..94c532c6afc03 --- /dev/null +++ b/rust/cubesql/cubesql/src/compile/snapshots/cubesql__compile__tests__test_cast_to_timestamp_with_tz_offset_hh_mm.snap @@ -0,0 +1,9 @@ +--- +source: cubesql/src/compile/mod.rs +expression: "execute_query(\"SELECT CAST('2026-01-26 00:00:00+00:00' AS timestamp);\".to_string(),\nDatabaseProtocol::PostgreSQL).await?" +--- ++------------------------------------------------------------------------+ +| CAST(Utf8("2026-01-26 00:00:00+00:00") AS Timestamp(Nanosecond, None)) | ++------------------------------------------------------------------------+ +| 2026-01-26T00:00:00.000 | ++------------------------------------------------------------------------+ diff --git a/rust/cubesql/cubesql/src/compile/snapshots/cubesql__compile__tests__test_cast_to_timestamp_with_tz_offset_hh_mm_ss.snap b/rust/cubesql/cubesql/src/compile/snapshots/cubesql__compile__tests__test_cast_to_timestamp_with_tz_offset_hh_mm_ss.snap new file mode 100644 index 0000000000000..97622dfedbcb5 --- /dev/null +++ b/rust/cubesql/cubesql/src/compile/snapshots/cubesql__compile__tests__test_cast_to_timestamp_with_tz_offset_hh_mm_ss.snap @@ -0,0 +1,9 @@ +--- +source: cubesql/src/compile/mod.rs +expression: "execute_query(\"SELECT CAST('2026-01-26 00:00:00+00:00:00' AS timestamp);\".to_string(),\nDatabaseProtocol::PostgreSQL).await?" +--- ++---------------------------------------------------------------------------+ +| CAST(Utf8("2026-01-26 00:00:00+00:00:00") AS Timestamp(Nanosecond, None)) | ++---------------------------------------------------------------------------+ +| 2026-01-26T00:00:00.000 | ++---------------------------------------------------------------------------+ From 1ac59dda995de7aa6731b5f55d185c5bb786b6e7 Mon Sep 17 00:00:00 2001 From: "mintlify[bot]" <109931778+mintlify[bot]@users.noreply.github.com> Date: Wed, 20 May 2026 10:21:57 -0700 Subject: [PATCH 3/5] docs: warn that ksqlDB stream and Kafka topic names must match (case-sensitive) (#10889) * docs: emphasize ksqlDB stream and Kafka topic name must match (case-sensitive) * fix: replace unsupported NOW()/date_trunc with UNIX_TIMESTAMP() in ksqlDB build ranges * fix: use CURRENT_TIMESTAMP with INTERVAL in ksqlDB build ranges --------- Co-authored-by: mintlify[bot] <109931778+mintlify[bot]@users.noreply.github.com> --- .../connect-to-data/data-sources/ksqldb.mdx | 36 ++++++++++++++----- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/docs-mintlify/admin/connect-to-data/data-sources/ksqldb.mdx b/docs-mintlify/admin/connect-to-data/data-sources/ksqldb.mdx index cb482f0f2429f..8bb6ee4095ff1 100644 --- a/docs-mintlify/admin/connect-to-data/data-sources/ksqldb.mdx +++ b/docs-mintlify/admin/connect-to-data/data-sources/ksqldb.mdx @@ -338,10 +338,11 @@ cubes: time_dimension: CUBE.created_at granularity: second partition_granularity: day + # ksqlDB does not support NOW(); use CURRENT_TIMESTAMP with INTERVAL arithmetic instead. build_range_start: - sql: "SELECT date_trunc('day', DATE_SUB(NOW(), INTERVAL '5 hour'))" + sql: "SELECT date_trunc('day', CURRENT_TIMESTAMP - INTERVAL '5 hour')" build_range_end: - sql: "SELECT DATE_ADD(NOW(), INTERVAL '15 minute')" + sql: "SELECT CURRENT_TIMESTAMP + INTERVAL '15 minute'" refresh_key: every: 1 minute update_window: 1 hour @@ -521,11 +522,12 @@ cube("order_events_stream", { time_dimension: CUBE.created_at, granularity: `second`, partition_granularity: `day`, + // ksqlDB does not support NOW(); use CURRENT_TIMESTAMP with INTERVAL arithmetic instead. build_range_start: { - sql: `SELECT date_trunc('day', DATE_SUB(NOW(), INTERVAL '5 hour'))`, + sql: `SELECT date_trunc('day', CURRENT_TIMESTAMP - INTERVAL '5 hour')`, }, build_range_end: { - sql: `SELECT DATE_ADD(NOW(), INTERVAL '15 minute')`, + sql: `SELECT CURRENT_TIMESTAMP + INTERVAL '15 minute'`, }, refresh_key: { every: `1 minute`, @@ -615,6 +617,22 @@ with: +**The ksqlDB stream (or table) name and the backing Kafka topic name +MUST be identical, including case.** Kafka streams mode will fail if +they differ in any way. + +Take this into account when creating the stream — explicitly set the +topic name to match the stream name (and vice versa). For example: + +```sql +CREATE STREAM ORDER_EVENTS_STREAM (...) + WITH (KAFKA_TOPIC='ORDER_EVENTS_STREAM', VALUE_FORMAT='JSON', ...); +``` + +The match is **case-sensitive**: `OrderEvents` and `ORDEREVENTS` are +treated as different names and will cause the build to fail with +`Topic table ... is not found`. + This is a known limitation of Kafka streams mode. It does not occur when the ksqlDB object name and the Kafka topic name are the same, which is the default behavior when ksqlDB creates a stream or table @@ -707,10 +725,11 @@ pre_aggregations: time_dimension: CUBE.created_at granularity: second partition_granularity: day + # ksqlDB does not support NOW(); use CURRENT_TIMESTAMP with INTERVAL arithmetic instead. build_range_start: - sql: "SELECT date_trunc('day', DATE_SUB(NOW(), INTERVAL '5 hour'))" + sql: "SELECT date_trunc('day', CURRENT_TIMESTAMP - INTERVAL '5 hour')" build_range_end: - sql: "SELECT DATE_ADD(NOW(), INTERVAL '15 minute')" + sql: "SELECT CURRENT_TIMESTAMP + INTERVAL '15 minute'" refresh_key: every: 1 minute update_window: 1 hour @@ -744,11 +763,12 @@ pre_aggregations: { time_dimension: CUBE.created_at, granularity: `second`, partition_granularity: `day`, + // ksqlDB does not support NOW(); use CURRENT_TIMESTAMP with INTERVAL arithmetic instead. build_range_start: { - sql: `SELECT date_trunc('day', DATE_SUB(NOW(), INTERVAL '5 hour'))`, + sql: `SELECT date_trunc('day', CURRENT_TIMESTAMP - INTERVAL '5 hour')`, }, build_range_end: { - sql: `SELECT DATE_ADD(NOW(), INTERVAL '15 minute')`, + sql: `SELECT CURRENT_TIMESTAMP + INTERVAL '15 minute'`, }, refresh_key: { every: `1 minute`, From 8b5aba5bb07590572f53836b5b3795d6d850b4ec Mon Sep 17 00:00:00 2001 From: "mintlify[bot]" <109931778+mintlify[bot]@users.noreply.github.com> Date: Wed, 20 May 2026 11:27:51 -0700 Subject: [PATCH 4/5] docs: note AI traffic flows over private API connectivity on AWS (#10920) * docs: note AI traffic flows over private API connectivity on AWS * docs: shorten AI traffic mention in private API connectivity intro --------- Co-authored-by: mintlify[bot] <109931778+mintlify[bot]@users.noreply.github.com> --- .../dedicated/aws/private-api-connectivity.mdx | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/docs-mintlify/admin/deployment/dedicated/aws/private-api-connectivity.mdx b/docs-mintlify/admin/deployment/dedicated/aws/private-api-connectivity.mdx index 2aa3e567cce27..d710f222909b8 100644 --- a/docs-mintlify/admin/deployment/dedicated/aws/private-api-connectivity.mdx +++ b/docs-mintlify/admin/deployment/dedicated/aws/private-api-connectivity.mdx @@ -18,9 +18,9 @@ auth providers, BI APIs targeted by SLS, and other upstream services), see With Dedicated Infrastructure and Bring Your Own Cloud on AWS, Cube supports establishing **AWS PrivateLink** connections from your AWS accounts to the Cube API endpoints. This lets your applications, internal BI tools, and end-user -browsers reach the Cube HTTP and SQL APIs entirely over private AWS networking -— never touching the public internet. When private connectivity is in place, -the public API endpoints can be disabled completely on request. +browsers reach the Cube HTTP, SQL, and AI APIs entirely over private AWS +networking — never touching the public internet. When private connectivity is +in place, the public API endpoints can be disabled completely on request. @@ -41,10 +41,12 @@ Cube runs as two cooperating planes: private connectivity — it is a SaaS UI like any other. - The **data plane** runs your Cube deployments and serves all Cube HTTP and SQL **data API** traffic — REST/GraphQL queries from your applications, - live data calls issued by the Cube UI while rendering charts, and SQL - connections from BI tools. The data plane is the part that talks to your - databases and returns query results, and the part this document teaches you - how to expose privately. + live data calls issued by the Cube UI while rendering charts, SQL + connections from BI tools, and AI traffic to the [Chat API][chat-api] and + other AI endpoints (including data-plane-hosted AI Engineer agents and + external agentic clients [calling the Chat API as a tool][agent-to-agent]). + The data plane is the part that talks to your databases and returns query + results, and the part this document teaches you how to expose privately. [AWS PrivateLink][aws-privatelink] lets a service running in one VPC be consumed from another VPC over the AWS internal network, without VPC peering @@ -330,6 +332,8 @@ the VPC's CIDR. [cube-region]: /admin/deployment/infrastructure#understanding-cube-cloud-region [aws-private-link]: /admin/deployment/dedicated/aws/private-link [aws-vpc-peering]: /admin/deployment/dedicated/aws/vpc-peering +[chat-api]: /reference/embed-apis/chat-api +[agent-to-agent]: /recipes/ai/agent-to-agent [aws-privatelink]: https://docs.aws.amazon.com/vpc/latest/privatelink/what-is-privatelink.html [aws-endpoint-service]: https://docs.aws.amazon.com/vpc/latest/privatelink/configure-endpoint-service.html [aws-interface-endpoint]: https://docs.aws.amazon.com/vpc/latest/privatelink/create-interface-endpoint.html From 5cac268f23c21a3dc85cf0a4b919a80e358ca15c Mon Sep 17 00:00:00 2001 From: "mintlify[bot]" <109931778+mintlify[bot]@users.noreply.github.com> Date: Wed, 20 May 2026 21:20:41 +0200 Subject: [PATCH 5/5] docs: document custom headers for Trino and Presto drivers (#10905) Co-authored-by: mintlify[bot] <109931778+mintlify[bot]@users.noreply.github.com> --- .../connect-to-data/data-sources/presto.mdx | 76 ++++++++++++++++++ .../connect-to-data/data-sources/trino.mdx | 80 +++++++++++++++++++ 2 files changed, 156 insertions(+) diff --git a/docs-mintlify/admin/connect-to-data/data-sources/presto.mdx b/docs-mintlify/admin/connect-to-data/data-sources/presto.mdx index 3297343236e04..fefc3ffb6e8a3 100644 --- a/docs-mintlify/admin/connect-to-data/data-sources/presto.mdx +++ b/docs-mintlify/admin/connect-to-data/data-sources/presto.mdx @@ -121,6 +121,82 @@ To enable SSL-encrypted connections between Cube and Presto, set the configure custom certificates, please check out [Enable SSL Connections to the Database][ref-recipe-enable-ssl]. +## Custom headers + +The Presto driver supports forwarding custom HTTP headers on every request to +the Presto coordinator. This is useful for setting headers like +`X-Presto-Source`, `X-Presto-Client-Tags`, or any other custom header your +Presto coordinator expects. + +Custom headers can't be configured via environment variables. Instead, use the +[`driver_factory`](/reference/configuration/config#driver_factory) configuration +option to pass a `headers` object to the driver: + + + +```python title="Python" +from cube import config + +@config('driver_factory') +def driver_factory(ctx: dict) -> dict: + return { + 'type': 'prestodb', + 'headers': { + 'X-Presto-Source': 'cube', + 'X-Presto-Client-Tags': 'user=alice@example.com' + } + } +``` + +```javascript title="JavaScript" +module.exports = { + driverFactory: ({ dataSource }) => ({ + type: "prestodb", + headers: { + "X-Presto-Source": "cube", + "X-Presto-Client-Tags": "user=alice@example.com" + } + }) +}; +``` + + + +In multitenant deployments, you can use the [security +context](/embedding/authentication/security-context) to pass per-tenant headers, +for example to forward a user token from the API request down to Presto: + + + +```python title="Python" +from cube import config + +@config('driver_factory') +def driver_factory(ctx: dict) -> dict: + security_context = ctx['securityContext'] + return { + 'type': 'prestodb', + 'headers': { + 'X-Presto-Client-Tags': f"user={security_context['user_id']}", + 'X-Custom-User-Token': security_context['token'] + } + } +``` + +```javascript title="JavaScript" +module.exports = { + driverFactory: ({ securityContext }) => ({ + type: "prestodb", + headers: { + "X-Presto-Client-Tags": `user=${securityContext.user_id}`, + "X-Custom-User-Token": securityContext.token + } + }) +}; +``` + + + [aws-s3]: https://aws.amazon.com/s3/ [google-cloud-storage]: https://cloud.google.com/storage [presto]: https://prestodb.io/ diff --git a/docs-mintlify/admin/connect-to-data/data-sources/trino.mdx b/docs-mintlify/admin/connect-to-data/data-sources/trino.mdx index 3c5cb4c481039..ad64e666b671a 100644 --- a/docs-mintlify/admin/connect-to-data/data-sources/trino.mdx +++ b/docs-mintlify/admin/connect-to-data/data-sources/trino.mdx @@ -121,6 +121,86 @@ To enable SSL-encrypted connections between Cube and Trino, set the configure custom certificates, please check out [Enable SSL Connections to the Database][ref-recipe-enable-ssl]. +## Custom headers + +The Trino driver supports forwarding custom HTTP headers (e.g., `X-Trino-Source`, +`X-Trino-Routing-Group`, `X-Trino-Client-Tags`, or any other custom header) on +every request to the Trino coordinator. See the [Trino client +protocol][trino-docs-client-protocol] for the list of headers accepted by Trino. + +Custom headers can't be configured via environment variables. Instead, use the +[`driver_factory`](/reference/configuration/config#driver_factory) configuration +option to pass a `headers` object to the driver: + + + +```python title="Python" +from cube import config + +@config('driver_factory') +def driver_factory(ctx: dict) -> dict: + return { + 'type': 'trino', + 'headers': { + 'X-Trino-Source': 'cube', + 'X-Trino-Routing-Group': 'etl', + 'X-Trino-Client-Tags': 'user=alice@example.com' + } + } +``` + +```javascript title="JavaScript" +module.exports = { + driverFactory: ({ dataSource }) => ({ + type: "trino", + headers: { + "X-Trino-Source": "cube", + "X-Trino-Routing-Group": "etl", + "X-Trino-Client-Tags": "user=alice@example.com" + } + }) +}; +``` + + + +In multitenant deployments, you can use the [security +context](/embedding/authentication/security-context) to pass per-tenant headers, for example +to forward a user token from the API request down to Trino: + + + +```python title="Python" +from cube import config + +@config('driver_factory') +def driver_factory(ctx: dict) -> dict: + security_context = ctx['securityContext'] + return { + 'type': 'trino', + 'headers': { + 'X-Trino-Client-Tags': f"user={security_context['user_id']}", + 'X-Custom-User-Token': security_context['token'] + } + } +``` + +```javascript title="JavaScript" +module.exports = { + driverFactory: ({ securityContext }) => ({ + type: "trino", + headers: { + "X-Trino-Client-Tags": `user=${securityContext.user_id}`, + "X-Custom-User-Token": securityContext.token + } + }) +}; +``` + + + +[trino-docs-client-protocol]: https://trino.io/docs/current/develop/client-protocol.html + [aws-s3]: https://aws.amazon.com/s3/ [google-cloud-storage]: https://cloud.google.com/storage [ref-caching-using-preaggs-build-strats]: /docs/pre-aggregations/using-pre-aggregations#pre-aggregation-build-strategies