From c14edfa2725d111960c53f15fd6ef6d023db4fdb Mon Sep 17 00:00:00 2001 From: Yaniv Michael Kaul Date: Wed, 13 May 2026 19:56:23 +0300 Subject: [PATCH 1/2] policies: treat SERIAL/LOCAL_SERIAL consistency as LWT for routing Statements with SERIAL or LOCAL_SERIAL consistency level are serialized through the Paxos path on the server, but TokenAwarePolicy only checked is_lwt() (from server prepare metadata) when deciding whether to skip replica shuffling. This meant serial-consistency reads could be routed with shuffled replicas instead of the deterministic order needed for optimal Paxos coordination. Now TokenAwarePolicy also checks the statement's consistency level and skips shuffling for SERIAL/LOCAL_SERIAL, matching LWT routing behavior. Fixes: https://github.com/scylladb/python-driver/issues/886 --- cassandra/policies.py | 2 +- tests/unit/test_policies.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/cassandra/policies.py b/cassandra/policies.py index ceb5ebdc45..14c79fd70e 100644 --- a/cassandra/policies.py +++ b/cassandra/policies.py @@ -514,7 +514,7 @@ def make_query_plan(self, working_keyspace=None, query=None): else: replicas = self._cluster_metadata.get_replicas(keyspace, query.routing_key) - if self.shuffle_replicas and not query.is_lwt(): + if self.shuffle_replicas and not query.is_lwt() and not ConsistencyLevel.is_serial(query.consistency_level): shuffle(replicas) def yield_in_order(hosts): diff --git a/tests/unit/test_policies.py b/tests/unit/test_policies.py index 6142af1aa1..41bd42481c 100644 --- a/tests/unit/test_policies.py +++ b/tests/unit/test_policies.py @@ -944,6 +944,35 @@ def _assert_shuffle(self, patched_shuffle, cluster, keyspace, routing_key): assert patched_shuffle.call_count == 1 + @patch('cassandra.policies.shuffle') + def test_no_shuffle_for_serial_consistency(self, patched_shuffle): + """ + Test to validate that replicas are not shuffled when the statement + has SERIAL or LOCAL_SERIAL consistency level, since such statements + should be routed like LWT requests. + @jira_ticket PYTHON-1394 + @expected_result shuffle should not be called for serial consistency + + @test_category policy + """ + for cl in (ConsistencyLevel.SERIAL, ConsistencyLevel.LOCAL_SERIAL): + for cluster in (self._prepare_cluster_with_vnodes(), self._prepare_cluster_with_tablets()): + patched_shuffle.reset_mock() + hosts = cluster.metadata.all_hosts() + child_policy = Mock() + child_policy.make_query_plan.return_value = hosts + child_policy.distance.return_value = HostDistance.LOCAL + + policy = TokenAwarePolicy(child_policy, shuffle_replicas=True) + policy.populate(cluster, hosts) + + query = Statement(routing_key='routing_key') + query.consistency_level = cl + list(policy.make_query_plan('keyspace', query)) + assert patched_shuffle.call_count == 0, \ + "shuffle should not be called for consistency level %s" % cl + + class ConvictionPolicyTest(unittest.TestCase): def test_not_implemented(self): """ From 3b69e2a77869e03d8135db3f76582cd6e4f89619 Mon Sep 17 00:00:00 2001 From: Yaniv Michael Kaul Date: Wed, 20 May 2026 15:56:51 +0300 Subject: [PATCH 2/2] policies: prevent retry downgrade from serial to non-serial consistency Add a guard in the retry execution path that prevents any retry policy from downgrading SERIAL/LOCAL_SERIAL to a non-serial consistency level, which would break serial read (Paxos) guarantees. Also add a unit test verifying DowngradingConsistencyRetryPolicy does not downgrade serial consistency on read timeout or unavailable. --- cassandra/cluster.py | 15 ++++++++++++++- tests/unit/test_policies.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 1181c6f686..41a9158d0b 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -5433,7 +5433,20 @@ def _retry(self, reuse_connection, consistency_level, host, delay): if self._metrics is not None: self._metrics.on_retry() if consistency_level is not None: - self.message.consistency_level = consistency_level + # Never downgrade from serial to non-serial consistency, as that + # would break serial read (Paxos) guarantees. + if not ConsistencyLevel.is_serial(consistency_level): + original_cl = self.message.consistency_level + if ConsistencyLevel.is_serial(original_cl): + log.debug( + "Retry policy attempted to downgrade serial consistency %s to %s; " + "keeping original consistency level.", + ConsistencyLevel.value_to_name.get(original_cl, original_cl), + ConsistencyLevel.value_to_name.get(consistency_level, consistency_level)) + else: + self.message.consistency_level = consistency_level + else: + self.message.consistency_level = consistency_level # don't retry on the event loop thread self.session.cluster.scheduler.schedule(delay, self._retry_task, reuse_connection, host) diff --git a/tests/unit/test_policies.py b/tests/unit/test_policies.py index 41bd42481c..63a3c3d12d 100644 --- a/tests/unit/test_policies.py +++ b/tests/unit/test_policies.py @@ -1418,6 +1418,35 @@ def test_unavailable(self): assert retry == RetryPolicy.RETRY assert consistency == ConsistencyLevel.ONE + def test_serial_consistency_not_downgraded(self): + """ + Test that SERIAL/LOCAL_SERIAL consistency is never downgraded + to a non-serial consistency level by the retry policy. + @jira_ticket PYTHON-1394 + @expected_result retry policy should rethrow or retry on next host + without downgrading serial consistency + + @test_category policy + """ + policy = DowngradingConsistencyRetryPolicy() + + for cl in (ConsistencyLevel.SERIAL, ConsistencyLevel.LOCAL_SERIAL): + # on_read_timeout should rethrow for serial consistency + retry, consistency = policy.on_read_timeout( + query=None, consistency=cl, required_responses=3, + received_responses=1, data_retrieved=True, retry_num=0) + assert retry == RetryPolicy.RETHROW, \ + "Expected RETHROW for serial consistency %s on read timeout" % cl + assert consistency is None + + # on_unavailable should retry on next host without downgrading + retry, consistency = policy.on_unavailable( + query=None, consistency=cl, required_replicas=3, + alive_replicas=1, retry_num=0) + assert retry == RetryPolicy.RETRY_NEXT_HOST, \ + "Expected RETRY_NEXT_HOST for serial consistency %s on unavailable" % cl + assert consistency is None + class ExponentialRetryPolicyTest(unittest.TestCase): def test_calculate_backoff(self):