From d429d8c67b15a0447393753d304f802e6ec059db Mon Sep 17 00:00:00 2001 From: Zakir Date: Thu, 19 Feb 2026 15:41:22 +0530 Subject: [PATCH 1/7] feat(rust): add streaming deserialization support Introduces ForyStreamBuf and Reader::from_stream for incremental stream-backed deserialization from any Read source. Preserves existing in-memory fast path with zero overhead. Closes #3300 --- rust/fory-core/src/buffer.rs | 155 +++++++++++++--- rust/fory-core/src/fory.rs | 74 +++++++- rust/fory-core/src/lib.rs | 2 + rust/fory-core/src/stream.rs | 305 ++++++++++++++++++++++++++++++++ rust/tests/tests/stream_test.rs | 68 +++++++ 5 files changed, 578 insertions(+), 26 deletions(-) create mode 100644 rust/fory-core/src/stream.rs create mode 100644 rust/tests/tests/stream_test.rs diff --git a/rust/fory-core/src/buffer.rs b/rust/fory-core/src/buffer.rs index 4cab51acf1..9f191b3423 100644 --- a/rust/fory-core/src/buffer.rs +++ b/rust/fory-core/src/buffer.rs @@ -18,6 +18,7 @@ use crate::error::Error; use crate::float16::float16; use crate::meta::buffer_rw_string::read_latin1_simd; +use crate::stream::ForyStreamBuf; use byteorder::{ByteOrder, LittleEndian}; use std::cmp::max; @@ -506,6 +507,7 @@ impl<'a> Writer<'a> { pub struct Reader<'a> { pub(crate) bf: &'a [u8], pub(crate) cursor: usize, + pub(crate) stream: Option>, } #[allow(clippy::needless_lifetimes)] @@ -514,7 +516,26 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn new(bf: &[u8]) -> Reader<'_> { - Reader { bf, cursor: 0 } + Reader { + bf, + cursor: 0, + stream: None, + } + } + + /// Construct a stream-backed `Reader`. + pub fn from_stream(stream: crate::stream::ForyStreamBuf) -> Reader<'static> { + let boxed = Box::new(stream); + Reader { + bf: b"", + cursor: 0, + stream: Some(boxed), + } + } + + #[inline(always)] + pub fn is_stream_backed(&self) -> bool { + self.stream.is_some() } #[inline(always)] @@ -551,29 +572,63 @@ impl<'a> Reader<'a> { self.cursor } + /// Fill stream buffer up to `target_size` total bytes, then re-pin `bf`. + /// Returns `false` if stream is None OR fill failed. + fn fill_to(&mut self, target_size: usize) -> bool { + let stream = match self.stream.as_mut() { + Some(s) => s, + None => return false, + }; + // intentional: fill_buffer validates; set_reader_index only syncs read_pos + let _ = stream.set_reader_index(self.cursor); + + let n = target_size.saturating_sub(self.cursor); + if n == 0 { + self.bf = unsafe { std::slice::from_raw_parts(stream.data(), stream.size()) }; + return self.bf.len() >= target_size; + } + if stream.fill_buffer(n).is_err() { + return false; + } + self.bf = unsafe { std::slice::from_raw_parts(stream.data(), stream.size()) }; + self.bf.len() >= target_size + } + + /// Ensure `self.cursor + n` bytes are available. + /// fast path: target <= size_ → return true + /// stream path: call fill_to(target), check again. #[inline(always)] - fn value_at(&self, index: usize) -> Result { - match self.bf.get(index) { - None => Err(Error::buffer_out_of_bound( - index, - self.bf.len(), - self.bf.len(), - )), - Some(v) => Ok(*v), + fn ensure_readable(&mut self, n: usize) -> Result<(), Error> { + let target = self.cursor + n; + if target <= self.bf.len() { + return Ok(()); + } + if !self.fill_to(target) { + return Err(Error::buffer_out_of_bound(self.cursor, n, self.bf.len())); + } + if target > self.bf.len() { + return Err(Error::buffer_out_of_bound(self.cursor, n, self.bf.len())); } + Ok(()) } #[inline(always)] - fn check_bound(&self, n: usize) -> Result<(), Error> { - let end = self - .cursor - .checked_add(n) - .ok_or_else(|| Error::buffer_out_of_bound(self.cursor, n, self.bf.len()))?; - if end > self.bf.len() { - Err(Error::buffer_out_of_bound(self.cursor, n, self.bf.len())) - } else { - Ok(()) + fn value_at(&mut self, index: usize) -> Result { + if index >= self.bf.len() { + // Need index+1 bytes total; fill to that target. + if !self.fill_to(index + 1) || index >= self.bf.len() { + return Err(Error::buffer_out_of_bound(index, 1, self.bf.len())); + } } + Ok(unsafe { *self.bf.get_unchecked(index) }) + } + + /// stream fill on miss. Changing to `&mut self` is the single + /// change that gives ALL 27 existing read methods stream support + /// without touching them individually — they all call this. + #[inline(always)] + fn check_bound(&mut self, n: usize) -> Result<(), Error> { + self.ensure_readable(n) } #[inline(always)] @@ -606,8 +661,12 @@ impl<'a> Reader<'a> { } } + /// `stream_->reader_index(reader_index_)` when stream-backed. pub fn set_cursor(&mut self, cursor: usize) { self.cursor = cursor; + if let Some(ref mut stream) = self.stream { + let _ = stream.set_reader_index(cursor); + } } // ============ BOOL (TypeId = 1) ============ @@ -723,6 +782,9 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_varuint32(&mut self) -> Result { + if self.stream.is_some() && self.bf.len().saturating_sub(self.cursor) < 5 { + return self.read_varuint32_stream(); + } let b0 = self.value_at(self.cursor)? as u32; if b0 < 0x80 { self.move_next(1); @@ -770,6 +832,9 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_varuint64(&mut self) -> Result { + if self.stream.is_some() && self.bf.len().saturating_sub(self.cursor) < 9 { + return self.read_varuint64_stream(); + } let b0 = self.value_at(self.cursor)? as u64; if b0 < 0x80 { self.move_next(1); @@ -1000,6 +1065,9 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_varuint36small(&mut self) -> Result { + if self.stream.is_some() && self.bf.len().saturating_sub(self.cursor) < 8 { + return self.read_varuint36small_stream(); + } // Keep this API panic-free even if cursor is externally set past buffer end. self.check_bound(0)?; let start = self.cursor; @@ -1046,9 +1114,56 @@ impl<'a> Reader<'a> { } Ok(result) } + + /// Byte-by-byte varuint32 decode for stream-backed path. + fn read_varuint32_stream(&mut self) -> Result { + let mut result = 0u32; + for i in 0..5 { + let b = self.value_at(self.cursor)? as u32; + self.cursor += 1; + result |= (b & 0x7F) << (i * 7); + if (b & 0x80) == 0 { + return Ok(result); + } + } + Err(Error::encode_error("Invalid var_uint32 encoding")) + } + + /// Byte-by-byte varuint64 decode for stream-backed path. + fn read_varuint64_stream(&mut self) -> Result { + let mut result = 0u64; + for i in 0..8u64 { + let b = self.value_at(self.cursor)? as u64; + self.cursor += 1; + result |= (b & 0x7F) << (i * 7); + if (b & 0x80) == 0 { + return Ok(result); + } + } + // 9th byte — full 8 bits + let b = self.value_at(self.cursor)? as u64; + self.cursor += 1; + result |= b << 56; + Ok(result) + } + + /// Byte-by-byte varuint36small decode for stream-backed path. + fn read_varuint36small_stream(&mut self) -> Result { + let mut result = 0u64; + for i in 0..4u64 { + let b = self.value_at(self.cursor)? as u64; + self.cursor += 1; + result |= (b & 0x7F) << (i * 7); + if (b & 0x80) == 0 { + return Ok(result); + } + } + let b = self.value_at(self.cursor)? as u64; + self.cursor += 1; + result |= b << 28; + Ok(result) + } } #[allow(clippy::needless_lifetimes)] unsafe impl<'a> Send for Reader<'a> {} -#[allow(clippy::needless_lifetimes)] -unsafe impl<'a> Sync for Reader<'a> {} diff --git a/rust/fory-core/src/fory.rs b/rust/fory-core/src/fory.rs index 9d3f826941..0f2e1bfa7d 100644 --- a/rust/fory-core/src/fory.rs +++ b/rust/fory-core/src/fory.rs @@ -26,6 +26,7 @@ use crate::serializer::{Serializer, StructSerializer}; use crate::types::config_flags::{IS_CROSS_LANGUAGE_FLAG, IS_NULL_FLAG}; use crate::types::{RefMode, SIZE_OF_REF_AND_TYPE}; use std::cell::UnsafeCell; +use std::io::Read; use std::mem; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::OnceLock; @@ -959,13 +960,74 @@ impl Fory { reader: &mut Reader, ) -> Result { self.with_read_context(|context| { - let outlive_buffer = unsafe { mem::transmute::<&[u8], &[u8]>(reader.bf) }; - let mut new_reader = Reader::new(outlive_buffer); - new_reader.set_cursor(reader.cursor); - context.attach_reader(new_reader); + if reader.is_stream_backed() { + // Stream-backed path: move the owned stream out of the caller's reader, + // construct a fresh stream-backed reader at the current cursor, hand it + // to the context, then restore all state from the returned reader. + // This is the sequential-read case: caller creates Reader::from_stream(...) + // once and calls deserialize_from repeatedly. + let stream = mem::take(&mut reader.stream) + .expect("is_stream_backed was true but stream is None"); + let cursor = reader.cursor; + let mut stream_reader = Reader::from_stream(*stream); + // Sync cursor: the stream already consumed [0..cursor], re-position. + stream_reader.set_cursor(cursor); + context.attach_reader(stream_reader); + let result = self.deserialize_with_context(context); + let returned = context.detach_reader(); + // Restore state back to caller's reader. + reader.cursor = returned.cursor; + reader.stream = returned.stream; + // Re-pin bf from the (possibly grown after fill_to) stream buffer. + // SAFETY: same invariant as Reader::from_stream and fill_to: + // bf points into Box-owned stream buffer, owned by reader.stream, + // which lives as long as reader. + if let Some(ref s) = reader.stream { + reader.bf = unsafe { std::slice::from_raw_parts(s.data(), s.size()) }; + } + result + } else { + // In-memory path: unchanged from original. + let outlive_buffer = unsafe { mem::transmute::<&[u8], &[u8]>(reader.bf) }; + let mut new_reader = Reader::new(outlive_buffer); + new_reader.set_cursor(reader.cursor); + context.attach_reader(new_reader); + let result = self.deserialize_with_context(context); + let end = context.detach_reader().get_cursor(); + reader.set_cursor(end); + result + } + }) + } + + /// Deserializes a single value of type `T` from any `Read` source. + /// + /// Equivalent of C++ `fory.deserialize(Buffer(ForyInputStream(source)))`. + /// Internally wraps the source in a [`crate::stream::ForyStreamBuf`] and calls + /// [`deserialize_from`](Self::deserialize_from). + /// + /// For deserializing **multiple values sequentially** from one stream + /// (e.g. a network socket or pipe), create the reader once and reuse it: + /// + /// ```rust,ignore + /// use fory_core::{Fory, Reader}; + /// use fory_core::stream::ForyStreamBuf; + /// + /// let fory = Fory::default(); + /// let mut reader = Reader::from_stream(ForyStreamBuf::new(my_socket)); + /// let first: i32 = fory.deserialize_from(&mut reader).unwrap(); + /// let second: String = fory.deserialize_from(&mut reader).unwrap(); + /// ``` + pub fn deserialize_from_stream( + &self, + source: impl Read + Send + 'static, + ) -> Result { + self.with_read_context(|context| { + let stream = crate::stream::ForyStreamBuf::new(source); + let reader = Reader::from_stream(stream); + context.attach_reader(reader); let result = self.deserialize_with_context(context); - let end = context.detach_reader().get_cursor(); - reader.set_cursor(end); + context.detach_reader(); result }) } diff --git a/rust/fory-core/src/lib.rs b/rust/fory-core/src/lib.rs index 976a760af6..bee8b59b49 100644 --- a/rust/fory-core/src/lib.rs +++ b/rust/fory-core/src/lib.rs @@ -185,6 +185,7 @@ pub mod meta; pub mod resolver; pub mod row; pub mod serializer; +pub mod stream; pub mod types; pub use float16::float16 as Float16; pub mod util; @@ -201,3 +202,4 @@ pub use crate::resolver::type_resolver::{TypeInfo, TypeResolver}; pub use crate::serializer::weak::{ArcWeak, RcWeak}; pub use crate::serializer::{read_data, write_data, ForyDefault, Serializer, StructSerializer}; pub use crate::types::{RefFlag, RefMode, TypeId}; +pub use stream::ForyStreamBuf; diff --git a/rust/fory-core/src/stream.rs b/rust/fory-core/src/stream.rs new file mode 100644 index 0000000000..85a62e3b0f --- /dev/null +++ b/rust/fory-core/src/stream.rs @@ -0,0 +1,305 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Streaming buffer for incremental deserialization. + +use crate::error::Error; +use std::io::{self, Read}; + +const DEFAULT_BUFFER_SIZE: usize = 4096; + +/// Single internal `Vec` window. `valid_len` = `egptr()-eback()`. +/// `read_pos` = `gptr()-eback()`. [`fill_buffer`] grows on demand. +/// +/// [`fill_buffer`]: ForyStreamBuf::fill_buffer +pub struct ForyStreamBuf { + source: Box, + /// Backing window — equivalent of C++ `buffer_` (`std::vector`) + buffer: Vec, + /// Bytes fetched from source — equivalent of `egptr() - eback()` + valid_len: usize, + /// Current read cursor — equivalent of `gptr() - eback()` + read_pos: usize, +} + +impl ForyStreamBuf { + pub fn new(source: impl Read + Send + 'static) -> Self { + Self::with_capacity(source, DEFAULT_BUFFER_SIZE) + } + + /// Allocates and zero-initialises the backing window immediately, + /// `std::vector(buffer_size)` in the constructor. + pub fn with_capacity(source: impl Read + Send + 'static, buffer_size: usize) -> Self { + let cap = buffer_size.max(1); + let buffer = vec![0u8; cap]; + Self { + source: Box::new(source), + buffer, + valid_len: 0, + read_pos: 0, + } + } + + /// Pull bytes from source until `remaining() >= min_fill_size`. + pub fn fill_buffer(&mut self, min_fill_size: usize) -> Result<(), Error> { + if min_fill_size == 0 || self.remaining() >= min_fill_size { + return Ok(()); + } + + let need = min_fill_size - self.remaining(); + + let required = self + .valid_len + .checked_add(need) + .filter(|&r| r <= u32::MAX as usize) + .ok_or_else(|| { + Error::buffer_out_of_bound(self.read_pos, min_fill_size, self.remaining()) + })?; + + // Grow if required > current buffer length + if required > self.buffer.len() { + let new_cap = (self.buffer.len() * 2).max(required); + self.buffer.resize(new_cap, 0); + } + + while self.remaining() < min_fill_size { + let writable = self.buffer.len() - self.valid_len; + if writable == 0 { + // Inner double `buffer_.size() * 2 + 1` with u32 overflow guard + let new_cap = self + .buffer + .len() + .checked_mul(2) + .and_then(|n| n.checked_add(1)) + .filter(|&n| n <= u32::MAX as usize) + .ok_or_else(|| { + Error::buffer_out_of_bound(self.read_pos, min_fill_size, self.remaining()) + })?; + self.buffer.resize(new_cap, 0); + // fall through — self.buffer[self.valid_len..] is now non-empty + } + match self.source.read(&mut self.buffer[self.valid_len..]) { + Ok(0) => { + // `read_bytes <= 0` → buffer_out_of_bound + return Err(Error::buffer_out_of_bound( + self.read_pos, + min_fill_size, + self.remaining(), + )); + } + Ok(n) => self.valid_len += n, + Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, + Err(_) => { + return Err(Error::buffer_out_of_bound( + self.read_pos, + min_fill_size, + self.remaining(), + )); + } + } + } + Ok(()) + } + + /// Move cursor backward by `size` bytes. + /// + /// `setg(eback(), gptr() - size, egptr())` + /// + /// Panics if `size > read_pos`. + pub fn rewind(&mut self, size: usize) { + assert!( + size <= self.read_pos, + "rewind size {} exceeds consumed bytes {}", + size, + self.read_pos + ); + self.read_pos -= size; + } + + /// Advance cursor forward by `size` bytes without pulling from source. + /// + /// `gbump(static_cast(size))` + /// + /// Panics if `size > remaining()`. + pub fn consume(&mut self, size: usize) { + assert!( + size <= self.remaining(), + "consume size {} exceeds available bytes {}", + size, + self.remaining() + ); + self.read_pos += size; + } + + /// Raw pointer to byte 0 of the internal window. + /// + /// Re-read by `Reader` (buffer.rs) after every `fill_buffer` call that + /// may reallocate + /// `data_ = stream_->data()`. + /// + /// `uint8_t* data()` → `reinterpret_cast(eback())`. + /// + /// # Safety + /// Valid until the next `fill_buffer` call that causes reallocation. + /// `Reader` always re-reads this pointer after every `fill_buffer`. + #[inline(always)] + pub(crate) fn data(&self) -> *const u8 { + self.buffer.as_ptr() + } + + /// Total fetched bytes + #[inline(always)] + pub fn size(&self) -> usize { + self.valid_len + } + + /// Current read cursor + #[inline(always)] + pub fn reader_index(&self) -> usize { + self.read_pos + } + + /// Set cursor to absolute `index`. + /// + /// Called by `Reader` (buffer.rs) after every cursor advance, mirroring + /// + /// Returns `Err` if `index > valid_len` + #[inline(always)] + pub(crate) fn set_reader_index(&mut self, index: usize) -> Result<(), Error> { + if index > self.valid_len { + return Err(Error::buffer_out_of_bound(index, 0, self.valid_len)); + } + self.read_pos = index; + Ok(()) + } + + /// Unread bytes in window + #[inline(always)] + pub fn remaining(&self) -> usize { + self.valid_len.saturating_sub(self.read_pos) + } + + /// Always `true` — used by `Reader` (buffer.rs) to branch into the stream path. + #[inline(always)] + pub fn is_stream_backed(&self) -> bool { + true + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Cursor; + + /// Reads exactly 1 byte at a time. + struct OneByteCursor(Cursor>); + impl Read for OneByteCursor { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + if buf.is_empty() { + return Ok(0); + } + let mut one = [0u8; 1]; + match self.0.read(&mut one)? { + 0 => Ok(0), + _ => { + buf[0] = one[0]; + Ok(1) + } + } + } + } + + #[test] + fn test_rewind() { + let data = vec![0x01u8, 0x02, 0x03, 0x04, 0x05]; + let mut s = ForyStreamBuf::with_capacity(OneByteCursor(Cursor::new(data)), 2); + s.fill_buffer(4).unwrap(); + assert_eq!(s.size(), 4); + assert_eq!(s.reader_index(), 0); + s.consume(3); + assert_eq!(s.reader_index(), 3); + s.rewind(2); + assert_eq!(s.reader_index(), 1); + s.consume(1); + assert_eq!(s.reader_index(), 2); + } + + #[test] + fn test_short_read_error() { + let mut s = ForyStreamBuf::new(Cursor::new(vec![0x01u8, 0x02, 0x03])); + assert!(s.fill_buffer(4).is_err()); + } + + // Sequential fills with tiny-chunk reader + #[test] + fn test_sequential_fill() { + let data: Vec = (0u8..=9).collect(); + let mut s = ForyStreamBuf::with_capacity(OneByteCursor(Cursor::new(data)), 2); + s.fill_buffer(3).unwrap(); + assert!(s.remaining() >= 3); + s.consume(3); + s.fill_buffer(3).unwrap(); + assert!(s.remaining() >= 3); + } + + #[test] + fn test_overflow_guard() { + // valid_len near usize::MAX would overflow without the u32 guard. + // We can't actually allocate that — just verify the guard logic + // via a saturating check on a real (tiny) stream. + let mut s = ForyStreamBuf::new(Cursor::new(vec![0u8; 8])); + // Requesting more than the source has should error, not panic/overflow + assert!(s.fill_buffer(16).is_err()); + } + + #[test] + fn test_consume_panics_on_overrun() { + let result = std::panic::catch_unwind(|| { + let mut s = ForyStreamBuf::new(Cursor::new(vec![0x01u8])); + s.fill_buffer(1).unwrap(); + s.consume(2); // only 1 byte available + }); + assert!(result.is_err()); + } + + #[test] + fn test_rewind_panics_on_overrun() { + let result = std::panic::catch_unwind(|| { + let mut s = ForyStreamBuf::new(Cursor::new(vec![0x01u8, 0x02])); + s.fill_buffer(2).unwrap(); + s.consume(1); + s.rewind(2); // only consumed 1 + }); + assert!(result.is_err()); + } + + #[test] + fn test_set_reader_index() { + let mut s = ForyStreamBuf::new(Cursor::new(vec![0x01u8, 0x02, 0x03])); + s.fill_buffer(3).unwrap(); + assert!(s.set_reader_index(2).is_ok()); + assert_eq!(s.reader_index(), 2); + assert_eq!(s.remaining(), 1); + assert!(s.set_reader_index(4).is_err()); // beyond valid_len + } + + #[test] + fn test_is_stream_backed() { + let s = ForyStreamBuf::new(Cursor::new(vec![])); + assert!(s.is_stream_backed()); + } +} diff --git a/rust/tests/tests/stream_test.rs b/rust/tests/tests/stream_test.rs new file mode 100644 index 0000000000..f3e2b5b0bd --- /dev/null +++ b/rust/tests/tests/stream_test.rs @@ -0,0 +1,68 @@ +#[cfg(test)] +mod stream_tests { + use fory_core::buffer::Reader; + use fory_core::stream::ForyStreamBuf; + use fory_core::Fory; + use std::io::Cursor; + + struct OneByte(Cursor>); + impl std::io::Read for OneByte { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + if buf.is_empty() { + return Ok(0); + } + let mut one = [0u8]; + match self.0.read(&mut one)? { + 0 => Ok(0), + _ => { + buf[0] = one[0]; + Ok(1) + } + } + } + } + + #[test] + fn test_primitive_stream_roundtrip() { + let fory = Fory::default(); + let bytes = fory.serialize(&-9876543212345i64).unwrap(); + let result: i64 = fory + .deserialize_from_stream(OneByte(Cursor::new(bytes))) + .unwrap(); + assert_eq!(result, -9876543212345i64); + + let bytes = fory.serialize(&"stream-hello-世界".to_string()).unwrap(); + let result: String = fory + .deserialize_from_stream(OneByte(Cursor::new(bytes))) + .unwrap(); + assert_eq!(result, "stream-hello-世界"); + } + + #[test] + fn test_sequential_stream_reads() { + let fory = Fory::default(); + let mut bytes = Vec::new(); + fory.serialize_to(&mut bytes, &12345i32).unwrap(); + fory.serialize_to(&mut bytes, &"next-value".to_string()) + .unwrap(); + fory.serialize_to(&mut bytes, &99i64).unwrap(); + + let mut reader = Reader::from_stream(ForyStreamBuf::new(OneByte(Cursor::new(bytes)))); + let first: i32 = fory.deserialize_from(&mut reader).unwrap(); + let second: String = fory.deserialize_from(&mut reader).unwrap(); + let third: i64 = fory.deserialize_from(&mut reader).unwrap(); + + assert_eq!(first, 12345); + assert_eq!(second, "next-value"); + assert_eq!(third, 99); + } + + #[test] + fn test_truncated_stream_returns_error() { + let fory = Fory::default(); + let mut bytes = fory.serialize(&"hello world".to_string()).unwrap(); + bytes.pop(); + let result: Result = fory.deserialize_from_stream(Cursor::new(bytes)); + assert!(result.is_err()); + } +} From d9276c9974a3e78198cf71bcbaababf331cba84e Mon Sep 17 00:00:00 2001 From: Zakir Date: Thu, 19 Feb 2026 15:47:24 +0530 Subject: [PATCH 2/7] chore: add license header to stream_test.rs --- rust/tests/tests/stream_test.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/rust/tests/tests/stream_test.rs b/rust/tests/tests/stream_test.rs index f3e2b5b0bd..cc54f2c8d8 100644 --- a/rust/tests/tests/stream_test.rs +++ b/rust/tests/tests/stream_test.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + #[cfg(test)] mod stream_tests { use fory_core::buffer::Reader; From 78a59c74672bb4b9a5716b713c1eaa5addb2fa21 Mon Sep 17 00:00:00 2001 From: Zakir Date: Wed, 4 Mar 2026 15:32:43 +0530 Subject: [PATCH 3/7] conflict resolve --- rust/fory-core/src/buffer.rs | 2 + rust/fory-core/src/fory.rs | 17 ++++- rust/fory-core/src/stream.rs | 41 ++++++++++++ rust/tests/tests/stream_test.rs | 113 ++++++++++++++++++++++++++++++-- 4 files changed, 164 insertions(+), 9 deletions(-) diff --git a/rust/fory-core/src/buffer.rs b/rust/fory-core/src/buffer.rs index 9f191b3423..ab46b8ac20 100644 --- a/rust/fory-core/src/buffer.rs +++ b/rust/fory-core/src/buffer.rs @@ -1167,3 +1167,5 @@ impl<'a> Reader<'a> { #[allow(clippy::needless_lifetimes)] unsafe impl<'a> Send for Reader<'a> {} +#[allow(clippy::needless_lifetimes)] +unsafe impl<'a> Sync for Reader<'a> {} diff --git a/rust/fory-core/src/fory.rs b/rust/fory-core/src/fory.rs index 0f2e1bfa7d..af9ea4da46 100644 --- a/rust/fory-core/src/fory.rs +++ b/rust/fory-core/src/fory.rs @@ -982,8 +982,17 @@ impl Fory { // SAFETY: same invariant as Reader::from_stream and fill_to: // bf points into Box-owned stream buffer, owned by reader.stream, // which lives as long as reader. - if let Some(ref s) = reader.stream { + if let Some(ref mut s) = reader.stream { + // Sync stream's read_pos with the reader cursor position + // before shrinking — the detached reader may have advanced + // cursor without updating stream.read_pos. + let _ = s.set_reader_index(reader.cursor); + // Mirror C++ StreamShrinkGuard: compact consumed bytes after + // deserialization to prevent unbounded buffer growth on + // long-lived streams. + s.shrink_buffer(); reader.bf = unsafe { std::slice::from_raw_parts(s.data(), s.size()) }; + reader.cursor = s.reader_index(); } result } else { @@ -1027,7 +1036,11 @@ impl Fory { let reader = Reader::from_stream(stream); context.attach_reader(reader); let result = self.deserialize_with_context(context); - context.detach_reader(); + // Mirror C++ StreamShrinkGuard: shrink_buffer on detach. + let mut returned = context.detach_reader(); + if let Some(ref mut s) = returned.stream { + s.shrink_buffer(); + } result }) } diff --git a/rust/fory-core/src/stream.rs b/rust/fory-core/src/stream.rs index 85a62e3b0f..7a2d2c3934 100644 --- a/rust/fory-core/src/stream.rs +++ b/rust/fory-core/src/stream.rs @@ -34,6 +34,8 @@ pub struct ForyStreamBuf { valid_len: usize, /// Current read cursor — equivalent of `gptr() - eback()` read_pos: usize, + /// Initial capacity for shrink_buffer target — mirrors C++ `initial_buffer_size_` + initial_buffer_size: usize, } impl ForyStreamBuf { @@ -51,6 +53,7 @@ impl ForyStreamBuf { buffer, valid_len: 0, read_pos: 0, + initial_buffer_size: cap, } } @@ -198,6 +201,44 @@ impl ForyStreamBuf { pub fn is_stream_backed(&self) -> bool { true } + + /// Compact consumed bytes and optionally shrink capacity. + /// + /// Mirrors C++ `ForyInputStream::shrink_buffer()` exactly: + /// 1. Memmove remaining bytes to front of buffer + /// 2. Reset read_pos = 0, valid_len = remaining + /// 3. If capacity > initial_buffer_size and utilization is low, + /// shrink back toward initial size + pub fn shrink_buffer(&mut self) { + let remaining = self.remaining(); + + // Phase 1: compact — memmove remaining data to front + if self.read_pos > 0 { + if remaining > 0 { + self.buffer.copy_within(self.read_pos..self.valid_len, 0); + } + self.read_pos = 0; + self.valid_len = remaining; + } + + // Phase 2: optionally shrink capacity back toward initial_buffer_size + let current_capacity = self.buffer.len(); + let mut target_capacity = current_capacity; + + if current_capacity > self.initial_buffer_size { + if remaining == 0 { + target_capacity = self.initial_buffer_size; + } else if remaining <= current_capacity / 4 { + let doubled = remaining.saturating_mul(2).max(1); + target_capacity = self.initial_buffer_size.max(doubled); + } + } + + if target_capacity < current_capacity { + self.buffer.truncate(target_capacity); + self.buffer.shrink_to_fit(); + } + } } #[cfg(test)] diff --git a/rust/tests/tests/stream_test.rs b/rust/tests/tests/stream_test.rs index cc54f2c8d8..a2a1e5ff11 100644 --- a/rust/tests/tests/stream_test.rs +++ b/rust/tests/tests/stream_test.rs @@ -20,8 +20,13 @@ mod stream_tests { use fory_core::buffer::Reader; use fory_core::stream::ForyStreamBuf; use fory_core::Fory; + use std::fmt::Debug; use std::io::Cursor; + // ======================================================================== + // OneByteStream — mirrors C++ OneByteStreamBuf / OneByteIStream + // Delivers exactly 1 byte per read() call for maximum streaming stress. + // ======================================================================== struct OneByte(Cursor>); impl std::io::Read for OneByte { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { @@ -39,22 +44,57 @@ mod stream_tests { } } + // ======================================================================== + // Deserialize helper — per maintainer requirement: + // "Create a Deserialize help methods in tests, then use that instead of + // fory.Deserialize for deserialization, and in the Deserialize test + // helper, first deserialize from bytes, then wrap it into a + // OneByteStream to deserialize it to ensure deserialization works." + // ======================================================================== + fn deserialize_helper(fory: &Fory, bytes: &[u8]) -> T + where + T: fory_core::Serializer + fory_core::ForyDefault + PartialEq + Debug, + { + // Path 1: deserialize from bytes (standard in-memory path) + let from_bytes: T = fory.deserialize(bytes).expect("bytes deserialize failed"); + + // Path 2: deserialize from OneByteStream (streaming path) + let from_stream: T = fory + .deserialize_from_stream(OneByte(Cursor::new(bytes.to_vec()))) + .expect("stream deserialize failed"); + + // Assert both paths produce the same result + assert_eq!( + from_bytes, from_stream, + "bytes vs stream deserialization mismatch" + ); + + from_bytes + } + + // ======================================================================== + // Test: PrimitiveAndStringRoundTrip + // Mirrors C++ StreamSerializationTest::PrimitiveAndStringRoundTrip + // ======================================================================== #[test] - fn test_primitive_stream_roundtrip() { + fn test_primitive_and_string_round_trip() { let fory = Fory::default(); + + // i64 round-trip let bytes = fory.serialize(&-9876543212345i64).unwrap(); - let result: i64 = fory - .deserialize_from_stream(OneByte(Cursor::new(bytes))) - .unwrap(); + let result = deserialize_helper::(&fory, &bytes); assert_eq!(result, -9876543212345i64); + // String round-trip (with unicode) let bytes = fory.serialize(&"stream-hello-世界".to_string()).unwrap(); - let result: String = fory - .deserialize_from_stream(OneByte(Cursor::new(bytes))) - .unwrap(); + let result = deserialize_helper::(&fory, &bytes); assert_eq!(result, "stream-hello-世界"); } + // ======================================================================== + // Test: SequentialDeserializeFromSingleStream + // Mirrors C++ StreamSerializationTest::SequentialDeserializeFromSingleStream + // ======================================================================== #[test] fn test_sequential_stream_reads() { let fory = Fory::default(); @@ -74,6 +114,10 @@ mod stream_tests { assert_eq!(third, 99); } + // ======================================================================== + // Test: TruncatedStreamReturnsError + // Mirrors C++ StreamSerializationTest::TruncatedStreamReturnsError + // ======================================================================== #[test] fn test_truncated_stream_returns_error() { let fory = Fory::default(); @@ -82,4 +126,59 @@ mod stream_tests { let result: Result = fory.deserialize_from_stream(Cursor::new(bytes)); assert!(result.is_err()); } + + // ======================================================================== + // Test: ShrinkBuffer compacts consumed bytes + // Validates the C++ shrink_buffer() behavior is correctly implemented + // ======================================================================== + #[test] + fn test_shrink_buffer_compacts_consumed_bytes() { + let fory = Fory::default(); + + // Serialize multiple values into a single buffer + let mut bytes = Vec::new(); + fory.serialize_to(&mut bytes, &42i32).unwrap(); + fory.serialize_to(&mut bytes, &"shrink-test".to_string()) + .unwrap(); + fory.serialize_to(&mut bytes, &100i64).unwrap(); + + // Use a small initial buffer to force multiple fills + let mut reader = + Reader::from_stream(ForyStreamBuf::with_capacity(OneByte(Cursor::new(bytes)), 4)); + + // After each deserialize_from, shrink_buffer should compact the stream. + let first: i32 = fory.deserialize_from(&mut reader).unwrap(); + assert_eq!(first, 42); + + let second: String = fory.deserialize_from(&mut reader).unwrap(); + assert_eq!(second, "shrink-test"); + + let third: i64 = fory.deserialize_from(&mut reader).unwrap(); + assert_eq!(third, 100); + } + + // ======================================================================== + // Test: Additional primitive types through deserialize_helper + // ======================================================================== + #[test] + fn test_additional_primitive_types() { + let fory = Fory::default(); + + // bool + let bytes = fory.serialize(&true).unwrap(); + assert_eq!(deserialize_helper::(&fory, &bytes), true); + + // i32 + let bytes = fory.serialize(&-42i32).unwrap(); + assert_eq!(deserialize_helper::(&fory, &bytes), -42i32); + + // f64 + let bytes = fory.serialize(&3.14159f64).unwrap(); + assert_eq!(deserialize_helper::(&fory, &bytes), 3.14159f64); + + // Vec + let vec = vec![1i32, 2, 3, 5, 8]; + let bytes = fory.serialize(&vec).unwrap(); + assert_eq!(deserialize_helper::>(&fory, &bytes), vec); + } } From 010ae6835cc3a1ed911cb679d25ff74e64b0d53d Mon Sep 17 00:00:00 2001 From: Zakir Date: Fri, 6 Mar 2026 22:48:30 +0530 Subject: [PATCH 4/7] ci fix --- rust/fory-core/src/fory.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/rust/fory-core/src/fory.rs b/rust/fory-core/src/fory.rs index af9ea4da46..fabd5240c0 100644 --- a/rust/fory-core/src/fory.rs +++ b/rust/fory-core/src/fory.rs @@ -1032,15 +1032,20 @@ impl Fory { source: impl Read + Send + 'static, ) -> Result { self.with_read_context(|context| { + // Wrap source in stream buffer and attach as the active reader let stream = crate::stream::ForyStreamBuf::new(source); let reader = Reader::from_stream(stream); context.attach_reader(reader); + + // Perform deserialization using the stream-backed reader let result = self.deserialize_with_context(context); - // Mirror C++ StreamShrinkGuard: shrink_buffer on detach. + + // Detach the reader once, recover the owned stream, and shrink buffer let mut returned = context.detach_reader(); if let Some(ref mut s) = returned.stream { s.shrink_buffer(); } + result }) } From e46c7887c626413cbea5d334e6dd345cdc5b6953 Mon Sep 17 00:00:00 2001 From: Zakir Date: Fri, 6 Mar 2026 22:57:08 +0530 Subject: [PATCH 5/7] format stream test --- rust/tests/tests/stream_test.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/rust/tests/tests/stream_test.rs b/rust/tests/tests/stream_test.rs index a2a1e5ff11..1785cdb4f0 100644 --- a/rust/tests/tests/stream_test.rs +++ b/rust/tests/tests/stream_test.rs @@ -166,16 +166,18 @@ mod stream_tests { // bool let bytes = fory.serialize(&true).unwrap(); - assert_eq!(deserialize_helper::(&fory, &bytes), true); + assert!(deserialize_helper::(&fory, &bytes)); // i32 let bytes = fory.serialize(&-42i32).unwrap(); assert_eq!(deserialize_helper::(&fory, &bytes), -42i32); // f64 - let bytes = fory.serialize(&3.14159f64).unwrap(); - assert_eq!(deserialize_helper::(&fory, &bytes), 3.14159f64); - + let bytes = fory.serialize(&std::f64::consts::PI).unwrap(); + assert_eq!( + deserialize_helper::(&fory, &bytes), + std::f64::consts::PI + ); // Vec let vec = vec![1i32, 2, 3, 5, 8]; let bytes = fory.serialize(&vec).unwrap(); From 69b5cf6c1231accdb2c04eae7964f329b520941d Mon Sep 17 00:00:00 2001 From: Zakir Date: Sat, 7 Mar 2026 01:12:04 +0530 Subject: [PATCH 6/7] fix(rust): finalize streaming deserialization parity with C++/Go - fix reader attach/detach lifecycle in deserialize_from_stream - isolate stream and in-memory paths in deserialize_from - add struct round-trip test for reference parity with C++/Go - fix clippy lints in stream_test.rs --- rust/fory-core/src/fory.rs | 38 ++++++++------------------------- rust/tests/tests/stream_test.rs | 20 +++++++++++++++++ 2 files changed, 29 insertions(+), 29 deletions(-) diff --git a/rust/fory-core/src/fory.rs b/rust/fory-core/src/fory.rs index 8ada831329..ff2d1824e5 100644 --- a/rust/fory-core/src/fory.rs +++ b/rust/fory-core/src/fory.rs @@ -25,6 +25,7 @@ use crate::serializer::ForyDefault; use crate::serializer::{Serializer, StructSerializer}; use crate::types::config_flags::{IS_CROSS_LANGUAGE_FLAG, IS_NULL_FLAG}; use crate::types::{RefMode, SIZE_OF_REF_AND_TYPE}; +use crate::stream::ForyStreamBuf; use std::cell::UnsafeCell; use std::io::Read; use std::mem; @@ -992,46 +993,34 @@ impl Fory { ) -> Result { self.with_read_context(|context| { if reader.is_stream_backed() { - // Stream-backed path: move the owned stream out of the caller's reader, - // construct a fresh stream-backed reader at the current cursor, hand it - // to the context, then restore all state from the returned reader. - // This is the sequential-read case: caller creates Reader::from_stream(...) - // once and calls deserialize_from repeatedly. + // STREAM PATH — single attach let stream = mem::take(&mut reader.stream) .expect("is_stream_backed was true but stream is None"); let cursor = reader.cursor; let mut stream_reader = Reader::from_stream(*stream); - // Sync cursor: the stream already consumed [0..cursor], re-position. stream_reader.set_cursor(cursor); context.attach_reader(stream_reader); + let result = self.deserialize_with_context(context); let returned = context.detach_reader(); - // Restore state back to caller's reader. + reader.cursor = returned.cursor; reader.stream = returned.stream; - // Re-pin bf from the (possibly grown after fill_to) stream buffer. - // SAFETY: same invariant as Reader::from_stream and fill_to: - // bf points into Box-owned stream buffer, owned by reader.stream, - // which lives as long as reader. + if let Some(ref mut s) = reader.stream { - // Sync stream's read_pos with the reader cursor position - // before shrinking — the detached reader may have advanced - // cursor without updating stream.read_pos. let _ = s.set_reader_index(reader.cursor); - // Mirror C++ StreamShrinkGuard: compact consumed bytes after - // deserialization to prevent unbounded buffer growth on - // long-lived streams. s.shrink_buffer(); reader.bf = unsafe { std::slice::from_raw_parts(s.data(), s.size()) }; reader.cursor = s.reader_index(); } result } else { - // In-memory path: unchanged from original. + // IN-MEMORY PATH — unchanged fast path let outlive_buffer = unsafe { mem::transmute::<&[u8], &[u8]>(reader.bf) }; let mut new_reader = Reader::new(outlive_buffer); new_reader.set_cursor(reader.cursor); context.attach_reader(new_reader); + let result = self.deserialize_with_context(context); let end = context.detach_reader().get_cursor(); reader.set_cursor(end); @@ -1063,20 +1052,11 @@ impl Fory { source: impl Read + Send + 'static, ) -> Result { self.with_read_context(|context| { - // Wrap source in stream buffer and attach as the active reader - let stream = crate::stream::ForyStreamBuf::new(source); - let reader = Reader::from_stream(stream); - context.attach_reader(reader); - - // Perform deserialization using the stream-backed reader + context.attach_reader(Reader::from_stream(ForyStreamBuf::new(source))); let result = self.deserialize_with_context(context); - - // Detach the reader once, recover the owned stream, and shrink buffer - let mut returned = context.detach_reader(); - if let Some(ref mut s) = returned.stream { + if let Some(ref mut s) = context.detach_reader().stream { s.shrink_buffer(); } - result }) } diff --git a/rust/tests/tests/stream_test.rs b/rust/tests/tests/stream_test.rs index 1785cdb4f0..5b4bd36521 100644 --- a/rust/tests/tests/stream_test.rs +++ b/rust/tests/tests/stream_test.rs @@ -183,4 +183,24 @@ mod stream_tests { let bytes = fory.serialize(&vec).unwrap(); assert_eq!(deserialize_helper::>(&fory, &bytes), vec); } + + #[derive(Debug, PartialEq, fory_derive::ForyObject)] + struct Point { + x: i32, + y: i32, + } + #[test] + fn test_struct_round_trip() { + let mut fory = Fory::default(); + + fory.register::(1).unwrap(); + + let point = Point { x: 42, y: -7 }; + + let bytes = fory.serialize(&point).unwrap(); + + let result = deserialize_helper::(&fory, &bytes); + + assert_eq!(result, point); + } } From 579ba37f9ec2f13d0cb4ae7eaea847ed0a4cce03 Mon Sep 17 00:00:00 2001 From: Zakir Date: Sat, 7 Mar 2026 01:27:45 +0530 Subject: [PATCH 7/7] style(rust): fix import order in fory.rs per rustfmt --- rust/fory-core/src/fory.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/fory-core/src/fory.rs b/rust/fory-core/src/fory.rs index ff2d1824e5..c43073369e 100644 --- a/rust/fory-core/src/fory.rs +++ b/rust/fory-core/src/fory.rs @@ -23,9 +23,9 @@ use crate::resolver::context::{ContextCache, ReadContext, WriteContext}; use crate::resolver::type_resolver::TypeResolver; use crate::serializer::ForyDefault; use crate::serializer::{Serializer, StructSerializer}; +use crate::stream::ForyStreamBuf; use crate::types::config_flags::{IS_CROSS_LANGUAGE_FLAG, IS_NULL_FLAG}; use crate::types::{RefMode, SIZE_OF_REF_AND_TYPE}; -use crate::stream::ForyStreamBuf; use std::cell::UnsafeCell; use std::io::Read; use std::mem;