From 68989aef302d2ab91778322915ad30a3cd1c240d Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Fri, 27 Feb 2026 20:05:01 +0900 Subject: [PATCH 1/4] Enable HATracker memberlist Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + docs/configuration/arguments.md | 2 +- docs/configuration/config-file-reference.md | 5 +- ...tegration_memberlist_single_binary_test.go | 244 +++++++++++++++ pkg/cortex/modules.go | 3 + pkg/ha/ha_tracker.go | 100 +++++- pkg/ha/ha_tracker_test.go | 292 +++++++++++++++++- pkg/ring/kv/memberlist/memberlist_client.go | 16 +- schemas/cortex-config-schema.json | 2 +- 9 files changed, 651 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d1aa1c467b5..b7b6afc202d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * Metrics: Renamed `cortex_parquet_queryable_cache_*` to `cortex_parquet_cache_*`. * Flags: Renamed `-querier.parquet-queryable-shard-cache-size` to `-querier.parquet-shard-cache-size` and `-querier.parquet-queryable-shard-cache-ttl` to `-querier.parquet-shard-cache-ttl`. * Config: Renamed `parquet_queryable_shard_cache_size` to `parquet_shard_cache_size` and `parquet_queryable_shard_cache_ttl` to `parquet_shard_cache_ttl`. +* [FEATURE] HATracker: Add experimental support for `memberlist` and `multi` as a KV store backend. #7284 * [FEATURE] StoreGateway: Introduces a new parquet mode. #7046 * [FEATURE] StoreGateway: Add a parquet shard cache to parquet mode. #7166 * [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077 diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md index 9ba9f317100..fa32ba010e2 100644 --- a/docs/configuration/arguments.md +++ b/docs/configuration/arguments.md @@ -117,7 +117,7 @@ The next three options only apply when the querier is used together with the Que ### Ring/HA Tracker Store -The KVStore client is used by both the Ring and HA Tracker (HA Tracker doesn't support memberlist as KV store). +The KVStore client is used by both the Ring and HA Tracker (HA Tracker supports memberlist as a KV store as an experimental feature). - `{ring,distributor.ha-tracker}.prefix` The prefix for the keys in the store. Should end with a /. For example with a prefix of foo/, the key bar would be stored under foo/bar. - `{ring,distributor.ha-tracker}.store` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 81b85fb018f..c47eede8dc1 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3103,9 +3103,8 @@ ha_tracker: # CLI flag: -distributor.ha-tracker.enable-startup-sync [enable_startup_sync: | default = false] - # Backend storage to use for the ring. Please be aware that memberlist is not - # supported by the HA tracker since gossip propagation is too slow for HA - # purposes. + # Backend storage to use for the ring. Memberlist support in the HA tracker is + # experimental, as gossip propagation delays may impact HA performance. kvstore: # Backend storage to use for the ring. Supported values are: consul, # dynamodb, etcd, inmemory, memberlist, multi. diff --git a/integration/integration_memberlist_single_binary_test.go b/integration/integration_memberlist_single_binary_test.go index a79642e1dd5..14a720d9a93 100644 --- a/integration/integration_memberlist_single_binary_test.go +++ b/integration/integration_memberlist_single_binary_test.go @@ -11,6 +11,8 @@ import ( "testing" "time" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -246,3 +248,245 @@ func TestSingleBinaryWithMemberlistScaling(t *testing.T) { "expected all instances to have %f ring members and %f tombstones", expectedRingMembers, expectedTombstones) } + +func TestHATrackerWithMemberlistClusterSync(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + minio := e2edb.NewMinio(9000, bucketName) + require.NoError(t, s.StartAndWaitReady(minio)) + + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-distributor.ha-tracker.enable": "true", + "-distributor.ha-tracker.enable-for-all-users": "true", + "-distributor.ha-tracker.cluster": "cluster", + "-distributor.ha-tracker.replica": "__replica__", + // Use memberlist as the KV store for the HA Tracker + "-distributor.ha-tracker.store": "memberlist", + + // To fast failover + "-distributor.ha-tracker.update-timeout": "1s", + "-distributor.ha-tracker.update-timeout-jitter-max": "0s", + "-distributor.ha-tracker.failover-timeout": "2s", + + // memberlist config + "-ring.store": "memberlist", + "-memberlist.bind-port": "8000", + }) + + cortex1 := newSingleBinary("cortex-1", "", "", flags) + cortex2 := newSingleBinary("cortex-2", "", networkName+"-cortex-1:8000", flags) + cortex3 := newSingleBinary("cortex-3", "", networkName+"-cortex-1:8000", flags) + + require.NoError(t, s.StartAndWaitReady(cortex1)) + require.NoError(t, s.StartAndWaitReady(cortex2, cortex3)) + + // Ensure both Cortex instances have successfully discovered each other in the memberlist cluster. + require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(3), "memberlist_client_cluster_members_count")) + require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(3), "memberlist_client_cluster_members_count")) + require.NoError(t, cortex3.WaitSumMetrics(e2e.Equals(3), "memberlist_client_cluster_members_count")) + + // All Cortex servers should have 512 tokens, altogether 3 * 512. + require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(3*512), "cortex_ring_tokens_total")) + require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(3*512), "cortex_ring_tokens_total")) + require.NoError(t, cortex3.WaitSumMetrics(e2e.Equals(3*512), "cortex_ring_tokens_total")) + + now := time.Now() + userID := "user-1" + + client1, err := e2ecortex.NewClient(cortex1.HTTPEndpoint(), "", "", "", userID) + require.NoError(t, err) + + series, _ := generateSeries("foo", now, prompb.Label{Name: "__replica__", Value: "replica0"}, prompb.Label{Name: "cluster", Value: "cluster0"}) + // send to cortex1 + res, err := client1.Push([]prompb.TimeSeries{series[0]}) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(1), "cortex_ha_tracker_elected_replica_changes_total")) + // cortex-2 should be noticed HA reader via memberlist gossip + require.NoError(t, cortex2.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ha_tracker_elected_replica_changes_total"}, e2e.WaitMissingMetrics)) + // cortex-3 should be noticed HA reader via memberlist gossip + require.NoError(t, cortex3.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ha_tracker_elected_replica_changes_total"}, e2e.WaitMissingMetrics)) + + // Wait 5 seconds to ensure the FailoverTimeout (2s) has comfortably passed. + time.Sleep(5 * time.Second) + + client2, err := e2ecortex.NewClient(cortex2.HTTPEndpoint(), "", "", "", userID) + require.NoError(t, err) + + series2, _ := generateSeries("foo", now.Add(time.Second*30), prompb.Label{Name: "__replica__", Value: "replica1"}, prompb.Label{Name: "cluster", Value: "cluster0"}) + // send to cortex2 + res2, err := client2.Push([]prompb.TimeSeries{series2[0]}) + require.NoError(t, err) + require.Equal(t, 200, res2.StatusCode) + + // cortex2 failover to replica1 + require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(2), "cortex_ha_tracker_elected_replica_changes_total")) + // cortex-1 should be noticed changed HA reader via memberlist gossip + require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(2), "cortex_ha_tracker_elected_replica_changes_total")) + // cortex-3 should be noticed changed HA reader via memberlist gossip + require.NoError(t, cortex3.WaitSumMetrics(e2e.Equals(2), "cortex_ha_tracker_elected_replica_changes_total")) +} + +func TestHATrackerWithMemberlist(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + minio := e2edb.NewMinio(9000, bucketName) + require.NoError(t, s.StartAndWaitReady(minio)) + + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-distributor.ha-tracker.enable": "true", + "-distributor.ha-tracker.enable-for-all-users": "true", + "-distributor.ha-tracker.cluster": "cluster", + "-distributor.ha-tracker.replica": "__replica__", + // Use memberlist as the KV store for the HA Tracker + "-distributor.ha-tracker.store": "memberlist", + + // To fast failover + "-distributor.ha-tracker.update-timeout": "1s", + "-distributor.ha-tracker.update-timeout-jitter-max": "0s", + "-distributor.ha-tracker.failover-timeout": "2s", + + // memberlist config + "-ring.store": "memberlist", + "-memberlist.bind-port": "8000", + }) + + cortex := newSingleBinary("cortex", "", "", flags) + require.NoError(t, s.StartAndWaitReady(cortex)) + + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(1), "memberlist_client_cluster_members_count")) + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + now := time.Now() + numUsers := 100 + + for i := 1; i <= numUsers; i++ { + userID := fmt.Sprintf("user-%d", i) + client, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), "", "", "", userID) + require.NoError(t, err) + + series, _ := generateSeries("foo", now, prompb.Label{Name: "__replica__", Value: "replica0"}, prompb.Label{Name: "cluster", Value: "cluster0"}) + res, err := client.Push([]prompb.TimeSeries{series[0]}) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + } + + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(numUsers)), "cortex_ha_tracker_elected_replica_changes_total")) + + // Wait 5 seconds to ensure the FailoverTimeout (2s) has comfortably passed. + time.Sleep(5 * time.Second) + + for i := 1; i <= numUsers; i++ { + userID := fmt.Sprintf("user-%d", i) + client, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), "", "", "", userID) + require.NoError(t, err) + + // This time, we send data from replica1 instead of replica0. + series, _ := generateSeries("foo", now.Add(time.Second*30), prompb.Label{Name: "__replica__", Value: "replica1"}, prompb.Label{Name: "cluster", Value: "cluster0"}) + res, err := client.Push([]prompb.TimeSeries{series[0]}) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + } + + // Since the leader successfully failed over to replica1, the change count increments by 1 per user + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(numUsers*2)), "cortex_ha_tracker_elected_replica_changes_total")) +} + +func TestHATrackerWithMultiKV(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, bucketName) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-distributor.ha-tracker.enable": "true", + "-distributor.ha-tracker.enable-for-all-users": "true", + "-distributor.ha-tracker.cluster": "cluster", + "-distributor.ha-tracker.replica": "__replica__", + // Use memberlist as the KV store for the HA Tracker + "-distributor.ha-tracker.store": "multi", + + // To fast failover + "-distributor.ha-tracker.update-timeout": "1s", + "-distributor.ha-tracker.update-timeout-jitter-max": "0s", + "-distributor.ha-tracker.failover-timeout": "2s", + + // multi KV config + "-distributor.ha-tracker.multi.primary": "consul", + "-distributor.ha-tracker.multi.secondary": "memberlist", + "-distributor.ha-tracker.consul.hostname": consul.NetworkHTTPEndpoint(), + + // Enable data mirroring + "-distributor.ha-tracker.multi.mirror-enabled": "true", + + // memberlist config + "-ring.store": "memberlist", + "-memberlist.bind-port": "8000", + }) + + cortex := newSingleBinary("cortex", "", "", flags) + require.NoError(t, s.StartAndWaitReady(cortex)) + + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(1), "memberlist_client_cluster_members_count")) + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + // mirror enabled + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(1)), "cortex_multikv_mirror_enabled")) + // consul as primary KV Store + require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_multikv_primary_store"}, e2e.WaitMissingMetrics, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "store", "consul"))), + ) + + now := time.Now() + numUsers := 100 + + for i := 1; i <= numUsers; i++ { + userID := fmt.Sprintf("user-%d", i) + client, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), "", "", "", userID) + require.NoError(t, err) + + series, _ := generateSeries("foo", now, prompb.Label{Name: "__replica__", Value: "replica0"}, prompb.Label{Name: "cluster", Value: "cluster0"}) + res, err := client.Push([]prompb.TimeSeries{series[0]}) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + } + + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(numUsers)), "cortex_ha_tracker_elected_replica_changes_total")) + + // Wait 5 seconds to ensure the FailoverTimeout (2s) has comfortably passed. + time.Sleep(5 * time.Second) + + for i := 1; i <= numUsers; i++ { + userID := fmt.Sprintf("user-%d", i) + client, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), "", "", "", userID) + require.NoError(t, err) + + // This time, we send data from replica1 instead of replica0. + series, _ := generateSeries("foo", now.Add(time.Second*30), prompb.Label{Name: "__replica__", Value: "replica1"}, prompb.Label{Name: "cluster", Value: "cluster0"}) + res, err := client.Push([]prompb.TimeSeries{series[0]}) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + } + + // Since the leader successfully failed over to replica1, the change count increments by 1 per user + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(numUsers*2)), "cortex_ha_tracker_elected_replica_changes_total")) + // Two keys (1 cluster with 2 replicas) per user should be written to the memberlist (secondary store) + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(numUsers*2)), "cortex_multikv_mirror_writes_total")) +} diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index fbc6db605f0..4fe4502d89d 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -37,6 +37,7 @@ import ( "github.com/cortexproject/cortex/pkg/flusher" "github.com/cortexproject/cortex/pkg/frontend" "github.com/cortexproject/cortex/pkg/frontend/transport" + "github.com/cortexproject/cortex/pkg/ha" "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/overrides" "github.com/cortexproject/cortex/pkg/parquetconverter" @@ -821,6 +822,7 @@ func (t *Cortex) initMemberlistKV() (services.Service, error) { t.Cfg.MemberlistKV.MetricsRegisterer = reg t.Cfg.MemberlistKV.Codecs = []codec.Codec{ ring.GetCodec(), + ha.GetReplicaDescCodec(), } dnsProviderReg := prometheus.WrapRegistererWithPrefix( "cortex_", @@ -835,6 +837,7 @@ func (t *Cortex) initMemberlistKV() (services.Service, error) { // Update the config. t.Cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + t.Cfg.Distributor.HATrackerConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.StoreGateway.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.Compactor.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV diff --git a/pkg/ha/ha_tracker.go b/pkg/ha/ha_tracker.go index 11b38eb79c6..d798e07d184 100644 --- a/pkg/ha/ha_tracker.go +++ b/pkg/ha/ha_tracker.go @@ -21,6 +21,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/ring/kv/codec" + "github.com/cortexproject/cortex/pkg/ring/kv/memberlist" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -72,7 +73,7 @@ type HATrackerConfig struct { // of tracked keys is large. EnableStartupSync bool `yaml:"enable_startup_sync"` - KVStore kv.Config `yaml:"kvstore" doc:"description=Backend storage to use for the ring. Please be aware that memberlist is not supported by the HA tracker since gossip propagation is too slow for HA purposes."` + KVStore kv.Config `yaml:"kvstore" doc:"description=Backend storage to use for the ring. Memberlist support in the HA tracker is experimental, as gossip propagation delays may impact HA performance."` } // RegisterFlags adds the flags required to config this to the given FlagSet with a specified prefix @@ -80,6 +81,92 @@ func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet) { cfg.RegisterFlagsWithPrefix("", "", f) } +func (d *ReplicaDesc) Clone() any { + return proto.Clone(d) +} + +// Merge merges other ReplicaDesc into this one and can be sent out to other clients. +func (d *ReplicaDesc) Merge(mergeable memberlist.Mergeable, _ bool) (memberlist.Mergeable, error) { + if mergeable == nil { + return nil, nil + } + + other, ok := mergeable.(*ReplicaDesc) + if !ok { + return nil, fmt.Errorf("expected *ha.ReplicaDesc, got %T", mergeable) + } + + if other == nil { + return nil, nil + } + + getLatestTime := func(desc *ReplicaDesc) int64 { + if desc.DeletedAt > desc.ReceivedAt { + return desc.DeletedAt + } + return desc.ReceivedAt + } + + curLatest := getLatestTime(d) + otherLatest := getLatestTime(other) + + if otherLatest > curLatest { + // If other is more recent, take it. + return d.apply(other), nil + } + + if otherLatest < curLatest { + // If the current is more recent, ignore the incoming data. + return nil, nil + } + + // If timestamps are the same, we take deleted one. + isCurDeleted := d.DeletedAt == curLatest && d.DeletedAt > 0 + isOtherIsDeleted := other.DeletedAt == otherLatest && other.DeletedAt > 0 + + if isOtherIsDeleted && !isCurDeleted { + // If other has been deleted, take it. + return d.apply(other), nil + } + if isCurDeleted && !isOtherIsDeleted { + // If the current has been deleted, ignore the incoming data. + return nil, nil + } + + // If timestamps are exactly equal but replicas differ, use lexicographic ordering + if other.Replica != d.Replica { + if other.Replica < d.Replica { + return d.apply(other), nil + } + } + + // No change (same timestamp, same replica) + return nil, nil +} + +// apply performs an in-place update of the current descriptor and returns a cloned result. +func (d *ReplicaDesc) apply(other *ReplicaDesc) *ReplicaDesc { + d.Replica = other.Replica + d.ReceivedAt = other.ReceivedAt + d.DeletedAt = other.DeletedAt + return proto.Clone(d).(*ReplicaDesc) +} + +// MergeContent describes content of this Mergeable. +// For ReplicaDesc, we return the replica name. +func (d *ReplicaDesc) MergeContent() []string { + if d.Replica == "" { + return nil + } + return []string{d.Replica} +} + +// RemoveTombstones is a no-op for ReplicaDesc. +func (d *ReplicaDesc) RemoveTombstones(_ time.Time) (total, removed int) { + // No-op: HATracker manages tombstones via cleanupOldReplicas + return +} + // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *HATrackerConfig) RegisterFlagsWithPrefix(flagPrefix string, kvPrefix string, f *flag.FlagSet) { finalFlagPrefix := "" @@ -116,12 +203,13 @@ func (cfg *HATrackerConfig) Validate() error { return fmt.Errorf(errInvalidFailoverTimeout, cfg.FailoverTimeout, minFailureTimeout) } - // Tracker kv store only supports consul and etcd. - storeAllowedList := []string{"consul", "etcd"} - if slices.Contains(storeAllowedList, cfg.KVStore.Store) { - return nil + // Tracker kv store only supports consul, etcd, memberlist, and multi. + storeAllowedList := []string{"consul", "etcd", "memberlist", "multi"} + if !slices.Contains(storeAllowedList, cfg.KVStore.Store) { + return fmt.Errorf("invalid HATracker KV store type: %s", cfg.KVStore.Store) } - return fmt.Errorf("invalid HATracker KV store type: %s", cfg.KVStore.Store) + + return nil } func GetReplicaDescCodec() codec.Proto { diff --git a/pkg/ha/ha_tracker_test.go b/pkg/ha/ha_tracker_test.go index 882a4c88681..db74eda24fa 100644 --- a/pkg/ha/ha_tracker_test.go +++ b/pkg/ha/ha_tracker_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/gogo/protobuf/proto" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" @@ -20,6 +21,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/ring/kv/consul" + "github.com/cortexproject/cortex/pkg/ring/kv/memberlist" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" util_log "github.com/cortexproject/cortex/pkg/util/log" @@ -108,14 +110,23 @@ func TestHATrackerConfig_Validate(t *testing.T) { }(), expectedErr: nil, }, - "should failed with invalid kv store": { + "should pass with memberlist kv store": { cfg: func() HATrackerConfig { cfg := HATrackerConfig{} flagext.DefaultValues(&cfg) cfg.KVStore.Store = "memberlist" return cfg }(), - expectedErr: fmt.Errorf("invalid HATracker KV store type: %s", "memberlist"), + expectedErr: nil, + }, + "should pass with multi kv store": { + cfg: func() HATrackerConfig { + cfg := HATrackerConfig{} + flagext.DefaultValues(&cfg) + cfg.KVStore.Store = "multi" + return cfg + }(), + expectedErr: nil, }, } @@ -945,3 +956,280 @@ func checkReplicaDeletionState(t *testing.T, duration time.Duration, c *HATracke require.Equal(t, expectedMarkedForDeletion, markedForDeletion, "KV entry marked for deletion") } } + +func TestReplicaDesc_Merge(t *testing.T) { + now := time.Now() + + tests := []struct { + name string + current *ReplicaDesc + other *ReplicaDesc + expectChange bool + expectedResult *ReplicaDesc + }{ + { + name: "merge with more recent replica", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + other: &ReplicaDesc{ + Replica: "replica2", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: 0, + }, + expectChange: true, + expectedResult: &ReplicaDesc{ + Replica: "replica2", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: 0, + }, + }, + { + name: "merge with older replica - no change", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: 0, + }, + other: &ReplicaDesc{ + Replica: "replica2", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + expectChange: false, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: 0, + }, + }, + { + name: "merge with deleted replica", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + other: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + }, + expectChange: true, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(time.Minute)), + DeletedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + }, + }, + { + name: "undelete with more recent replica", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: timestamp.FromTime(now.Add(time.Minute)), + }, + other: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + DeletedAt: 0, + }, + expectChange: true, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + DeletedAt: 0, + }, + }, + { + name: "merge with nil other", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + other: nil, + expectChange: false, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + }, + { + name: "merge deleted with more recent deleted", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: timestamp.FromTime(now.Add(time.Minute)), + }, + other: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + }, + expectChange: true, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: timestamp.FromTime(now.Add(2 * time.Minute)), + }, + }, + { + name: "same timestamp, different replica - choose lexicographically smaller", + current: &ReplicaDesc{ + Replica: "replica-b", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + other: &ReplicaDesc{ + Replica: "replica-a", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + expectChange: true, + expectedResult: &ReplicaDesc{ + Replica: "replica-a", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + }, + { + name: "same timestamp, same replica - no change", + current: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + other: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + expectChange: false, + expectedResult: &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(now), + DeletedAt: 0, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var change memberlist.Mergeable + var err error + + if tt.other != nil { + change, err = tt.current.Merge(tt.other, false) + } else { + change, err = tt.current.Merge(nil, false) + } + + require.NoError(t, err) + + if tt.expectChange { + require.NotNil(t, change, "expected a change to be returned") + } else { + require.Nil(t, change, "expected no change to be returned") + } + + assert.Equal(t, tt.expectedResult.Replica, tt.current.Replica) + assert.Equal(t, tt.expectedResult.ReceivedAt, tt.current.ReceivedAt) + assert.Equal(t, tt.expectedResult.DeletedAt, tt.current.DeletedAt) + }) + } +} + +func TestReplicaDesc_Merge_Commutativity(t *testing.T) { + tests := []struct { + name string + descA *ReplicaDesc + descB *ReplicaDesc + }{ + { + name: "Same replica: New vs Older", + descA: &ReplicaDesc{ + Replica: "replica-A", + ReceivedAt: 200, + DeletedAt: 0, + }, + descB: &ReplicaDesc{ + Replica: "replica-A", + ReceivedAt: 50, + DeletedAt: 100, + }, + }, + { + name: "Same Timestamps - Lexicographical Tie-break", + descA: &ReplicaDesc{ + Replica: "replica-A", + ReceivedAt: 100, + DeletedAt: 0, + }, + descB: &ReplicaDesc{ + Replica: "replica-B", + ReceivedAt: 100, + DeletedAt: 0, + }, + }, + { + name: "Concurrent Deletions with Different Timestamps", + descA: &ReplicaDesc{ + Replica: "replica-A", + ReceivedAt: 50, + DeletedAt: 150, + }, + descB: &ReplicaDesc{ + Replica: "replica-A", + ReceivedAt: 50, + DeletedAt: 120, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // A merges B + nodeA := proto.Clone(tt.descA).(*ReplicaDesc) + incomingB := proto.Clone(tt.descB).(*ReplicaDesc) + _, _ = nodeA.Merge(incomingB, false) + + // B merges A + nodeB := proto.Clone(tt.descB).(*ReplicaDesc) + incomingA := proto.Clone(tt.descA).(*ReplicaDesc) + _, _ = nodeB.Merge(incomingA, false) + + // Check if both nodes converged to the exact same state + isSame := (nodeA.Replica == nodeB.Replica) && + (nodeA.ReceivedAt == nodeB.ReceivedAt) && + (nodeA.DeletedAt == nodeB.DeletedAt) + + if !isSame { + t.Errorf("Commutativity violation in '%s'!\n"+ + "Result of A.Merge(B): Replica=%s, ReceivedAt=%d, DeletedAt=%d\n"+ + "Result of B.Merge(A): Replica=%s, ReceivedAt=%d, DeletedAt=%d", + tt.name, + nodeA.Replica, nodeA.ReceivedAt, nodeA.DeletedAt, + nodeB.Replica, nodeB.ReceivedAt, nodeB.DeletedAt) + } + }) + } +} + +func TestReplicaDesc_MergeContent(t *testing.T) { + desc := &ReplicaDesc{ + Replica: "replica1", + ReceivedAt: timestamp.FromTime(time.Now()), + DeletedAt: 0, + } + + content := desc.MergeContent() + require.Equal(t, []string{"replica1"}, content) + + emptyDesc := &ReplicaDesc{} + emptyContent := emptyDesc.MergeContent() + require.Nil(t, emptyContent) +} diff --git a/pkg/ring/kv/memberlist/memberlist_client.go b/pkg/ring/kv/memberlist/memberlist_client.go index 48b5bd9266f..c0e26b69232 100644 --- a/pkg/ring/kv/memberlist/memberlist_client.go +++ b/pkg/ring/kv/memberlist/memberlist_client.go @@ -72,7 +72,12 @@ func (c *Client) Get(ctx context.Context, key string) (any, error) { // Delete is part of kv.Client interface. func (c *Client) Delete(ctx context.Context, key string) error { - return errors.New("memberlist does not support Delete") + err := c.awaitKVRunningOrStopping(ctx) + if err != nil { + return err + } + + return c.kv.Delete(key) } // CAS is part of kv.Client interface @@ -679,6 +684,15 @@ func (m *KV) get(key string, codec codec.Codec) (out any, version uint, err erro return v.value, v.version, nil } +func (m *KV) Delete(key string) error { + // TODO(Sungjin1212): Mark as delete and broadcast to peers + m.storeMu.Lock() + defer m.storeMu.Unlock() + + delete(m.store, key) + return nil +} + // WatchKey watches for value changes for given key. When value changes, 'f' function is called with the // latest value. Notifications that arrive while 'f' is running are coalesced into one subsequent 'f' call. // diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index dfbd85f685c..378fc916942 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -3750,7 +3750,7 @@ "x-format": "duration" }, "kvstore": { - "description": "Backend storage to use for the ring. Please be aware that memberlist is not supported by the HA tracker since gossip propagation is too slow for HA purposes.", + "description": "Backend storage to use for the ring. Memberlist support in the HA tracker is experimental, as gossip propagation delays may impact HA performance.", "properties": { "consul": { "$ref": "#/definitions/consul_config" From bade89cc134af4a48f57214bc856b443985050dc Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Fri, 27 Feb 2026 20:41:46 +0900 Subject: [PATCH 2/4] Wrap error Signed-off-by: SungJin1212 --- pkg/ring/kv/memberlist/memberlist_client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/ring/kv/memberlist/memberlist_client.go b/pkg/ring/kv/memberlist/memberlist_client.go index c0e26b69232..1496064a321 100644 --- a/pkg/ring/kv/memberlist/memberlist_client.go +++ b/pkg/ring/kv/memberlist/memberlist_client.go @@ -902,7 +902,7 @@ outer: } m.casFailures.Inc() - return fmt.Errorf("failed to CAS-update key %s: %v", key, lastError) + return fmt.Errorf("failed to CAS-update key %s: %w", key, lastError) } // returns change, error (or nil, if CAS succeeded), and whether to retry or not. @@ -915,7 +915,7 @@ func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in any) (out any out, retry, err := f(val) if err != nil { - return nil, 0, retry, fmt.Errorf("fn returned error: %v", err) + return nil, 0, retry, fmt.Errorf("fn returned error: %w", err) } if out == nil { From cc52a025a61cd56ed12fffbec6a943173ec3285c Mon Sep 17 00:00:00 2001 From: Friedrich Gonzalez <1517449+friedrichg@users.noreply.github.com> Date: Wed, 4 Mar 2026 01:45:35 -0800 Subject: [PATCH 3/4] Testing memberlist changes Signed-off-by: Friedrich Gonzalez <1517449+friedrichg@users.noreply.github.com> --- ...tegration_memberlist_single_binary_test.go | 134 ++++++++++++++++++ .../kv/memberlist/memberlist_client_test.go | 66 +++++++++ 2 files changed, 200 insertions(+) diff --git a/integration/integration_memberlist_single_binary_test.go b/integration/integration_memberlist_single_binary_test.go index 14a720d9a93..197c46b74f6 100644 --- a/integration/integration_memberlist_single_binary_test.go +++ b/integration/integration_memberlist_single_binary_test.go @@ -490,3 +490,137 @@ func TestHATrackerWithMultiKV(t *testing.T) { // Two keys (1 cluster with 2 replicas) per user should be written to the memberlist (secondary store) require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(numUsers*2)), "cortex_multikv_mirror_writes_total")) } + +// TestHATrackerMemberlistDeleteNotPropagated demonstrates that memberlist gossip +// re-propagates KV store entries to nodes that have lost them. +// +// This is relevant to KV.Delete in the HA tracker: when cleanupOldReplicas calls +// client.Delete to remove stale entries, it only removes the key from the local +// memberlist KV store without broadcasting the deletion to peers. Since the peer +// nodes still hold the key, the next push-pull sync (MergeRemoteState) will +// re-gossip it back to the node that deleted it, effectively un-doing the cleanup. +// +// The test demonstrates the re-gossip mechanism by: +// 1. Setting up 2 nodes with HA tracker using memberlist +// 2. Pushing HA data to node1 → both nodes track it via gossip +// 3. Stopping node1 (simulating loss of local state, equivalent to a local Delete) +// 4. Restarting node1 → node2 gossips ring + HA data back via push-pull +// 5. Verifying that the HA tracker on the restarted node works with the re-gossiped +// state by performing a failover +func TestHATrackerMemberlistDeleteNotPropagated(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + minio := e2edb.NewMinio(9000, bucketName) + require.NoError(t, s.StartAndWaitReady(minio)) + + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-distributor.ha-tracker.enable": "true", + "-distributor.ha-tracker.enable-for-all-users": "true", + "-distributor.ha-tracker.cluster": "cluster", + "-distributor.ha-tracker.replica": "__replica__", + "-distributor.ha-tracker.store": "memberlist", + + "-distributor.ha-tracker.update-timeout": "1s", + "-distributor.ha-tracker.update-timeout-jitter-max": "0s", + "-distributor.ha-tracker.failover-timeout": "2s", + + "-ring.store": "memberlist", + "-memberlist.bind-port": "8000", + "-memberlist.gossip-interval": "200ms", + "-memberlist.pullpush-interval": "1s", + "-memberlist.left-ingesters-timeout": "600s", + }) + + cortex1 := newSingleBinary("cortex-1", "", "", flags) + cortex2 := newSingleBinary("cortex-2", "", networkName+"-cortex-1:8000", flags) + + require.NoError(t, s.StartAndWaitReady(cortex1)) + require.NoError(t, s.StartAndWaitReady(cortex2)) + + // Wait for both nodes to see each other in the memberlist cluster. + require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(2), "memberlist_client_cluster_members_count")) + require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(2), "memberlist_client_cluster_members_count")) + + require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(2*512), "cortex_ring_tokens_total")) + require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(2*512), "cortex_ring_tokens_total")) + + userID := "user-1" + now := time.Now() + + client1, err := e2ecortex.NewClient(cortex1.HTTPEndpoint(), "", "", "", userID) + require.NoError(t, err) + + // Push HA data to cortex1 with replica0. + series, _ := generateSeries("foo", now, + prompb.Label{Name: "__replica__", Value: "replica0"}, + prompb.Label{Name: "cluster", Value: "cluster0"}, + ) + res, err := client1.Push([]prompb.TimeSeries{series[0]}) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // Both nodes should see the HA tracker entry via memberlist gossip. + require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(1), "cortex_ha_tracker_elected_replica_changes_total")) + require.NoError(t, cortex2.WaitSumMetricsWithOptions(e2e.Equals(1), + []string{"cortex_ha_tracker_elected_replica_changes_total"}, e2e.WaitMissingMetrics)) + + // Stop cortex1 — this clears its local memberlist store (simulates local-only Delete). + require.NoError(t, s.Stop(cortex1)) + + // Wait for cortex2 to see only 1 memberlist member. + require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(1), "memberlist_client_cluster_members_count")) + + // Restart cortex1 with the same config. It starts with an empty memberlist store. + // During memberlist join, push-pull sync re-populates the KV store from cortex2, + // including both ring data AND the HA tracker entry. This is the same mechanism + // that would re-gossip a locally-deleted key back to the node that deleted it. + cortex1Restarted := newSingleBinary("cortex-1", "", networkName+"-cortex-2:8000", flags) + require.NoError(t, s.StartAndWaitReady(cortex1Restarted)) + + // Wait for both nodes to see each other again. + require.NoError(t, cortex1Restarted.WaitSumMetrics(e2e.Equals(2), "memberlist_client_cluster_members_count")) + require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(2), "memberlist_client_cluster_members_count")) + + // Verify the ring is fully restored via push-pull gossip from cortex2. + // This proves that cortex2 re-gossiped its full KV state (including HA tracker + // entries) to the restarted cortex1. The ring and HA tracker share the same + // memberlist KV store and push-pull mechanism. + require.NoError(t, cortex1Restarted.WaitSumMetrics(e2e.Equals(2*512), "cortex_ring_tokens_total")) + require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(2*512), "cortex_ring_tokens_total")) + + // Wait for the failover timeout to pass so we can trigger a replica change. + time.Sleep(5 * time.Second) + + // Push from replica1 to cortex2. Since the failover timeout (2s) has passed, + // cortex2's HA tracker should accept the failover from replica0 → replica1. + client2, err := e2ecortex.NewClient(cortex2.HTTPEndpoint(), "", "", "", userID) + require.NoError(t, err) + + series2, _ := generateSeries("foo", now.Add(time.Second*30), + prompb.Label{Name: "__replica__", Value: "replica1"}, + prompb.Label{Name: "cluster", Value: "cluster0"}, + ) + res2, err := client2.Push([]prompb.TimeSeries{series2[0]}) + require.NoError(t, err) + require.Equal(t, 200, res2.StatusCode) + + // cortex2 should see the failover (replica change count goes from 1 → 2). + require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(2), "cortex_ha_tracker_elected_replica_changes_total")) + + // The restarted cortex1 should also see the failover via gossip. + // It sees only 1 change (the failover), not 2, because the initial replica + // election arrived via push-pull before the HA tracker registered its + // WatchPrefix, so that notification was missed. But the failover is a new + // CAS after WatchPrefix was registered, so it IS observed. + // + // This proves the full cycle: data deleted from a node (via restart/clearing + // the store) gets re-gossiped from peers, and subsequent changes continue + // to propagate. This is exactly why KV.Delete without broadcasting is broken: + // the peer's copy always wins and the deleted key comes back. + require.NoError(t, cortex1Restarted.WaitSumMetricsWithOptions(e2e.Equals(1), + []string{"cortex_ha_tracker_elected_replica_changes_total"}, e2e.WaitMissingMetrics)) +} diff --git a/pkg/ring/kv/memberlist/memberlist_client_test.go b/pkg/ring/kv/memberlist/memberlist_client_test.go index 2ac167665a4..b0c1f77c0b7 100644 --- a/pkg/ring/kv/memberlist/memberlist_client_test.go +++ b/pkg/ring/kv/memberlist/memberlist_client_test.go @@ -1356,6 +1356,72 @@ func (p dnsProviderMock) Addresses() []string { return p.resolved } +// TestDeleteIsNotPropagatedToOtherNodes demonstrates that KV.Delete only +// removes the key locally and does not broadcast the deletion to peers. +// When a peer gossips its copy back (simulated via NotifyMsg), the deleted +// key reappears on the node that deleted it. +// +// This test requires network binding (memberlist needs TCP ports). +// Run with: go test -run TestDeleteIsNotPropagatedToOtherNodes -v ./pkg/ring/kv/memberlist/ +func TestDeleteIsNotPropagatedToOtherNodes(t *testing.T) { + c := dataCodec{} + + cfg := KVConfig{} + cfg.RetransmitMult = 1 + cfg.Codecs = append(cfg.Codecs, c) + + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) + defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck + + client, err := NewClient(mkv, c) + require.NoError(t, err) + + const testKey = "ha-tracker/user1/cluster1" + now := time.Now() + + // Write a value via CAS (simulates node1 writing). + cas(t, client, testKey, func(in *data) (*data, bool, error) { + d := &data{Members: map[string]member{}} + d.Members["replica0"] = member{Timestamp: now.Unix(), State: ACTIVE} + return d, true, nil + }) + + // Drain any broadcast messages from the CAS above. + mkv.GetBroadcasts(0, math.MaxInt32) + + // Verify the key exists. + d := getData(t, client, testKey) + require.NotNil(t, d) + require.Contains(t, d.Members, "replica0") + + // Delete the key locally (this is what KV.Delete in PR #7284 does). + mkv.storeMu.Lock() + delete(mkv.store, testKey) + mkv.storeMu.Unlock() + + // The key should be gone locally. + d = getData(t, client, testKey) + require.Nil(t, d, "key should be deleted locally") + + // No broadcast was generated for the delete (this is the bug). + broadcasts := mkv.GetBroadcasts(0, math.MaxInt32) + require.Empty(t, broadcasts, "Delete should not generate any broadcast — the deletion is local-only") + + // Simulate another node (node2) gossiping back the value it still has. + // This is what happens during push-pull sync or regular gossip. + mkv.NotifyMsg(marshalKeyValuePair(t, testKey, c, &data{ + Members: map[string]member{ + "replica0": {Timestamp: now.Unix(), State: ACTIVE}, + }, + })) + + // The key reappears because the gossip brought it back. + d = getData(t, client, testKey) + require.NotNil(t, d, "key reappeared after gossip from peer — Delete was not propagated") + require.Contains(t, d.Members, "replica0", "replica0 came back via gossip") +} + func BenchmarkCASTimerAllocation(b *testing.B) { c := dataCodec{} From e3589b87bbdcdc21ca77ae3929c9faf1e50ff961 Mon Sep 17 00:00:00 2001 From: Friedrich Gonzalez <1517449+friedrichg@users.noreply.github.com> Date: Wed, 4 Mar 2026 01:53:21 -0800 Subject: [PATCH 4/4] Fix lint Signed-off-by: Friedrich Gonzalez <1517449+friedrichg@users.noreply.github.com> --- .../integration_memberlist_single_binary_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/integration/integration_memberlist_single_binary_test.go b/integration/integration_memberlist_single_binary_test.go index 197c46b74f6..2a09990381f 100644 --- a/integration/integration_memberlist_single_binary_test.go +++ b/integration/integration_memberlist_single_binary_test.go @@ -501,12 +501,12 @@ func TestHATrackerWithMultiKV(t *testing.T) { // re-gossip it back to the node that deleted it, effectively un-doing the cleanup. // // The test demonstrates the re-gossip mechanism by: -// 1. Setting up 2 nodes with HA tracker using memberlist -// 2. Pushing HA data to node1 → both nodes track it via gossip -// 3. Stopping node1 (simulating loss of local state, equivalent to a local Delete) -// 4. Restarting node1 → node2 gossips ring + HA data back via push-pull -// 5. Verifying that the HA tracker on the restarted node works with the re-gossiped -// state by performing a failover +// 1. Setting up 2 nodes with HA tracker using memberlist +// 2. Pushing HA data to node1 → both nodes track it via gossip +// 3. Stopping node1 (simulating loss of local state, equivalent to a local Delete) +// 4. Restarting node1 → node2 gossips ring + HA data back via push-pull +// 5. Verifying that the HA tracker on the restarted node works with the re-gossiped +// state by performing a failover func TestHATrackerMemberlistDeleteNotPropagated(t *testing.T) { s, err := e2e.NewScenario(networkName) require.NoError(t, err)