diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 1181c6f686..41a9158d0b 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -5433,7 +5433,20 @@ def _retry(self, reuse_connection, consistency_level, host, delay): if self._metrics is not None: self._metrics.on_retry() if consistency_level is not None: - self.message.consistency_level = consistency_level + # Never downgrade from serial to non-serial consistency, as that + # would break serial read (Paxos) guarantees. + if not ConsistencyLevel.is_serial(consistency_level): + original_cl = self.message.consistency_level + if ConsistencyLevel.is_serial(original_cl): + log.debug( + "Retry policy attempted to downgrade serial consistency %s to %s; " + "keeping original consistency level.", + ConsistencyLevel.value_to_name.get(original_cl, original_cl), + ConsistencyLevel.value_to_name.get(consistency_level, consistency_level)) + else: + self.message.consistency_level = consistency_level + else: + self.message.consistency_level = consistency_level # don't retry on the event loop thread self.session.cluster.scheduler.schedule(delay, self._retry_task, reuse_connection, host) diff --git a/cassandra/policies.py b/cassandra/policies.py index ceb5ebdc45..14c79fd70e 100644 --- a/cassandra/policies.py +++ b/cassandra/policies.py @@ -514,7 +514,7 @@ def make_query_plan(self, working_keyspace=None, query=None): else: replicas = self._cluster_metadata.get_replicas(keyspace, query.routing_key) - if self.shuffle_replicas and not query.is_lwt(): + if self.shuffle_replicas and not query.is_lwt() and not ConsistencyLevel.is_serial(query.consistency_level): shuffle(replicas) def yield_in_order(hosts): diff --git a/tests/unit/test_policies.py b/tests/unit/test_policies.py index 6142af1aa1..63a3c3d12d 100644 --- a/tests/unit/test_policies.py +++ b/tests/unit/test_policies.py @@ -944,6 +944,35 @@ def _assert_shuffle(self, patched_shuffle, cluster, keyspace, routing_key): assert patched_shuffle.call_count == 1 + @patch('cassandra.policies.shuffle') + def test_no_shuffle_for_serial_consistency(self, patched_shuffle): + """ + Test to validate that replicas are not shuffled when the statement + has SERIAL or LOCAL_SERIAL consistency level, since such statements + should be routed like LWT requests. + @jira_ticket PYTHON-1394 + @expected_result shuffle should not be called for serial consistency + + @test_category policy + """ + for cl in (ConsistencyLevel.SERIAL, ConsistencyLevel.LOCAL_SERIAL): + for cluster in (self._prepare_cluster_with_vnodes(), self._prepare_cluster_with_tablets()): + patched_shuffle.reset_mock() + hosts = cluster.metadata.all_hosts() + child_policy = Mock() + child_policy.make_query_plan.return_value = hosts + child_policy.distance.return_value = HostDistance.LOCAL + + policy = TokenAwarePolicy(child_policy, shuffle_replicas=True) + policy.populate(cluster, hosts) + + query = Statement(routing_key='routing_key') + query.consistency_level = cl + list(policy.make_query_plan('keyspace', query)) + assert patched_shuffle.call_count == 0, \ + "shuffle should not be called for consistency level %s" % cl + + class ConvictionPolicyTest(unittest.TestCase): def test_not_implemented(self): """ @@ -1389,6 +1418,35 @@ def test_unavailable(self): assert retry == RetryPolicy.RETRY assert consistency == ConsistencyLevel.ONE + def test_serial_consistency_not_downgraded(self): + """ + Test that SERIAL/LOCAL_SERIAL consistency is never downgraded + to a non-serial consistency level by the retry policy. + @jira_ticket PYTHON-1394 + @expected_result retry policy should rethrow or retry on next host + without downgrading serial consistency + + @test_category policy + """ + policy = DowngradingConsistencyRetryPolicy() + + for cl in (ConsistencyLevel.SERIAL, ConsistencyLevel.LOCAL_SERIAL): + # on_read_timeout should rethrow for serial consistency + retry, consistency = policy.on_read_timeout( + query=None, consistency=cl, required_responses=3, + received_responses=1, data_retrieved=True, retry_num=0) + assert retry == RetryPolicy.RETHROW, \ + "Expected RETHROW for serial consistency %s on read timeout" % cl + assert consistency is None + + # on_unavailable should retry on next host without downgrading + retry, consistency = policy.on_unavailable( + query=None, consistency=cl, required_replicas=3, + alive_replicas=1, retry_num=0) + assert retry == RetryPolicy.RETRY_NEXT_HOST, \ + "Expected RETRY_NEXT_HOST for serial consistency %s on unavailable" % cl + assert consistency is None + class ExponentialRetryPolicyTest(unittest.TestCase): def test_calculate_backoff(self):