diff --git a/rust/fory-core/src/buffer.rs b/rust/fory-core/src/buffer.rs index 4cab51acf1..ab46b8ac20 100644 --- a/rust/fory-core/src/buffer.rs +++ b/rust/fory-core/src/buffer.rs @@ -18,6 +18,7 @@ use crate::error::Error; use crate::float16::float16; use crate::meta::buffer_rw_string::read_latin1_simd; +use crate::stream::ForyStreamBuf; use byteorder::{ByteOrder, LittleEndian}; use std::cmp::max; @@ -506,6 +507,7 @@ impl<'a> Writer<'a> { pub struct Reader<'a> { pub(crate) bf: &'a [u8], pub(crate) cursor: usize, + pub(crate) stream: Option>, } #[allow(clippy::needless_lifetimes)] @@ -514,7 +516,26 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn new(bf: &[u8]) -> Reader<'_> { - Reader { bf, cursor: 0 } + Reader { + bf, + cursor: 0, + stream: None, + } + } + + /// Construct a stream-backed `Reader`. + pub fn from_stream(stream: crate::stream::ForyStreamBuf) -> Reader<'static> { + let boxed = Box::new(stream); + Reader { + bf: b"", + cursor: 0, + stream: Some(boxed), + } + } + + #[inline(always)] + pub fn is_stream_backed(&self) -> bool { + self.stream.is_some() } #[inline(always)] @@ -551,29 +572,63 @@ impl<'a> Reader<'a> { self.cursor } + /// Fill stream buffer up to `target_size` total bytes, then re-pin `bf`. + /// Returns `false` if stream is None OR fill failed. + fn fill_to(&mut self, target_size: usize) -> bool { + let stream = match self.stream.as_mut() { + Some(s) => s, + None => return false, + }; + // intentional: fill_buffer validates; set_reader_index only syncs read_pos + let _ = stream.set_reader_index(self.cursor); + + let n = target_size.saturating_sub(self.cursor); + if n == 0 { + self.bf = unsafe { std::slice::from_raw_parts(stream.data(), stream.size()) }; + return self.bf.len() >= target_size; + } + if stream.fill_buffer(n).is_err() { + return false; + } + self.bf = unsafe { std::slice::from_raw_parts(stream.data(), stream.size()) }; + self.bf.len() >= target_size + } + + /// Ensure `self.cursor + n` bytes are available. + /// fast path: target <= size_ → return true + /// stream path: call fill_to(target), check again. #[inline(always)] - fn value_at(&self, index: usize) -> Result { - match self.bf.get(index) { - None => Err(Error::buffer_out_of_bound( - index, - self.bf.len(), - self.bf.len(), - )), - Some(v) => Ok(*v), + fn ensure_readable(&mut self, n: usize) -> Result<(), Error> { + let target = self.cursor + n; + if target <= self.bf.len() { + return Ok(()); + } + if !self.fill_to(target) { + return Err(Error::buffer_out_of_bound(self.cursor, n, self.bf.len())); + } + if target > self.bf.len() { + return Err(Error::buffer_out_of_bound(self.cursor, n, self.bf.len())); } + Ok(()) } #[inline(always)] - fn check_bound(&self, n: usize) -> Result<(), Error> { - let end = self - .cursor - .checked_add(n) - .ok_or_else(|| Error::buffer_out_of_bound(self.cursor, n, self.bf.len()))?; - if end > self.bf.len() { - Err(Error::buffer_out_of_bound(self.cursor, n, self.bf.len())) - } else { - Ok(()) + fn value_at(&mut self, index: usize) -> Result { + if index >= self.bf.len() { + // Need index+1 bytes total; fill to that target. + if !self.fill_to(index + 1) || index >= self.bf.len() { + return Err(Error::buffer_out_of_bound(index, 1, self.bf.len())); + } } + Ok(unsafe { *self.bf.get_unchecked(index) }) + } + + /// stream fill on miss. Changing to `&mut self` is the single + /// change that gives ALL 27 existing read methods stream support + /// without touching them individually — they all call this. + #[inline(always)] + fn check_bound(&mut self, n: usize) -> Result<(), Error> { + self.ensure_readable(n) } #[inline(always)] @@ -606,8 +661,12 @@ impl<'a> Reader<'a> { } } + /// `stream_->reader_index(reader_index_)` when stream-backed. pub fn set_cursor(&mut self, cursor: usize) { self.cursor = cursor; + if let Some(ref mut stream) = self.stream { + let _ = stream.set_reader_index(cursor); + } } // ============ BOOL (TypeId = 1) ============ @@ -723,6 +782,9 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_varuint32(&mut self) -> Result { + if self.stream.is_some() && self.bf.len().saturating_sub(self.cursor) < 5 { + return self.read_varuint32_stream(); + } let b0 = self.value_at(self.cursor)? as u32; if b0 < 0x80 { self.move_next(1); @@ -770,6 +832,9 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_varuint64(&mut self) -> Result { + if self.stream.is_some() && self.bf.len().saturating_sub(self.cursor) < 9 { + return self.read_varuint64_stream(); + } let b0 = self.value_at(self.cursor)? as u64; if b0 < 0x80 { self.move_next(1); @@ -1000,6 +1065,9 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_varuint36small(&mut self) -> Result { + if self.stream.is_some() && self.bf.len().saturating_sub(self.cursor) < 8 { + return self.read_varuint36small_stream(); + } // Keep this API panic-free even if cursor is externally set past buffer end. self.check_bound(0)?; let start = self.cursor; @@ -1046,6 +1114,55 @@ impl<'a> Reader<'a> { } Ok(result) } + + /// Byte-by-byte varuint32 decode for stream-backed path. + fn read_varuint32_stream(&mut self) -> Result { + let mut result = 0u32; + for i in 0..5 { + let b = self.value_at(self.cursor)? as u32; + self.cursor += 1; + result |= (b & 0x7F) << (i * 7); + if (b & 0x80) == 0 { + return Ok(result); + } + } + Err(Error::encode_error("Invalid var_uint32 encoding")) + } + + /// Byte-by-byte varuint64 decode for stream-backed path. + fn read_varuint64_stream(&mut self) -> Result { + let mut result = 0u64; + for i in 0..8u64 { + let b = self.value_at(self.cursor)? as u64; + self.cursor += 1; + result |= (b & 0x7F) << (i * 7); + if (b & 0x80) == 0 { + return Ok(result); + } + } + // 9th byte — full 8 bits + let b = self.value_at(self.cursor)? as u64; + self.cursor += 1; + result |= b << 56; + Ok(result) + } + + /// Byte-by-byte varuint36small decode for stream-backed path. + fn read_varuint36small_stream(&mut self) -> Result { + let mut result = 0u64; + for i in 0..4u64 { + let b = self.value_at(self.cursor)? as u64; + self.cursor += 1; + result |= (b & 0x7F) << (i * 7); + if (b & 0x80) == 0 { + return Ok(result); + } + } + let b = self.value_at(self.cursor)? as u64; + self.cursor += 1; + result |= b << 28; + Ok(result) + } } #[allow(clippy::needless_lifetimes)] diff --git a/rust/fory-core/src/fory.rs b/rust/fory-core/src/fory.rs index d4e2359b33..c43073369e 100644 --- a/rust/fory-core/src/fory.rs +++ b/rust/fory-core/src/fory.rs @@ -23,9 +23,11 @@ use crate::resolver::context::{ContextCache, ReadContext, WriteContext}; use crate::resolver::type_resolver::TypeResolver; use crate::serializer::ForyDefault; use crate::serializer::{Serializer, StructSerializer}; +use crate::stream::ForyStreamBuf; use crate::types::config_flags::{IS_CROSS_LANGUAGE_FLAG, IS_NULL_FLAG}; use crate::types::{RefMode, SIZE_OF_REF_AND_TYPE}; use std::cell::UnsafeCell; +use std::io::Read; use std::mem; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::OnceLock; @@ -990,13 +992,71 @@ impl Fory { reader: &mut Reader, ) -> Result { self.with_read_context(|context| { - let outlive_buffer = unsafe { mem::transmute::<&[u8], &[u8]>(reader.bf) }; - let mut new_reader = Reader::new(outlive_buffer); - new_reader.set_cursor(reader.cursor); - context.attach_reader(new_reader); + if reader.is_stream_backed() { + // STREAM PATH — single attach + let stream = mem::take(&mut reader.stream) + .expect("is_stream_backed was true but stream is None"); + let cursor = reader.cursor; + let mut stream_reader = Reader::from_stream(*stream); + stream_reader.set_cursor(cursor); + context.attach_reader(stream_reader); + + let result = self.deserialize_with_context(context); + let returned = context.detach_reader(); + + reader.cursor = returned.cursor; + reader.stream = returned.stream; + + if let Some(ref mut s) = reader.stream { + let _ = s.set_reader_index(reader.cursor); + s.shrink_buffer(); + reader.bf = unsafe { std::slice::from_raw_parts(s.data(), s.size()) }; + reader.cursor = s.reader_index(); + } + result + } else { + // IN-MEMORY PATH — unchanged fast path + let outlive_buffer = unsafe { mem::transmute::<&[u8], &[u8]>(reader.bf) }; + let mut new_reader = Reader::new(outlive_buffer); + new_reader.set_cursor(reader.cursor); + context.attach_reader(new_reader); + + let result = self.deserialize_with_context(context); + let end = context.detach_reader().get_cursor(); + reader.set_cursor(end); + result + } + }) + } + + /// Deserializes a single value of type `T` from any `Read` source. + /// + /// Equivalent of C++ `fory.deserialize(Buffer(ForyInputStream(source)))`. + /// Internally wraps the source in a [`crate::stream::ForyStreamBuf`] and calls + /// [`deserialize_from`](Self::deserialize_from). + /// + /// For deserializing **multiple values sequentially** from one stream + /// (e.g. a network socket or pipe), create the reader once and reuse it: + /// + /// ```rust,ignore + /// use fory_core::{Fory, Reader}; + /// use fory_core::stream::ForyStreamBuf; + /// + /// let fory = Fory::default(); + /// let mut reader = Reader::from_stream(ForyStreamBuf::new(my_socket)); + /// let first: i32 = fory.deserialize_from(&mut reader).unwrap(); + /// let second: String = fory.deserialize_from(&mut reader).unwrap(); + /// ``` + pub fn deserialize_from_stream( + &self, + source: impl Read + Send + 'static, + ) -> Result { + self.with_read_context(|context| { + context.attach_reader(Reader::from_stream(ForyStreamBuf::new(source))); let result = self.deserialize_with_context(context); - let end = context.detach_reader().get_cursor(); - reader.set_cursor(end); + if let Some(ref mut s) = context.detach_reader().stream { + s.shrink_buffer(); + } result }) } diff --git a/rust/fory-core/src/lib.rs b/rust/fory-core/src/lib.rs index 976a760af6..bee8b59b49 100644 --- a/rust/fory-core/src/lib.rs +++ b/rust/fory-core/src/lib.rs @@ -185,6 +185,7 @@ pub mod meta; pub mod resolver; pub mod row; pub mod serializer; +pub mod stream; pub mod types; pub use float16::float16 as Float16; pub mod util; @@ -201,3 +202,4 @@ pub use crate::resolver::type_resolver::{TypeInfo, TypeResolver}; pub use crate::serializer::weak::{ArcWeak, RcWeak}; pub use crate::serializer::{read_data, write_data, ForyDefault, Serializer, StructSerializer}; pub use crate::types::{RefFlag, RefMode, TypeId}; +pub use stream::ForyStreamBuf; diff --git a/rust/fory-core/src/stream.rs b/rust/fory-core/src/stream.rs new file mode 100644 index 0000000000..7a2d2c3934 --- /dev/null +++ b/rust/fory-core/src/stream.rs @@ -0,0 +1,346 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Streaming buffer for incremental deserialization. + +use crate::error::Error; +use std::io::{self, Read}; + +const DEFAULT_BUFFER_SIZE: usize = 4096; + +/// Single internal `Vec` window. `valid_len` = `egptr()-eback()`. +/// `read_pos` = `gptr()-eback()`. [`fill_buffer`] grows on demand. +/// +/// [`fill_buffer`]: ForyStreamBuf::fill_buffer +pub struct ForyStreamBuf { + source: Box, + /// Backing window — equivalent of C++ `buffer_` (`std::vector`) + buffer: Vec, + /// Bytes fetched from source — equivalent of `egptr() - eback()` + valid_len: usize, + /// Current read cursor — equivalent of `gptr() - eback()` + read_pos: usize, + /// Initial capacity for shrink_buffer target — mirrors C++ `initial_buffer_size_` + initial_buffer_size: usize, +} + +impl ForyStreamBuf { + pub fn new(source: impl Read + Send + 'static) -> Self { + Self::with_capacity(source, DEFAULT_BUFFER_SIZE) + } + + /// Allocates and zero-initialises the backing window immediately, + /// `std::vector(buffer_size)` in the constructor. + pub fn with_capacity(source: impl Read + Send + 'static, buffer_size: usize) -> Self { + let cap = buffer_size.max(1); + let buffer = vec![0u8; cap]; + Self { + source: Box::new(source), + buffer, + valid_len: 0, + read_pos: 0, + initial_buffer_size: cap, + } + } + + /// Pull bytes from source until `remaining() >= min_fill_size`. + pub fn fill_buffer(&mut self, min_fill_size: usize) -> Result<(), Error> { + if min_fill_size == 0 || self.remaining() >= min_fill_size { + return Ok(()); + } + + let need = min_fill_size - self.remaining(); + + let required = self + .valid_len + .checked_add(need) + .filter(|&r| r <= u32::MAX as usize) + .ok_or_else(|| { + Error::buffer_out_of_bound(self.read_pos, min_fill_size, self.remaining()) + })?; + + // Grow if required > current buffer length + if required > self.buffer.len() { + let new_cap = (self.buffer.len() * 2).max(required); + self.buffer.resize(new_cap, 0); + } + + while self.remaining() < min_fill_size { + let writable = self.buffer.len() - self.valid_len; + if writable == 0 { + // Inner double `buffer_.size() * 2 + 1` with u32 overflow guard + let new_cap = self + .buffer + .len() + .checked_mul(2) + .and_then(|n| n.checked_add(1)) + .filter(|&n| n <= u32::MAX as usize) + .ok_or_else(|| { + Error::buffer_out_of_bound(self.read_pos, min_fill_size, self.remaining()) + })?; + self.buffer.resize(new_cap, 0); + // fall through — self.buffer[self.valid_len..] is now non-empty + } + match self.source.read(&mut self.buffer[self.valid_len..]) { + Ok(0) => { + // `read_bytes <= 0` → buffer_out_of_bound + return Err(Error::buffer_out_of_bound( + self.read_pos, + min_fill_size, + self.remaining(), + )); + } + Ok(n) => self.valid_len += n, + Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, + Err(_) => { + return Err(Error::buffer_out_of_bound( + self.read_pos, + min_fill_size, + self.remaining(), + )); + } + } + } + Ok(()) + } + + /// Move cursor backward by `size` bytes. + /// + /// `setg(eback(), gptr() - size, egptr())` + /// + /// Panics if `size > read_pos`. + pub fn rewind(&mut self, size: usize) { + assert!( + size <= self.read_pos, + "rewind size {} exceeds consumed bytes {}", + size, + self.read_pos + ); + self.read_pos -= size; + } + + /// Advance cursor forward by `size` bytes without pulling from source. + /// + /// `gbump(static_cast(size))` + /// + /// Panics if `size > remaining()`. + pub fn consume(&mut self, size: usize) { + assert!( + size <= self.remaining(), + "consume size {} exceeds available bytes {}", + size, + self.remaining() + ); + self.read_pos += size; + } + + /// Raw pointer to byte 0 of the internal window. + /// + /// Re-read by `Reader` (buffer.rs) after every `fill_buffer` call that + /// may reallocate + /// `data_ = stream_->data()`. + /// + /// `uint8_t* data()` → `reinterpret_cast(eback())`. + /// + /// # Safety + /// Valid until the next `fill_buffer` call that causes reallocation. + /// `Reader` always re-reads this pointer after every `fill_buffer`. + #[inline(always)] + pub(crate) fn data(&self) -> *const u8 { + self.buffer.as_ptr() + } + + /// Total fetched bytes + #[inline(always)] + pub fn size(&self) -> usize { + self.valid_len + } + + /// Current read cursor + #[inline(always)] + pub fn reader_index(&self) -> usize { + self.read_pos + } + + /// Set cursor to absolute `index`. + /// + /// Called by `Reader` (buffer.rs) after every cursor advance, mirroring + /// + /// Returns `Err` if `index > valid_len` + #[inline(always)] + pub(crate) fn set_reader_index(&mut self, index: usize) -> Result<(), Error> { + if index > self.valid_len { + return Err(Error::buffer_out_of_bound(index, 0, self.valid_len)); + } + self.read_pos = index; + Ok(()) + } + + /// Unread bytes in window + #[inline(always)] + pub fn remaining(&self) -> usize { + self.valid_len.saturating_sub(self.read_pos) + } + + /// Always `true` — used by `Reader` (buffer.rs) to branch into the stream path. + #[inline(always)] + pub fn is_stream_backed(&self) -> bool { + true + } + + /// Compact consumed bytes and optionally shrink capacity. + /// + /// Mirrors C++ `ForyInputStream::shrink_buffer()` exactly: + /// 1. Memmove remaining bytes to front of buffer + /// 2. Reset read_pos = 0, valid_len = remaining + /// 3. If capacity > initial_buffer_size and utilization is low, + /// shrink back toward initial size + pub fn shrink_buffer(&mut self) { + let remaining = self.remaining(); + + // Phase 1: compact — memmove remaining data to front + if self.read_pos > 0 { + if remaining > 0 { + self.buffer.copy_within(self.read_pos..self.valid_len, 0); + } + self.read_pos = 0; + self.valid_len = remaining; + } + + // Phase 2: optionally shrink capacity back toward initial_buffer_size + let current_capacity = self.buffer.len(); + let mut target_capacity = current_capacity; + + if current_capacity > self.initial_buffer_size { + if remaining == 0 { + target_capacity = self.initial_buffer_size; + } else if remaining <= current_capacity / 4 { + let doubled = remaining.saturating_mul(2).max(1); + target_capacity = self.initial_buffer_size.max(doubled); + } + } + + if target_capacity < current_capacity { + self.buffer.truncate(target_capacity); + self.buffer.shrink_to_fit(); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Cursor; + + /// Reads exactly 1 byte at a time. + struct OneByteCursor(Cursor>); + impl Read for OneByteCursor { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + if buf.is_empty() { + return Ok(0); + } + let mut one = [0u8; 1]; + match self.0.read(&mut one)? { + 0 => Ok(0), + _ => { + buf[0] = one[0]; + Ok(1) + } + } + } + } + + #[test] + fn test_rewind() { + let data = vec![0x01u8, 0x02, 0x03, 0x04, 0x05]; + let mut s = ForyStreamBuf::with_capacity(OneByteCursor(Cursor::new(data)), 2); + s.fill_buffer(4).unwrap(); + assert_eq!(s.size(), 4); + assert_eq!(s.reader_index(), 0); + s.consume(3); + assert_eq!(s.reader_index(), 3); + s.rewind(2); + assert_eq!(s.reader_index(), 1); + s.consume(1); + assert_eq!(s.reader_index(), 2); + } + + #[test] + fn test_short_read_error() { + let mut s = ForyStreamBuf::new(Cursor::new(vec![0x01u8, 0x02, 0x03])); + assert!(s.fill_buffer(4).is_err()); + } + + // Sequential fills with tiny-chunk reader + #[test] + fn test_sequential_fill() { + let data: Vec = (0u8..=9).collect(); + let mut s = ForyStreamBuf::with_capacity(OneByteCursor(Cursor::new(data)), 2); + s.fill_buffer(3).unwrap(); + assert!(s.remaining() >= 3); + s.consume(3); + s.fill_buffer(3).unwrap(); + assert!(s.remaining() >= 3); + } + + #[test] + fn test_overflow_guard() { + // valid_len near usize::MAX would overflow without the u32 guard. + // We can't actually allocate that — just verify the guard logic + // via a saturating check on a real (tiny) stream. + let mut s = ForyStreamBuf::new(Cursor::new(vec![0u8; 8])); + // Requesting more than the source has should error, not panic/overflow + assert!(s.fill_buffer(16).is_err()); + } + + #[test] + fn test_consume_panics_on_overrun() { + let result = std::panic::catch_unwind(|| { + let mut s = ForyStreamBuf::new(Cursor::new(vec![0x01u8])); + s.fill_buffer(1).unwrap(); + s.consume(2); // only 1 byte available + }); + assert!(result.is_err()); + } + + #[test] + fn test_rewind_panics_on_overrun() { + let result = std::panic::catch_unwind(|| { + let mut s = ForyStreamBuf::new(Cursor::new(vec![0x01u8, 0x02])); + s.fill_buffer(2).unwrap(); + s.consume(1); + s.rewind(2); // only consumed 1 + }); + assert!(result.is_err()); + } + + #[test] + fn test_set_reader_index() { + let mut s = ForyStreamBuf::new(Cursor::new(vec![0x01u8, 0x02, 0x03])); + s.fill_buffer(3).unwrap(); + assert!(s.set_reader_index(2).is_ok()); + assert_eq!(s.reader_index(), 2); + assert_eq!(s.remaining(), 1); + assert!(s.set_reader_index(4).is_err()); // beyond valid_len + } + + #[test] + fn test_is_stream_backed() { + let s = ForyStreamBuf::new(Cursor::new(vec![])); + assert!(s.is_stream_backed()); + } +} diff --git a/rust/tests/tests/stream_test.rs b/rust/tests/tests/stream_test.rs new file mode 100644 index 0000000000..5b4bd36521 --- /dev/null +++ b/rust/tests/tests/stream_test.rs @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[cfg(test)] +mod stream_tests { + use fory_core::buffer::Reader; + use fory_core::stream::ForyStreamBuf; + use fory_core::Fory; + use std::fmt::Debug; + use std::io::Cursor; + + // ======================================================================== + // OneByteStream — mirrors C++ OneByteStreamBuf / OneByteIStream + // Delivers exactly 1 byte per read() call for maximum streaming stress. + // ======================================================================== + struct OneByte(Cursor>); + impl std::io::Read for OneByte { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + if buf.is_empty() { + return Ok(0); + } + let mut one = [0u8]; + match self.0.read(&mut one)? { + 0 => Ok(0), + _ => { + buf[0] = one[0]; + Ok(1) + } + } + } + } + + // ======================================================================== + // Deserialize helper — per maintainer requirement: + // "Create a Deserialize help methods in tests, then use that instead of + // fory.Deserialize for deserialization, and in the Deserialize test + // helper, first deserialize from bytes, then wrap it into a + // OneByteStream to deserialize it to ensure deserialization works." + // ======================================================================== + fn deserialize_helper(fory: &Fory, bytes: &[u8]) -> T + where + T: fory_core::Serializer + fory_core::ForyDefault + PartialEq + Debug, + { + // Path 1: deserialize from bytes (standard in-memory path) + let from_bytes: T = fory.deserialize(bytes).expect("bytes deserialize failed"); + + // Path 2: deserialize from OneByteStream (streaming path) + let from_stream: T = fory + .deserialize_from_stream(OneByte(Cursor::new(bytes.to_vec()))) + .expect("stream deserialize failed"); + + // Assert both paths produce the same result + assert_eq!( + from_bytes, from_stream, + "bytes vs stream deserialization mismatch" + ); + + from_bytes + } + + // ======================================================================== + // Test: PrimitiveAndStringRoundTrip + // Mirrors C++ StreamSerializationTest::PrimitiveAndStringRoundTrip + // ======================================================================== + #[test] + fn test_primitive_and_string_round_trip() { + let fory = Fory::default(); + + // i64 round-trip + let bytes = fory.serialize(&-9876543212345i64).unwrap(); + let result = deserialize_helper::(&fory, &bytes); + assert_eq!(result, -9876543212345i64); + + // String round-trip (with unicode) + let bytes = fory.serialize(&"stream-hello-世界".to_string()).unwrap(); + let result = deserialize_helper::(&fory, &bytes); + assert_eq!(result, "stream-hello-世界"); + } + + // ======================================================================== + // Test: SequentialDeserializeFromSingleStream + // Mirrors C++ StreamSerializationTest::SequentialDeserializeFromSingleStream + // ======================================================================== + #[test] + fn test_sequential_stream_reads() { + let fory = Fory::default(); + let mut bytes = Vec::new(); + fory.serialize_to(&mut bytes, &12345i32).unwrap(); + fory.serialize_to(&mut bytes, &"next-value".to_string()) + .unwrap(); + fory.serialize_to(&mut bytes, &99i64).unwrap(); + + let mut reader = Reader::from_stream(ForyStreamBuf::new(OneByte(Cursor::new(bytes)))); + let first: i32 = fory.deserialize_from(&mut reader).unwrap(); + let second: String = fory.deserialize_from(&mut reader).unwrap(); + let third: i64 = fory.deserialize_from(&mut reader).unwrap(); + + assert_eq!(first, 12345); + assert_eq!(second, "next-value"); + assert_eq!(third, 99); + } + + // ======================================================================== + // Test: TruncatedStreamReturnsError + // Mirrors C++ StreamSerializationTest::TruncatedStreamReturnsError + // ======================================================================== + #[test] + fn test_truncated_stream_returns_error() { + let fory = Fory::default(); + let mut bytes = fory.serialize(&"hello world".to_string()).unwrap(); + bytes.pop(); + let result: Result = fory.deserialize_from_stream(Cursor::new(bytes)); + assert!(result.is_err()); + } + + // ======================================================================== + // Test: ShrinkBuffer compacts consumed bytes + // Validates the C++ shrink_buffer() behavior is correctly implemented + // ======================================================================== + #[test] + fn test_shrink_buffer_compacts_consumed_bytes() { + let fory = Fory::default(); + + // Serialize multiple values into a single buffer + let mut bytes = Vec::new(); + fory.serialize_to(&mut bytes, &42i32).unwrap(); + fory.serialize_to(&mut bytes, &"shrink-test".to_string()) + .unwrap(); + fory.serialize_to(&mut bytes, &100i64).unwrap(); + + // Use a small initial buffer to force multiple fills + let mut reader = + Reader::from_stream(ForyStreamBuf::with_capacity(OneByte(Cursor::new(bytes)), 4)); + + // After each deserialize_from, shrink_buffer should compact the stream. + let first: i32 = fory.deserialize_from(&mut reader).unwrap(); + assert_eq!(first, 42); + + let second: String = fory.deserialize_from(&mut reader).unwrap(); + assert_eq!(second, "shrink-test"); + + let third: i64 = fory.deserialize_from(&mut reader).unwrap(); + assert_eq!(third, 100); + } + + // ======================================================================== + // Test: Additional primitive types through deserialize_helper + // ======================================================================== + #[test] + fn test_additional_primitive_types() { + let fory = Fory::default(); + + // bool + let bytes = fory.serialize(&true).unwrap(); + assert!(deserialize_helper::(&fory, &bytes)); + + // i32 + let bytes = fory.serialize(&-42i32).unwrap(); + assert_eq!(deserialize_helper::(&fory, &bytes), -42i32); + + // f64 + let bytes = fory.serialize(&std::f64::consts::PI).unwrap(); + assert_eq!( + deserialize_helper::(&fory, &bytes), + std::f64::consts::PI + ); + // Vec + let vec = vec![1i32, 2, 3, 5, 8]; + let bytes = fory.serialize(&vec).unwrap(); + assert_eq!(deserialize_helper::>(&fory, &bytes), vec); + } + + #[derive(Debug, PartialEq, fory_derive::ForyObject)] + struct Point { + x: i32, + y: i32, + } + #[test] + fn test_struct_round_trip() { + let mut fory = Fory::default(); + + fory.register::(1).unwrap(); + + let point = Point { x: 42, y: -7 }; + + let bytes = fory.serialize(&point).unwrap(); + + let result = deserialize_helper::(&fory, &bytes); + + assert_eq!(result, point); + } +}