Skip to content

Fix mqtt client not have the consumer metric#2056

Open
Technoboy- wants to merge 1 commit intomasterfrom
fix-pulsar-stats
Open

Fix mqtt client not have the consumer metric#2056
Technoboy- wants to merge 1 commit intomasterfrom
fix-pulsar-stats

Conversation

@Technoboy-
Copy link
Copy Markdown
Contributor

@Technoboy- Technoboy- commented Apr 22, 2026

Motivation

Message consumption is not reflected in the subscription and consumer stats when using a MQTT client.

Scenario: MQTT consumer subscribes to topic and starts consuming messages.

Expected: msgOutCounter, bytesOutCounter etc. under the MQTT subscription (and consumer) increase as messages are consumed.

Besides the message consumption stats, it would be great to have the clientVersion at least hard-coded to mqtt, similar to how the Kafka adapter does, as well as passing the appId.

Here is the example:

Subscription: 99fde046-a922-4a20-9a5a-49117214a0b6_deployed-clients
    "99fde046-a922-4a20-9a5a-49117214a0b6_deployed-clients-consumer-QA" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesOutCounter" : 0,
      "msgOutCounter" : 0,
      "msgRateRedeliver" : 0.0,
      "messageAckRate" : 0.0,
      "chunkedMessageRate" : 0.0,
      "msgBacklog" : 1837,
      "backlogSize" : 214929,
      "earliestMsgPublishTimeInBacklog" : 0,
      "msgBacklogNoDelayed" : 1837,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "msgInReplay" : 0,
      "unackedMessages" : 0,
      "type" : "None",
      "msgRateExpired" : 2.500064182522716,
      "totalMsgExpired" : 366939,
      "lastExpireTimestamp" : 1776747927393,
      "lastConsumedFlowTimestamp" : 0,
      "lastConsumedTimestamp" : 0,
      "lastAckedTimestamp" : 0,
      "lastMarkDeleteAdvancedTimestamp" : 1776747927394,
      "consumers" : [ ],
      "isDurable" : true,
      "isReplicated" : false,
      "allowOutOfOrderDelivery" : false,
      "consumersAfterMarkDeletePosition" : { },
      "drainingHashesCount" : 0,
      "drainingHashesClearedTotal" : 0,
      "drainingHashesUnackedMessages" : 0,
      "nonContiguousDeletedMessagesRanges" : 0,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
      "delayedMessageIndexSizeInBytes" : 0,
      "subscriptionProperties" : { },
      "filterProcessedMsgCount" : 0,
      "filterAcceptedMsgCount" : 0,
      "filterRejectedMsgCount" : 0,
      "filterRescheduledMsgCount" : 0,
      "dispatchThrottledMsgEventsBySubscriptionLimit" : 0,
      "dispatchThrottledBytesEventsBySubscriptionLimit" : 0,
      "dispatchThrottledMsgEventsByTopicLimit" : 0,
      "dispatchThrottledBytesEventsByTopicLimit" : 0,
      "dispatchThrottledMsgEventsByBrokerLimit" : 0,
      "dispatchThrottledBytesEventsByBrokerLimit" : 0,
      "replicated" : false,
      "durable" : true
    }

Documentation

Check the box below.

Need to update docs?

  • doc-required

    (If you need help on updating docs, create a doc issue)

  • no-need-doc

    (Please explain why)

  • doc

    (If this PR contains doc changes)

@github-actions
Copy link
Copy Markdown

@Technoboy-:Thanks for your contribution. For this PR, do we need to update docs?
(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant