Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions hazelcast/asyncio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
__all__ = [
"AtomicLong",
"CPSubsystem",
"EntryEventCallable",
"Executor",
"HazelcastClient",
Expand Down Expand Up @@ -27,3 +29,5 @@
from hazelcast.internal.asyncio_proxy.set import Set
from hazelcast.internal.asyncio_proxy.vector_collection import VectorCollection
from hazelcast.internal.asyncio_proxy.reliable_topic import ReliableTopic, ReliableMessageListener
from hazelcast.internal.asyncio_proxy.cp_manager import CPSubsystem
from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong
7 changes: 7 additions & 0 deletions hazelcast/internal/asyncio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from hazelcast.discovery import HazelcastCloudAddressProvider
from hazelcast.errors import IllegalStateError, InvalidConfigurationError
from hazelcast.internal.asyncio_invocation import InvocationService, Invocation
from hazelcast.internal.asyncio_proxy.cp_manager import CPSubsystem
from hazelcast.internal.asyncio_proxy.pn_counter import PNCounter
from hazelcast.internal.asyncio_proxy.vector_collection import VectorCollection
from hazelcast.internal.asyncio_sql import _InternalSqlService, SqlService
Expand Down Expand Up @@ -207,6 +208,7 @@ def __init__(self, config: Config | None = None, **kwargs):
self._compact_schema_service,
)
self._proxy_manager = ProxyManager(self._context)
self._cp_subsystem = CPSubsystem(self._context)
self._lock_reference_id_generator = AtomicInteger(1)
self._statistics = Statistics(
self,
Expand Down Expand Up @@ -528,6 +530,11 @@ def sql(self) -> SqlService:
"""Returns a service to execute distributed SQL queries."""
return self._sql_service

@property
def cp_subsystem(self) -> CPSubsystem:
"""CP Subsystem offers set of in-memory linearizable data structures."""
return self._cp_subsystem

def _create_address_provider(self):
config = self._config
cluster_members = config.cluster_members
Expand Down
255 changes: 255 additions & 0 deletions hazelcast/internal/asyncio_proxy/atomic_long.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
import typing

from hazelcast.internal.asyncio_proxy.cp import BaseCPProxy
from hazelcast.protocol.codec import (
atomic_long_add_and_get_codec,
atomic_long_compare_and_set_codec,
atomic_long_get_codec,
atomic_long_get_and_add_codec,
atomic_long_get_and_set_codec,
atomic_long_alter_codec,
atomic_long_apply_codec,
)
from hazelcast.serialization.compact import SchemaNotReplicatedError
from hazelcast.util import check_is_int, check_not_none


class AtomicLong(BaseCPProxy):
"""AtomicLong is a redundant and highly available distributed counter
for 64-bit integers (``long`` type in Java).

It works on top of the Raft consensus algorithm. It offers linearizability
during crash failures and network partitions. It is CP with respect to
the CAP principle. If a network partition occurs, it remains available
on at most one side of the partition.

AtomicLong implementation does not offer exactly-once / effectively-once
execution semantics. It goes with at-least-once execution semantics
by default and can cause an API call to be committed multiple times
in case of CP member failures. It can be tuned to offer at-most-once
execution semantics. Please see `fail-on-indeterminate-operation-state`
server-side setting.
"""

async def add_and_get(self, delta: int) -> int:
"""Atomically adds the given value to the current value.

Args:
delta: The value to add to the current value.

Returns:
The updated value, the given value added to the current value.
"""
check_is_int(delta)
codec = atomic_long_add_and_get_codec
request = codec.encode_request(self._group_id, self._object_name, delta)
return await self._ainvoke(request, codec.decode_response)

async def compare_and_set(self, expect: int, update: int) -> bool:
"""Atomically sets the value to the given updated value
only if the current value equals the expected value.

Args:
expect: The expected value.
update: The new value.

Returns:
``True`` if successful; or ``False`` if the actual value was not
equal to the expected value.
"""
check_is_int(expect)
check_is_int(update)
codec = atomic_long_compare_and_set_codec
request = codec.encode_request(self._group_id, self._object_name, expect, update)
return await self._ainvoke(request, codec.decode_response)

async def decrement_and_get(self) -> int:
"""Atomically decrements the current value by one.

Returns:
The updated value, the current value decremented by one.
"""
return await self.add_and_get(-1)

async def get_and_decrement(self) -> int:
"""Atomically decrements the current value by one.

Returns:
The old value.
"""
return await self.get_and_add(-1)

async def get(self) -> int:
"""Gets the current value.

Returns:
The current value.
"""
codec = atomic_long_get_codec
request = codec.encode_request(self._group_id, self._object_name)
return await self._ainvoke(request, codec.decode_response)

async def get_and_add(self, delta: int) -> int:
"""Atomically adds the given value to the current value.

Args:
delta: The value to add to the current value.

Returns:
The old value before the add.
"""
check_is_int(delta)
codec = atomic_long_get_and_add_codec
request = codec.encode_request(self._group_id, self._object_name, delta)
return await self._ainvoke(request, codec.decode_response)

async def get_and_set(self, new_value: int) -> int:
"""Atomically sets the given value and returns the old value.

Args:
new_value: The new value.

Returns:
The old value.
"""
check_is_int(new_value)
codec = atomic_long_get_and_set_codec
request = codec.encode_request(self._group_id, self._object_name, new_value)
return await self._ainvoke(request, codec.decode_response)

async def increment_and_get(self) -> int:
"""Atomically increments the current value by one.

Returns:
The updated value, the current value incremented by one.
"""
return await self.add_and_get(1)

async def get_and_increment(self) -> int:
"""Atomically increments the current value by one.

Returns:
The old value.
"""
return await self.get_and_add(1)

async def set(self, new_value: int) -> None:
"""Atomically sets the given value.

Args:
new_value: The new value
"""
check_is_int(new_value)
codec = atomic_long_get_and_set_codec
request = codec.encode_request(self._group_id, self._object_name, new_value)
return await self._ainvoke(request)

async def alter(self, function: typing.Any) -> None:
"""Alters the currently stored value by applying a function on it.

Notes:
``function`` must be an instance of Hazelcast serializable type.
It must have a counterpart registered in the server-side that
implements the ``com.hazelcast.core.IFunction`` interface with
the actual logic of the function to be applied.

Args:
function: The function that alters the currently stored value.
"""
check_not_none(function, "Function cannot be None")
try:
function_data = self._to_data(function)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.alter, function)

