Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions be/src/common/thread_safety_annotations.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,54 @@ class SCOPED_CAPABILITY LockGuard {
private:
MutexType& _mu;
};

// RAII unique lock annotated for thread safety analysis.
// Supports manual lock/unlock while preserving capability tracking.
template <typename MutexType>
class SCOPED_CAPABILITY UniqueLock {
public:
explicit UniqueLock(MutexType& mu) ACQUIRE(mu) : _mu(&mu), _locked(true) {
#ifdef BE_TEST
doris::mock_random_sleep();
#endif
_mu->lock();
}

UniqueLock(MutexType& mu, std::adopt_lock_t) REQUIRES(mu) : _mu(&mu), _locked(true) {}

UniqueLock(MutexType& mu, std::defer_lock_t) EXCLUDES(mu) : _mu(&mu), _locked(false) {}

~UniqueLock() RELEASE() {
if (_locked) {
_mu->unlock();
#ifdef BE_TEST
doris::mock_random_sleep();
#endif
}
}

void lock() ACQUIRE() {
#ifdef BE_TEST
doris::mock_random_sleep();
#endif
_mu->lock();
_locked = true;
}

void unlock() RELEASE() {
_mu->unlock();
_locked = false;
#ifdef BE_TEST
doris::mock_random_sleep();
#endif
}

bool owns_lock() const { return _locked; }

UniqueLock(const UniqueLock&) = delete;
UniqueLock& operator=(const UniqueLock&) = delete;

private:
MutexType* _mu;
bool _locked;
};
4 changes: 2 additions & 2 deletions be/src/exec/operator/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ void AnalyticSinkLocalState::_init_result_columns() {
void AnalyticSinkLocalState::_refresh_buffer_and_dependency_state(Block* block) {
size_t buffer_size = 0;
{
std::unique_lock<std::mutex> lc(_shared_state->buffer_mutex);
LockGuard lc(_shared_state->buffer_mutex);
_shared_state->blocks_buffer.push(std::move(*block));
buffer_size = _shared_state->blocks_buffer.size();
}
Expand Down Expand Up @@ -755,7 +755,7 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, Block* input_bloc
RETURN_IF_ERROR(_add_input_block(state, input_block));
RETURN_IF_ERROR(local_state._execute_impl());
if (local_state._input_eos) {
std::unique_lock<std::mutex> lc(local_state._shared_state->sink_eos_lock);
LockGuard lc(local_state._shared_state->sink_eos_lock);
local_state._shared_state->sink_eos = true;
local_state._dependency->set_ready_to_read(); // ready for source to read
}
Expand Down
9 changes: 4 additions & 5 deletions be/src/exec/operator/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,25 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* state, Block* output_blo
output_block->clear_column_data();
size_t output_rows = 0;
{
std::lock_guard<std::mutex> lock(local_state._shared_state->buffer_mutex);
LockGuard lock(local_state._shared_state->buffer_mutex);
if (!local_state._shared_state->blocks_buffer.empty()) {
local_state._shared_state->blocks_buffer.front().swap(*output_block);
local_state._shared_state->blocks_buffer.pop();
output_rows = output_block->rows();
//if buffer have no data and sink not eos, block reading and wait for signal again
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, output_block));
if (local_state._shared_state->blocks_buffer.empty() &&
!local_state._shared_state->sink_eos) {
if (local_state._shared_state->blocks_buffer.empty()) {
// add this mutex to check, as in some case maybe is doing block(), and the sink is doing set eos.
// so have to hold mutex to set block(), avoid to sink have set eos and set ready, but here set block() by mistake
std::unique_lock<std::mutex> lc(local_state._shared_state->sink_eos_lock);
LockGuard lc(local_state._shared_state->sink_eos_lock);
if (!local_state._shared_state->sink_eos) {
local_state._dependency->block(); // block self source
local_state._dependency->set_ready_to_write(); // ready for sink write
}
}
} else {
//iff buffer have no data and sink eos, set eos
std::unique_lock<std::mutex> lc(local_state._shared_state->sink_eos_lock);
LockGuard lc(local_state._shared_state->sink_eos_lock);
*eos = local_state._shared_state->sink_eos;
}
}
Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/operator/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <algorithm>
#include <cstdint>
#include <memory>
#include <mutex>
#include <random>
#include <string>

Expand Down Expand Up @@ -214,7 +213,7 @@ void ExchangeSinkLocalState::_create_channels() {
}

void ExchangeSinkLocalState::on_channel_finished(InstanceLoId channel_id) {
std::lock_guard<std::mutex> lock(_finished_channels_mutex);
LockGuard lock(_finished_channels_mutex);

if (_finished_channels.contains(channel_id)) {
LOG(WARNING) << "Query: " << print_id(_state->query_id())
Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/operator/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
#include <algorithm>
#include <atomic>
#include <memory>
#include <mutex>

#include "common/status.h"
#include "common/thread_safety_annotations.h"
#include "exec/exchange/exchange_writer.h"
#include "exec/exchange/vdata_stream_sender.h"
#include "exec/operator/exchange_sink_buffer.h"
Expand Down Expand Up @@ -180,8 +180,8 @@ class ExchangeSinkLocalState MOCK_REMOVE(final) : public PipelineXSinkLocalState
int _last_local_channel_idx = -1;

std::atomic_int _working_channels_count = 0;
std::set<InstanceLoId> _finished_channels;
std::mutex _finished_channels_mutex;
std::set<InstanceLoId> _finished_channels GUARDED_BY(_finished_channels_mutex);
AnnotatedMutex _finished_channels_mutex;
};

class ExchangeSinkOperatorX MOCK_REMOVE(final) : public DataSinkOperatorX<ExchangeSinkLocalState> {
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/operator/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
_dependency->block();
_finish_dependency->block();
{
std::lock_guard<std::mutex> guard(p._mutex);
LockGuard guard(p._mutex);
p._finish_dependencies.push_back(_finish_dependency);
}
} else {
Expand Down Expand Up @@ -242,7 +242,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
}

if (p._use_shared_hash_table) {
std::unique_lock lock(p._mutex);
LockGuard lock(p._mutex);
// Only signal non-builder tasks when the builder actually built the hash table.
// When the builder is terminated (woken up early because the probe side finished
// first), it never called process_build_block() so the hash table variant is still
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/operator/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include "common/thread_safety_annotations.h"
#include "exec/operator/join_build_sink_operator.h"
#include "exec/operator/operator.h"
#include "exec/runtime_filter/runtime_filter_producer_helper.h"
Expand Down Expand Up @@ -196,8 +197,8 @@ class HashJoinBuildSinkOperatorX MOCK_REMOVE(final)

bool _use_shared_hash_table = false;
std::atomic<bool> _signaled = false;
std::mutex _mutex;
std::vector<std::shared_ptr<Dependency>> _finish_dependencies;
AnnotatedMutex _mutex;
std::vector<std::shared_ptr<Dependency>> _finish_dependencies GUARDED_BY(_mutex);
std::map<int, std::shared_ptr<RuntimeFilterWrapper>> _runtime_filters;
};

Expand Down
19 changes: 11 additions & 8 deletions be/src/exec/operator/multi_cast_data_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ MultiCastBlock::MultiCastBlock(Block* block, int un_finish_copy, size_t mem_size
block->clear();
}

Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, Block* block, bool* eos) {
Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, Block* block,
bool* eos) NO_THREAD_SAFETY_ANALYSIS {
MultiCastBlock* multi_cast_block = nullptr;
{
INJECT_MOCK_SLEEP(std::unique_lock l(_mutex));
UniqueLock l(_mutex);
for (auto it = _spill_readers[sender_idx].begin();
it != _spill_readers[sender_idx].end();) {
if ((*it)->all_data_read) {
Expand Down Expand Up @@ -92,21 +93,23 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, Block* b
auto spill_func = [this, reader_item, sender_idx]() {
Block block;
bool spill_eos = false;
bool has_cached_blocks = false;
size_t read_size = 0;
while (!spill_eos) {
RETURN_IF_ERROR(reader_item->reader->read(&block, &spill_eos));
if (!block.empty()) {
std::lock_guard l(_mutex);
LockGuard l(_mutex);
read_size += block.allocated_bytes();
_cached_blocks[sender_idx].emplace_back(std::move(block));
has_cached_blocks = true;
if (_cached_blocks[sender_idx].size() >= 32 ||
read_size > 2 * 1024 * 1024) {
break;
}
}
}

if (spill_eos || !_cached_blocks[sender_idx].empty()) {
if (spill_eos || has_cached_blocks) {
reader_item->all_data_read = spill_eos;
_set_ready_for_read(sender_idx);
}
Expand Down Expand Up @@ -158,7 +161,7 @@ Status MultiCastDataStreamer::_copy_block(RuntimeState* state, int32_t sender_id
block->get_by_position(i).column = block->get_by_position(i).column->clone_resized(rows);
}

INJECT_MOCK_SLEEP(std::lock_guard l(_mutex));
LockGuard l(_mutex);
multi_cast_block._un_finish_copy--;
auto copying_count = _copying_count.fetch_sub(1) - 1;
if (multi_cast_block._un_finish_copy == 0) {
Expand Down Expand Up @@ -292,7 +295,7 @@ Status MultiCastDataStreamer::push(RuntimeState* state, doris::Block* block, boo
const auto block_mem_size = block->allocated_bytes();

{
INJECT_MOCK_SLEEP(std::lock_guard l(_mutex));
LockGuard l(_mutex);
if (_pending_block) {
DCHECK_GT(_pending_block->rows(), 0);
const auto pending_size = _pending_block->allocated_bytes();
Expand Down Expand Up @@ -345,7 +348,7 @@ Status MultiCastDataStreamer::push(RuntimeState* state, doris::Block* block, boo
_eos = eos;
}

if (_eos) {
if (eos) {
for (auto* read_dep : _dependencies) {
read_dep->set_always_ready();
}
Expand Down Expand Up @@ -376,7 +379,7 @@ std::string MultiCastDataStreamer::debug_string() {
size_t pos_at_end_count = 0;
size_t blocks_count = 0;
{
std::unique_lock l(_mutex);
LockGuard l(_mutex);
blocks_count = _multi_cast_blocks.size();
for (int32_t i = 0; i != _cast_sender_count; ++i) {
if (!_dependencies[i]->is_blocked_by()) {
Expand Down
19 changes: 10 additions & 9 deletions be/src/exec/operator/multi_cast_data_streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <string>
#include <vector>

#include "common/thread_safety_annotations.h"
#include "core/block/block.h"
#include "exec/exchange/vdata_stream_sender.h"
#include "exec/pipeline/dependency.h"
Expand Down Expand Up @@ -99,16 +100,16 @@ class MultiCastDataStreamer {
Status _copy_block(RuntimeState* state, int32_t sender_idx, Block* block,
MultiCastBlock& multi_cast_block);

Status _start_spill_task(RuntimeState* state, SpillFileSPtr spill_file);
Status _start_spill_task(RuntimeState* state, SpillFileSPtr spill_file) REQUIRES(_mutex);

Status _trigger_spill_if_need(RuntimeState* state, bool* triggered);
Status _trigger_spill_if_need(RuntimeState* state, bool* triggered) REQUIRES(_mutex);

RuntimeProfile* _profile = nullptr;
std::list<MultiCastBlock> _multi_cast_blocks;
std::vector<std::vector<Block>> _cached_blocks;
std::vector<std::list<MultiCastBlock>::iterator> _sender_pos_to_read;
std::mutex _mutex;
bool _eos = false;
std::list<MultiCastBlock> _multi_cast_blocks GUARDED_BY(_mutex);
std::vector<std::vector<Block>> _cached_blocks GUARDED_BY(_mutex);
std::vector<std::list<MultiCastBlock>::iterator> _sender_pos_to_read GUARDED_BY(_mutex);
AnnotatedMutex _mutex;
bool _eos GUARDED_BY(_mutex) = false;
int _cast_sender_count = 0;
int _node_id;
std::atomic_int64_t _cumulative_mem_size = 0;
Expand All @@ -119,9 +120,9 @@ class MultiCastDataStreamer {
Dependency* _write_dependency;
std::vector<Dependency*> _dependencies;

BlockUPtr _pending_block;
BlockUPtr _pending_block GUARDED_BY(_mutex);

std::vector<std::vector<std::shared_ptr<SpillingReader>>> _spill_readers;
std::vector<std::vector<std::shared_ptr<SpillingReader>>> _spill_readers GUARDED_BY(_mutex);

RuntimeProfile* _sink_operator_profile;
// operator_profile of each source operator
Expand Down
10 changes: 4 additions & 6 deletions be/src/exec/operator/partition_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, Block* input_block,
if (local_state._is_need_passthrough) {
{
COUNTER_UPDATE(local_state._passthrough_rows_counter, (int64_t)current_rows);
std::lock_guard<std::mutex> lock(local_state._shared_state->buffer_mutex);
LockGuard lock(local_state._shared_state->buffer_mutex);
local_state._shared_state->blocks_buffer.push(std::move(*input_block));
// buffer have data, source could read this.
local_state._dependency->set_ready_to_read();
Expand Down Expand Up @@ -158,8 +158,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, Block* input_block,
}
local_state._value_places[i]->_blocks.clear();
RETURN_IF_ERROR(sorter->prepare_for_read(false));
INJECT_MOCK_SLEEP(std::unique_lock<std::mutex> lc(
local_state._shared_state->prepared_finish_lock));
LockGuard lc(local_state._shared_state->prepared_finish_lock);
sorter->set_prepared_finish();
// iff one sorter have data, then could set source ready to read
local_state._dependency->set_ready_to_read();
Expand All @@ -170,7 +169,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, Block* input_block,
local_state._sorted_partition_input_rows);
//so all data from child have sink completed
{
std::unique_lock<std::mutex> lc(local_state._shared_state->sink_eos_lock);
LockGuard lc(local_state._shared_state->sink_eos_lock);
local_state._shared_state->sink_eos = true;
// this ready is also need, as source maybe block by self in some case
local_state._dependency->set_ready_to_read();
Expand Down Expand Up @@ -261,8 +260,7 @@ Status PartitionSortSinkOperatorX::_emplace_into_hash_table(
{
COUNTER_UPDATE(local_state._passthrough_rows_counter,
(int64_t)(row + 1));
std::lock_guard<std::mutex> lock(
local_state._shared_state->buffer_mutex);
LockGuard lock(local_state._shared_state->buffer_mutex);
// have emplace (num_rows - row) to hashtable, and now have row remaining needed in block;
// set_num_rows(x) retains the range [0, x - 1], so row + 1 is needed here.
input_block->set_num_rows(row + 1);
Expand Down
9 changes: 4 additions & 5 deletions be/src/exec/operator/partition_sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,16 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, Block* outpu
output_block->clear_column_data();
auto get_data_from_blocks_buffer = false;
{
std::lock_guard<std::mutex> lock(local_state._shared_state->buffer_mutex);
LockGuard lock(local_state._shared_state->buffer_mutex);
get_data_from_blocks_buffer = !local_state._shared_state->blocks_buffer.empty();
if (get_data_from_blocks_buffer) {
local_state._shared_state->blocks_buffer.front().swap(*output_block);
local_state._shared_state->blocks_buffer.pop();

if (local_state._shared_state->blocks_buffer.empty() &&
!local_state._shared_state->sink_eos) {
if (local_state._shared_state->blocks_buffer.empty()) {
// add this mutex to check, as in some case maybe is doing block(), and the sink is doing set eos.
// so have to hold mutex to set block(), avoid to sink have set eos and set ready, but here set block() by mistake
std::unique_lock<std::mutex> lc(local_state._shared_state->sink_eos_lock);
LockGuard lc(local_state._shared_state->sink_eos_lock);
//if buffer have no data and sink not eos, block reading and wait for signal again
if (!local_state._shared_state->sink_eos) {
local_state._dependency->block();
Expand Down Expand Up @@ -92,7 +91,7 @@ Status PartitionSortSourceOperatorX::get_sorted_block(RuntimeState* state, Block
if (current_eos) {
// current sort have eos, so get next idx
local_state._sort_idx++;
std::unique_lock<std::mutex> lc(local_state._shared_state->prepared_finish_lock);
LockGuard lc(local_state._shared_state->prepared_finish_lock);
if (local_state._sort_idx < sorter_size &&
!sorters[local_state._sort_idx]->prepared_finish()) {
local_state._dependency->block();
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/operator/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ bool ScanLocalState<Derived>::should_run_serial() const {
Status ScanLocalStateBase::update_late_arrival_runtime_filter(RuntimeState* state,
int& arrived_rf_num) {
// Lock needed because _conjuncts can be accessed concurrently by multiple scanner threads
std::unique_lock lock(_conjuncts_lock);
LockGuard lock(_conjuncts_lock);
RETURN_IF_ERROR(_helper.try_append_late_arrival_runtime_filter(state, _parent->row_descriptor(),
arrived_rf_num, _conjuncts));
if (state->enable_adjust_conjunct_order_by_cost()) {
Expand All @@ -86,7 +86,7 @@ Status ScanLocalStateBase::update_late_arrival_runtime_filter(RuntimeState* stat

Status ScanLocalStateBase::clone_conjunct_ctxs(VExprContextSPtrs& scanner_conjuncts) {
// Lock needed because _conjuncts can be accessed concurrently by multiple scanner threads
std::unique_lock lock(_conjuncts_lock);
LockGuard lock(_conjuncts_lock);
scanner_conjuncts.resize(_conjuncts.size());
for (size_t i = 0; i != _conjuncts.size(); ++i) {
RETURN_IF_ERROR(_conjuncts[i]->clone(_state, scanner_conjuncts[i]));
Expand Down
Loading
Loading