Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ serde = { version = "1.0.228", features = ["derive", "rc"] }
serde_json = "1.0"
serde_json_borrow = "0.9"
serde_qs = { version = "0.15" }
serde_urlencoded = "0.7"
serde_with = "3.16"
serde_yaml = "0.9"
serial_test = { version = "3.2", features = ["file_locks"] }
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-serve/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ itertools = { workspace = true }
mockall = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
serde_urlencoded = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-serve/src/elasticsearch_api/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use warp::{Filter, Rejection};

use super::model::{
CatIndexQueryParams, DeleteQueryParams, FieldCapabilityQueryParams, FieldCapabilityRequestBody,
MultiSearchQueryParams, SearchQueryParamsCount,
IndexMappingQueryParams, MultiSearchQueryParams, SearchQueryParamsCount,
};
use crate::Body;
use crate::decompression::get_body_bytes;
Expand Down Expand Up @@ -285,9 +285,10 @@ pub(crate) fn elastic_aliases_filter() -> impl Filter<Extract = (), Error = Reje
}

pub(crate) fn elastic_index_mapping_filter()
-> impl Filter<Extract = (String,), Error = Rejection> + Clone {
-> impl Filter<Extract = (String, IndexMappingQueryParams), Error = Rejection> + Clone {
warp::path!("_elastic" / String / "_mapping")
.or(warp::path!("_elastic" / String / "_mappings"))
.unify()
.and(warp::get())
.and(warp::query())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright 2021-Present Datadog, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use serde::{Deserialize, Serialize};

/// Query parameters accepted by `GET /_elastic/{index}/_mapping` and
/// `/_mappings`.
///
/// All fields are optional and absent by default. Unknown query
/// parameters are silently ignored to stay compatible with standard
/// Elasticsearch clients that pass extras like `pretty`,
/// `ignore_unavailable`, or `allow_no_indices`.
///
/// `start_timestamp` and `end_timestamp` are forwarded into
/// [`quickwit_proto::search::ListFieldsRequest`] verbatim, where they prune
/// the set of splits considered for field discovery. Unit is **epoch
/// seconds**, interval is half-open `[start_timestamp, end_timestamp)` —
/// matching the `ListFieldsRequest` proto contract exactly.
///
/// `fields` is an optional comma-separated list of column names. When
/// every requested name is a flat literal declared in some index's
/// `doc_mapping`, the handler filters the mapping response to those
/// columns and skips the per-split `list_fields` lookup entirely. Any
/// other shape (wildcards, dotted paths, or names not in `doc_mapping`)
/// falls back to listing all fields from the splits in the time range
/// and returning the full mapping — `fields=` then acts as a request
/// hint, not a strict filter. An empty value (`fields=`) is treated the
/// same as the parameter being absent.
#[serde_with::skip_serializing_none]
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub struct IndexMappingQueryParams {
#[serde(default)]
pub start_timestamp: Option<i64>,
#[serde(default)]
pub end_timestamp: Option<i64>,
#[serde(default)]
pub fields: Option<String>,
}

#[cfg(test)]
mod tests {
use super::IndexMappingQueryParams;

#[test]
fn empty_query_string_yields_none() {
let params: IndexMappingQueryParams = serde_urlencoded::from_str("").unwrap();
assert!(params.start_timestamp.is_none());
assert!(params.end_timestamp.is_none());
assert!(params.fields.is_none());
}

#[test]
fn both_params_present_yield_some() {
let qs = "start_timestamp=1712160204&end_timestamp=1712764984";
let params: IndexMappingQueryParams = serde_urlencoded::from_str(qs).unwrap();
assert_eq!(params.start_timestamp, Some(1712160204));
assert_eq!(params.end_timestamp, Some(1712764984));
assert!(params.fields.is_none());
}

#[test]
fn only_start_timestamp_present() {
let qs = "start_timestamp=1712160204";
let params: IndexMappingQueryParams = serde_urlencoded::from_str(qs).unwrap();
assert_eq!(params.start_timestamp, Some(1712160204));
assert!(params.end_timestamp.is_none());
}

#[test]
fn only_end_timestamp_present() {
let qs = "end_timestamp=1712764984";
let params: IndexMappingQueryParams = serde_urlencoded::from_str(qs).unwrap();
assert!(params.start_timestamp.is_none());
assert_eq!(params.end_timestamp, Some(1712764984));
}

#[test]
fn unknown_field_is_ignored() {
// ES clients commonly pass `pretty`, `ignore_unavailable`,
// `allow_no_indices`, etc. on `_mapping`. We ignore them rather
// than reject the request, matching the pre-column-hints route
// which did not parse the query string at all.
let qs = "start_timestamp=1&pretty=true&ignore_unavailable=true";
let params: IndexMappingQueryParams = serde_urlencoded::from_str(qs).unwrap();
assert_eq!(params.start_timestamp, Some(1));
assert!(params.end_timestamp.is_none());
assert!(params.fields.is_none());
}

#[test]
fn fields_param_present() {
let qs = "fields=host,message,status";
let params: IndexMappingQueryParams = serde_urlencoded::from_str(qs).unwrap();
assert_eq!(params.fields.as_deref(), Some("host,message,status"));
}

#[test]
fn fields_combined_with_timestamps() {
let qs = "start_timestamp=1&end_timestamp=2&fields=host";
let params: IndexMappingQueryParams = serde_urlencoded::from_str(qs).unwrap();
assert_eq!(params.start_timestamp, Some(1));
assert_eq!(params.end_timestamp, Some(2));
assert_eq!(params.fields.as_deref(), Some("host"));
}

#[test]
fn empty_fields_value() {
let qs = "fields=";
let params: IndexMappingQueryParams = serde_urlencoded::from_str(qs).unwrap();
// Empty string is parsed as Some(""); the handler treats it as
// "absent" (legacy full-mapping behavior). See
// `parse_field_hints` in `rest_handler.rs`.
assert_eq!(params.fields.as_deref(), Some(""));
}
}
123 changes: 119 additions & 4 deletions quickwit/quickwit-serve/src/elasticsearch_api/model/mappings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};

use quickwit_doc_mapper::{FieldMappingEntry, FieldMappingType};
use quickwit_metastore::IndexMetadata;
Expand Down Expand Up @@ -62,15 +62,43 @@ enum FieldMapping {
}

impl ElasticsearchMappingsResponse {
/// Builds the full mapping response for every requested index. Includes
/// every declared field plus any dynamic fields the search service
/// returned. This is the legacy code path used when the caller did not
/// opt into column hints.
pub fn from_doc_mapping(
indexes_metadata: Vec<IndexMetadata>,
list_fields_response: Option<&ListFieldsResponse>,
) -> Self {
Self::from_doc_mapping_filtered(indexes_metadata, list_fields_response, &[])
}

/// Builds the mapping response, optionally filtered to a set of
/// caller-requested column names. Matching is by exact equality on the
/// declared top-level field name.
///
/// The caller (the column-hints fast path in `rest_handler.rs`) is
/// responsible for ensuring `requested_fields` only contains flat
/// literal names that exist in some index's `doc_mapping` — wildcards
/// and dotted paths are rejected upstream because the static
/// `doc_mapping` never declares them.
///
/// An empty `requested_fields` slice means "no filter" and behaves
/// identically to [`Self::from_doc_mapping`].
pub fn from_doc_mapping_filtered(
indexes_metadata: Vec<IndexMetadata>,
list_fields_response: Option<&ListFieldsResponse>,
requested_fields: &[String],
) -> Self {
let filter: HashSet<&str> = requested_fields.iter().map(String::as_str).collect();
let indices = indexes_metadata
.into_iter()
.map(|index_metadata| {
let field_mappings = &index_metadata.index_config.doc_mapping.field_mappings;
let mut properties = build_properties(field_mappings);
if !filter.is_empty() {
properties.retain(|name, _| filter.contains(name.as_str()));
}
if let Some(list_fields) = list_fields_response {
merge_dynamic_fields(&mut properties, list_fields);
}
Expand Down Expand Up @@ -126,7 +154,9 @@ fn field_mapping_from_entry(entry: &FieldMappingEntry) -> Option<FieldMapping> {
/// Merges dynamic fields from a `ListFieldsResponse` into the properties map.
///
/// Fields already present in the map (from explicit doc mappings) are skipped,
/// as are internal fields (prefixed with `_`).
/// as are internal fields (prefixed with `_`). When the same `field_name`
/// appears multiple times in the response (e.g. observed in two splits with
/// different types) the first mappable observation wins.
fn merge_dynamic_fields(
properties: &mut HashMap<String, FieldMapping>,
list_fields_response: &ListFieldsResponse,
Expand Down Expand Up @@ -272,10 +302,10 @@ mod tests {
assert_eq!(meta["properties"]["source"]["type"], "keyword");
}

use quickwit_proto::search::ListFieldsEntryResponse;

#[test]
fn test_merge_dynamic_fields_skips_existing_and_internal() {
use quickwit_proto::search::ListFieldsEntryResponse;

let mut properties = HashMap::new();
properties.insert("title".to_string(), FieldMapping::Leaf { typ: "text" });

Expand Down Expand Up @@ -306,4 +336,89 @@ mod tests {
assert!(properties.contains_key("dynamic_field"));
assert!(!properties.contains_key("_timestamp"));
}

fn make_index_metadata(field_mappings: serde_json::Value) -> IndexMetadata {
let entries: Vec<FieldMappingEntry> = serde_json::from_value(field_mappings).unwrap();
let mut metadata = IndexMetadata::for_test("test", "ram:///indexes/test");
metadata.index_config.doc_mapping.field_mappings = entries;
metadata
}

fn properties_of(response: &ElasticsearchMappingsResponse) -> &HashMap<String, FieldMapping> {
&response.indices["test"].mappings.properties
}

#[test]
fn from_doc_mapping_filtered_keeps_only_requested() {
let index_metadata = make_index_metadata(serde_json::json!([
{ "name": "host", "type": "text" },
{ "name": "message", "type": "text" },
{ "name": "status", "type": "i64" },
{ "name": "service", "type": "text" },
{ "name": "trace_id", "type": "text" },
]));
let requested = vec!["host".to_string(), "message".to_string()];
let response = ElasticsearchMappingsResponse::from_doc_mapping_filtered(
vec![index_metadata],
None,
&requested,
);
let props = properties_of(&response);
assert_eq!(props.len(), 2);
assert!(props.contains_key("host"));
assert!(props.contains_key("message"));
assert!(!props.contains_key("status"));
assert!(!props.contains_key("service"));
assert!(!props.contains_key("trace_id"));
}

#[test]
fn from_doc_mapping_filtered_includes_object_subtree_for_top_level_match() {
let index_metadata = make_index_metadata(serde_json::json!([
{
"name": "host",
"type": "object",
"field_mappings": [
{ "name": "region", "type": "text" },
{ "name": "name", "type": "text" }
]
},
{ "name": "message", "type": "text" }
]));
let requested = vec!["host".to_string()];
let response = ElasticsearchMappingsResponse::from_doc_mapping_filtered(
vec![index_metadata],
None,
&requested,
);
let serialized = serde_json::to_value(&response).unwrap();
// The `host` object subtree is preserved verbatim — both nested fields stay.
let host_props = &serialized["test"]["mappings"]["properties"]["host"]["properties"];
assert_eq!(host_props["region"]["type"], "keyword");
assert_eq!(host_props["name"]["type"], "keyword");
// `message` is filtered out.
assert!(
serialized["test"]["mappings"]["properties"]
.get("message")
.is_none()
);
}

#[test]
fn from_doc_mapping_filtered_empty_request_returns_all() {
let index_metadata = make_index_metadata(serde_json::json!([
{ "name": "host", "type": "text" },
{ "name": "message", "type": "text" }
]));
let response_all =
ElasticsearchMappingsResponse::from_doc_mapping(vec![index_metadata.clone()], None);
let response_filtered = ElasticsearchMappingsResponse::from_doc_mapping_filtered(
vec![index_metadata],
None,
&[],
);
let serialized_all = serde_json::to_value(&response_all).unwrap();
let serialized_filtered = serde_json::to_value(&response_filtered).unwrap();
assert_eq!(serialized_all, serialized_filtered);
}
}
2 changes: 2 additions & 0 deletions quickwit/quickwit-serve/src/elasticsearch_api/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod bulk_query_params;
mod cat_indices;
mod error;
mod field_capability;
mod index_mapping_query_params;
mod mappings;
mod multi_search;
mod scroll;
Expand All @@ -36,6 +37,7 @@ pub use field_capability::{
FieldCapabilityQueryParams, FieldCapabilityRequestBody, FieldCapabilityResponse,
build_list_field_request_for_es_api, convert_to_es_field_capabilities_response,
};
pub use index_mapping_query_params::IndexMappingQueryParams;
pub(crate) use mappings::ElasticsearchMappingsResponse;
pub use multi_search::{
MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse, MultiSearchSingleResponse,
Expand Down
Loading
Loading