diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index b55ace70153..aa265a30bd8 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -9067,6 +9067,7 @@ dependencies = [ "serde", "serde_json", "serde_qs 0.15.0", + "serde_urlencoded", "serde_with", "tempfile", "thiserror 2.0.18", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index e844fc086ac..05af80de868 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -234,6 +234,7 @@ serde = { version = "1.0.228", features = ["derive", "rc"] } serde_json = "1.0" serde_json_borrow = "0.9" serde_qs = { version = "0.15" } +serde_urlencoded = "0.7" serde_with = "3.16" serde_yaml = "0.9" serial_test = { version = "3.2", features = ["file_locks"] } diff --git a/quickwit/quickwit-serve/Cargo.toml b/quickwit/quickwit-serve/Cargo.toml index 79c4307d682..883efda889e 100644 --- a/quickwit/quickwit-serve/Cargo.toml +++ b/quickwit/quickwit-serve/Cargo.toml @@ -89,6 +89,7 @@ itertools = { workspace = true } mockall = { workspace = true } opentelemetry = { workspace = true } opentelemetry_sdk = { workspace = true } +serde_urlencoded = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs b/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs index 5f8ddd69f9f..79b990d7330 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs @@ -20,7 +20,7 @@ use warp::{Filter, Rejection}; use super::model::{ CatIndexQueryParams, DeleteQueryParams, FieldCapabilityQueryParams, FieldCapabilityRequestBody, - MultiSearchQueryParams, SearchQueryParamsCount, + IndexMappingQueryParams, MultiSearchQueryParams, SearchQueryParamsCount, }; use crate::Body; use crate::decompression::get_body_bytes; @@ -285,9 +285,10 @@ pub(crate) fn elastic_aliases_filter() -> impl Filter impl Filter + Clone { +-> impl Filter + Clone { warp::path!("_elastic" / String / "_mapping") .or(warp::path!("_elastic" / String / "_mappings")) .unify() .and(warp::get()) + .and(warp::query()) } diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/model/index_mapping_query_params.rs b/quickwit/quickwit-serve/src/elasticsearch_api/model/index_mapping_query_params.rs new file mode 100644 index 00000000000..29b625849b7 --- /dev/null +++ b/quickwit/quickwit-serve/src/elasticsearch_api/model/index_mapping_query_params.rs @@ -0,0 +1,126 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::{Deserialize, Serialize}; + +/// Query parameters accepted by `GET /_elastic/{index}/_mapping` and +/// `/_mappings`. +/// +/// All fields are optional and absent by default. Unknown query +/// parameters are silently ignored to stay compatible with standard +/// Elasticsearch clients that pass extras like `pretty`, +/// `ignore_unavailable`, or `allow_no_indices`. +/// +/// `start_timestamp` and `end_timestamp` are forwarded into +/// [`quickwit_proto::search::ListFieldsRequest`] verbatim, where they prune +/// the set of splits considered for field discovery. Unit is **epoch +/// seconds**, interval is half-open `[start_timestamp, end_timestamp)` — +/// matching the `ListFieldsRequest` proto contract exactly. +/// +/// `fields` is an optional comma-separated list of column names. When +/// every requested name is a flat literal declared in some index's +/// `doc_mapping`, the handler filters the mapping response to those +/// columns and skips the per-split `list_fields` lookup entirely. Any +/// other shape (wildcards, dotted paths, or names not in `doc_mapping`) +/// falls back to listing all fields from the splits in the time range +/// and returning the full mapping — `fields=` then acts as a request +/// hint, not a strict filter. An empty value (`fields=`) is treated the +/// same as the parameter being absent. +#[serde_with::skip_serializing_none] +#[derive(Default, Debug, Clone, Serialize, Deserialize)] +pub struct IndexMappingQueryParams { + #[serde(default)] + pub start_timestamp: Option, + #[serde(default)] + pub end_timestamp: Option, + #[serde(default)] + pub fields: Option, +} + +#[cfg(test)] +mod tests { + use super::IndexMappingQueryParams; + + #[test] + fn empty_query_string_yields_none() { + let params: IndexMappingQueryParams = serde_urlencoded::from_str("").unwrap(); + assert!(params.start_timestamp.is_none()); + assert!(params.end_timestamp.is_none()); + assert!(params.fields.is_none()); + } + + #[test] + fn both_params_present_yield_some() { + let qs = "start_timestamp=1712160204&end_timestamp=1712764984"; + let params: IndexMappingQueryParams = serde_urlencoded::from_str(qs).unwrap(); + assert_eq!(params.start_timestamp, Some(1712160204)); + assert_eq!(params.end_timestamp, Some(1712764984)); + assert!(params.fields.is_none()); + } + + #[test] + fn only_start_timestamp_present() { + let qs = "start_timestamp=1712160204"; + let params: IndexMappingQueryParams = serde_urlencoded::from_str(qs).unwrap(); + assert_eq!(params.start_timestamp, Some(1712160204)); + assert!(params.end_timestamp.is_none()); + } + + #[test] + fn only_end_timestamp_present() { + let qs = "end_timestamp=1712764984"; + let params: IndexMappingQueryParams = serde_urlencoded::from_str(qs).unwrap(); + assert!(params.start_timestamp.is_none()); + assert_eq!(params.end_timestamp, Some(1712764984)); + } + + #[test] + fn unknown_field_is_ignored() { + // ES clients commonly pass `pretty`, `ignore_unavailable`, + // `allow_no_indices`, etc. on `_mapping`. We ignore them rather + // than reject the request, matching the pre-column-hints route + // which did not parse the query string at all. + let qs = "start_timestamp=1&pretty=true&ignore_unavailable=true"; + let params: IndexMappingQueryParams = serde_urlencoded::from_str(qs).unwrap(); + assert_eq!(params.start_timestamp, Some(1)); + assert!(params.end_timestamp.is_none()); + assert!(params.fields.is_none()); + } + + #[test] + fn fields_param_present() { + let qs = "fields=host,message,status"; + let params: IndexMappingQueryParams = serde_urlencoded::from_str(qs).unwrap(); + assert_eq!(params.fields.as_deref(), Some("host,message,status")); + } + + #[test] + fn fields_combined_with_timestamps() { + let qs = "start_timestamp=1&end_timestamp=2&fields=host"; + let params: IndexMappingQueryParams = serde_urlencoded::from_str(qs).unwrap(); + assert_eq!(params.start_timestamp, Some(1)); + assert_eq!(params.end_timestamp, Some(2)); + assert_eq!(params.fields.as_deref(), Some("host")); + } + + #[test] + fn empty_fields_value() { + let qs = "fields="; + let params: IndexMappingQueryParams = serde_urlencoded::from_str(qs).unwrap(); + // Empty string is parsed as Some(""); the handler treats it as + // "absent" (legacy full-mapping behavior). See + // `parse_field_hints` in `rest_handler.rs`. + assert_eq!(params.fields.as_deref(), Some("")); + } +} diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/model/mappings.rs b/quickwit/quickwit-serve/src/elasticsearch_api/model/mappings.rs index c639ed3daa9..f5dd6fe5b87 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/model/mappings.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/model/mappings.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use quickwit_doc_mapper::{FieldMappingEntry, FieldMappingType}; use quickwit_metastore::IndexMetadata; @@ -62,15 +62,43 @@ enum FieldMapping { } impl ElasticsearchMappingsResponse { + /// Builds the full mapping response for every requested index. Includes + /// every declared field plus any dynamic fields the search service + /// returned. This is the legacy code path used when the caller did not + /// opt into column hints. pub fn from_doc_mapping( indexes_metadata: Vec, list_fields_response: Option<&ListFieldsResponse>, ) -> Self { + Self::from_doc_mapping_filtered(indexes_metadata, list_fields_response, &[]) + } + + /// Builds the mapping response, optionally filtered to a set of + /// caller-requested column names. Matching is by exact equality on the + /// declared top-level field name. + /// + /// The caller (the column-hints fast path in `rest_handler.rs`) is + /// responsible for ensuring `requested_fields` only contains flat + /// literal names that exist in some index's `doc_mapping` — wildcards + /// and dotted paths are rejected upstream because the static + /// `doc_mapping` never declares them. + /// + /// An empty `requested_fields` slice means "no filter" and behaves + /// identically to [`Self::from_doc_mapping`]. + pub fn from_doc_mapping_filtered( + indexes_metadata: Vec, + list_fields_response: Option<&ListFieldsResponse>, + requested_fields: &[String], + ) -> Self { + let filter: HashSet<&str> = requested_fields.iter().map(String::as_str).collect(); let indices = indexes_metadata .into_iter() .map(|index_metadata| { let field_mappings = &index_metadata.index_config.doc_mapping.field_mappings; let mut properties = build_properties(field_mappings); + if !filter.is_empty() { + properties.retain(|name, _| filter.contains(name.as_str())); + } if let Some(list_fields) = list_fields_response { merge_dynamic_fields(&mut properties, list_fields); } @@ -126,7 +154,9 @@ fn field_mapping_from_entry(entry: &FieldMappingEntry) -> Option { /// Merges dynamic fields from a `ListFieldsResponse` into the properties map. /// /// Fields already present in the map (from explicit doc mappings) are skipped, -/// as are internal fields (prefixed with `_`). +/// as are internal fields (prefixed with `_`). When the same `field_name` +/// appears multiple times in the response (e.g. observed in two splits with +/// different types) the first mappable observation wins. fn merge_dynamic_fields( properties: &mut HashMap, list_fields_response: &ListFieldsResponse, @@ -272,10 +302,10 @@ mod tests { assert_eq!(meta["properties"]["source"]["type"], "keyword"); } + use quickwit_proto::search::ListFieldsEntryResponse; + #[test] fn test_merge_dynamic_fields_skips_existing_and_internal() { - use quickwit_proto::search::ListFieldsEntryResponse; - let mut properties = HashMap::new(); properties.insert("title".to_string(), FieldMapping::Leaf { typ: "text" }); @@ -306,4 +336,89 @@ mod tests { assert!(properties.contains_key("dynamic_field")); assert!(!properties.contains_key("_timestamp")); } + + fn make_index_metadata(field_mappings: serde_json::Value) -> IndexMetadata { + let entries: Vec = serde_json::from_value(field_mappings).unwrap(); + let mut metadata = IndexMetadata::for_test("test", "ram:///indexes/test"); + metadata.index_config.doc_mapping.field_mappings = entries; + metadata + } + + fn properties_of(response: &ElasticsearchMappingsResponse) -> &HashMap { + &response.indices["test"].mappings.properties + } + + #[test] + fn from_doc_mapping_filtered_keeps_only_requested() { + let index_metadata = make_index_metadata(serde_json::json!([ + { "name": "host", "type": "text" }, + { "name": "message", "type": "text" }, + { "name": "status", "type": "i64" }, + { "name": "service", "type": "text" }, + { "name": "trace_id", "type": "text" }, + ])); + let requested = vec!["host".to_string(), "message".to_string()]; + let response = ElasticsearchMappingsResponse::from_doc_mapping_filtered( + vec![index_metadata], + None, + &requested, + ); + let props = properties_of(&response); + assert_eq!(props.len(), 2); + assert!(props.contains_key("host")); + assert!(props.contains_key("message")); + assert!(!props.contains_key("status")); + assert!(!props.contains_key("service")); + assert!(!props.contains_key("trace_id")); + } + + #[test] + fn from_doc_mapping_filtered_includes_object_subtree_for_top_level_match() { + let index_metadata = make_index_metadata(serde_json::json!([ + { + "name": "host", + "type": "object", + "field_mappings": [ + { "name": "region", "type": "text" }, + { "name": "name", "type": "text" } + ] + }, + { "name": "message", "type": "text" } + ])); + let requested = vec!["host".to_string()]; + let response = ElasticsearchMappingsResponse::from_doc_mapping_filtered( + vec![index_metadata], + None, + &requested, + ); + let serialized = serde_json::to_value(&response).unwrap(); + // The `host` object subtree is preserved verbatim — both nested fields stay. + let host_props = &serialized["test"]["mappings"]["properties"]["host"]["properties"]; + assert_eq!(host_props["region"]["type"], "keyword"); + assert_eq!(host_props["name"]["type"], "keyword"); + // `message` is filtered out. + assert!( + serialized["test"]["mappings"]["properties"] + .get("message") + .is_none() + ); + } + + #[test] + fn from_doc_mapping_filtered_empty_request_returns_all() { + let index_metadata = make_index_metadata(serde_json::json!([ + { "name": "host", "type": "text" }, + { "name": "message", "type": "text" } + ])); + let response_all = + ElasticsearchMappingsResponse::from_doc_mapping(vec![index_metadata.clone()], None); + let response_filtered = ElasticsearchMappingsResponse::from_doc_mapping_filtered( + vec![index_metadata], + None, + &[], + ); + let serialized_all = serde_json::to_value(&response_all).unwrap(); + let serialized_filtered = serde_json::to_value(&response_filtered).unwrap(); + assert_eq!(serialized_all, serialized_filtered); + } } diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/model/mod.rs b/quickwit/quickwit-serve/src/elasticsearch_api/model/mod.rs index 4351a26b65b..f64af3e5413 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/model/mod.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/model/mod.rs @@ -17,6 +17,7 @@ mod bulk_query_params; mod cat_indices; mod error; mod field_capability; +mod index_mapping_query_params; mod mappings; mod multi_search; mod scroll; @@ -36,6 +37,7 @@ pub use field_capability::{ FieldCapabilityQueryParams, FieldCapabilityRequestBody, FieldCapabilityResponse, build_list_field_request_for_es_api, convert_to_es_field_capabilities_response, }; +pub use index_mapping_query_params::IndexMappingQueryParams; pub(crate) use mappings::ElasticsearchMappingsResponse; pub use multi_search::{ MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse, MultiSearchSingleResponse, diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs index 6242d3b85ca..47e8ba02afa 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::str::from_utf8; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -58,9 +58,9 @@ use super::model::{ CatIndexQueryParams, DeleteQueryParams, ElasticsearchCatIndexResponse, ElasticsearchError, ElasticsearchResolveIndexEntryResponse, ElasticsearchResolveIndexResponse, ElasticsearchResponse, ElasticsearchStatsResponse, FieldCapabilityQueryParams, - FieldCapabilityRequestBody, FieldCapabilityResponse, MultiSearchHeader, MultiSearchQueryParams, - MultiSearchResponse, MultiSearchSingleResponse, ScrollQueryParams, SearchBody, - SearchQueryParams, SearchQueryParamsCount, StatsResponseEntry, + FieldCapabilityRequestBody, FieldCapabilityResponse, IndexMappingQueryParams, + MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse, MultiSearchSingleResponse, + ScrollQueryParams, SearchBody, SearchQueryParams, SearchQueryParamsCount, StatsResponseEntry, build_list_field_request_for_es_api, convert_to_es_field_capabilities_response, }; use super::{TrackTotalHits, make_elastic_api_response}; @@ -199,8 +199,49 @@ async fn get_index_metadata( Ok(index_metadata) } +/// Parses the `fields` URL query parameter. Returns `None` when the +/// parameter is absent, empty, or contains only whitespace and commas. +/// Otherwise returns the trimmed, non-empty tokens. +fn parse_field_hints(params: &IndexMappingQueryParams) -> Option> { + let raw = params.fields.as_deref()?; + let tokens: Vec = raw + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + if tokens.is_empty() { + None + } else { + Some(tokens) + } +} + +/// Collects the set of declared top-level field names across every index +/// in `indexes_metadata`. Used by the column-hints handler to decide +/// whether a `?fields=…` request can be served directly from `doc_mapping` +/// without going to the splits. +fn collect_declared_top_level_names(indexes_metadata: &[IndexMetadata]) -> HashSet { + let mut names = HashSet::new(); + for metadata in indexes_metadata { + for entry in &metadata.index_config.doc_mapping.field_mappings { + names.insert(entry.name.clone()); + } + } + names +} + +/// `_mapping?fields=…` column-hints fast path. +/// +/// When every requested name is a flat literal (no `*`, no `?`, no `.`) +/// declared in the union of the indexes' `doc_mapping`, we can serve the +/// response purely from the declared mapping — no `list_fields` call, +/// no split I/O. Anything else (wildcards, dotted paths, names that +/// aren't declared) falls through to the full-mapping path, which lists +/// all fields from the splits in the time range and returns the +/// unfiltered mapping. pub(crate) async fn es_compat_index_mapping( index_id: String, + params: IndexMappingQueryParams, mut metastore: MetastoreServiceClient, search_service: Arc, ) -> Result { @@ -214,22 +255,46 @@ pub(crate) async fn es_compat_index_mapping( .iter() .map(|m| m.index_id().to_string()) .collect(); + + // Take the fast path when `?fields=…` is present and every requested + // name is a flat declared field on some index. In that case the + // declared mapping is authoritative and we skip `list_fields`. + if let Some(requested_fields) = parse_field_hints(¶ms) { + let declared_top: HashSet = collect_declared_top_level_names(&indexes_metadata); + let all_declared = requested_fields.iter().all(|requested| { + !requested.contains('*') + && !requested.contains('?') + && !requested.contains('.') + && declared_top.contains(requested.as_str()) + }); + if all_declared { + return Ok(ElasticsearchMappingsResponse::from_doc_mapping_filtered( + indexes_metadata, + None, + &requested_fields, + )); + } + } + + // Full-mapping fallback (no `?fields=`, or some requested name needs + // split-side discovery): list all fields from the splits in the time + // range and return the full mapping. Same shape as the pre-PR route, + // just with the timestamp-pushdown optimization applied. let list_fields_request = quickwit_proto::search::ListFieldsRequest { index_id_patterns, fields: Vec::new(), - start_timestamp: None, - end_timestamp: None, + start_timestamp: params.start_timestamp, + end_timestamp: params.end_timestamp, query_ast: None, }; let list_fields_response = search_service .root_list_fields(list_fields_request) .await .ok(); - let response = ElasticsearchMappingsResponse::from_doc_mapping( + Ok(ElasticsearchMappingsResponse::from_doc_mapping( indexes_metadata, list_fields_response.as_ref(), - ); - Ok(response) + )) } /// GET or POST _elastic/_search