Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* Metrics: Renamed `cortex_parquet_queryable_cache_*` to `cortex_parquet_cache_*`.
* Flags: Renamed `-querier.parquet-queryable-shard-cache-size` to `-querier.parquet-shard-cache-size` and `-querier.parquet-queryable-shard-cache-ttl` to `-querier.parquet-shard-cache-ttl`.
* Config: Renamed `parquet_queryable_shard_cache_size` to `parquet_shard_cache_size` and `parquet_queryable_shard_cache_ttl` to `parquet_shard_cache_ttl`.
* [FEATURE] HATracker: Add experimental support for `memberlist` and `multi` as a KV store backend. #7284
* [FEATURE] StoreGateway: Introduces a new parquet mode. #7046
* [FEATURE] StoreGateway: Add a parquet shard cache to parquet mode. #7166
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ The next three options only apply when the querier is used together with the Que

### Ring/HA Tracker Store

The KVStore client is used by both the Ring and HA Tracker (HA Tracker doesn't support memberlist as KV store).
The KVStore client is used by both the Ring and HA Tracker (HA Tracker supports memberlist as a KV store as an experimental feature).
- `{ring,distributor.ha-tracker}.prefix`
The prefix for the keys in the store. Should end with a /. For example with a prefix of foo/, the key bar would be stored under foo/bar.
- `{ring,distributor.ha-tracker}.store`
Expand Down
5 changes: 2 additions & 3 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3103,9 +3103,8 @@ ha_tracker:
# CLI flag: -distributor.ha-tracker.enable-startup-sync
[enable_startup_sync: <boolean> | default = false]
# Backend storage to use for the ring. Please be aware that memberlist is not
# supported by the HA tracker since gossip propagation is too slow for HA
# purposes.
# Backend storage to use for the ring. Memberlist support in the HA tracker is
# experimental, as gossip propagation delays may impact HA performance.
kvstore:
# Backend storage to use for the ring. Supported values are: consul,
# dynamodb, etcd, inmemory, memberlist, multi.
Expand Down
378 changes: 378 additions & 0 deletions integration/integration_memberlist_single_binary_test.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cortexproject/cortex/pkg/flusher"
"github.com/cortexproject/cortex/pkg/frontend"
"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/ha"
"github.com/cortexproject/cortex/pkg/ingester"
"github.com/cortexproject/cortex/pkg/overrides"
"github.com/cortexproject/cortex/pkg/parquetconverter"
Expand Down Expand Up @@ -821,6 +822,7 @@ func (t *Cortex) initMemberlistKV() (services.Service, error) {
t.Cfg.MemberlistKV.MetricsRegisterer = reg
t.Cfg.MemberlistKV.Codecs = []codec.Codec{
ring.GetCodec(),
ha.GetReplicaDescCodec(),
}
dnsProviderReg := prometheus.WrapRegistererWithPrefix(
"cortex_",
Expand All @@ -835,6 +837,7 @@ func (t *Cortex) initMemberlistKV() (services.Service, error) {

// Update the config.
t.Cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Distributor.HATrackerConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.StoreGateway.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Compactor.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
Expand Down
100 changes: 94 additions & 6 deletions pkg/ha/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
)
Expand Down Expand Up @@ -72,14 +73,100 @@ type HATrackerConfig struct {
// of tracked keys is large.
EnableStartupSync bool `yaml:"enable_startup_sync"`

KVStore kv.Config `yaml:"kvstore" doc:"description=Backend storage to use for the ring. Please be aware that memberlist is not supported by the HA tracker since gossip propagation is too slow for HA purposes."`
KVStore kv.Config `yaml:"kvstore" doc:"description=Backend storage to use for the ring. Memberlist support in the HA tracker is experimental, as gossip propagation delays may impact HA performance."`
}

// RegisterFlags adds the flags required to config this to the given FlagSet with a specified prefix
func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", "", f)
}

func (d *ReplicaDesc) Clone() any {
return proto.Clone(d)
}

// Merge merges other ReplicaDesc into this one and can be sent out to other clients.
func (d *ReplicaDesc) Merge(mergeable memberlist.Mergeable, _ bool) (memberlist.Mergeable, error) {
if mergeable == nil {
return nil, nil
}

other, ok := mergeable.(*ReplicaDesc)
if !ok {
return nil, fmt.Errorf("expected *ha.ReplicaDesc, got %T", mergeable)
}

if other == nil {
return nil, nil
}

getLatestTime := func(desc *ReplicaDesc) int64 {
if desc.DeletedAt > desc.ReceivedAt {
return desc.DeletedAt
}
return desc.ReceivedAt
}

curLatest := getLatestTime(d)
otherLatest := getLatestTime(other)

if otherLatest > curLatest {
// If other is more recent, take it.
return d.apply(other), nil
}

if otherLatest < curLatest {
// If the current is more recent, ignore the incoming data.
return nil, nil
}

// If timestamps are the same, we take deleted one.
isCurDeleted := d.DeletedAt == curLatest && d.DeletedAt > 0
isOtherIsDeleted := other.DeletedAt == otherLatest && other.DeletedAt > 0

if isOtherIsDeleted && !isCurDeleted {
// If other has been deleted, take it.
return d.apply(other), nil
}
if isCurDeleted && !isOtherIsDeleted {
// If the current has been deleted, ignore the incoming data.
return nil, nil
}

// If timestamps are exactly equal but replicas differ, use lexicographic ordering
if other.Replica != d.Replica {
if other.Replica < d.Replica {
return d.apply(other), nil
}
}

// No change (same timestamp, same replica)
return nil, nil
}

// apply performs an in-place update of the current descriptor and returns a cloned result.
func (d *ReplicaDesc) apply(other *ReplicaDesc) *ReplicaDesc {
d.Replica = other.Replica
d.ReceivedAt = other.ReceivedAt
d.DeletedAt = other.DeletedAt
return proto.Clone(d).(*ReplicaDesc)
}

// MergeContent describes content of this Mergeable.
// For ReplicaDesc, we return the replica name.
func (d *ReplicaDesc) MergeContent() []string {
if d.Replica == "" {
return nil
}
return []string{d.Replica}
}

// RemoveTombstones is a no-op for ReplicaDesc.
func (d *ReplicaDesc) RemoveTombstones(_ time.Time) (total, removed int) {
// No-op: HATracker manages tombstones via cleanupOldReplicas
return
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *HATrackerConfig) RegisterFlagsWithPrefix(flagPrefix string, kvPrefix string, f *flag.FlagSet) {
finalFlagPrefix := ""
Expand Down Expand Up @@ -116,12 +203,13 @@ func (cfg *HATrackerConfig) Validate() error {
return fmt.Errorf(errInvalidFailoverTimeout, cfg.FailoverTimeout, minFailureTimeout)
}

// Tracker kv store only supports consul and etcd.
storeAllowedList := []string{"consul", "etcd"}
if slices.Contains(storeAllowedList, cfg.KVStore.Store) {
return nil
// Tracker kv store only supports consul, etcd, memberlist, and multi.
storeAllowedList := []string{"consul", "etcd", "memberlist", "multi"}
if !slices.Contains(storeAllowedList, cfg.KVStore.Store) {
return fmt.Errorf("invalid HATracker KV store type: %s", cfg.KVStore.Store)
}
return fmt.Errorf("invalid HATracker KV store type: %s", cfg.KVStore.Store)

return nil
}

func GetReplicaDescCodec() codec.Proto {
Expand Down
Loading
Loading