diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp index a59580e321..8dee422598 100644 --- a/src/brpc/selective_channel.cpp +++ b/src/brpc/selective_channel.cpp @@ -419,9 +419,13 @@ void Sender::Clear() { if (_main_cntl == NULL) { return; } - delete _alloc_resources[1].response; - delete _alloc_resources[1].sub_done; - _alloc_resources[1] = Resource(); + for (int i = 0; i < _nalloc; ++i) { + delete _alloc_resources[i].response; + if (_alloc_resources[i].sub_done != &_sub_done0) { + delete _alloc_resources[i].sub_done; + } + _alloc_resources[i] = Resource(); + } const CallId cid = _main_cntl->call_id(); _main_cntl = NULL; if (_user_done) { @@ -434,7 +438,7 @@ inline Resource Sender::PopFree() { if (_nfree == 0) { if (_nalloc == 0) { Resource r; - r.response = _response; + r.response = _response->New(); r.sub_done = &_sub_done0; _alloc_resources[_nalloc++] = r; return r; diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp index b17fc73809..db6e2ac777 100644 --- a/test/brpc_channel_unittest.cpp +++ b/test/brpc_channel_unittest.cpp @@ -1487,6 +1487,57 @@ class ChannelTest : public ::testing::Test{ EXPECT_EQ(cntl.response_attachment().to_string(), "123"); StopAndJoin(); } + + void TestBackupRequestSelectiveResponseRace() { + ASSERT_EQ(0, StartAccept(_ep)); + + const size_t NCHANS = 8; + brpc::SelectiveChannel channel; + ASSERT_EQ(0, channel.Init("rr", NULL)); + for (size_t i = 0; i < NCHANS; ++i) { + brpc::Channel* subchan = new brpc::Channel; + SetUpChannel(subchan, false, false); + ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i; + } + + const int kRounds = 150; + const int kCodeListSize = 20000; + std::atomic call_cnt(0); + _svc.SetMockFunc([&call_cnt](google::protobuf::RpcController*, + const ::test::EchoRequest*, + ::test::EchoResponse* res, + google::protobuf::Closure*) { + const int seen = call_cnt.fetch_add(1, std::memory_order_relaxed); + const bool slow = ((seen & 1) == 0); + if (slow) { + bthread_usleep(1500); + } + res->clear_code_list(); + const int base = slow ? 1000000 : 2000000; + for (int i = 0; i < kCodeListSize; ++i) { + res->add_code_list(base + i); + } + res->set_message(slow ? "slow" : "fast"); + }); + + for (int round = 0; round < kRounds; ++round) { + brpc::Controller cntl; + test::EchoRequest req; + test::EchoResponse res; + req.set_message(__FUNCTION__); + cntl.set_backup_request_ms(1); + cntl.set_timeout_ms(3000); + CallMethod(&channel, &cntl, &req, &res, true); + ASSERT_FALSE(cntl.Failed()) << "round=" << round + << " err=" << cntl.ErrorText(); + ASSERT_EQ(kCodeListSize, res.code_list_size()) << "round=" << round; + ASSERT_TRUE(res.message() == "slow" || res.message() == "fast") + << "round=" << round; + } + + EXPECT_EQ(kRounds * 2, call_cnt.load(std::memory_order_relaxed)); + StopAndJoin(); + } void TestCloseFD(bool single_server, bool async, bool short_connection) { std::cout << " *** single=" << single_server @@ -2783,6 +2834,10 @@ TEST_F(ChannelTest, backuprequest_selective) { } } +TEST_F(ChannelTest, backuprequest_selective_response_race) { + TestBackupRequestSelectiveResponseRace(); +} + TEST_F(ChannelTest, close_fd) { for (int i = 0; i <= 1; ++i) { // Flag SingleServer for (int j = 0; j <= 1; ++j) { // Flag Asynchronous