Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions docs/api/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ For the sake of clarity, in this document we have grouped API endpoints by servi
| [Remote read](#remote-read) | Querier, Query-frontend || `POST <prometheus-http-prefix>/api/v1/read` |
| [Build information](#build-information) | Querier, Query-frontend |v1.15.0| `GET <prometheus-http-prefix>/api/v1/status/buildinfo` |
| [Get tenant ingestion stats](#get-tenant-ingestion-stats) | Querier || `GET /api/v1/user_stats` |
| [Get tenant TSDB status](#get-tenant-tsdb-status) | Querier || `GET /api/v1/status/tsdb` |
| [Ruler ring status](#ruler-ring-status) | Ruler || `GET /ruler/ring` |
| [Ruler rules ](#ruler-rule-groups) | Ruler || `GET /ruler/rule_groups` |
| [List rules](#list-rules) | Ruler || `GET <prometheus-http-prefix>/api/v1/rules` |
Expand Down Expand Up @@ -505,6 +506,67 @@ Returns realtime ingestion rate, for the authenticated tenant, in `JSON` format.

_Requires [authentication](#authentication)._

### Get tenant TSDB status

```
GET /api/v1/status/tsdb

# Legacy
GET <legacy-http-prefix>/api/v1/status/tsdb
```

Returns TSDB cardinality statistics for the authenticated tenant's in-memory (head) data, in `JSON` format. This is useful for understanding which metrics, labels, and label-value pairs contribute the most series and for debugging high-cardinality issues.

The endpoint accepts an optional `limit` query parameter (default `10`) that controls how many entries are returned in each top-N list.

_Requires [authentication](#authentication)._

#### Example request

```
GET /api/v1/status/tsdb?limit=5
```

#### Example response

```json
{
"numSeries": 1234,
"minTime": 1709640000000,
"maxTime": 1709726400000,
"numLabelPairs": 42,
"seriesCountByMetricName": [
{ "name": "http_requests_total", "value": 500 },
{ "name": "process_cpu_seconds_total", "value": 200 }
],
"labelValueCountByLabelName": [
{ "name": "instance", "value": 150 },
{ "name": "job", "value": 10 }
],
"memoryInBytesByLabelName": [
{ "name": "instance", "value": 32000 },
{ "name": "job", "value": 4800 }
],
"seriesCountByLabelValuePair": [
{ "name": "job=cortex", "value": 800 },
{ "name": "job=prometheus", "value": 300 }
]
}
```

#### Response fields

| Field | Description |
|-------|-------------|
| `numSeries` | Total number of active series for the tenant. |
| `minTime` | Minimum timestamp (ms) across all samples in the TSDB head. |
| `maxTime` | Maximum timestamp (ms) across all samples in the TSDB head. |
| `numLabelPairs` | Total number of distinct label name-value pairs. |
| `seriesCountByMetricName` | Top metrics ranked by number of series (descending). |
| `labelValueCountByLabelName` | Top label names ranked by number of unique values (descending). |
| `memoryInBytesByLabelName` | Top label names ranked by estimated memory usage in bytes (descending). |
| `seriesCountByLabelValuePair` | Top label name=value pairs ranked by number of series (descending). |

## Ruler

The ruler API endpoints require to configure a backend object storage to store the recording rules and alerts. The ruler API uses the concept of a "namespace" when creating rule groups. This is a stand in for the name of the rule file in Prometheus and rule groups must be named uniquely within a namespace.
Expand Down
65 changes: 65 additions & 0 deletions integration/api_endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/runutil"
Expand Down Expand Up @@ -134,3 +135,67 @@ func Test_AllUserStats_WhenIngesterRollingUpdate(t *testing.T) {
require.Len(t, userStats, 1)
require.Equal(t, uint64(2), userStats[0].QueriedIngesters)
}

func TestTSDBStatus(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

flags := BlocksStorageFlags()
flags["-distributor.replication-factor"] = "1"

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Start Cortex in single binary mode.
cortex := e2ecortex.NewSingleBinary("cortex-1", flags, "")
require.NoError(t, s.StartAndWaitReady(cortex))

// Wait until the ingester ring is active.
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

client, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "test-tenant")
require.NoError(t, err)

now := time.Now()

// Push multiple series to create interesting cardinality:
// - http_requests_total with 3 label combinations
// - process_cpu_seconds_total with 1 label combination
series1, _ := generateSeries("http_requests_total", now, prompb.Label{Name: "method", Value: "GET"}, prompb.Label{Name: "status", Value: "200"})
series2, _ := generateSeries("http_requests_total", now, prompb.Label{Name: "method", Value: "POST"}, prompb.Label{Name: "status", Value: "200"})
series3, _ := generateSeries("http_requests_total", now, prompb.Label{Name: "method", Value: "GET"}, prompb.Label{Name: "status", Value: "500"})
series4, _ := generateSeries("process_cpu_seconds_total", now, prompb.Label{Name: "instance", Value: "a"})

allSeries := append(series1, series2...)
allSeries = append(allSeries, series3...)
allSeries = append(allSeries, series4...)

res, err := client.Push(allSeries)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// Query TSDB status with default limit.
status, err := client.TSDBStatus(10)
require.NoError(t, err)

assert.Equal(t, uint64(4), status.NumSeries)
require.GreaterOrEqual(t, len(status.SeriesCountByMetricName), 2)
assert.Equal(t, "http_requests_total", status.SeriesCountByMetricName[0].Name)
assert.Equal(t, uint64(3), status.SeriesCountByMetricName[0].Value)
assert.Equal(t, "process_cpu_seconds_total", status.SeriesCountByMetricName[1].Name)
assert.Equal(t, uint64(1), status.SeriesCountByMetricName[1].Value)
assert.NotEmpty(t, status.LabelValueCountByLabelName)
assert.Greater(t, status.MinTime, int64(0))
assert.Greater(t, status.MaxTime, int64(0))

// Query TSDB status with limit=1 to verify truncation.
status, err = client.TSDBStatus(1)
require.NoError(t, err)
assert.Len(t, status.SeriesCountByMetricName, 1)
assert.Equal(t, "http_requests_total", status.SeriesCountByMetricName[0].Name)
}
35 changes: 35 additions & 0 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"

"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/ingester"
"github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/util/backoff"
Expand Down Expand Up @@ -164,6 +165,40 @@ func (c *Client) AllUserStats() ([]ingester.UserIDStats, error) {
return userStats, nil
}

func (c *Client) TSDBStatus(limit int) (*distributor.TSDBStatusResult, error) {
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s/api/v1/status/tsdb?limit=%d", c.distributorAddress, limit), nil)
if err != nil {
return nil, err
}
req.Header.Set("Accept", "application/json")
req.Header.Set("X-Scope-OrgID", c.orgID)

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

res, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
body, _ := io.ReadAll(res.Body)
return nil, fmt.Errorf("unexpected status code %d: %s", res.StatusCode, string(body))
}

body, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}

var result distributor.TSDBStatusResult
if err := json.Unmarshal(body, &result); err != nil {
return nil, err
}

return &result, nil
}

// Push the input timeseries to the remote endpoint
func (c *Client) Push(timeseries []prompb.TimeSeries, metadata ...prompb.MetricMetadata) (*http.Response, error) {
// Create write request
Expand Down
3 changes: 3 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ func (a *API) RegisterCompactor(c *compactor.Compactor) {
type Distributor interface {
querier.Distributor
UserStatsHandler(w http.ResponseWriter, r *http.Request)
TSDBStatusHandler(w http.ResponseWriter, r *http.Request)
}

// RegisterQueryable registers the default routes associated with the querier
Expand All @@ -438,8 +439,10 @@ func (a *API) RegisterQueryable(
) {
// these routes are always registered to the default server
a.RegisterRoute("/api/v1/user_stats", http.HandlerFunc(distributor.UserStatsHandler), true, "GET")
a.RegisterRoute("/api/v1/status/tsdb", http.HandlerFunc(distributor.TSDBStatusHandler), true, "GET")

a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/user_stats"), http.HandlerFunc(distributor.UserStatsHandler), true, "GET")
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/api/v1/status/tsdb"), http.HandlerFunc(distributor.TSDBStatusHandler), true, "GET")
}

// RegisterQueryAPI registers the Prometheus API routes with the provided handler.
Expand Down
116 changes: 116 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"io"
"math"
"net/http"
"slices"
"sort"
Expand Down Expand Up @@ -1676,6 +1677,121 @@ func (d *Distributor) UserStats(ctx context.Context) (*ingester.UserStats, error
return totalStats, nil
}

// TSDBStatusResult holds the result of a TSDB status query across ingesters.
type TSDBStatusResult struct {
NumSeries uint64 `json:"numSeries"`
MinTime int64 `json:"minTime"`
MaxTime int64 `json:"maxTime"`
NumLabelPairs int32 `json:"numLabelPairs"`
SeriesCountByMetricName []TSDBStatResult `json:"seriesCountByMetricName"`
LabelValueCountByLabelName []TSDBStatResult `json:"labelValueCountByLabelName"`
MemoryInBytesByLabelName []TSDBStatResult `json:"memoryInBytesByLabelName"`
SeriesCountByLabelValuePair []TSDBStatResult `json:"seriesCountByLabelValuePair"`
}

// TSDBStatResult is a single name/value cardinality stat.
type TSDBStatResult struct {
Name string `json:"name"`
Value uint64 `json:"value"`
}

// TSDBStatus returns TSDB cardinality statistics for the current tenant, aggregated
// across all ingesters.
func (d *Distributor) TSDBStatus(ctx context.Context, limit int32) (*TSDBStatusResult, error) {
replicationSet, err := d.GetIngestersForMetadata(ctx)
if err != nil {
return nil, err
}

// Require all ingesters to respond.
replicationSet.MaxErrors = 0

req := &ingester_client.TSDBStatusRequest{Limit: limit}
resps, err := d.ForReplicationSet(ctx, replicationSet, false, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return client.TSDBStatus(ctx, req)
})
if err != nil {
return nil, err
}

// Merge responses from all ingesters.
seriesByMetric := map[string]uint64{}
labelValueCount := map[string]uint64{}
memoryByLabel := map[string]uint64{}
seriesByLabelPair := map[string]uint64{}

var totalSeries uint64
var numLabelPairs int32
minTime := int64(math.MaxInt64)
maxTime := int64(math.MinInt64)

for _, resp := range resps {
r := resp.(*ingester_client.TSDBStatusResponse)
totalSeries += r.NumSeries
if r.MinTime < minTime {
minTime = r.MinTime
}
if r.MaxTime > maxTime {
maxTime = r.MaxTime
}
if r.NumLabelPairs > numLabelPairs {
numLabelPairs = r.NumLabelPairs
}
for _, item := range r.SeriesCountByMetricName {
seriesByMetric[item.Name] += item.Value
}
for _, item := range r.LabelValueCountByLabelName {
if item.Value > labelValueCount[item.Name] {
labelValueCount[item.Name] = item.Value
}
}
for _, item := range r.MemoryInBytesByLabelName {
memoryByLabel[item.Name] += item.Value
}
for _, item := range r.SeriesCountByLabelValuePair {
seriesByLabelPair[item.Name] += item.Value
}
}

rf := uint64(d.ingestersRing.ReplicationFactor())

result := &TSDBStatusResult{
NumSeries: totalSeries / rf,
MinTime: minTime,
MaxTime: maxTime,
NumLabelPairs: numLabelPairs,
SeriesCountByMetricName: topNStats(seriesByMetric, int(limit), rf),
LabelValueCountByLabelName: topNStats(labelValueCount, int(limit), 1), // don't divide unique value counts
MemoryInBytesByLabelName: topNStats(memoryByLabel, int(limit), rf),
SeriesCountByLabelValuePair: topNStats(seriesByLabelPair, int(limit), rf),
}

// If no ingesters responded with valid times, zero them out.
if minTime == math.MaxInt64 {
result.MinTime = 0
}
if maxTime == math.MinInt64 {
result.MaxTime = 0
}

return result, nil
}

// topNStats sorts a name→count map by count descending, divides by rf, and returns the top n items.
func topNStats(m map[string]uint64, n int, rf uint64) []TSDBStatResult {
items := make([]TSDBStatResult, 0, len(m))
for name, value := range m {
items = append(items, TSDBStatResult{Name: name, Value: value / rf})
}
sort.Slice(items, func(i, j int) bool {
return items[i].Value > items[j].Value
})
if len(items) > n {
items = items[:n]
}
return items
}

// AllUserStats returns statistics about all users.
// Note it does not divide by the ReplicationFactor like UserStats()
func (d *Distributor) AllUserStats(ctx context.Context) ([]ingester.UserIDStats, int, error) {
Expand Down
Loading
Loading