codec = atomic_long_alter_codec
# 1 means return the new value.
# There is no way to tell server to return nothing as of now (30.09.2020)
# The new value is `long` (comes with the initial frame) and we
# don't try to decode it. So, this shouldn't cause any problems.
request = codec.encode_request(self._group_id, self._object_name, function_data, 1)
return await self._ainvoke(request)

async def alter_and_get(self, function: typing.Any) -> int:
"""Alters the currently stored value by applying a function on it and
gets the result.

Notes:
``function`` must be an instance of Hazelcast serializable type.
It must have a counterpart registered in the server-side that
implements the ``com.hazelcast.core.IFunction`` interface with
the actual logic of the function to be applied.

Args:
function: The function that alters the currently stored value.

Returns:
The new value.
"""
check_not_none(function, "Function cannot be None")
try:
function_data = self._to_data(function)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.alter_and_get, function)

codec = atomic_long_alter_codec
# 1 means return the new value.
request = codec.encode_request(self._group_id, self._object_name, function_data, 1)
return await self._ainvoke(request, codec.decode_response)

async def get_and_alter(self, function: typing.Any) -> int:
"""Alters the currently stored value by applying a function on it and
gets the old value.

Notes:
``function`` must be an instance of Hazelcast serializable type.
It must have a counterpart registered in the server-side that
implements the ``com.hazelcast.core.IFunction`` interface with
the actual logic of the function to be applied.

Args:
function: The function that alters the currently stored value.

Returns:
The old value.
"""
check_not_none(function, "Function cannot be None")
try:
function_data = self._to_data(function)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.get_and_alter, function)

codec = atomic_long_alter_codec
# 0 means return the old value.
request = codec.encode_request(self._group_id, self._object_name, function_data, 0)
return await self._ainvoke(request, codec.decode_response)

async def apply(self, function: typing.Any) -> typing.Any:
"""Applies a function on the value, the actual stored value will not
change.

Notes:
``function`` must be an instance of Hazelcast serializable type.
It must have a counterpart registered in the server-side that
implements the ``com.hazelcast.core.IFunction`` interface with
the actual logic of the function to be applied.

Args:
function: The function applied to the currently stored value.

Returns:
The result of the function application.
"""
check_not_none(function, "Function cannot be None")
try:
function_data = self._to_data(function)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.apply, function)

codec = atomic_long_apply_codec
request = codec.encode_request(self._group_id, self._object_name, function_data)

def handler(response):
return self._to_object(codec.decode_response(response))

return await self._ainvoke(request, handler)
34 changes: 34 additions & 0 deletions hazelcast/internal/asyncio_proxy/cp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from hazelcast.internal.asyncio_invocation import Invocation
from hazelcast.protocol.codec import cp_group_destroy_cp_object_codec


def _no_op_response_handler(_):
return None


class BaseCPProxy:
def __init__(self, context, group_id, service_name, proxy_name, object_name):
self._group_id = group_id
self._service_name = service_name
self._proxy_name = proxy_name
self._object_name = object_name
self._invocation_service = context.invocation_service
serialization_service = context.serialization_service
self._to_data = serialization_service.to_data
self._to_object = serialization_service.to_object
self._send_schema_and_retry = context.compact_schema_service.send_schema_and_retry

async def destroy(self) -> None:
"""Destroys this proxy."""
codec = cp_group_destroy_cp_object_codec
request = codec.encode_request(self._group_id, self._service_name, self._object_name)
return await self._ainvoke(request)

def _invoke(self, request, response_handler=_no_op_response_handler):
invocation = Invocation(request, response_handler=response_handler)
self._invocation_service.invoke(invocation)
return invocation.future

async def _ainvoke(self, request, response_handler=_no_op_response_handler):
fut = self._invoke(request, response_handler)
return await fut
Loading
Loading