From 4f73759e6b8a131c32e5631ee751278125fa917b Mon Sep 17 00:00:00 2001 From: chakkk309 Date: Sun, 17 May 2026 21:44:20 +0300 Subject: [PATCH] Fix lag i64::MIN offset overflow --- datafusion/functions-window/src/lead_lag.rs | 56 +++++++++++++++++---- 1 file changed, 47 insertions(+), 9 deletions(-) diff --git a/datafusion/functions-window/src/lead_lag.rs b/datafusion/functions-window/src/lead_lag.rs index 7569dac9ac106..aae21bd441562 100644 --- a/datafusion/functions-window/src/lead_lag.rs +++ b/datafusion/functions-window/src/lead_lag.rs @@ -22,7 +22,9 @@ use arrow::datatypes::FieldRef; use datafusion_common::arrow::array::ArrayRef; use datafusion_common::arrow::datatypes::DataType; use datafusion_common::arrow::datatypes::Field; -use datafusion_common::{DataFusionError, Result, ScalarValue, arrow_datafusion_err}; +use datafusion_common::{ + DataFusionError, Result, ScalarValue, arrow_datafusion_err, exec_err, +}; use datafusion_doc::window_doc_sections::DOC_SECTION_ANALYTICAL; use datafusion_expr::{ Documentation, LimitEffect, Literal, PartitionEvaluator, ReversedUDWF, Signature, @@ -37,7 +39,7 @@ use std::any::Any; use std::cmp::min; use std::collections::VecDeque; use std::hash::Hash; -use std::ops::{Neg, Range}; +use std::ops::Range; use std::sync::{Arc, LazyLock}; get_or_init_udwf!( @@ -112,14 +114,23 @@ impl WindowShiftKind { /// In [`WindowShiftEvaluator`] a positive offset is used to signal /// computation of `lag()`. So here we negate the input offset /// value when computing `lead()`. - fn shift_offset(&self, value: Option) -> i64 { + fn shift_offset(&self, value: Option) -> Result { match self { - WindowShiftKind::Lag => value.unwrap_or(1), - WindowShiftKind::Lead => value.map(|v| v.neg()).unwrap_or(-1), + WindowShiftKind::Lag => Ok(value.unwrap_or(1)), + WindowShiftKind::Lead => value + .map(|v| checked_neg_offset(v, self.name())) + .unwrap_or(Ok(-1)), } } } +fn checked_neg_offset(offset: i64, name: &str) -> Result { + match offset.checked_neg() { + Some(offset) => Ok(offset), + None => exec_err!("{name} offset must not be i64::MIN"), + } +} + /// window shift expression #[derive(Debug, PartialEq, Eq, Hash)] pub struct WindowShift { @@ -266,14 +277,17 @@ impl WindowUDFImpl for WindowShift { get_scalar_value_from_args(partition_evaluator_args.input_exprs(), 1)? .map(|v| get_signed_integer(&v)) .map_or(Ok(None), |v| v.map(Some)) - .map(|n| self.kind.shift_offset(n)) - .map(|offset| { + .and_then(|n| self.kind.shift_offset(n)) + .and_then(|offset| { if partition_evaluator_args.is_reversed() { - -offset + checked_neg_offset(offset, self.kind.name()) } else { - offset + Ok(offset) } })?; + if shift_offset == i64::MIN { + return exec_err!("{} offset must not be i64::MIN", self.kind.name()); + } let default_value = parse_default_value( partition_evaluator_args.input_exprs(), partition_evaluator_args.input_fields(), @@ -827,4 +841,28 @@ mod tests { .collect::(), ) } + + #[test] + fn test_lag_i64_min_offset_error() { + let expr = Arc::new(Column::new("c3", 0)) as Arc; + let shift_offset = Arc::new(Literal::new(ScalarValue::Int64(Some(i64::MIN)))) + as Arc; + let input_exprs = &[expr, shift_offset]; + let input_fields = [DataType::Int32, DataType::Int64] + .into_iter() + .map(|d| Field::new("f", d, true)) + .map(Arc::new) + .collect::>(); + + let err = WindowShift::lag() + .partition_evaluator(PartitionEvaluatorArgs::new( + input_exprs, + &input_fields, + false, + false, + )) + .expect_err("i64::MIN offset should be rejected"); + + assert!(err.to_string().contains("lag offset must not be i64::MIN")); + } }