From 9f86ab52140b25db730a1e6f8ee0da1966b45000 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 21 May 2026 16:43:18 +0800 Subject: [PATCH 1/2] feat: introduce a v2 createProducer API to carry error message when fail --- include/pulsar/Client.h | 17 ++- include/pulsar/Result.h | 16 +++ lib/BinaryProtoLookupService.cc | 72 +++++----- lib/BinaryProtoLookupService.h | 14 +- lib/Client.cc | 17 ++- lib/ClientConnection.cc | 193 ++++++++++++++++----------- lib/ClientConnection.h | 15 ++- lib/ClientConnectionAdaptor.h | 4 +- lib/ClientImpl.cc | 111 ++++++++------- lib/ClientImpl.h | 11 +- lib/ConnectionPool.cc | 26 ++-- lib/ConnectionPool.h | 13 +- lib/ConsumerImpl.cc | 53 ++++---- lib/ConsumerImpl.h | 2 +- lib/ConsumerImplBase.h | 5 +- lib/HTTPLookupService.cc | 97 +++++++------- lib/HTTPLookupService.h | 12 +- lib/HandlerBase.cc | 7 +- lib/HandlerBase.h | 2 +- lib/LookupDataResult.h | 2 +- lib/LookupService.h | 7 +- lib/MultiTopicsConsumerImpl.cc | 13 +- lib/PartitionedProducerImpl.cc | 25 ++-- lib/PartitionedProducerImpl.h | 6 +- lib/PendingRequest.h | 8 +- lib/ProducerImpl.cc | 47 +++---- lib/ProducerImpl.h | 10 +- lib/ProducerImplBase.h | 2 +- lib/RetryableLookupService.h | 47 ++++++- lib/RetryableOperation.h | 24 ++-- lib/RetryableOperationCache.h | 12 +- tests/ClientTest.cc | 8 +- tests/ConnectionTest.cc | 4 +- tests/LookupServiceTest.cc | 26 ++-- tests/MockClientImpl.h | 9 +- tests/MultiTopicsConsumerTest.cc | 2 +- tests/RetryableOperationCacheTest.cc | 14 +- tests/SchemaTest.cc | 16 ++- 38 files changed, 553 insertions(+), 416 deletions(-) diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h index e9813e3d..ee468dee 100644 --- a/include/pulsar/Client.h +++ b/include/pulsar/Client.h @@ -36,6 +36,7 @@ #include #include +#include namespace pulsar { typedef std::function CreateProducerCallback; @@ -45,6 +46,8 @@ typedef std::function TableViewCallback; typedef std::function&)> GetPartitionsCallback; typedef std::function CloseCallback; +using CreateProducerV2Callback = std::function)>; + class ClientImpl; class PulsarFriend; class PulsarWrapper; @@ -108,7 +111,9 @@ class PULSAR_PUBLIC Client { * @return ResultOk if the producer has been successfully created * @return ResultError if there was an error */ - Result createProducer(const std::string& topic, const ProducerConfiguration& conf, Producer& producer); + [[deprecated("use createProducerAsyncV2")]] Result createProducer(const std::string& topic, + const ProducerConfiguration& conf, + Producer& producer); /** * Asynchronously create a producer with the default ProducerConfiguration for publishing on a specific @@ -118,7 +123,8 @@ class PULSAR_PUBLIC Client { * @param callback the callback that is triggered when the producer is created successfully or not * @param callback Callback function that is invoked when the operation is completed */ - void createProducerAsync(const std::string& topic, const CreateProducerCallback& callback); + [[deprecated("use createProducerAsyncV2")]] void createProducerAsync( + const std::string& topic, const CreateProducerCallback& callback); /** * Asynchronously create a producer with the customized ProducerConfiguration for publishing on a specific @@ -127,8 +133,11 @@ class PULSAR_PUBLIC Client { * @param topic the name of the topic where to produce * @param conf the customized ProducerConfiguration */ - void createProducerAsync(const std::string& topic, const ProducerConfiguration& conf, - const CreateProducerCallback& callback); + [[deprecated("use createProducerAsyncV2")]] void createProducerAsync( + const std::string& topic, const ProducerConfiguration& conf, const CreateProducerCallback& callback); + + void createProducerAsyncV2(const std::string& topic, const ProducerConfiguration& conf, + CreateProducerV2Callback callback); /** * Subscribe to a given topic and subscription combination with the default ConsumerConfiguration diff --git a/include/pulsar/Result.h b/include/pulsar/Result.h index a6c30d4c..b94c08c1 100644 --- a/include/pulsar/Result.h +++ b/include/pulsar/Result.h @@ -23,6 +23,8 @@ #include #include +#include +#include namespace pulsar { @@ -101,6 +103,20 @@ enum Result : int8_t PULSAR_PUBLIC const char* strResult(Result result); PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, pulsar::Result result); + +struct PULSAR_PUBLIC Error { + Result result; + std::string message; +}; + +inline std::ostream& operator<<(std::ostream& os, const Error& error) { + os << error.result; + if (!error.message.empty()) { + os << " " << error.message; + } + return os; +} + } // namespace pulsar #endif /* ERROR_HPP_ */ diff --git a/lib/BinaryProtoLookupService.cc b/lib/BinaryProtoLookupService.cc index a110e965..7712e760 100644 --- a/lib/BinaryProtoLookupService.cc +++ b/lib/BinaryProtoLookupService.cc @@ -47,10 +47,10 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho // NOTE: we can use move capture for topic since C++14 cnxPool_.getConnectionAsync(address).addListener([this, promise, topic, address, authoritative, - redirectCount](Result result, + redirectCount](const auto& error, const ClientConnectionWeakPtr& weakCnx) { - if (result != ResultOk) { - promise->setFailed(result); + if (error.result != ResultOk) { + promise->setFailed(error.result); return; } auto cnx = weakCnx.lock(); @@ -62,10 +62,10 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho auto lookupPromise = std::make_shared(); cnx->newTopicLookup(topic, authoritative, listenerName_, newRequestId(), lookupPromise); lookupPromise->getFuture().addListener([this, cnx, promise, topic, address, redirectCount]( - Result result, const LookupDataResultPtr& data) { - if (result != ResultOk || !data) { - LOG_ERROR("Lookup failed for " << topic << ", result " << result); - promise->setFailed(result); + const Error& error, const LookupDataResultPtr& data) { + if (error.result != ResultOk || !data) { + LOG_ERROR("Lookup failed for " << topic << ", result " << error); + promise->setFailed(error.result); return; } @@ -96,15 +96,11 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho return promise->getFuture(); } -/* - * @param topicName topic to get number of partitions. - * - */ -Future BinaryProtoLookupService::getPartitionMetadataAsync( +Future BinaryProtoLookupService::getPartitionMetadataAsync( const TopicNamePtr& topicName) { LookupDataResultPromisePtr promise = std::make_shared(); if (!topicName) { - promise->setFailed(ResultInvalidTopicName); + promise->setFailed(Error{ResultInvalidTopicName, ""}); return promise->getFuture(); } std::string lookupName = topicName->toString(); @@ -115,16 +111,17 @@ Future BinaryProtoLookupService::getPartitionMetada return promise->getFuture(); } -void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::string& topicName, Result result, +void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::string& topicName, + const Error& error, const ClientConnectionWeakPtr& clientCnx, const LookupDataResultPromisePtr& promise) { - if (result != ResultOk) { - promise->setFailed(result); + if (error.result != ResultOk) { + promise->setFailed(error); return; } auto conn = clientCnx.lock(); if (!conn) { - promise->setFailed(ResultConnectError); + promise->setFailed(Error{ResultConnectError, ""}); return; } LookupDataResultPromisePtr lookupPromise = std::make_shared(); @@ -135,7 +132,7 @@ void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::str std::placeholders::_2, clientCnx, promise)); } -void BinaryProtoLookupService::handlePartitionMetadataLookup(const std::string& topicName, Result result, +void BinaryProtoLookupService::handlePartitionMetadataLookup(const std::string& topicName, const Error& error, const LookupDataResultPtr& data, const ClientConnectionWeakPtr& clientCnx, const LookupDataResultPromisePtr& promise) { @@ -144,8 +141,8 @@ void BinaryProtoLookupService::handlePartitionMetadataLookup(const std::string& << data->getBrokerUrl()); promise->setValue(data); } else { - LOG_DEBUG("PartitionMetadataLookup failed for " << topicName << ", result " << result); - promise->setFailed(result); + LOG_DEBUG("PartitionMetadataLookup failed for " << topicName << ", result " << error); + promise->setFailed(error); } } @@ -168,26 +165,28 @@ Future BinaryProtoLookupService::getTopicsOfNamespac return promise->getFuture(); } -Future BinaryProtoLookupService::getSchema(const TopicNamePtr& topicName, - const std::string& version) { - GetSchemaPromisePtr promise = std::make_shared>(); - +Future BinaryProtoLookupService::getSchema(const TopicNamePtr& topicName, + const std::string& version) { + GetSchemaPromisePtr promise = std::make_shared>(); if (!topicName) { - promise->setFailed(ResultInvalidTopicName); + promise->setFailed(Error{ResultInvalidTopicName, ""}); return promise->getFuture(); } - cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost()) + + const auto topic = topicName->toString(); + const auto address = serviceNameResolver_.resolveHost(); + cnxPool_.getConnectionAsync(address, address) .addListener(std::bind(&BinaryProtoLookupService::sendGetSchemaRequest, this, topicName->toString(), version, std::placeholders::_1, std::placeholders::_2, promise)); - return promise->getFuture(); } void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName, const std::string& version, - Result result, const ClientConnectionWeakPtr& clientCnx, + const Error& error, + const ClientConnectionWeakPtr& clientCnx, const GetSchemaPromisePtr& promise) { - if (result != ResultOk) { - promise->setFailed(result); + if (error.result != ResultOk) { + promise->setFailed(error); return; } @@ -195,11 +194,10 @@ void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName uint64_t requestId = newRequestId(); LOG_DEBUG("sendGetSchemaRequest. requestId: " << requestId << " topicName: " << topicName << " version: " << version); - conn->newGetSchema(topicName, version, requestId) - .addListener([promise](Result result, const SchemaInfo& schemaInfo) { - if (result != ResultOk) { - promise->setFailed(result); + .addListener([promise](const auto& error, const SchemaInfo& schemaInfo) { + if (error.result != ResultOk) { + promise->setFailed(error); return; } promise->setValue(schemaInfo); @@ -208,11 +206,11 @@ void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string& nsName, CommandGetTopicsOfNamespace_Mode mode, - Result result, + const Error& error, const ClientConnectionWeakPtr& clientCnx, const NamespaceTopicsPromisePtr& promise) { - if (result != ResultOk) { - promise->setFailed(result); + if (error.result != ResultOk) { + promise->setFailed(error.result); return; } diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h index 35dcb163..b5adc429 100644 --- a/lib/BinaryProtoLookupService.h +++ b/lib/BinaryProtoLookupService.h @@ -35,7 +35,7 @@ class ConnectionPool; class LookupDataResult; class ServiceNameResolver; using NamespaceTopicsPromisePtr = std::shared_ptr>; -using GetSchemaPromisePtr = std::shared_ptr>; +using GetSchemaPromisePtr = std::shared_ptr>; class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { public: @@ -48,12 +48,12 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { LookupResultFuture getBroker(const TopicName& topicName) override; - Future getPartitionMetadataAsync(const TopicNamePtr& topicName) override; + Future getPartitionMetadataAsync(const TopicNamePtr& topicName) override; Future getTopicsOfNamespaceAsync( const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override; - Future getSchema(const TopicNamePtr& topicName, const std::string& version) override; + Future getSchema(const TopicNamePtr& topicName, const std::string& version) override; ServiceNameResolver& getServiceNameResolver() override { return serviceNameResolver_; } @@ -71,20 +71,20 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { std::string listenerName_; const int32_t maxLookupRedirects_; - void sendPartitionMetadataLookupRequest(const std::string& topicName, Result result, + void sendPartitionMetadataLookupRequest(const std::string& topicName, const Error& error, const ClientConnectionWeakPtr& clientCnx, const LookupDataResultPromisePtr& promise); - void handlePartitionMetadataLookup(const std::string& topicName, Result result, + void handlePartitionMetadataLookup(const std::string& topicName, const Error& error, const LookupDataResultPtr& data, const ClientConnectionWeakPtr& clientCnx, const LookupDataResultPromisePtr& promise); void sendGetTopicsOfNamespaceRequest(const std::string& nsName, CommandGetTopicsOfNamespace_Mode mode, - Result result, const ClientConnectionWeakPtr& clientCnx, + const Error& error, const ClientConnectionWeakPtr& clientCnx, const NamespaceTopicsPromisePtr& promise); - void sendGetSchemaRequest(const std::string& topicName, const std::string& version, Result result, + void sendGetSchemaRequest(const std::string& topicName, const std::string& version, const Error& error, const ClientConnectionWeakPtr& clientCnx, const GetSchemaPromisePtr& promise); void getTopicsOfNamespaceListener(Result result, const NamespaceTopicsPtr& topicsPtr, diff --git a/lib/Client.cc b/lib/Client.cc index 48e92dda..3eca8a7b 100644 --- a/lib/Client.cc +++ b/lib/Client.cc @@ -65,7 +65,18 @@ void Client::createProducerAsync(const std::string& topic, const CreateProducerC void Client::createProducerAsync(const std::string& topic, const ProducerConfiguration& conf, const CreateProducerCallback& callback) { - impl_->createProducerAsync(topic, conf, callback); + impl_->createProducerAsync(topic, conf, [callback](const auto& v) { + if (const auto* error = std::get_if(&v)) { + callback(error->result, Producer()); + } else { + callback(ResultOk, std::get(v)); + } + }); +} + +void Client::createProducerAsyncV2(const std::string& topic, const ProducerConfiguration& conf, + CreateProducerV2Callback callback) { + impl_->createProducerAsync(topic, conf, std::move(callback)); } Result Client::subscribe(const std::string& topic, const std::string& subscriptionName, Consumer& consumer) { @@ -199,7 +210,9 @@ uint64_t Client::getNumberOfConsumers() { return impl_->getNumberOfConsumers(); void Client::getSchemaInfoAsync(const std::string& topic, int64_t version, std::function callback) { impl_->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "") - .addListener(std::move(callback)); + .addListener([callback{std::move(callback)}](const Error& error, const SchemaInfo& schemaInfo) { + callback(error.result, schemaInfo); + }); } ServiceInfo Client::getServiceInfo() const { return impl_->getServiceInfo(); } diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index a135ade7..95597034 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -209,8 +209,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: LOG_INFO(cnxString() << "Create ClientConnection, timeout=" << clientConfiguration.getConnectionTimeout()); if (!authentication_) { - LOG_ERROR("Invalid authentication plugin"); - throw ResultAuthenticationError; + throw Error{ResultAuthenticationError, "Invalid authentication plugin"}; return; } @@ -248,7 +247,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: ctx.load_verify_file(trustCertFilePath); } else { LOG_ERROR(trustCertFilePath << ": No such trustCertFile"); - throw ResultAuthenticationError; + throw Error{ResultAuthenticationError, trustCertFilePath + ": No such trustCertFile"}; } } else { ctx.set_default_verify_paths(); @@ -265,11 +264,11 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: tlsPrivateKey = authData->getTlsPrivateKey(); if (!file_exists(tlsCertificates)) { LOG_ERROR(tlsCertificates << ": No such tlsCertificates"); - throw ResultAuthenticationError; + throw Error{ResultAuthenticationError, tlsCertificates + ": No such tlsCertificates"}; } if (!file_exists(tlsCertificates)) { LOG_ERROR(tlsCertificates << ": No such tlsCertificates"); - throw ResultAuthenticationError; + throw Error{ResultAuthenticationError, tlsCertificates + ": No such tlsCertificates"}; } ctx.use_private_key_file(tlsPrivateKey, ASIO::ssl::context::pem); ctx.use_certificate_chain_file(tlsCertificates); @@ -304,7 +303,7 @@ ClientConnection::~ClientConnection() { void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdConnected) { if (!cmdConnected.has_server_version()) { LOG_ERROR(cnxString() << "Server version is not set"); - close(); + close(Error{ResultConnectError, cnxString() + "Server version is not set"}); return; } @@ -414,7 +413,7 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, const tcp::endp std::atomic_store(&cnxStringPtr_, std::make_shared(cnxStringStream.str())); } catch (const ASIO_SYSTEM_ERROR& e) { LOG_ERROR("Failed to get endpoint: " << e.what()); - close(ResultRetryable); + close(Error{ResultRetryable, ""}); return; } if (logicalAddress_ == physicalAddress_) { @@ -470,7 +469,7 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, const tcp::endp Url service_url; if (!Url::parse(physicalAddress_, service_url)) { LOG_ERROR(cnxString() << "Invalid Url, unable to parse: " << err << " " << err.message()); - close(); + close(Error{ResultConnectError, "Invalid Url, unable to parse: " + err.message()}); return; } } @@ -492,9 +491,9 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, const tcp::endp cancelTimer(*connectTimer_); } if (err == ASIO::error::operation_aborted) { - close(); + close(Error{ResultConnectError, "Connection attempt was canceled"}); } else { - close(ResultRetryable); + close(Error{ResultRetryable, ""}); } } } @@ -503,10 +502,10 @@ void ClientConnection::handleHandshake(const ASIO_ERROR& err) { if (err) { if (err.value() == ASIO::ssl::error::stream_truncated) { LOG_WARN(cnxString() << "Handshake failed: " << err.message()); - close(ResultRetryable); + close(Error{ResultRetryable, ""}); } else { LOG_ERROR(cnxString() << "Handshake failed: " << err.message()); - close(); + close(Error{ResultConnectError, "Handshake failed: " + err.message()}); } return; } @@ -515,16 +514,17 @@ void ClientConnection::handleHandshake(const ASIO_ERROR& err) { Result result = ResultOk; SharedBuffer buffer; try { + // TODO: pass error instead of result buffer = Commands::newConnect(authentication_, logicalAddress_, connectingThroughProxy, clientVersion_, result); } catch (const std::exception& e) { LOG_ERROR(cnxString() << "Failed to create Connect command: " << e.what()); - close(ResultAuthenticationError); + close(Error{ResultAuthenticationError, "Failed to create Connect command: " + std::string(e.what())}); return; } if (result != ResultOk) { LOG_ERROR(cnxString() << "Failed to establish connection: " << result); - close(result); + close(Error{result, ""}); return; } // Send CONNECT command to broker @@ -541,7 +541,7 @@ void ClientConnection::handleSentPulsarConnect(const ASIO_ERROR& err, const Shar } if (err) { LOG_ERROR(cnxString() << "Failed to establish connection: " << err.message()); - close(); + close(Error{ResultConnectError, "Failed to establish connection: " + err.message()}); return; } @@ -555,7 +555,7 @@ void ClientConnection::handleSentAuthResponse(const ASIO_ERROR& err, const Share } if (err) { LOG_WARN(cnxString() << "Failed to send auth response: " << err.message()); - close(); + close(Error{ResultRetryable, "Failed to send auth response: " + err.message()}); return; } } @@ -576,14 +576,15 @@ void ClientConnection::tcpConnectAsync() { std::string hostUrl = isSniProxy_ ? proxyServiceUrl_ : physicalAddress_; if (!Url::parse(hostUrl, service_url)) { LOG_ERROR(cnxString() << "Invalid Url, unable to parse: " << err << " " << err.message()); - close(); + close(Error{ResultConnectError, "Invalid Url, unable to parse: " + err.message()}); return; } if (service_url.protocol() != "pulsar" && service_url.protocol() != "pulsar+ssl") { LOG_ERROR(cnxString() << "Invalid Url protocol '" << service_url.protocol() << "'. Valid values are 'pulsar' and 'pulsar+ssl'"); - close(); + close(Error{ResultConnectError, "Invalid Url protocol '" + service_url.protocol() + + "'. Valid values are 'pulsar' and 'pulsar+ssl'"}); return; } @@ -598,7 +599,7 @@ void ClientConnection::handleResolve(ASIO_ERROR err, const tcp::resolver::result if (err) { std::string hostUrl = isSniProxy_ ? cnxString() : proxyServiceUrl_; LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message()); - close(); + close(Error{ResultConnectError, hostUrl + " Resolve error: " + err.message()}); return; } @@ -625,7 +626,8 @@ void ClientConnection::handleResolve(ASIO_ERROR err, const tcp::resolver::result LOG_ERROR(cnxString() << "Connection to " << results << " was not established in " << connectTimeout_.count() << " ms"); lock.unlock(); - close(); + close(Error{ResultConnectError, "Connection was not established in " + + std::to_string(connectTimeout_.count()) + " ms"}); } // else: the connection is closed or already established }); @@ -660,7 +662,7 @@ void ClientConnection::handleRead(const ASIO_ERROR& err, size_t bytesTransferred } else { LOG_ERROR(cnxString() << "Read operation failed: " << err.message()); } - close(ResultDisconnected); + close(Error{ResultDisconnected, ""}); } else if (bytesTransferred < minReadSize) { // Read the remaining part, use a slice of buffer to write on the next // region @@ -711,7 +713,7 @@ void ClientConnection::processIncomingBuffer() { proto::BaseCommand incomingCmd; if (!incomingCmd.ParseFromArray(incomingBuffer_.data(), cmdSize)) { LOG_ERROR(cnxString() << "Error parsing protocol buffer command"); - close(ResultDisconnected); + close(Error{ResultDisconnected, ""}); return; } @@ -735,7 +737,7 @@ void ClientConnection::processIncomingBuffer() { << ", message ledger id " << incomingCmd.message().message_id().ledgerid() << ", entry id " << incomingCmd.message().message_id().entryid() << "] Error parsing broker entry metadata"); - close(ResultDisconnected); + close(Error{ResultDisconnected, ""}); return; } incomingBuffer_.setReaderIndex(readerIndex + 2 + 4 + brokerEntryMetadataSize); @@ -753,7 +755,7 @@ void ClientConnection::processIncomingBuffer() { << ", message ledger id " << incomingCmd.message().message_id().ledgerid() // << ", entry id " << incomingCmd.message().message_id().entryid() << "] Error parsing message metadata"); - close(ResultDisconnected); + close(Error{ResultDisconnected, ""}); return; } @@ -886,7 +888,8 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) { // Handle Pulsar Connected if (incomingCmd.type() != BaseCommand::CONNECTED) { // Wrong cmd - close(); + close(Error{ResultDisconnected, cnxString() + "Expected CONNECTED command but received " + + Commands::messageType(incomingCmd.type())}); } else { handlePulsarConnected(incomingCmd.connected()); } @@ -987,7 +990,7 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) { default: LOG_WARN(cnxString() << "Received invalid message from server"); - close(ResultDisconnected); + close(Error{ResultDisconnected, cnxString() + "Received invalid message from server"}); break; } } @@ -1032,11 +1035,11 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, co Lock lock(mutex_); if (isClosed()) { lock.unlock(); - promise->setFailed(ResultNotConnected); + promise->setFailed(Error{ResultNotConnected, ""}); return; } else if (numOfPendingLookupRequest_ >= maxPendingLookupRequest_) { lock.unlock(); - promise->setFailed(ResultTooManyLookupRequestException); + promise->setFailed(Error{ResultTooManyLookupRequestException, ""}); return; } @@ -1048,13 +1051,14 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, co self->numOfPendingLookupRequest_--; } }); - request->getFuture().addListener([promise](Result result, const LookupDataResultPtr& lookupDataResult) { - if (result == ResultOk) { - promise->setValue(lookupDataResult); - } else { - promise->setFailed(result); - } - }); + request->getFuture().addListener( + [promise](const auto& error, const LookupDataResultPtr& lookupDataResult) { + if (error.result == ResultOk) { + promise->setValue(lookupDataResult); + } else { + promise->setFailed(error); + } + }); numOfPendingLookupRequest_++; lock.unlock(); @@ -1112,7 +1116,7 @@ void ClientConnection::handleSend(const ASIO_ERROR& err, const SharedBuffer&) { } if (err) { LOG_WARN(cnxString() << "Could not send message on connection: " << err << " " << err.message()); - close(ResultDisconnected); + close(Error{ResultDisconnected, ""}); } else { sendPendingCommands(); } @@ -1124,7 +1128,7 @@ void ClientConnection::handleSendPair(const ASIO_ERROR& err) { } if (err) { LOG_WARN(cnxString() << "Could not send pair message on connection: " << err << " " << err.message()); - close(ResultDisconnected); + close(Error{ResultDisconnected, ""}); } else { sendPendingCommands(); } @@ -1164,16 +1168,16 @@ void ClientConnection::sendPendingCommands() { } } -Future ClientConnection::sendRequestWithId(const SharedBuffer& cmd, int requestId, - const char* requestType) { +Future ClientConnection::sendRequestWithId(const SharedBuffer& cmd, int requestId, + const char* requestType) { Lock lock(mutex_); if (isClosed()) { lock.unlock(); LOG_DEBUG(cnxString() << "Fail " << requestType << "(req_id: " << requestId << ") to a closed connection"); - Promise promise; - promise.setFailed(ResultNotConnected); + Promise promise; + promise.setFailed(Error{ResultNotConnected, ""}); return promise.getFuture(); } @@ -1206,7 +1210,7 @@ void ClientConnection::handleKeepAliveTimeout(const ASIO_ERROR& ec) { if (havePendingPingRequest_) { LOG_WARN(cnxString() << "Forcing connection to close after keep-alive timeout"); - close(ResultDisconnected); + close(Error{ResultDisconnected, ""}); } else { // Send keep alive probe to peer LOG_DEBUG(cnxString() << "Sending ping message"); @@ -1234,10 +1238,10 @@ void ClientConnection::handleConsumerStatsTimeout(const ASIO_ERROR& ec, startConsumerStatsTimer(consumerStatsRequests); } -const std::future& ClientConnection::close(Result result, bool switchCluster) { +const std::future& ClientConnection::close(Error&& error, bool switchCluster) { Lock lock(mutex_); if (closeFuture_) { - connectPromise_.setFailed(result); + connectPromise_.setFailed(std::move(error)); return *closeFuture_; } auto promise = std::make_shared>(); @@ -1279,6 +1283,7 @@ const std::future& ClientConnection::close(Result result, bool switchClust cancelTimer(*connectTimer_); lock.unlock(); int refCount = weak_from_this().use_count(); + auto result = error.result; if (result != ResultAlreadyClosed /* closed by the pool */ && !isResultRetryable(result)) { LOG_ERROR(cnxString() << "Connection closed with " << result << " (refCnt: " << refCount << ")"); } else { @@ -1327,37 +1332,33 @@ const std::future& ClientConnection::close(Result result, bool switchClust } self.reset(); - connectPromise_.setFailed(result); + connectPromise_.setFailed(error); // Fail all pending requests after releasing the lock. for (auto& kv : pendingRequests) { - kv.second->fail(result); + kv.second->fail(error); } for (auto& kv : pendingLookupRequests) { - kv.second->fail(result); + kv.second->fail(error); } for (auto& kv : pendingConsumerStatsMap) { LOG_ERROR(cnxString() << " Closing Client Connection, please try again later"); kv.second.setFailed(result); } for (auto& kv : pendingGetLastMessageIdRequests) { - kv.second->fail(result); + kv.second->fail(error); } for (auto& kv : pendingGetNamespaceTopicsRequests) { - kv.second->fail(result); + kv.second->fail(error); } for (auto& kv : pendingGetSchemaRequests) { - kv.second->fail(result); + kv.second->fail(error); } return *closeFuture_; } bool ClientConnection::isClosed() const { return state_ == Disconnected; } -Future ClientConnection::getConnectFuture() { - return connectPromise_.getFuture(); -} - void ClientConnection::registerProducer(int producerId, const ProducerImplPtr& producer) { Lock lock(mutex_); producers_.insert(std::make_pair(producerId, producer)); @@ -1405,12 +1406,21 @@ Future ClientConnection::newGetLastMessageId(u }); lock.unlock(); + // TODO: return Error instead + Promise promise; + request->getFuture().addListener([promise](const auto& error, const auto& response) { + if (error.result == ResultOk) { + promise.setValue(response); + } else { + promise.setFailed(error.result); + } + }); if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr && mockServer_->sendRequest("GET_LAST_MESSAGE_ID", requestId)) { - return request->getFuture(); + return promise.getFuture(); } sendCommand(Commands::newGetLastMessageId(consumerId, requestId)); - return request->getFuture(); + return promise.getFuture(); } Future ClientConnection::newGetTopicsOfNamespace( @@ -1429,23 +1439,34 @@ Future ClientConnection::newGetTopicsOfNamespace( LOG_WARN(cnxString << "GetTopicsOfNamespace request timeout to broker, req_id: " << requestId); }); lock.unlock(); + + // TODO: return Error instead + Promise promise; + request->getFuture().addListener([promise](const auto& error, const auto& response) { + if (error.result == ResultOk) { + promise.setValue(response); + } else { + promise.setFailed(error.result); + } + }); + if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr && mockServer_->sendRequest("GET_TOPICS_OF_NAMESPACE", requestId)) { - return request->getFuture(); + return promise.getFuture(); } sendCommand(Commands::newGetTopicsOfNamespace(nsName, mode, requestId)); - return request->getFuture(); + return promise.getFuture(); } -Future ClientConnection::newGetSchema(const std::string& topicName, - const std::string& version, uint64_t requestId) { +Future ClientConnection::newGetSchema(const std::string& topicName, + const std::string& version, uint64_t requestId) { Lock lock(mutex_); if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString() << "Client is not connected to the broker"); - Promise promise; - promise.setFailed(ResultNotConnected); + Promise promise; + promise.setFailed(Error{ResultNotConnected, ""}); return promise.getFuture(); } @@ -1453,14 +1474,15 @@ Future ClientConnection::newGetSchema(const std::string& top insertRequest(pendingGetSchemaRequests_, requestId, [cnxString = cnxString(), requestId]() { LOG_WARN(cnxString << "GetSchema request timeout to broker, req_id: " << requestId); }); + auto future = request->getFuture(); lock.unlock(); if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr && mockServer_->sendRequest("GET_SCHEMA", requestId)) { - return request->getFuture(); + return future; } sendCommand(Commands::newGetSchema(topicName, version, requestId)); - return request->getFuture(); + return future; } void ClientConnection::checkServerError(ServerError error, const std::string& message) { @@ -1486,7 +1508,7 @@ void ClientConnection::handleSendReceipt(const proto::CommandSendReceipt& sendRe if (!producer->ackReceived(sequenceId, messageId)) { // If the producer fails to process the ack, we need to close the connection // to give it a chance to recover from there - close(ResultDisconnected); + close(Error{ResultDisconnected, ""}); } } } else { @@ -1510,12 +1532,12 @@ void ClientConnection::handleSendError(const proto::CommandSendError& error) { if (!producer->removeCorruptMessage(sequenceId)) { // If the producer fails to remove corrupt msg, we need to close the // connection to give it a chance to recover from there - close(ResultDisconnected); + close(Error{ResultDisconnected, ""}); } } } } else { - close(ResultDisconnected); + close(Error{ResultDisconnected, ""}); } } @@ -1554,13 +1576,16 @@ void ClientConnection::handlePartitionedMetadataResponse( << partitionMetadataResponse.request_id() << " error: " << partitionMetadataResponse.error() << " msg: " << partitionMetadataResponse.message()); - checkServerError(partitionMetadataResponse.error(), partitionMetadataResponse.message()); - request->fail( - getResult(partitionMetadataResponse.error(), partitionMetadataResponse.message())); + auto msg = partitionMetadataResponse.message(); + checkServerError(partitionMetadataResponse.error(), msg); + auto result = getResult(partitionMetadataResponse.error(), msg); + request->fail(Error{result, std::move(msg)}); } else { LOG_ERROR(cnxString() << "Failed partition-metadata lookup req_id: " << partitionMetadataResponse.request_id() << " with empty response: "); - request->fail(ResultConnectError); + request->fail( + Error{ResultConnectError, "Empty response from broker for request " + + std::to_string(partitionMetadataResponse.request_id())}); } } else { LookupDataResultPtr lookupResultPtr = std::make_shared(); @@ -1627,12 +1652,16 @@ void ClientConnection::handleLookupTopicRespose( LOG_ERROR(cnxString() << "Failed lookup req_id: " << lookupTopicResponse.request_id() << " error: " << lookupTopicResponse.error() << " msg: " << lookupTopicResponse.message()); - checkServerError(lookupTopicResponse.error(), lookupTopicResponse.message()); - request->fail(getResult(lookupTopicResponse.error(), lookupTopicResponse.message())); + auto msg = lookupTopicResponse.message(); + checkServerError(lookupTopicResponse.error(), msg); + auto result = getResult(lookupTopicResponse.error(), lookupTopicResponse.message()); + request->fail(Error{result, std::move(msg)}); } else { LOG_ERROR(cnxString() << "Failed lookup req_id: " << lookupTopicResponse.request_id() << " with empty response: "); - request->fail(ResultConnectError); + request->fail( + Error{ResultConnectError, "Empty response from broker for request " + + std::to_string(lookupTopicResponse.request_id())}); } } else { LOG_DEBUG(cnxString() << "Received lookup response from server. req_id: " @@ -1702,6 +1731,7 @@ void ClientConnection::handleError(const proto::CommandError& error) { LOG_WARN(cnxString() << "Received error response from server: " << result << (error.has_message() ? (" (" + error.message() + ")") : "") << " -- req_id: " << error.request_id()); + Error requestError{result, error.message()}; Lock lock(mutex_); @@ -1711,7 +1741,7 @@ void ClientConnection::handleError(const proto::CommandError& error) { pendingRequests_.erase(it); lock.unlock(); - request->fail(result); + request->fail(requestError); } else { auto it = pendingGetLastMessageIdRequests_.find(error.request_id()); if (it != pendingGetLastMessageIdRequests_.end()) { @@ -1719,7 +1749,7 @@ void ClientConnection::handleError(const proto::CommandError& error) { pendingGetLastMessageIdRequests_.erase(it); lock.unlock(); - request->fail(result); + request->fail(requestError); } else { auto it = pendingGetNamespaceTopicsRequests_.find(error.request_id()); if (it != pendingGetNamespaceTopicsRequests_.end()) { @@ -1727,7 +1757,7 @@ void ClientConnection::handleError(const proto::CommandError& error) { pendingGetNamespaceTopicsRequests_.erase(it); lock.unlock(); - request->fail(result); + request->fail(requestError); } else { lock.unlock(); } @@ -1857,10 +1887,11 @@ void ClientConnection::handleAuthChallenge() { LOG_DEBUG(cnxString() << "Received auth challenge from broker"); Result result; + // TODO: use Error instead SharedBuffer buffer = Commands::newAuthResponse(authentication_, result); if (result != ResultOk) { LOG_ERROR(cnxString() << "Failed to send auth response: " << result); - close(result); + close(Error{result, "Failed to auth response"}); return; } auto self = shared_from_this(); @@ -1955,7 +1986,7 @@ void ClientConnection::handleGetSchemaResponse(const proto::CommandGetSchemaResp : "") << " -- req_id: " << response.request_id()); } - request->fail(result); + request->fail(Error{result, response.error_message()}); return; } @@ -1992,7 +2023,7 @@ void ClientConnection::handleAckResponse(const proto::CommandAckResponse& respon lock.unlock(); if (response.has_error()) { - request->fail(getResult(response.error(), "")); + request->fail(Error{getResult(response.error(), ""), response.message()}); } else { request->complete({}); } @@ -2003,7 +2034,7 @@ void ClientConnection::unsafeRemovePendingRequest(long requestId) { if (it != pendingRequests_.end()) { auto request = std::move(it->second); pendingRequests_.erase(it); - request->fail(ResultDisconnected); + request->fail(Error{ResultDisconnected, ""}); } } diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index c8cd86fe..fd89fae5 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -163,11 +163,11 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this& close(Result result = ResultConnectError, bool switchCluster = false); + const std::future& close(Error&& error = Error{ResultConnectError, ""}, bool switchCluster = false); bool isClosed() const; - Future getConnectFuture(); + auto getConnectFuture() const { return connectPromise_.getFuture(); } Future getCloseFuture(); @@ -191,8 +191,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this sendRequestWithId(const SharedBuffer& cmd, int requestId, - const char* requestType); + Future sendRequestWithId(const SharedBuffer& cmd, int requestId, + const char* requestType); const std::string& brokerAddress() const; @@ -212,8 +212,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this newGetSchema(const std::string& topicName, const std::string& version, - uint64_t requestId); + Future newGetSchema(const std::string& topicName, const std::string& version, + uint64_t requestId); void attachMockServer(const std::shared_ptr& mockServer) { mockServer_ = mockServer; @@ -334,7 +334,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this connectPromise_; + Promise connectPromise_; + const std::chrono::milliseconds connectTimeout_; const DeadlineTimerPtr connectTimer_; diff --git a/lib/ClientConnectionAdaptor.h b/lib/ClientConnectionAdaptor.h index 2c299373..780b35b9 100644 --- a/lib/ClientConnectionAdaptor.h +++ b/lib/ClientConnectionAdaptor.h @@ -45,13 +45,13 @@ inline void checkServerError(Connection& connection, ServerError error, const st message.find("KeeperException") == std::string::npos && message.find("is being unloaded") == std::string::npos && message.find("the broker do not have test listener") == std::string::npos) { - connection.close(ResultDisconnected); + connection.close(Error{ResultDisconnected, message}); } break; case proto::ServerError::TooManyRequests: // TODO: Implement maxNumberOfRejectedRequestPerConnection like // https://github.com/apache/pulsar/pull/274 - connection.close(ResultDisconnected); + connection.close(Error{ResultDisconnected, ""}); break; default: break; diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index b84c14c4..1decb125 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -28,6 +28,7 @@ #include #include #include +#include #include "BinaryProtoLookupService.h" #include "ClientConfigurationImpl.h" @@ -188,49 +189,54 @@ LookupServicePtr ClientImpl::getLookup(const std::string& redirectedClusterURI) } void ClientImpl::createProducerAsync(const std::string& topic, const ProducerConfiguration& conf, - const CreateProducerCallback& callback, bool autoDownloadSchema) { + CreateProducerV2Callback callback, bool autoDownloadSchema) { if (conf.isChunkingEnabled() && conf.getBatchingEnabled()) { - throw std::invalid_argument("Batching and chunking of messages can't be enabled together"); + callback( + Error{ResultInvalidConfiguration, "Batching and chunking of messages can't be enabled together"}); + return; } + TopicNamePtr topicName; { std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); - callback(ResultAlreadyClosed, Producer()); + callback(Error{ResultAlreadyClosed, ""}); return; } else if (!(topicName = TopicName::get(topic))) { lock.unlock(); - callback(ResultInvalidTopicName, Producer()); + // TODO: return an error in TopicName + callback(Error{ResultInvalidTopicName, ""}); return; } } if (autoDownloadSchema) { - auto self = shared_from_this(); - getSchema(topicName).addListener( - [self, topicName, callback](Result res, const SchemaInfo& topicSchema) { - if (res != ResultOk) { - callback(res, Producer()); - return; - } - ProducerConfiguration conf; - conf.setSchema(topicSchema); - self->getPartitionMetadataAsync(topicName).addListener( - std::bind(&ClientImpl::handleCreateProducer, self, std::placeholders::_1, - std::placeholders::_2, topicName, conf, callback)); - }); + getSchema(topicName).addListener([self{shared_from_this()}, topicName, callback{std::move(callback)}]( + const Error& error, const SchemaInfo& topicSchema) mutable { + if (error.result != ResultOk) { + callback(error); + return; + } + ProducerConfiguration conf; + conf.setSchema(topicSchema); + self->getPartitionMetadataAsync(topicName).addListener( + std::bind(&ClientImpl::handleCreateProducer, self, std::placeholders::_1, + std::placeholders::_2, topicName, conf, callback)); + }); } else { getPartitionMetadataAsync(topicName).addListener( - std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), std::placeholders::_1, - std::placeholders::_2, topicName, conf, callback)); + [this, conf, topicName, callback{std::move(callback)}]( + const Error& error, const LookupDataResultPtr& partitionMetadata) { + handleCreateProducer(error, partitionMetadata, topicName, conf, callback); + }); } } -void ClientImpl::handleCreateProducer(Result result, const LookupDataResultPtr& partitionMetadata, +void ClientImpl::handleCreateProducer(const Error& error, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const ProducerConfiguration& conf, - const CreateProducerCallback& callback) { - if (!result) { + CreateProducerV2Callback callback) { + if (!error.result) { ProducerImplBasePtr producer; auto interceptors = std::make_shared(conf.getInterceptors()); @@ -244,36 +250,39 @@ void ClientImpl::handleCreateProducer(Result result, const LookupDataResultPtr& } } catch (const std::runtime_error& e) { LOG_ERROR("Failed to create producer: " << e.what()); - callback(ResultConnectError, {}); + callback(error); return; } producer->getProducerCreatedFuture().addListener( - std::bind(&ClientImpl::handleProducerCreated, shared_from_this(), std::placeholders::_1, - std::placeholders::_2, callback, producer)); + [this, self{shared_from_this()}, callback{std::move(callback)}, producer]( + const Error& error, const ProducerImplBaseWeakPtr& producerBaseWeakPtr) { + handleProducerCreated(error, producerBaseWeakPtr, callback, producer); + }); producer->start(); } else { LOG_ERROR("Error Checking/Getting Partition Metadata while creating producer on " - << topicName->toString() << " -- " << result); - callback(result, Producer()); + << topicName->toString() << " -- " << error.result); + callback(error); } } -void ClientImpl::handleProducerCreated(Result result, const ProducerImplBaseWeakPtr& producerBaseWeakPtr, - const CreateProducerCallback& callback, +void ClientImpl::handleProducerCreated(const Error& error, const ProducerImplBaseWeakPtr& producerBaseWeakPtr, + const CreateProducerV2Callback& callback, const ProducerImplBasePtr& producer) { - if (result == ResultOk) { + if (error.result == ResultOk) { auto address = producer.get(); auto existingProducer = producers_.putIfAbsent(address, producer); if (existingProducer) { auto producer = existingProducer.value().lock(); LOG_ERROR("Unexpected existing producer at the same address: " << address << ", producer: " << (producer ? producer->getProducerName() : "(null)")); - callback(ResultUnknownError, {}); + callback(Error{ResultUnknownError, + "Unexpected existing producer for name " + producer->getProducerName()}); return; } - callback(result, Producer(producer)); + callback(Producer(producer)); } else { - callback(result, {}); + callback(error); } } @@ -293,10 +302,11 @@ void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& st } } - MessageId msgId(startMessageId); - getPartitionMetadataAsync(topicName).addListener( - std::bind(&ClientImpl::handleReaderMetadataLookup, shared_from_this(), std::placeholders::_1, - std::placeholders::_2, topicName, msgId, conf, callback)); + getPartitionMetadataAsync(topicName).addListener([this, self{shared_from_this()}, topicName, + startMessageId, conf, + callback](const auto& error, const auto& metadata) { + handleReaderMetadataLookup(error.result, metadata, topicName, startMessageId, conf, callback); + }); } void ClientImpl::createTableViewAsync(const std::string& topic, const TableViewConfiguration& conf, @@ -507,9 +517,11 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& sub } } - getPartitionMetadataAsync(topicName).addListener( - std::bind(&ClientImpl::handleSubscribe, shared_from_this(), std::placeholders::_1, - std::placeholders::_2, topicName, subscriptionName, conf, callback)); + getPartitionMetadataAsync(topicName).addListener([this, self{shared_from_this()}, topicName, + subscriptionName, conf, + callback](const auto& error, const auto& metadata) { + handleSubscribe(error.result, metadata, topicName, subscriptionName, conf, callback); + }); } void ClientImpl::handleSubscribe(Result result, const LookupDataResultPtr& partitionMetadata, @@ -604,8 +616,8 @@ GetConnectionFuture ClientImpl::getConnection(const std::string& redirectedClust useProxy_ = data.proxyThroughServiceUrl; lookupCount_++; pool_.getConnectionAsync(data.logicalAddress, data.physicalAddress, key) - .addListener([promise](Result result, const ClientConnectionWeakPtr& weakCnx) { - if (result == ResultOk) { + .addListener([promise](const auto& error, const ClientConnectionWeakPtr& weakCnx) { + if (error.result == ResultOk) { auto cnx = weakCnx.lock(); if (cnx) { promise.setValue(cnx); @@ -613,7 +625,7 @@ GetConnectionFuture ClientImpl::getConnection(const std::string& redirectedClust promise.setFailed(ResultConnectError); } } else { - promise.setFailed(result); + promise.setFailed(error.result); } }); }); @@ -635,8 +647,8 @@ GetConnectionFuture ClientImpl::connect(const std::string& redirectedClusterURI, const auto& physicalAddress = getPhysicalAddress(redirectedClusterURI, logicalAddress); Promise promise; pool_.getConnectionAsync(logicalAddress, physicalAddress, key) - .addListener([promise](Result result, const ClientConnectionWeakPtr& weakCnx) { - if (result == ResultOk) { + .addListener([promise](const auto& error, const ClientConnectionWeakPtr& weakCnx) { + if (error.result == ResultOk) { auto cnx = weakCnx.lock(); if (cnx) { promise.setValue(cnx); @@ -644,7 +656,7 @@ GetConnectionFuture ClientImpl::connect(const std::string& redirectedClusterURI, promise.setFailed(ResultConnectError); } } else { - promise.setFailed(result); + promise.setFailed(error.result); } }); return promise.getFuture(); @@ -685,9 +697,10 @@ void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, const GetP return; } } - getPartitionMetadataAsync(topicName).addListener(std::bind(&ClientImpl::handleGetPartitions, - shared_from_this(), std::placeholders::_1, - std::placeholders::_2, topicName, callback)); + getPartitionMetadataAsync(topicName).addListener( + [this, self{shared_from_this()}, topicName, callback](const auto& error, const auto& metadata) { + handleGetPartitions(error.result, metadata, topicName, callback); + }); } void ClientImpl::closeAsync(const CloseCallback& callback) { diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index 7772b15b..c046d8e9 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -93,7 +93,7 @@ class ClientImpl : public std::enable_shared_from_this { * that exists for the topic. */ void createProducerAsync(const std::string& topic, const ProducerConfiguration& conf, - const CreateProducerCallback& callback, bool autoDownloadSchema = false); + CreateProducerV2Callback callback, bool autoDownloadSchema = false); void subscribeAsync(const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback); @@ -161,7 +161,6 @@ class ClientImpl : public std::enable_shared_from_this { std::shared_lock lock(mutex_); return lookupServicePtr_->getTopicsOfNamespaceAsync(nsName, mode); } - auto getSchema(const TopicNamePtr& topicName, const std::string& version = "") { std::shared_lock lock(mutex_); return lookupServicePtr_->getSchema(topicName, version); @@ -172,9 +171,9 @@ class ClientImpl : public std::enable_shared_from_this { friend class PulsarFriend; private: - void handleCreateProducer(Result result, const LookupDataResultPtr& partitionMetadata, + void handleCreateProducer(const Error& error, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const ProducerConfiguration& conf, - const CreateProducerCallback& callback); + CreateProducerV2Callback callback); void handleSubscribe(Result result, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const std::string& consumerName, @@ -187,8 +186,8 @@ class ClientImpl : public std::enable_shared_from_this { void handleGetPartitions(Result result, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const GetPartitionsCallback& callback); - void handleProducerCreated(Result result, const ProducerImplBaseWeakPtr& producerWeakPtr, - const CreateProducerCallback& callback, const ProducerImplBasePtr& producer); + void handleProducerCreated(const Error& error, const ProducerImplBaseWeakPtr& producerWeakPtr, + const CreateProducerV2Callback& callback, const ProducerImplBasePtr& producer); void handleConsumerCreated(Result result, const ConsumerImplBaseWeakPtr& consumerWeakPtr, const SubscribeCallback& callback, const ConsumerImplBasePtr& consumer); diff --git a/lib/ConnectionPool.cc b/lib/ConnectionPool.cc index 6465ff77..35d815ab 100644 --- a/lib/ConnectionPool.cc +++ b/lib/ConnectionPool.cc @@ -59,7 +59,7 @@ bool ConnectionPool::close() { auto& cnx = kv.second; if (cnx) { // Close with a fatal error to not let client retry - auto& future = cnx->close(ResultAlreadyClosed); + auto& future = cnx->close(Error{ResultAlreadyClosed, ""}); using namespace std::chrono_literals; if (auto status = future.wait_for(5s); status != std::future_status::ready) { LOG_WARN("Connection close timed out for " << cnx.get()->cnxString()); @@ -86,7 +86,7 @@ bool ConnectionPool::close() { void ConnectionPool::closeAllConnectionsForNewCluster() { for (auto&& kv : releaseConnections()) { - kv.second->close(ResultDisconnected, true); + kv.second->close(Error{ResultDisconnected, ""}, true); } } @@ -97,12 +97,12 @@ static const std::string getKey(const std::string& logicalAddress, const std::st return ss.str(); } -Future ConnectionPool::getConnectionAsync(const std::string& logicalAddress, - const std::string& physicalAddress, - size_t keySuffix) { +Future ConnectionPool::getConnectionAsync(const std::string& logicalAddress, + const std::string& physicalAddress, + size_t keySuffix) { if (closed_) { - Promise promise; - promise.setFailed(ResultAlreadyClosed); + Promise promise; + promise.setFailed(Error{ResultAlreadyClosed, ""}); return promise.getFuture(); } @@ -133,21 +133,21 @@ Future ConnectionPool::getConnectionAsync(const cnx.reset(new ClientConnection(logicalAddress, physicalAddress, *serviceInfo_.load(), executorProvider_->get(keySuffix), clientConfiguration_, clientVersion_, *this, keySuffix)); - } catch (Result result) { - Promise promise; - promise.setFailed(result); + } catch (Error error) { + Promise promise; + promise.setFailed(std::move(error)); return promise.getFuture(); } catch (const std::runtime_error& e) { lock.unlock(); LOG_ERROR("Failed to create connection: " << e.what()) - Promise promise; - promise.setFailed(ResultConnectError); + Promise promise; + promise.setFailed(Error{ResultConnectError, e.what()}); return promise.getFuture(); } LOG_INFO("Created connection for " << key); - Future future = cnx->getConnectFuture(); + auto future = cnx->getConnectFuture(); pool_.insert(std::make_pair(key, cnx)); lock.unlock(); diff --git a/lib/ConnectionPool.h b/lib/ConnectionPool.h index f828ac69..201c1d01 100644 --- a/lib/ConnectionPool.h +++ b/lib/ConnectionPool.h @@ -25,6 +25,7 @@ #include #include +#include #include #include #include @@ -82,16 +83,16 @@ class PULSAR_PUBLIC ConnectionPool { * @param keySuffix the key suffix to choose which connection on the same broker * @return a future that will produce the ClientCnx object */ - Future getConnectionAsync(const std::string& logicalAddress, - const std::string& physicalAddress, - size_t keySuffix); + Future getConnectionAsync(const std::string& logicalAddress, + const std::string& physicalAddress, + size_t keySuffix); - Future getConnectionAsync(const std::string& logicalAddress, - const std::string& physicalAddress) { + Future getConnectionAsync(const std::string& logicalAddress, + const std::string& physicalAddress) { return getConnectionAsync(logicalAddress, physicalAddress, generateRandomIndex()); } - Future getConnectionAsync(const std::string& address) { + Future getConnectionAsync(const std::string& address) { return getConnectionAsync(address, address); } diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 85f49946..1060cd5c 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -225,13 +225,12 @@ void ConsumerImpl::onNegativeAcksSend(const std::set& messageIds) { interceptors_->onNegativeAcksSend(Consumer(shared_from_this()), messageIds); } -Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) { - // Do not use bool, only Result. - Promise promise; +Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) { + Promise promise; if (state_ == Closed) { LOG_DEBUG(getName() << "connectionOpened : Consumer is already closed"); - promise.setFailed(ResultAlreadyClosed); + promise.setFailed(Error{ResultAlreadyClosed, ""}); return promise.getFuture(); } @@ -262,10 +261,10 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c auto self = get_shared_this_ptr(); setFirstRequestIdAfterConnect(requestId); cnx->sendRequestWithId(cmd, requestId, "SUBSCRIBE") - .addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) { - Result handleResult = handleCreateConsumer(cnx, result); + .addListener([this, self, cnx, promise](const Error& error, const ResponseData& responseData) { + Result handleResult = handleCreateConsumer(cnx, error.result); if (handleResult != ResultOk) { - promise.setFailed(handleResult); + promise.setFailed(Error{handleResult, ""}); return; } promise.setSuccess(); @@ -314,11 +313,11 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result auto name = getName(); cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId, "CLOSE_CONSUMER") - .addListener([name](Result result, const ResponseData&) { - if (result == ResultOk) { + .addListener([name](const Error& error, const ResponseData&) { + if (error.result == ResultOk) { LOG_INFO(name << "Closed consumer successfully after subscribe completed"); } else { - LOG_WARN(name << "Failed to close consumer: " << strResult(result)); + LOG_WARN(name << "Failed to close consumer: " << strResult(error.result)); } }); } else { @@ -328,7 +327,7 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result LOG_WARN(getName() << "Client already closed when subscribe completed, close the connection " << cnx->cnxString()); - cnx->close(ResultNotConnected); + cnx->close(Error{ResultNotConnected, ""}); } return ResultAlreadyClosed; } @@ -423,7 +422,8 @@ void ConsumerImpl::unsubscribeAsync(const ResultCallback& originalCallback) { SharedBuffer cmd = Commands::newUnsubscribe(consumerId_, requestId); auto self = get_shared_this_ptr(); cnx->sendRequestWithId(cmd, requestId, "UNSUBSCRIBE") - .addListener([self, callback](Result result, const ResponseData&) { callback(result); }); + .addListener( + [self, callback](const Error& error, const ResponseData&) { callback(error.result); }); } else { Result result = ResultNotConnected; lock.unlock(); @@ -1420,7 +1420,7 @@ void ConsumerImpl::closeAsync(const ResultCallback& originalCallback) { auto requestId = newRequestId(); auto self = get_shared_this_ptr(); cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId, "CLOSE_CONSUMER") - .addListener([self, callback](Result result, const ResponseData&) { callback(result); }); + .addListener([self, callback](const Error& error, const ResponseData&) { callback(error.result); }); } const std::string& ConsumerImpl::getName() const { return consumerStr_; } @@ -1828,12 +1828,13 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c auto weakSelf = weak_from_this(); cnx->sendRequestWithId(seek, requestId, "SEEK") - .addListener([this, weakSelf, previousLastSeekArg](Result result, const ResponseData& responseData) { + .addListener([this, weakSelf, previousLastSeekArg](const Error& error, + const ResponseData& responseData) { auto self = weakSelf.lock(); if (!self) { return; } - if (result == ResultOk) { + if (error.result == ResultOk) { LockGuard lock(mutex_); if (getCnx().expired() || reconnectionPending_) { // Reconnection path: delay the seek callback until connectionOpened. clearReceiveQueue() @@ -1859,12 +1860,12 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c seekStatus_ = SeekStatus::NOT_STARTED; } // else: complete the seek future after connection is established } else { - LOG_ERROR(getName() << "Failed to seek: " << result); + LOG_ERROR(getName() << "Failed to seek: " << error.result); LockGuard lock{mutex_}; seekStatus_ = SeekStatus::NOT_STARTED; lastSeekArg_ = previousLastSeekArg; executor_->postWork([self, callback{std::exchange(seekCallback_, std::nullopt).value()}, - result]() { callback(result); }); + result{error.result}]() { callback(result); }); } }); } @@ -1912,13 +1913,13 @@ void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, const Proces if (client) { auto self = get_shared_this_ptr(); client->createProducerAsync( - deadLetterPolicy_.getDeadLetterTopic(), producerConfiguration, - [self](Result res, const Producer& producer) { - if (res == ResultOk) { - self->deadLetterProducer_->setValue(producer); + deadLetterPolicy_.getDeadLetterTopic(), producerConfiguration, [self](const auto& v) { + if (const auto* producer = std::get_if(&v)) { + self->deadLetterProducer_->setValue(*producer); } else { LOG_ERROR("Dead letter producer create exception with topic: " - << self->deadLetterPolicy_.getDeadLetterTopic() << " ex: " << res); + << self->deadLetterPolicy_.getDeadLetterTopic() + << " ex: " << std::get(v).result); self->deadLetterProducer_.reset(); } }); @@ -2003,9 +2004,9 @@ void ConsumerImpl::doImmediateAck(const ClientConnectionPtr& cnx, const MessageI cnx->sendRequestWithId( Commands::newAck(consumerId_, msgId.ledgerId(), msgId.entryId(), ackSet, ackType, requestId), requestId, "ACK") - .addListener([callback](Result result, const ResponseData&) { + .addListener([callback](const Error& error, const ResponseData&) { if (callback) { - callback(result); + callback(error.result); } }); } else { @@ -2034,9 +2035,9 @@ void ConsumerImpl::doImmediateAck(const ClientConnectionPtr& cnx, const std::set auto requestId = newRequestId(); cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, ackMsgIds, requestId), requestId, "ACK") - .addListener([callback](Result result, const ResponseData&) { + .addListener([callback](const Error& error, const ResponseData&) { if (callback) { - callback(result); + callback(error.result); } }); } else { diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 6f287aa2..e2637624 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -166,7 +166,7 @@ class ConsumerImpl : public ConsumerImplBase { protected: // overrided methods from HandlerBase - Future connectionOpened(const ClientConnectionPtr& cnx) override; + Future connectionOpened(const ClientConnectionPtr& cnx) override; void connectionFailed(Result result) override; // impl methods from ConsumerImpl base diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h index ffc0e3cb..7e2cbb2f 100644 --- a/lib/ConsumerImplBase.h +++ b/lib/ConsumerImplBase.h @@ -86,9 +86,8 @@ class ConsumerImplBase : public HandlerBase { protected: // overrided methods from HandlerBase - Future connectionOpened(const ClientConnectionPtr& cnx) override { - // Do not use bool, only Result. - Promise promise; + Future connectionOpened(const ClientConnectionPtr& cnx) override { + Promise promise; promise.setSuccess(); return promise.getFuture(); } diff --git a/lib/HTTPLookupService.cc b/lib/HTTPLookupService.cc index 0be9713c..21f89281 100644 --- a/lib/HTTPLookupService.cc +++ b/lib/HTTPLookupService.cc @@ -22,6 +22,8 @@ #include #include +#include +#include #include "CurlWrapper.h" #include "ExecutorService.h" @@ -79,7 +81,7 @@ auto HTTPLookupService::getBroker(const TopicName &topicName) -> LookupResultFut auto self = shared_from_this(); executorProvider_->get()->postWork([this, self, promise, completeUrl] { std::string responseData; - Result result = sendHTTPRequest(completeUrl, responseData); + Result result = sendHTTPRequest(completeUrl, responseData).result; if (result != ResultOk) { promise.setFailed(result); @@ -93,7 +95,7 @@ auto HTTPLookupService::getBroker(const TopicName &topicName) -> LookupResultFut return promise.getFuture(); } -Future HTTPLookupService::getPartitionMetadataAsync( +Future HTTPLookupService::getPartitionMetadataAsync( const TopicNamePtr &topicName) { LookupPromise promise; std::stringstream completeUrlStream; @@ -148,9 +150,9 @@ Future HTTPLookupService::getTopicsOfNamespaceAsync( return promise.getFuture(); } -Future HTTPLookupService::getSchema(const TopicNamePtr &topicName, - const std::string &version) { - Promise promise; +Future HTTPLookupService::getSchema(const TopicNamePtr &topicName, + const std::string &version) { + Promise promise; std::stringstream completeUrlStream; const auto &url = serviceNameResolver_.resolveHost(); @@ -166,7 +168,6 @@ Future HTTPLookupService::getSchema(const TopicNamePtr &topi if (!version.empty()) { completeUrlStream << "/" << fromBigEndianBytes(version); } - executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleGetSchemaHTTPRequest, shared_from_this(), promise, completeUrlStream.str())); return promise.getFuture(); @@ -175,7 +176,7 @@ Future HTTPLookupService::getSchema(const TopicNamePtr &topi void HTTPLookupService::handleNamespaceTopicsHTTPRequest(const NamespaceTopicsPromise &promise, const std::string &completeUrl) { std::string responseData; - Result result = sendHTTPRequest(completeUrl, responseData); + Result result = sendHTTPRequest(completeUrl, responseData).result; if (result != ResultOk) { promise.setFailed(result); @@ -184,25 +185,27 @@ void HTTPLookupService::handleNamespaceTopicsHTTPRequest(const NamespaceTopicsPr } } -Result HTTPLookupService::sendHTTPRequest(const std::string &completeUrl, std::string &responseData) { +Error HTTPLookupService::sendHTTPRequest(const std::string &completeUrl, std::string &responseData) { long responseCode = -1; return sendHTTPRequest(completeUrl, responseData, responseCode); } -Result HTTPLookupService::sendHTTPRequest(const std::string &completeUrl, std::string &responseData, - long &responseCode) { - // Authorization data +Error HTTPLookupService::sendHTTPRequest(const std::string &completeUrl, std::string &responseData, + long &responseCode) { AuthenticationDataPtr authDataContent; Result authResult = authenticationPtr_->getAuthData(authDataContent); if (authResult != ResultOk) { - LOG_ERROR("Failed to getAuthData: " << authResult); - return authResult; + std::ostringstream message; + message << "Failed to getAuthData: " << authResult; + LOG_ERROR(message.str()); + return Error{authResult, message.str()}; } CurlWrapper curl; if (!curl.init()) { - LOG_ERROR("Unable to curl_easy_init for url " << completeUrl); - return ResultLookupError; + const std::string message = "Unable to curl_easy_init for url " + completeUrl; + LOG_ERROR(message); + return Error{ResultLookupError, message}; } std::unique_ptr tlsContext; @@ -226,41 +229,39 @@ Result HTTPLookupService::sendHTTPRequest(const std::string &completeUrl, std::s options.userAgent = std::string("Pulsar-CPP-v") + PULSAR_VERSION_STR; options.maxLookupRedirects = maxLookupRedirects_; auto result = curl.get(completeUrl, authDataContent->getHttpHeaders(), options, tlsContext.get()); - const auto &error = result.error; - if (!error.empty()) { - LOG_ERROR(completeUrl << " failed: " << error); - return ResultConnectError; - } responseData = result.responseData; responseCode = result.responseCode; - auto res = result.code; + + const auto res = result.code; if (res == CURLE_OK) { LOG_INFO("Response received for url " << completeUrl << " responseCode " << responseCode); - } else if (res == CURLE_TOO_MANY_REDIRECTS) { - LOG_ERROR("Response received for url " << completeUrl << ": " << curl_easy_strerror(res) - << ", curl error: " << result.serverError - << ", redirect URL: " << result.redirectUrl); + return Error{}; + } + + std::ostringstream message; + if (res == CURLE_TOO_MANY_REDIRECTS) { + message << "Response received for url " << completeUrl << ": " << curl_easy_strerror(res) + << ", curl error: " << result.serverError << ", redirect URL: " << result.redirectUrl; } else { - LOG_ERROR("Response failed for url " << completeUrl << ": " << curl_easy_strerror(res) - << ", curl error: " << result.serverError); + message << "Response failed for url " << completeUrl << ": " << curl_easy_strerror(res) + << ", curl error: " << result.serverError; } + LOG_ERROR(message.str()); switch (res) { - case CURLE_OK: - return ResultOk; case CURLE_COULDNT_CONNECT: - return ResultRetryable; + return Error{ResultRetryable, message.str()}; case CURLE_COULDNT_RESOLVE_PROXY: case CURLE_COULDNT_RESOLVE_HOST: case CURLE_HTTP_RETURNED_ERROR: - return ResultConnectError; + return Error{ResultConnectError, message.str()}; case CURLE_READ_ERROR: - return ResultReadError; + return Error{ResultReadError, message.str()}; case CURLE_OPERATION_TIMEDOUT: - return ResultTimeout; + return Error{ResultTimeout, message.str()}; default: - return ResultLookupError; + return Error{ResultLookupError, message.str()}; } } @@ -353,10 +354,10 @@ NamespaceTopicsPtr HTTPLookupService::parseNamespaceTopicsData(const std::string void HTTPLookupService::handleLookupHTTPRequest(const LookupPromise &promise, const std::string &completeUrl, RequestType requestType) { std::string responseData; - Result result = sendHTTPRequest(completeUrl, responseData); + Error error = sendHTTPRequest(completeUrl, responseData); - if (result != ResultOk) { - promise.setFailed(result); + if (error.result != ResultOk) { + promise.setFailed(std::move(error)); } else { promise.setValue((requestType == PartitionMetaData) ? parsePartitionData(responseData) : parseLookupData(responseData)); @@ -367,12 +368,12 @@ void HTTPLookupService::handleGetSchemaHTTPRequest(const GetSchemaPromise &promi const std::string &completeUrl) { std::string responseData; long responseCode = -1; - Result result = sendHTTPRequest(completeUrl, responseData, responseCode); + Error error = sendHTTPRequest(completeUrl, responseData, responseCode); if (responseCode == 404) { - promise.setFailed(ResultTopicNotFound); - } else if (result != ResultOk) { - promise.setFailed(result); + promise.setFailed(Error{ResultTopicNotFound, ""}); + } else if (error.result != ResultOk) { + promise.setFailed(std::move(error)); } else { ptree::ptree root; std::stringstream stream(responseData); @@ -381,20 +382,24 @@ void HTTPLookupService::handleGetSchemaHTTPRequest(const GetSchemaPromise &promi } catch (ptree::json_parser_error &e) { LOG_ERROR("Failed to parse json of Partition Metadata: " << e.what() << "\nInput Json = " << responseData); - promise.setFailed(ResultInvalidMessage); + promise.setFailed(Error{ResultInvalidMessage, + "Failed to parse json of Partition Metadata: " + std::string(e.what()) + + "\nInput Json = " + responseData}); return; } const std::string defaultNotFoundString = "Not found"; auto schemaTypeStr = root.get("type", defaultNotFoundString); if (schemaTypeStr == defaultNotFoundString) { LOG_ERROR("malformed json! - type not present" << responseData); - promise.setFailed(ResultInvalidMessage); + promise.setFailed( + Error{ResultInvalidMessage, "malformed json! - type not present" + responseData}); return; } auto schemaData = root.get("data", defaultNotFoundString); if (schemaData == defaultNotFoundString) { LOG_ERROR("malformed json! - data not present" << responseData); - promise.setFailed(ResultInvalidMessage); + promise.setFailed( + Error{ResultInvalidMessage, "malformed json! - data not present" + responseData}); return; } @@ -407,7 +412,9 @@ void HTTPLookupService::handleGetSchemaHTTPRequest(const GetSchemaPromise &promi } catch (ptree::json_parser_error &e) { LOG_ERROR("Failed to parse json of Partition Metadata: " << e.what() << "\nInput Json = " << schemaData); - promise.setFailed(ResultInvalidMessage); + promise.setFailed(Error{ResultInvalidMessage, "Failed to parse json of Partition Metadata: " + + std::string(e.what()) + + "\nInput Json = " + schemaData}); return; } const auto keyData = toJson(kvRoot.get_child("key")); diff --git a/lib/HTTPLookupService.h b/lib/HTTPLookupService.h index 61a06155..9a217fc8 100644 --- a/lib/HTTPLookupService.h +++ b/lib/HTTPLookupService.h @@ -32,7 +32,7 @@ namespace pulsar { class ServiceNameResolver; using NamespaceTopicsPromise = Promise; using NamespaceTopicsPromisePtr = std::shared_ptr; -using GetSchemaPromise = Promise; +using GetSchemaPromise = Promise; class HTTPLookupService : public LookupService, public std::enable_shared_from_this { enum RequestType : uint8_t @@ -41,7 +41,7 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t PartitionMetaData }; - typedef Promise LookupPromise; + typedef Promise LookupPromise; ExecutorServiceProviderPtr executorProvider_; ServiceNameResolver serviceNameResolver_; @@ -64,18 +64,18 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t const std::string& completeUrl); void handleGetSchemaHTTPRequest(const GetSchemaPromise& promise, const std::string& completeUrl); - Result sendHTTPRequest(const std::string& completeUrl, std::string& responseData); + Error sendHTTPRequest(const std::string& completeUrl, std::string& responseData); - Result sendHTTPRequest(const std::string& completeUrl, std::string& responseData, long& responseCode); + Error sendHTTPRequest(const std::string& completeUrl, std::string& responseData, long& responseCode); public: HTTPLookupService(const ServiceInfo& serviceInfo, const ClientConfiguration& config); LookupResultFuture getBroker(const TopicName& topicName) override; - Future getPartitionMetadataAsync(const TopicNamePtr&) override; + Future getPartitionMetadataAsync(const TopicNamePtr& topicName) override; - Future getSchema(const TopicNamePtr& topicName, const std::string& version) override; + Future getSchema(const TopicNamePtr& topicName, const std::string& version) override; Future getTopicsOfNamespaceAsync( const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override; diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc index bc8cd530..5bccc0bc 100644 --- a/lib/HandlerBase.cc +++ b/lib/HandlerBase.cc @@ -123,16 +123,15 @@ void HandlerBase::grabCnx(const optional& assignedBrokerUrl) { auto before = high_resolution_clock::now(); cnxFuture.addListener([this, self, before](Result result, const ClientConnectionPtr& cnx) { if (result == ResultOk) { - connectionOpened(cnx).addListener([this, self, before](Result result, bool) { - // Do not use bool, only Result. + connectionOpened(cnx).addListener([this, self, before](const Error& error, bool) { reconnectionPending_ = false; - if (result == ResultOk) { + if (error.result == ResultOk) { connectionTimeMs_ = duration_cast(high_resolution_clock::now() - before).count(); // Prevent the creationTimer_ from cancelling the timer_ in future cancelTimer(*creationTimer_); LOG_INFO("Finished connecting to broker after " << connectionTimeMs_ << " ms") - } else if (isResultRetryable(result)) { + } else if (isResultRetryable(error.result)) { scheduleReconnection(); } }); diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h index 229404ee..16124156 100644 --- a/lib/HandlerBase.h +++ b/lib/HandlerBase.h @@ -93,7 +93,7 @@ class HandlerBase : public std::enable_shared_from_this { * @return ResultError if there was a failure. ResultRetryable if reconnection is needed. * @return Do not use bool, only Result. */ - virtual Future connectionOpened(const ClientConnectionPtr& connection) = 0; + virtual Future connectionOpened(const ClientConnectionPtr& connection) = 0; virtual void connectionFailed(Result result) = 0; diff --git a/lib/LookupDataResult.h b/lib/LookupDataResult.h index 81e50ccd..8f6e740e 100644 --- a/lib/LookupDataResult.h +++ b/lib/LookupDataResult.h @@ -29,7 +29,7 @@ namespace pulsar { class LookupDataResult; typedef std::shared_ptr LookupDataResultPtr; -typedef Promise LookupDataResultPromise; +typedef Promise LookupDataResultPromise; typedef std::shared_ptr LookupDataResultPromisePtr; class LookupDataResult { diff --git a/lib/LookupService.h b/lib/LookupService.h index 684984fc..a18facec 100644 --- a/lib/LookupService.h +++ b/lib/LookupService.h @@ -50,6 +50,7 @@ class LookupService { << ", physical address: " << lookupResult.physicalAddress; } }; + // TODO: change it to Error using LookupResultFuture = Future; using LookupResultPromise = Promise; @@ -67,7 +68,7 @@ class LookupService { * * Gets Partition metadata */ - virtual Future getPartitionMetadataAsync(const TopicNamePtr& topicName) = 0; + virtual Future getPartitionMetadataAsync(const TopicNamePtr& topicName) = 0; /** * @param namespace - namespace-name @@ -84,8 +85,8 @@ class LookupService { * @param version the schema version byte array, if it's empty, use the latest version * @return SchemaInfo */ - virtual Future getSchema(const TopicNamePtr& topicName, - const std::string& version = "") = 0; + virtual Future getSchema(const TopicNamePtr& topicName, + const std::string& version = "") = 0; virtual ServiceNameResolver& getServiceNameResolver() = 0; diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index a5699781..5e1b5171 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -188,11 +188,11 @@ Future MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s return topicPromise->getFuture(); } client->getPartitionMetadataAsync(topicName).addListener( - [this, topicName, topicPromise](Result result, const LookupDataResultPtr& lookupDataResult) { - if (result != ResultOk) { + [this, topicName, topicPromise](const auto& error, const LookupDataResultPtr& lookupDataResult) { + if (error.result != ResultOk) { LOG_ERROR("Error Checking/Getting Partition Metadata while MultiTopics Subscribing- " - << consumerStr_ << " result: " << result) - topicPromise->setFailed(result); + << consumerStr_ << " result: " << error) + topicPromise->setFailed(error.result); return; } subscribeTopicPartitions(lookupDataResult->getPartitions(), topicName, subscriptionName_, @@ -1010,11 +1010,12 @@ void MultiTopicsConsumerImpl::topicPartitionUpdate() { return; } client->getPartitionMetadataAsync(topicName).addListener( - [this, weakSelf, topicName, currentNumPartitions](Result result, + [this, weakSelf, topicName, currentNumPartitions](const auto& error, const LookupDataResultPtr& lookupDataResult) { auto self = weakSelf.lock(); if (self) { - this->handleGetPartitions(topicName, result, lookupDataResult, currentNumPartitions); + this->handleGetPartitions(topicName, error.result, lookupDataResult, + currentNumPartitions); } }); } diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc index 1aa5c87b..586a0edf 100644 --- a/lib/PartitionedProducerImpl.cc +++ b/lib/PartitionedProducerImpl.cc @@ -143,7 +143,7 @@ void PartitionedProducerImpl::start() { } void PartitionedProducerImpl::handleSinglePartitionProducerCreated( - Result result, const ProducerImplBaseWeakPtr& producerWeakPtr, unsigned int partitionIndex) { + const Error& error, const ProducerImplBaseWeakPtr& producerWeakPtr, unsigned int partitionIndex) { // to indicate, we are doing cleanup using closeAsync after producer create // has failed and the invocation of closeAsync is not from client const auto numPartitions = getNumPartitionsWithLock(); @@ -161,9 +161,10 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated( return; } - if (result != ResultOk) { - LOG_ERROR("Unable to create Producer for partition - " << partitionIndex << " Error - " << result); - partitionedProducerCreatedPromise_.setFailed(result); + if (error.result != ResultOk) { + LOG_ERROR("Unable to create Producer for partition - " << partitionIndex << " Error - " + << error.result); + partitionedProducerCreatedPromise_.setFailed(error); state_ = Failed; if (++numProducersCreated_ == numPartitions) { closeAsync(nullptr); @@ -231,11 +232,11 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac } else { // Wrapping the callback into a lambda has overhead, so we check if the producer is ready first producer->getProducerCreatedFuture().addListener( - [msg, callback](Result result, const ProducerImplBaseWeakPtr& weakProducer) { - if (result == ResultOk) { + [msg, callback](const Error& error, const ProducerImplBaseWeakPtr& weakProducer) { + if (error.result == ResultOk) { weakProducer.lock()->sendAsync(msg, callback); } else if (callback) { - callback(result, {}); + callback(error.result, {}); } }); } @@ -251,7 +252,7 @@ void PartitionedProducerImpl::internalShutdown() { if (client) { client->cleanupProducer(this); } - partitionedProducerCreatedPromise_.setFailed(ResultAlreadyClosed); + partitionedProducerCreatedPromise_.setFailed(Error{ResultAlreadyClosed, ""}); state_ = Closed; } @@ -353,14 +354,14 @@ void PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result, // is set second time here, first time it was successful. So check // if there's any adverse effect of setting it again. It should not // be but must check. MUSTCHECK changeme - partitionedProducerCreatedPromise_.setFailed(ResultUnknownError); + partitionedProducerCreatedPromise_.setFailed(Error{ResultUnknownError, ""}); callback(result); return; } } // override -Future PartitionedProducerImpl::getProducerCreatedFuture() { +Future PartitionedProducerImpl::getProducerCreatedFuture() { return partitionedProducerCreatedPromise_.getFuture(); } @@ -436,10 +437,10 @@ void PartitionedProducerImpl::getPartitionMetadata() { return; } client->getPartitionMetadataAsync(topicName_) - .addListener([weakSelf](Result result, const LookupDataResultPtr& lookupDataResult) { + .addListener([weakSelf](const auto& error, const LookupDataResultPtr& lookupDataResult) { auto self = weakSelf.lock(); if (self) { - self->handleGetPartitions(result, lookupDataResult); + self->handleGetPartitions(error.result, lookupDataResult); } }); } diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h index 94ba7179..dc06af02 100644 --- a/lib/PartitionedProducerImpl.h +++ b/lib/PartitionedProducerImpl.h @@ -78,12 +78,12 @@ class PartitionedProducerImpl : public ProducerImplBase, void internalShutdown(); bool isClosed() override; const std::string& getTopic() const override; - Future getProducerCreatedFuture() override; + Future getProducerCreatedFuture() override; void triggerFlush() override; void flushAsync(FlushCallback callback) override; bool isConnected() const override; uint64_t getNumberOfConnectedProducer() override; - void handleSinglePartitionProducerCreated(Result result, + void handleSinglePartitionProducerCreated(const Error& error, const ProducerImplBaseWeakPtr& producerBaseWeakPtr, const unsigned int partitionIndex); void createLazyPartitionProducer(const unsigned int partitionIndex); @@ -121,7 +121,7 @@ class PartitionedProducerImpl : public ProducerImplBase, std::atomic state_{Pending}; // only set this promise to value, when producers on all partitions are created. - Promise partitionedProducerCreatedPromise_; + Promise partitionedProducerCreatedPromise_; std::unique_ptr topicMetadata_; diff --git a/lib/PendingRequest.h b/lib/PendingRequest.h index 465073f6..7014a025 100644 --- a/lib/PendingRequest.h +++ b/lib/PendingRequest.h @@ -43,7 +43,7 @@ class PendingRequest : public std::enable_shared_from_this> { return; } timeoutCallback_(); - promise_.setFailed(ResultTimeout); + promise_.setFailed(Error{ResultTimeout, ""}); }); } @@ -52,8 +52,8 @@ class PendingRequest : public std::enable_shared_from_this> { cancelTimer(timer_); } - void fail(Result result) { - promise_.setFailed(result); + void fail(const Error& error) { + promise_.setFailed(error); cancelTimer(timer_); } @@ -65,7 +65,7 @@ class PendingRequest : public std::enable_shared_from_this> { private: ASIO::steady_timer timer_; - Promise promise_; + Promise promise_; std::function timeoutCallback_; std::atomic_bool timeoutDisabled_{false}; }; diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 7632581e..873796dc 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -135,13 +135,12 @@ void ProducerImpl::beforeConnectionChange(ClientConnection& connection) { connection.removeProducer(producerId_); } -Future ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) { - // Do not use bool, only Result. - Promise promise; +Future ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) { + Promise promise; if (state_ == Closed) { LOG_DEBUG(getName() << "connectionOpened : Producer is already closed"); - promise.setFailed(ResultAlreadyClosed); + promise.setFailed(Error{ResultAlreadyClosed, ""}); return promise.getFuture(); } @@ -162,12 +161,12 @@ Future ProducerImpl::connectionOpened(const ClientConnectionPtr& c auto self = shared_from_this(); setFirstRequestIdAfterConnect(requestId); cnx->sendRequestWithId(cmd, requestId, "PRODUCER") - .addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) { - Result handleResult = handleCreateProducer(cnx, result, responseData); - if (handleResult == ResultOk) { + .addListener([this, self, cnx, promise](const Error& error, const ResponseData& responseData) { + auto handledError = handleCreateProducer(cnx, error, responseData); + if (handledError.result == ResultOk) { promise.setSuccess(); } else { - promise.setFailed(handleResult); + promise.setFailed(handledError); } }); @@ -182,22 +181,23 @@ void ProducerImpl::connectionFailed(Result result) { // if producers are lazy, then they should always try to restart // so don't change the state and allow reconnections return; - } else if (!isResultRetryable(result) && producerCreatedPromise_.setFailed(result)) { + } else if (!isResultRetryable(result) && producerCreatedPromise_.setFailed(Error{result, ""})) { state_ = Failed; } } -Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result result, - const ResponseData& responseData) { +Error ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, const Error& error, + const ResponseData& responseData) { Result handleResult = ResultOk; Lock lock(mutex_); - LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " << strResult(result)); + LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " << error); // make sure we're still in the Pending/Ready state, closeAsync could have been invoked // while waiting for this response if using lazy producers const auto state = state_.load(); + const auto result = error.result; if (state != Ready && state != Pending) { LOG_DEBUG("Producer created response received but producer already closed"); failPendingMessages(ResultAlreadyClosed, false); @@ -211,9 +211,9 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result } if (!producerCreatedPromise_.isComplete()) { lock.unlock(); - producerCreatedPromise_.setFailed(ResultAlreadyClosed); + producerCreatedPromise_.setFailed(Error{ResultAlreadyClosed, ""}); } - return ResultAlreadyClosed; + return Error{ResultAlreadyClosed, ""}; } if (result == ResultOk) { @@ -281,7 +281,7 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result client->cleanupProducer(this); } lock.unlock(); - producerCreatedPromise_.setFailed(result); + producerCreatedPromise_.setFailed(error); handleResult = result; } else if (producerCreatedPromise_.isComplete() || retryOnCreationError_) { if (result == ResultProducerBlockedQuotaExceededException) { @@ -292,24 +292,25 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result } // Producer had already been initially created, we need to retry connecting in any case - LOG_WARN(getName() << "Failed to reconnect producer: " << strResult(result)); + LOG_WARN(getName() << "Failed to reconnect producer: " << error); handleResult = ResultRetryable; } else { // Producer was not yet created, retry to connect to broker if it's possible handleResult = convertToTimeoutIfNecessary(result, creationTimestamp_); + Error convertedError{handleResult, error.message}; if (isResultRetryable(handleResult)) { - LOG_WARN(getName() << "Temporary error in creating producer: " << strResult(handleResult)); + LOG_WARN(getName() << "Temporary error in creating producer: " << convertedError); } else { - LOG_ERROR(getName() << "Failed to create producer: " << strResult(handleResult)); + LOG_ERROR(getName() << "Failed to create producer: " << error); failPendingMessages(handleResult, false); state_ = Failed; lock.unlock(); - producerCreatedPromise_.setFailed(handleResult); + producerCreatedPromise_.setFailed(convertedError); } } } - return handleResult; + return handleResult == error.result ? error : Error{handleResult, error.message}; } auto ProducerImpl::getPendingCallbacksWhenFailed() -> decltype(pendingMessagesQueue_) { @@ -825,10 +826,10 @@ void ProducerImpl::closeAsync(CloseCallback originalCallback) { int requestId = client->newRequestId(); auto self = shared_from_this(); cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId, "CLOSE_PRODUCER") - .addListener([self, callback](Result result, const ResponseData&) { callback(result); }); + .addListener([self, callback](const Error& error, const ResponseData&) { callback(error.result); }); } -Future ProducerImpl::getProducerCreatedFuture() { +Future ProducerImpl::getProducerCreatedFuture() { return producerCreatedPromise_.getFuture(); } @@ -1022,7 +1023,7 @@ void ProducerImpl::internalShutdown() { client->cleanupProducer(this); } cancelTimers(); - producerCreatedPromise_.setFailed(ResultAlreadyClosed); + producerCreatedPromise_.setFailed(Error{ResultAlreadyClosed, ""}); state_ = Closed; } diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h index 26207f80..e7502533 100644 --- a/lib/ProducerImpl.h +++ b/lib/ProducerImpl.h @@ -87,7 +87,7 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase { void internalShutdown(); bool isClosed() override; const std::string& getTopic() const override; - Future getProducerCreatedFuture() override; + Future getProducerCreatedFuture() override; void triggerFlush() override; void flushAsync(FlushCallback callback) override; bool isConnected() const override; @@ -133,15 +133,15 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase { // overrided methods from HandlerBase void beforeConnectionChange(ClientConnection& connection) override; - Future connectionOpened(const ClientConnectionPtr& connection) override; + Future connectionOpened(const ClientConnectionPtr& connection) override; void connectionFailed(Result result) override; const std::string& getName() const override { return producerStr_; } private: void printStats(); - Result handleCreateProducer(const ClientConnectionPtr& cnx, Result result, - const ResponseData& responseData); + Error handleCreateProducer(const ClientConnectionPtr& cnx, const Error& error, + const ResponseData& responseData); void resendMessages(const ClientConnectionPtr& cnx); @@ -195,7 +195,7 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase { using DurationType = TimeDuration; void asyncWaitSendTimeout(DurationType expiryTime); - Promise producerCreatedPromise_; + Promise producerCreatedPromise_; struct PendingCallbacks; decltype(pendingMessagesQueue_) getPendingCallbacksWhenFailed(); diff --git a/lib/ProducerImplBase.h b/lib/ProducerImplBase.h index 25a12c8a..72607e2a 100644 --- a/lib/ProducerImplBase.h +++ b/lib/ProducerImplBase.h @@ -44,7 +44,7 @@ class ProducerImplBase { virtual bool isClosed() = 0; virtual void shutdown() = 0; virtual const std::string& getTopic() const = 0; - virtual Future getProducerCreatedFuture() = 0; + virtual Future getProducerCreatedFuture() = 0; virtual void triggerFlush() = 0; virtual void flushAsync(FlushCallback callback) = 0; virtual bool isConnected() const = 0; diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h index 7f50cf12..3066d489 100644 --- a/lib/RetryableLookupService.h +++ b/lib/RetryableLookupService.h @@ -53,11 +53,12 @@ class RetryableLookupService : public LookupService { } LookupResultFuture getBroker(const TopicName& topicName) override { - return lookupCache_->run("get-broker-" + topicName.toString(), - [this, topicName] { return lookupService_->getBroker(topicName); }); + return toResultFuture(lookupCache_->run("get-broker-" + topicName.toString(), [this, topicName] { + return toErrorFuture(lookupService_->getBroker(topicName)); + })); } - Future getPartitionMetadataAsync(const TopicNamePtr& topicName) override { + Future getPartitionMetadataAsync(const TopicNamePtr& topicName) override { return partitionLookupCache_->run( "get-partition-metadata-" + topicName->toString(), [this, topicName] { return lookupService_->getPartitionMetadataAsync(topicName); }); @@ -65,13 +66,15 @@ class RetryableLookupService : public LookupService { Future getTopicsOfNamespaceAsync( const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override { - return namespaceLookupCache_->run( + return toResultFuture(namespaceLookupCache_->run( "get-topics-of-namespace-" + nsName->toString() + "-" + std::to_string(mode), - [this, nsName, mode] { return lookupService_->getTopicsOfNamespaceAsync(nsName, mode); }); + [this, nsName, mode] { + return toErrorFuture(lookupService_->getTopicsOfNamespaceAsync(nsName, mode)); + })); } - Future getSchema(const TopicNamePtr& topicName, const std::string& version) override { - return getSchemaCache_->run("get-schema" + topicName->toString(), [this, topicName, version] { + Future getSchema(const TopicNamePtr& topicName, const std::string& version) override { + return getSchemaCache_->run(getSchemaCacheKey(topicName, version), [this, topicName, version] { return lookupService_->getSchema(topicName, version); }); } @@ -81,6 +84,36 @@ class RetryableLookupService : public LookupService { } private: + template + static Future toErrorFuture(Future future) { + Promise promise; + future.addListener([promise](Result result, const T& value) { + if (result == ResultOk) { + promise.setValue(value); + } else { + promise.setFailed({result, ""}); + } + }); + return promise.getFuture(); + } + + template + static Future toResultFuture(Future future) { + Promise promise; + future.addListener([promise](const Error& error, const T& value) { + if (error.result == ResultOk) { + promise.setValue(value); + } else { + promise.setFailed(error.result); + } + }); + return promise.getFuture(); + } + + static std::string getSchemaCacheKey(const TopicNamePtr& topicName, const std::string& version) { + return "get-schema-" + topicName->toString() + "-" + version; + } + const std::shared_ptr lookupService_; RetryableOperationCachePtr lookupCache_; RetryableOperationCachePtr partitionLookupCache_; diff --git a/lib/RetryableOperation.h b/lib/RetryableOperation.h index f4b056ed..28263908 100644 --- a/lib/RetryableOperation.h +++ b/lib/RetryableOperation.h @@ -41,7 +41,7 @@ class RetryableOperation : public std::enable_shared_from_this()>&& func, + RetryableOperation(const std::string& name, std::function()>&& func, TimeDuration timeout, DeadlineTimerPtr timer) : name_(name), func_(std::move(func)), @@ -58,7 +58,7 @@ class RetryableOperation : public std::enable_shared_from_this>(PassKey{}, std::forward(args)...); } - Future run() { + Future run() { bool expected = false; if (!started_.compare_exchange_strong(expected, true)) { return promise_.getFuture(); @@ -67,16 +67,16 @@ class RetryableOperation : public std::enable_shared_from_this()> func_; + std::function()> func_; const TimeDuration timeout_; Backoff backoff_; - Promise promise_; + Promise promise_; std::atomic_bool started_{false}; DeadlineTimerPtr timer_; @@ -84,24 +84,24 @@ class RetryableOperation : public std::enable_shared_from_this + Future runImpl(TimeDuration remainingTime) { std::weak_ptr> weakSelf{this->shared_from_this()}; - func_().addListener([this, weakSelf, remainingTime](Result result, const T& value) { + func_().addListener([this, weakSelf, remainingTime](const Error& error, const T& value) { auto self = weakSelf.lock(); if (!self) { return; } - if (result == ResultOk) { + if (error.result == ResultOk) { promise_.setValue(value); return; } - if (!isResultRetryable(result)) { - promise_.setFailed(result); + if (!isResultRetryable(error.result)) { + promise_.setFailed(error); return; } if (toMillis(remainingTime) <= 0) { - promise_.setFailed(ResultTimeout); + promise_.setFailed({ResultTimeout, ""}); return; } @@ -119,7 +119,7 @@ class RetryableOperation : public std::enable_shared_from_this(PassKey{}, std::forward(args)...); } - Future run(const std::string& key, std::function()>&& func) { + Future run(const std::string& key, std::function()>&& func) { std::unique_lock lock{mutex_}; if (closed_) { - Promise promise; - promise.setFailed(ResultAlreadyClosed); + Promise promise; + promise.setFailed({ResultAlreadyClosed, ""}); return promise.getFuture(); } auto it = operations_.find(key); @@ -70,8 +70,8 @@ class RetryableOperationCache : public std::enable_shared_from_thisget()->createDeadlineTimer(); } catch (const std::runtime_error& e) { LOG_ERROR("Failed to retry lookup for " << key << ": " << e.what()); - Promise promise; - promise.setFailed(ResultConnectError); + Promise promise; + promise.setFailed({ResultConnectError, e.what()}); return promise.getFuture(); } @@ -81,7 +81,7 @@ class RetryableOperationCache : public std::enable_shared_from_thisweak_from_this(); - future.addListener([this, weakSelf, key, operation](Result, const T&) { + future.addListener([this, weakSelf, key, operation](const Error&, const T&) { auto self = weakSelf.lock(); if (!self) { return; diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index 63ac9d16..e83db4e2 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -303,11 +303,11 @@ TEST(ClientTest, testTimedOutPendingRequestsAreErasedFromConnectionMaps) { "persistent://public/default/testTimedOutPendingRequests-" + suffix, "", requestIdGenerator++); ResponseData responseData; - ASSERT_EQ(ResultTimeout, pingFuture.get(responseData)); + ASSERT_EQ(ResultTimeout, pingFuture.get(responseData).result); ASSERT_EQ(0u, PulsarFriend::getPendingRequests(*connection)); LookupDataResultPtr lookupData; - ASSERT_EQ(ResultTimeout, lookupFuture.get(lookupData)); + ASSERT_EQ(ResultTimeout, lookupFuture.get(lookupData).result); ASSERT_EQ(0u, PulsarFriend::getPendingLookupRequests(*connection)); ASSERT_EQ(0u, PulsarFriend::getNumOfPendingLookupRequests(*connection)); @@ -320,11 +320,11 @@ TEST(ClientTest, testTimedOutPendingRequestsAreErasedFromConnectionMaps) { ASSERT_EQ(0u, PulsarFriend::getPendingGetTopicsOfNamespaceRequests(*connection)); SchemaInfo schemaInfo; - ASSERT_EQ(ResultTimeout, getSchemaFuture.get(schemaInfo)); + ASSERT_EQ(ResultTimeout, getSchemaFuture.get(schemaInfo).result); ASSERT_EQ(0u, PulsarFriend::getPendingGetSchemaRequests(*connection)); mockServer->close(); - connection->close(ResultDisconnected).wait(); + connection->close(Error{ResultDisconnected, ""}).wait(); executorProvider->close(); } diff --git a/tests/ConnectionTest.cc b/tests/ConnectionTest.cc index e0d063e9..dd4641ba 100644 --- a/tests/ConnectionTest.cc +++ b/tests/ConnectionTest.cc @@ -24,7 +24,7 @@ using namespace pulsar; class MockClientConnection { public: - MOCK_METHOD(void, close, (Result)); + MOCK_METHOD(void, close, (Error)); void checkServerError(ServerError error, const std::string& message) { ::pulsar::adaptor::checkServerError(*this, error, message); @@ -41,7 +41,7 @@ static const std::vector retryableErrorMessages{ TEST(ConnectionTest, testCheckServerError) { MockClientConnection conn; - EXPECT_CALL(conn, close(ResultDisconnected)).Times(0); + EXPECT_CALL(conn, close(::testing::_)).Times(0); for (auto&& msg : retryableErrorMessages) { conn.checkServerError(pulsar::proto::ServiceNotReady, msg); } diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc index 53cb76e1..e98d972e 100644 --- a/tests/LookupServiceTest.cc +++ b/tests/LookupServiceTest.cc @@ -91,7 +91,7 @@ TEST(LookupServiceTest, basicLookup) { TopicNamePtr topicName = TopicName::get("topic"); - Future partitionFuture = lookupService.getPartitionMetadataAsync(topicName); + Future partitionFuture = lookupService.getPartitionMetadataAsync(topicName); LookupDataResultPtr lookupData; partitionFuture.get(lookupData); ASSERT_TRUE(lookupData != NULL); @@ -129,9 +129,9 @@ static void testMultiAddresses(LookupService& lookupService) { results.clear(); for (int i = 0; i < numRequests; i++) { LookupDataResultPtr data; - const auto result = lookupService.getPartitionMetadataAsync(TopicName::get("topic")).get(data); - LOG_INFO("getPartitionMetadataAsync [" << i << "] " << result); - results.emplace_back(result); + const auto error = lookupService.getPartitionMetadataAsync(TopicName::get("topic")).get(data); + LOG_INFO("getPartitionMetadataAsync [" << i << "] " << error.result); + results.emplace_back(error.result); } verifySuccessCount(); @@ -186,7 +186,7 @@ TEST(LookupServiceTest, testRetry) { PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0); auto future2 = lookupService->getPartitionMetadataAsync(topicNamePtr); LookupDataResultPtr lookupDataResultPtr; - ASSERT_EQ(ResultOk, future2.get(lookupDataResultPtr)); + ASSERT_EQ(ResultOk, future2.get(lookupDataResultPtr).result); LOG_INFO("getPartitionMetadataAsync returns " << lookupDataResultPtr->getPartitions() << " partitions"); PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0); @@ -234,7 +234,7 @@ TEST(LookupServiceTest, testTimeout) { beforeMethod(); auto future2 = lookupService->getPartitionMetadataAsync(topicNamePtr); LookupDataResultPtr lookupDataResultPtr; - ASSERT_EQ(ResultTimeout, future2.get(lookupDataResultPtr)); + ASSERT_EQ(ResultTimeout, future2.get(lookupDataResultPtr).result); afterMethod("getPartitionMetadataAsync"); beforeMethod(); @@ -307,7 +307,7 @@ TEST_P(LookupServiceTest, testGetSchema) { SchemaInfo schemaInfo; auto future = PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic)); - ASSERT_EQ(ResultOk, future.get(schemaInfo)); + ASSERT_EQ(ResultOk, future.get(schemaInfo).result); ASSERT_EQ(jsonSchema, schemaInfo.getSchema()); ASSERT_EQ(SchemaType::JSON, schemaInfo.getSchemaType()); ASSERT_EQ(properties, schemaInfo.getProperties()); @@ -322,7 +322,7 @@ TEST_P(LookupServiceTest, testGetSchemaNotFound) { SchemaInfo schemaInfo; auto future = PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic)); - ASSERT_EQ(ResultTopicNotFound, future.get(schemaInfo)); + ASSERT_EQ(ResultTopicNotFound, future.get(schemaInfo).result); } TEST_P(LookupServiceTest, testGetKeyValueSchema) { @@ -344,7 +344,7 @@ TEST_P(LookupServiceTest, testGetKeyValueSchema) { SchemaInfo schemaInfo; auto future = PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic)); - ASSERT_EQ(ResultOk, future.get(schemaInfo)); + ASSERT_EQ(ResultOk, future.get(schemaInfo).result); ASSERT_EQ(keyValueSchema.getSchema(), schemaInfo.getSchema()); ASSERT_EQ(SchemaType::KEY_VALUE, schemaInfo.getSchemaType()); ASSERT_FALSE(schemaInfo.getProperties().empty()); @@ -510,13 +510,13 @@ class MockLookupService : public BinaryProtoLookupService { public: using BinaryProtoLookupService::BinaryProtoLookupService; - Future getPartitionMetadataAsync(const TopicNamePtr& topicName) override { + Future getPartitionMetadataAsync(const TopicNamePtr& topicName) override { bool expected = true; if (firstTime_.compare_exchange_strong(expected, false)) { // Trigger the retry LOG_INFO("Fail the lookup for " << topicName->toString() << " intentionally"); - Promise promise; - promise.setFailed(ResultRetryable); + Promise promise; + promise.setFailed(Error{ResultRetryable, ""}); return promise.getFuture(); } return BinaryProtoLookupService::getPartitionMetadataAsync(topicName); @@ -564,7 +564,7 @@ TEST(LookupServiceTest, testRetryAfterDestroyed) { lookupService->close(); Result result = ResultOk; lookupService->getPartitionMetadataAsync(TopicName::get("lookup-service-test-retry-after-destroyed")) - .addListener([&result](Result innerResult, const LookupDataResultPtr&) { result = innerResult; }); + .addListener([&result](const auto& error, const LookupDataResultPtr&) { result = error.result; }); EXPECT_EQ(ResultAlreadyClosed, result); pool.close(); executorProvider->close(); diff --git a/tests/MockClientImpl.h b/tests/MockClientImpl.h index 074808fc..7fa902bc 100644 --- a/tests/MockClientImpl.h +++ b/tests/MockClientImpl.h @@ -24,6 +24,7 @@ #include #include #include +#include #include "lib/ClientImpl.h" @@ -46,9 +47,13 @@ class MockClientImpl : public ClientImpl { using namespace std::chrono; auto start = high_resolution_clock::now(); auto promise = createPromise(); - createProducerAsync(topic, {}, [&start, promise](Result result, Producer) { + createProducerAsync(topic, {}, [&start, promise](const auto& v) { auto timeMs = duration_cast(high_resolution_clock::now() - start).count(); - promise->set_value({result, timeMs}); + if (const auto* error = std::get_if(&v)) { + promise->set_value({error->result, timeMs}); + } else { + promise->set_value({ResultOk, timeMs}); + } }); return wait(promise); } diff --git a/tests/MultiTopicsConsumerTest.cc b/tests/MultiTopicsConsumerTest.cc index 57407fbd..07e0c503 100644 --- a/tests/MultiTopicsConsumerTest.cc +++ b/tests/MultiTopicsConsumerTest.cc @@ -167,7 +167,7 @@ TEST(MultiTopicsConsumerTest, testGetConsumerStatsFail) { return PulsarFriend::getPendingConsumerStatsRequests(*connection) == expectedRequests; })); - connection->close(ResultDisconnected); + connection->close(Error{ResultDisconnected, ""}); ASSERT_EQ(ResultDisconnected, future.get()); mockServer->close(); diff --git a/tests/RetryableOperationCacheTest.cc b/tests/RetryableOperationCacheTest.cc index 2daaf3f3..a1bb2856 100644 --- a/tests/RetryableOperationCacheTest.cc +++ b/tests/RetryableOperationCacheTest.cc @@ -27,13 +27,13 @@ namespace pulsar { -using IntFuture = Future; +using IntFuture = Future; static int wait(IntFuture future) { int value; - auto result = future.get(value); - if (result != ResultOk) { - throw std::runtime_error(strResult(result)); + auto error = future.get(value); + if (error.result != ResultOk) { + throw std::runtime_error(strResult(error.result)); } return value; } @@ -50,9 +50,9 @@ class CountdownFunc { : result_(rhs.result_), totalRetryCount_(rhs.totalRetryCount_), current_(rhs.current_.load()) {} IntFuture operator()() { - Promise promise; + Promise promise; if (++current_ < totalRetryCount_) { - promise.setFailed(ResultRetryable); + promise.setFailed({ResultRetryable, ""}); } else { promise.setValue(result_); } @@ -141,7 +141,7 @@ TEST_F(RetryableOperationCacheTest, testClose) { for (auto&& future : futures_) { int value; // All cancelled futures complete with ResultDisconnected and the default int value - ASSERT_EQ(ResultDisconnected, future.get(value)); + ASSERT_EQ(ResultDisconnected, future.get(value).result); ASSERT_EQ(value, 0); } ASSERT_EQ(getSize(*cache), 0); diff --git a/tests/SchemaTest.cc b/tests/SchemaTest.cc index 1fad081c..8b175144 100644 --- a/tests/SchemaTest.cc +++ b/tests/SchemaTest.cc @@ -172,10 +172,18 @@ TEST(SchemaTest, testAutoDownloadSchema) { auto clientImplPtr = PulsarFriend::getClientImplPtr(client); - Promise promise; - clientImplPtr->createProducerAsync(topic, producerConfiguration, WaitForCallbackValue(promise), - true); - ASSERT_EQ(ResultOk, promise.getFuture().get(producer)); + Promise promise; + clientImplPtr->createProducerAsync( + topic, producerConfiguration, + [promise](const auto& v) { + if (const auto* error = std::get_if(&v)) { + promise.setFailed(*error); + } else { + promise.setValue(std::get(v)); + } + }, + true); + ASSERT_EQ(ResultOk, promise.getFuture().get(producer).result); Message msg = MessageBuilder().setContent("content").build(); ASSERT_EQ(ResultOk, producer.send(msg)); From c55cde4c2ee6f142bc53b2041fa2f458811f7b79 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 22 May 2026 21:39:22 +0800 Subject: [PATCH 2/2] fix style --- lib/HTTPLookupService.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/HTTPLookupService.cc b/lib/HTTPLookupService.cc index 21f89281..08352eb7 100644 --- a/lib/HTTPLookupService.cc +++ b/lib/HTTPLookupService.cc @@ -191,7 +191,7 @@ Error HTTPLookupService::sendHTTPRequest(const std::string &completeUrl, std::st } Error HTTPLookupService::sendHTTPRequest(const std::string &completeUrl, std::string &responseData, - long &responseCode) { + long &responseCode) { AuthenticationDataPtr authDataContent; Result authResult = authenticationPtr_->getAuthData(authDataContent); if (authResult != ResultOk) {