diff --git a/be/src/common/thread_safety_annotations.h b/be/src/common/thread_safety_annotations.h index 41c50711db3e0c..6cd8d4b0cae45c 100644 --- a/be/src/common/thread_safety_annotations.h +++ b/be/src/common/thread_safety_annotations.h @@ -118,3 +118,54 @@ class SCOPED_CAPABILITY LockGuard { private: MutexType& _mu; }; + +// RAII unique lock annotated for thread safety analysis. +// Supports manual lock/unlock while preserving capability tracking. +template +class SCOPED_CAPABILITY UniqueLock { +public: + explicit UniqueLock(MutexType& mu) ACQUIRE(mu) : _mu(&mu), _locked(true) { +#ifdef BE_TEST + doris::mock_random_sleep(); +#endif + _mu->lock(); + } + + UniqueLock(MutexType& mu, std::adopt_lock_t) REQUIRES(mu) : _mu(&mu), _locked(true) {} + + UniqueLock(MutexType& mu, std::defer_lock_t) EXCLUDES(mu) : _mu(&mu), _locked(false) {} + + ~UniqueLock() RELEASE() { + if (_locked) { + _mu->unlock(); +#ifdef BE_TEST + doris::mock_random_sleep(); +#endif + } + } + + void lock() ACQUIRE() { +#ifdef BE_TEST + doris::mock_random_sleep(); +#endif + _mu->lock(); + _locked = true; + } + + void unlock() RELEASE() { + _mu->unlock(); + _locked = false; +#ifdef BE_TEST + doris::mock_random_sleep(); +#endif + } + + bool owns_lock() const { return _locked; } + + UniqueLock(const UniqueLock&) = delete; + UniqueLock& operator=(const UniqueLock&) = delete; + +private: + MutexType* _mu; + bool _locked; +}; diff --git a/be/src/exec/operator/analytic_sink_operator.cpp b/be/src/exec/operator/analytic_sink_operator.cpp index 501a92869bce4d..f4fb437d31c023 100644 --- a/be/src/exec/operator/analytic_sink_operator.cpp +++ b/be/src/exec/operator/analytic_sink_operator.cpp @@ -460,7 +460,7 @@ void AnalyticSinkLocalState::_init_result_columns() { void AnalyticSinkLocalState::_refresh_buffer_and_dependency_state(Block* block) { size_t buffer_size = 0; { - std::unique_lock lc(_shared_state->buffer_mutex); + LockGuard lc(_shared_state->buffer_mutex); _shared_state->blocks_buffer.push(std::move(*block)); buffer_size = _shared_state->blocks_buffer.size(); } @@ -755,7 +755,7 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, Block* input_bloc RETURN_IF_ERROR(_add_input_block(state, input_block)); RETURN_IF_ERROR(local_state._execute_impl()); if (local_state._input_eos) { - std::unique_lock lc(local_state._shared_state->sink_eos_lock); + LockGuard lc(local_state._shared_state->sink_eos_lock); local_state._shared_state->sink_eos = true; local_state._dependency->set_ready_to_read(); // ready for source to read } diff --git a/be/src/exec/operator/analytic_source_operator.cpp b/be/src/exec/operator/analytic_source_operator.cpp index 0911a2a743c286..3d25b20c7a40fa 100644 --- a/be/src/exec/operator/analytic_source_operator.cpp +++ b/be/src/exec/operator/analytic_source_operator.cpp @@ -52,18 +52,17 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* state, Block* output_blo output_block->clear_column_data(); size_t output_rows = 0; { - std::lock_guard lock(local_state._shared_state->buffer_mutex); + LockGuard lock(local_state._shared_state->buffer_mutex); if (!local_state._shared_state->blocks_buffer.empty()) { local_state._shared_state->blocks_buffer.front().swap(*output_block); local_state._shared_state->blocks_buffer.pop(); output_rows = output_block->rows(); //if buffer have no data and sink not eos, block reading and wait for signal again RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, output_block)); - if (local_state._shared_state->blocks_buffer.empty() && - !local_state._shared_state->sink_eos) { + if (local_state._shared_state->blocks_buffer.empty()) { // add this mutex to check, as in some case maybe is doing block(), and the sink is doing set eos. // so have to hold mutex to set block(), avoid to sink have set eos and set ready, but here set block() by mistake - std::unique_lock lc(local_state._shared_state->sink_eos_lock); + LockGuard lc(local_state._shared_state->sink_eos_lock); if (!local_state._shared_state->sink_eos) { local_state._dependency->block(); // block self source local_state._dependency->set_ready_to_write(); // ready for sink write @@ -71,7 +70,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* state, Block* output_blo } } else { //iff buffer have no data and sink eos, set eos - std::unique_lock lc(local_state._shared_state->sink_eos_lock); + LockGuard lc(local_state._shared_state->sink_eos_lock); *eos = local_state._shared_state->sink_eos; } } diff --git a/be/src/exec/operator/exchange_sink_operator.cpp b/be/src/exec/operator/exchange_sink_operator.cpp index d6355493ce05a4..35698f5217d709 100644 --- a/be/src/exec/operator/exchange_sink_operator.cpp +++ b/be/src/exec/operator/exchange_sink_operator.cpp @@ -25,7 +25,6 @@ #include #include #include -#include #include #include @@ -214,7 +213,7 @@ void ExchangeSinkLocalState::_create_channels() { } void ExchangeSinkLocalState::on_channel_finished(InstanceLoId channel_id) { - std::lock_guard lock(_finished_channels_mutex); + LockGuard lock(_finished_channels_mutex); if (_finished_channels.contains(channel_id)) { LOG(WARNING) << "Query: " << print_id(_state->query_id()) diff --git a/be/src/exec/operator/exchange_sink_operator.h b/be/src/exec/operator/exchange_sink_operator.h index 74101712ddd26e..ea224ed99bd4de 100644 --- a/be/src/exec/operator/exchange_sink_operator.h +++ b/be/src/exec/operator/exchange_sink_operator.h @@ -22,9 +22,9 @@ #include #include #include -#include #include "common/status.h" +#include "common/thread_safety_annotations.h" #include "exec/exchange/exchange_writer.h" #include "exec/exchange/vdata_stream_sender.h" #include "exec/operator/exchange_sink_buffer.h" @@ -180,8 +180,8 @@ class ExchangeSinkLocalState MOCK_REMOVE(final) : public PipelineXSinkLocalState int _last_local_channel_idx = -1; std::atomic_int _working_channels_count = 0; - std::set _finished_channels; - std::mutex _finished_channels_mutex; + std::set _finished_channels GUARDED_BY(_finished_channels_mutex); + AnnotatedMutex _finished_channels_mutex; }; class ExchangeSinkOperatorX MOCK_REMOVE(final) : public DataSinkOperatorX { diff --git a/be/src/exec/operator/hashjoin_build_sink.cpp b/be/src/exec/operator/hashjoin_build_sink.cpp index 9f1a05876f8bb5..b75bd253026dbf 100644 --- a/be/src/exec/operator/hashjoin_build_sink.cpp +++ b/be/src/exec/operator/hashjoin_build_sink.cpp @@ -66,7 +66,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _dependency->block(); _finish_dependency->block(); { - std::lock_guard guard(p._mutex); + LockGuard guard(p._mutex); p._finish_dependencies.push_back(_finish_dependency); } } else { @@ -242,7 +242,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu } if (p._use_shared_hash_table) { - std::unique_lock lock(p._mutex); + LockGuard lock(p._mutex); // Only signal non-builder tasks when the builder actually built the hash table. // When the builder is terminated (woken up early because the probe side finished // first), it never called process_build_block() so the hash table variant is still diff --git a/be/src/exec/operator/hashjoin_build_sink.h b/be/src/exec/operator/hashjoin_build_sink.h index dcc76031c2c6df..be77ef6cc690bc 100644 --- a/be/src/exec/operator/hashjoin_build_sink.h +++ b/be/src/exec/operator/hashjoin_build_sink.h @@ -17,6 +17,7 @@ #pragma once +#include "common/thread_safety_annotations.h" #include "exec/operator/join_build_sink_operator.h" #include "exec/operator/operator.h" #include "exec/runtime_filter/runtime_filter_producer_helper.h" @@ -196,8 +197,8 @@ class HashJoinBuildSinkOperatorX MOCK_REMOVE(final) bool _use_shared_hash_table = false; std::atomic _signaled = false; - std::mutex _mutex; - std::vector> _finish_dependencies; + AnnotatedMutex _mutex; + std::vector> _finish_dependencies GUARDED_BY(_mutex); std::map> _runtime_filters; }; diff --git a/be/src/exec/operator/multi_cast_data_streamer.cpp b/be/src/exec/operator/multi_cast_data_streamer.cpp index 0a569803664e8f..538af373b4a95a 100644 --- a/be/src/exec/operator/multi_cast_data_streamer.cpp +++ b/be/src/exec/operator/multi_cast_data_streamer.cpp @@ -47,10 +47,11 @@ MultiCastBlock::MultiCastBlock(Block* block, int un_finish_copy, size_t mem_size block->clear(); } -Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, Block* block, bool* eos) { +Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, Block* block, + bool* eos) NO_THREAD_SAFETY_ANALYSIS { MultiCastBlock* multi_cast_block = nullptr; { - INJECT_MOCK_SLEEP(std::unique_lock l(_mutex)); + UniqueLock l(_mutex); for (auto it = _spill_readers[sender_idx].begin(); it != _spill_readers[sender_idx].end();) { if ((*it)->all_data_read) { @@ -92,13 +93,15 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, Block* b auto spill_func = [this, reader_item, sender_idx]() { Block block; bool spill_eos = false; + bool has_cached_blocks = false; size_t read_size = 0; while (!spill_eos) { RETURN_IF_ERROR(reader_item->reader->read(&block, &spill_eos)); if (!block.empty()) { - std::lock_guard l(_mutex); + LockGuard l(_mutex); read_size += block.allocated_bytes(); _cached_blocks[sender_idx].emplace_back(std::move(block)); + has_cached_blocks = true; if (_cached_blocks[sender_idx].size() >= 32 || read_size > 2 * 1024 * 1024) { break; @@ -106,7 +109,7 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, Block* b } } - if (spill_eos || !_cached_blocks[sender_idx].empty()) { + if (spill_eos || has_cached_blocks) { reader_item->all_data_read = spill_eos; _set_ready_for_read(sender_idx); } @@ -158,7 +161,7 @@ Status MultiCastDataStreamer::_copy_block(RuntimeState* state, int32_t sender_id block->get_by_position(i).column = block->get_by_position(i).column->clone_resized(rows); } - INJECT_MOCK_SLEEP(std::lock_guard l(_mutex)); + LockGuard l(_mutex); multi_cast_block._un_finish_copy--; auto copying_count = _copying_count.fetch_sub(1) - 1; if (multi_cast_block._un_finish_copy == 0) { @@ -292,7 +295,7 @@ Status MultiCastDataStreamer::push(RuntimeState* state, doris::Block* block, boo const auto block_mem_size = block->allocated_bytes(); { - INJECT_MOCK_SLEEP(std::lock_guard l(_mutex)); + LockGuard l(_mutex); if (_pending_block) { DCHECK_GT(_pending_block->rows(), 0); const auto pending_size = _pending_block->allocated_bytes(); @@ -345,7 +348,7 @@ Status MultiCastDataStreamer::push(RuntimeState* state, doris::Block* block, boo _eos = eos; } - if (_eos) { + if (eos) { for (auto* read_dep : _dependencies) { read_dep->set_always_ready(); } @@ -376,7 +379,7 @@ std::string MultiCastDataStreamer::debug_string() { size_t pos_at_end_count = 0; size_t blocks_count = 0; { - std::unique_lock l(_mutex); + LockGuard l(_mutex); blocks_count = _multi_cast_blocks.size(); for (int32_t i = 0; i != _cast_sender_count; ++i) { if (!_dependencies[i]->is_blocked_by()) { diff --git a/be/src/exec/operator/multi_cast_data_streamer.h b/be/src/exec/operator/multi_cast_data_streamer.h index 5b77277a4d64f8..461e7ff34d2afd 100644 --- a/be/src/exec/operator/multi_cast_data_streamer.h +++ b/be/src/exec/operator/multi_cast_data_streamer.h @@ -22,6 +22,7 @@ #include #include +#include "common/thread_safety_annotations.h" #include "core/block/block.h" #include "exec/exchange/vdata_stream_sender.h" #include "exec/pipeline/dependency.h" @@ -99,16 +100,16 @@ class MultiCastDataStreamer { Status _copy_block(RuntimeState* state, int32_t sender_idx, Block* block, MultiCastBlock& multi_cast_block); - Status _start_spill_task(RuntimeState* state, SpillFileSPtr spill_file); + Status _start_spill_task(RuntimeState* state, SpillFileSPtr spill_file) REQUIRES(_mutex); - Status _trigger_spill_if_need(RuntimeState* state, bool* triggered); + Status _trigger_spill_if_need(RuntimeState* state, bool* triggered) REQUIRES(_mutex); RuntimeProfile* _profile = nullptr; - std::list _multi_cast_blocks; - std::vector> _cached_blocks; - std::vector::iterator> _sender_pos_to_read; - std::mutex _mutex; - bool _eos = false; + std::list _multi_cast_blocks GUARDED_BY(_mutex); + std::vector> _cached_blocks GUARDED_BY(_mutex); + std::vector::iterator> _sender_pos_to_read GUARDED_BY(_mutex); + AnnotatedMutex _mutex; + bool _eos GUARDED_BY(_mutex) = false; int _cast_sender_count = 0; int _node_id; std::atomic_int64_t _cumulative_mem_size = 0; @@ -119,9 +120,9 @@ class MultiCastDataStreamer { Dependency* _write_dependency; std::vector _dependencies; - BlockUPtr _pending_block; + BlockUPtr _pending_block GUARDED_BY(_mutex); - std::vector>> _spill_readers; + std::vector>> _spill_readers GUARDED_BY(_mutex); RuntimeProfile* _sink_operator_profile; // operator_profile of each source operator diff --git a/be/src/exec/operator/partition_sort_sink_operator.cpp b/be/src/exec/operator/partition_sort_sink_operator.cpp index ebbe31e1120a18..76695b8ee3240c 100644 --- a/be/src/exec/operator/partition_sort_sink_operator.cpp +++ b/be/src/exec/operator/partition_sort_sink_operator.cpp @@ -127,7 +127,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, Block* input_block, if (local_state._is_need_passthrough) { { COUNTER_UPDATE(local_state._passthrough_rows_counter, (int64_t)current_rows); - std::lock_guard lock(local_state._shared_state->buffer_mutex); + LockGuard lock(local_state._shared_state->buffer_mutex); local_state._shared_state->blocks_buffer.push(std::move(*input_block)); // buffer have data, source could read this. local_state._dependency->set_ready_to_read(); @@ -158,8 +158,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, Block* input_block, } local_state._value_places[i]->_blocks.clear(); RETURN_IF_ERROR(sorter->prepare_for_read(false)); - INJECT_MOCK_SLEEP(std::unique_lock lc( - local_state._shared_state->prepared_finish_lock)); + LockGuard lc(local_state._shared_state->prepared_finish_lock); sorter->set_prepared_finish(); // iff one sorter have data, then could set source ready to read local_state._dependency->set_ready_to_read(); @@ -170,7 +169,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, Block* input_block, local_state._sorted_partition_input_rows); //so all data from child have sink completed { - std::unique_lock lc(local_state._shared_state->sink_eos_lock); + LockGuard lc(local_state._shared_state->sink_eos_lock); local_state._shared_state->sink_eos = true; // this ready is also need, as source maybe block by self in some case local_state._dependency->set_ready_to_read(); @@ -261,8 +260,7 @@ Status PartitionSortSinkOperatorX::_emplace_into_hash_table( { COUNTER_UPDATE(local_state._passthrough_rows_counter, (int64_t)(row + 1)); - std::lock_guard lock( - local_state._shared_state->buffer_mutex); + LockGuard lock(local_state._shared_state->buffer_mutex); // have emplace (num_rows - row) to hashtable, and now have row remaining needed in block; // set_num_rows(x) retains the range [0, x - 1], so row + 1 is needed here. input_block->set_num_rows(row + 1); diff --git a/be/src/exec/operator/partition_sort_source_operator.cpp b/be/src/exec/operator/partition_sort_source_operator.cpp index c79265d85ebb82..3db89fb4cd292d 100644 --- a/be/src/exec/operator/partition_sort_source_operator.cpp +++ b/be/src/exec/operator/partition_sort_source_operator.cpp @@ -40,17 +40,16 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, Block* outpu output_block->clear_column_data(); auto get_data_from_blocks_buffer = false; { - std::lock_guard lock(local_state._shared_state->buffer_mutex); + LockGuard lock(local_state._shared_state->buffer_mutex); get_data_from_blocks_buffer = !local_state._shared_state->blocks_buffer.empty(); if (get_data_from_blocks_buffer) { local_state._shared_state->blocks_buffer.front().swap(*output_block); local_state._shared_state->blocks_buffer.pop(); - if (local_state._shared_state->blocks_buffer.empty() && - !local_state._shared_state->sink_eos) { + if (local_state._shared_state->blocks_buffer.empty()) { // add this mutex to check, as in some case maybe is doing block(), and the sink is doing set eos. // so have to hold mutex to set block(), avoid to sink have set eos and set ready, but here set block() by mistake - std::unique_lock lc(local_state._shared_state->sink_eos_lock); + LockGuard lc(local_state._shared_state->sink_eos_lock); //if buffer have no data and sink not eos, block reading and wait for signal again if (!local_state._shared_state->sink_eos) { local_state._dependency->block(); @@ -92,7 +91,7 @@ Status PartitionSortSourceOperatorX::get_sorted_block(RuntimeState* state, Block if (current_eos) { // current sort have eos, so get next idx local_state._sort_idx++; - std::unique_lock lc(local_state._shared_state->prepared_finish_lock); + LockGuard lc(local_state._shared_state->prepared_finish_lock); if (local_state._sort_idx < sorter_size && !sorters[local_state._sort_idx]->prepared_finish()) { local_state._dependency->block(); diff --git a/be/src/exec/operator/scan_operator.cpp b/be/src/exec/operator/scan_operator.cpp index 5a731ae580b64e..e734b65ac24be9 100644 --- a/be/src/exec/operator/scan_operator.cpp +++ b/be/src/exec/operator/scan_operator.cpp @@ -73,7 +73,7 @@ bool ScanLocalState::should_run_serial() const { Status ScanLocalStateBase::update_late_arrival_runtime_filter(RuntimeState* state, int& arrived_rf_num) { // Lock needed because _conjuncts can be accessed concurrently by multiple scanner threads - std::unique_lock lock(_conjuncts_lock); + LockGuard lock(_conjuncts_lock); RETURN_IF_ERROR(_helper.try_append_late_arrival_runtime_filter(state, _parent->row_descriptor(), arrived_rf_num, _conjuncts)); if (state->enable_adjust_conjunct_order_by_cost()) { @@ -86,7 +86,7 @@ Status ScanLocalStateBase::update_late_arrival_runtime_filter(RuntimeState* stat Status ScanLocalStateBase::clone_conjunct_ctxs(VExprContextSPtrs& scanner_conjuncts) { // Lock needed because _conjuncts can be accessed concurrently by multiple scanner threads - std::unique_lock lock(_conjuncts_lock); + LockGuard lock(_conjuncts_lock); scanner_conjuncts.resize(_conjuncts.size()); for (size_t i = 0; i != _conjuncts.size(); ++i) { RETURN_IF_ERROR(_conjuncts[i]->clone(_state, scanner_conjuncts[i])); diff --git a/be/src/exec/operator/scan_operator.h b/be/src/exec/operator/scan_operator.h index 834e534f252451..591bdcac5ee594 100644 --- a/be/src/exec/operator/scan_operator.h +++ b/be/src/exec/operator/scan_operator.h @@ -18,11 +18,11 @@ #pragma once #include -#include #include #include #include "common/status.h" +#include "common/thread_safety_annotations.h" #include "core/field.h" #include "exec/common/util.hpp" #include "exec/operator/operator.h" @@ -125,7 +125,7 @@ class ScanLocalStateBase : public PipelineXLocalState<> { RuntimeProfile::Counter* _scan_rows = nullptr; RuntimeProfile::Counter* _scan_bytes = nullptr; - std::mutex _conjuncts_lock; + AnnotatedMutex _conjuncts_lock; RuntimeFilterConsumerHelper _helper; // magic number as seed to generate hash value for condition cache uint64_t _condition_cache_digest = 0; diff --git a/be/src/exec/pipeline/dependency.h b/be/src/exec/pipeline/dependency.h index bb36c12b2616ee..dbb4b938ad4b47 100644 --- a/be/src/exec/pipeline/dependency.h +++ b/be/src/exec/pipeline/dependency.h @@ -35,6 +35,7 @@ #include "common/config.h" #include "common/logging.h" +#include "common/thread_safety_annotations.h" #include "core/block/block.h" #include "core/types.h" #include "exec/common/agg_utils.h" @@ -685,10 +686,10 @@ struct AnalyticSharedState : public BasicSharedState { public: AnalyticSharedState() = default; - std::queue blocks_buffer; - std::mutex buffer_mutex; - bool sink_eos = false; - std::mutex sink_eos_lock; + std::queue blocks_buffer GUARDED_BY(buffer_mutex); + AnnotatedMutex buffer_mutex; + bool sink_eos GUARDED_BY(sink_eos_lock) = false; + AnnotatedMutex sink_eos_lock; Arena agg_arena_pool; }; @@ -776,12 +777,12 @@ struct NestedLoopJoinSharedState : public JoinSharedState { struct PartitionSortNodeSharedState : public BasicSharedState { ENABLE_FACTORY_CREATOR(PartitionSortNodeSharedState) public: - std::queue blocks_buffer; - std::mutex buffer_mutex; + std::queue blocks_buffer GUARDED_BY(buffer_mutex); + AnnotatedMutex buffer_mutex; std::vector> partition_sorts; - bool sink_eos = false; - std::mutex sink_eos_lock; - std::mutex prepared_finish_lock; + bool sink_eos GUARDED_BY(sink_eos_lock) = false; + AnnotatedMutex sink_eos_lock; + AnnotatedMutex prepared_finish_lock; }; struct SetSharedState : public BasicSharedState { diff --git a/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp b/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp index d50523605d0964..61589327909d63 100644 --- a/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp +++ b/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp @@ -316,13 +316,16 @@ TEST_F(MultiCastDataStreamerTest, SpillTest) { output2.join(); output3.join(); - ASSERT_EQ(multi_cast_data_streamer->_multi_cast_blocks.size(), 0); - ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[0].size(), 0); - ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[1].size(), 0); - ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[2].size(), 0); - ASSERT_EQ(multi_cast_data_streamer->_spill_readers[0].size(), 0); - ASSERT_EQ(multi_cast_data_streamer->_spill_readers[1].size(), 0); - ASSERT_EQ(multi_cast_data_streamer->_spill_readers[2].size(), 0); + { + LockGuard l(multi_cast_data_streamer->_mutex); + ASSERT_EQ(multi_cast_data_streamer->_multi_cast_blocks.size(), 0); + ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[0].size(), 0); + ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[1].size(), 0); + ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[2].size(), 0); + ASSERT_EQ(multi_cast_data_streamer->_spill_readers[0].size(), 0); + ASSERT_EQ(multi_cast_data_streamer->_spill_readers[1].size(), 0); + ASSERT_EQ(multi_cast_data_streamer->_spill_readers[2].size(), 0); + } auto debug_string = multi_cast_data_streamer->debug_string(); EXPECT_TRUE(debug_string.find("MemSize:") != std::string::npos);