Python API¶
dflockd_client (top-level exports)¶
The package provides convenience imports at the top level so you don't need to reach into submodules:
from dflockd_client import (
AsyncDistributedLock,
AsyncDistributedSemaphore,
AsyncSignalConn,
SyncDistributedLock,
SyncDistributedSemaphore,
SyncSignalConn,
Signal,
StatsResult,
__version__,
DEFAULT_SERVERS,
ShardingStrategy,
stable_hash_shard,
)
| Name | Type | Description |
|---|---|---|
AsyncDistributedLock |
class |
Alias for dflockd_client.client.DistributedLock |
AsyncDistributedSemaphore |
class |
Alias for dflockd_client.client.DistributedSemaphore |
SyncDistributedLock |
class |
Alias for dflockd_client.sync_client.DistributedLock |
SyncDistributedSemaphore |
class |
Alias for dflockd_client.sync_client.DistributedSemaphore |
StatsResult |
TypedDict |
Return type of stats() with connections, locks, semaphores, idle_locks, idle_semaphores |
__version__ |
str |
Installed package version (e.g. "1.7.1") |
DEFAULT_SERVERS |
tuple[tuple[str, int], ...] |
Default server list: (("127.0.0.1", 6388),) |
ShardingStrategy |
Callable[[str, int], int] |
Type alias for sharding callables |
stable_hash_shard |
function |
Default CRC-32 sharding strategy |
Signal |
NamedTuple |
Received signal with channel and payload fields |
AsyncSignalConn |
class |
Alias for dflockd_client.client.SignalConn |
SyncSignalConn |
class |
Alias for dflockd_client.sync_client.SignalConn |
dflockd_client.client (async)¶
DistributedLock¶
@dataclass
class DistributedLock:
key: str
_: KW_ONLY
acquire_timeout_s: int = 10
lease_ttl_s: int | None = None
servers: list[tuple[str, int]] = [("127.0.0.1", 6388)]
sharding_strategy: ShardingStrategy = stable_hash_shard
renew_ratio: float = 0.5
ssl_context: ssl.SSLContext | None = None
auth_token: str | None = None
connect_timeout_s: float = 10
key is the only positional parameter. All others are keyword-only.
Methods:
| Method | Returns | Description |
|---|---|---|
await acquire() |
bool |
Acquire the lock. Returns False on timeout |
await enqueue() |
str |
Two-phase step 1: join queue. Returns "acquired" or "queued" |
await wait(timeout_s=None) |
bool |
Two-phase step 2: block until granted. Returns False on timeout |
await release() |
bool |
Release the lock and stop renewal |
await aclose() |
None |
Close the connection and clean up |
Context manager:
Raises TimeoutError if the lock cannot be acquired.
Low-level functions¶
acquire¶
async def acquire(
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
key: str,
acquire_timeout_s: int,
lease_ttl_s: int | None = None,
) -> tuple[str, int]
Send a lock request. Returns (token, lease_ttl). Raises TimeoutError on timeout.
release¶
async def release(
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
key: str,
token: str,
) -> None
Send a release request. Raises RuntimeError on failure.
renew¶
async def renew(
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
key: str,
token: str,
lease_ttl_s: int | None = None,
) -> int
Send a renew request. Returns seconds remaining, or -1 if not reported by the server. Raises RuntimeError on failure.
enqueue¶
async def enqueue(
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
key: str,
lease_ttl_s: int | None = None,
) -> tuple[str, str | None, int | None]
Two-phase step 1: join the FIFO queue. Returns (status, token, lease) where status is "acquired" or "queued". Raises RuntimeError on failure.
wait¶
async def wait(
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
key: str,
wait_timeout_s: int,
) -> tuple[str, int]
Two-phase step 2: block until lock is granted. Returns (token, lease). Raises TimeoutError on timeout.
stats¶
Query server state. Returns a StatsResult TypedDict with connections (int), locks (list), semaphores (list), idle_locks (list), and idle_semaphores (list). Raises RuntimeError on failure.
DistributedSemaphore¶
@dataclass
class DistributedSemaphore:
key: str
_: KW_ONLY
limit: int
acquire_timeout_s: int = 10
lease_ttl_s: int | None = None
servers: list[tuple[str, int]] = [("127.0.0.1", 6388)]
sharding_strategy: ShardingStrategy = stable_hash_shard
renew_ratio: float = 0.5
ssl_context: ssl.SSLContext | None = None
auth_token: str | None = None
connect_timeout_s: float = 10
key is the only positional parameter. All others (including limit) are keyword-only.
Methods:
| Method | Returns | Description |
|---|---|---|
await acquire() |
bool |
Acquire a semaphore slot. Returns False on timeout |
await enqueue() |
str |
Two-phase step 1: join queue. Returns "acquired" or "queued" |
await wait(timeout_s=None) |
bool |
Two-phase step 2: block until granted. Returns False on timeout |
await release() |
bool |
Release the slot and stop renewal |
await aclose() |
None |
Close the connection and clean up |
Context manager:
Raises TimeoutError if a slot cannot be acquired.
Semaphore low-level functions¶
sem_acquire¶
async def sem_acquire(
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
key: str,
acquire_timeout_s: int,
limit: int,
lease_ttl_s: int | None = None,
) -> tuple[str, int]
Send a semaphore acquire request. Returns (token, lease_ttl). Raises TimeoutError on timeout, RuntimeError on errors (including error_limit_mismatch).
sem_release¶
async def sem_release(
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
key: str,
token: str,
) -> None
sem_renew¶
async def sem_renew(
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
key: str,
token: str,
lease_ttl_s: int | None = None,
) -> int
sem_enqueue¶
async def sem_enqueue(
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
key: str,
limit: int,
lease_ttl_s: int | None = None,
) -> tuple[str, str | None, int | None]
Two-phase step 1: join the semaphore FIFO queue. Returns (status, token, lease) where status is "acquired" or "queued".
sem_wait¶
async def sem_wait(
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
key: str,
wait_timeout_s: int,
) -> tuple[str, int]
Two-phase step 2: block until a semaphore slot is granted. Returns (token, lease). Raises TimeoutError on timeout.
SignalConn¶
@dataclass
class SignalConn:
server: tuple[str, int] = ("127.0.0.1", 6388)
ssl_context: ssl.SSLContext | None = None
auth_token: str | None = None
connect_timeout_s: float = 10
Methods:
| Method | Returns | Description |
|---|---|---|
await connect() |
None |
Connect to the server and start background reader |
await listen(pattern, *, group="") |
None |
Subscribe to signals matching pattern |
await unlisten(pattern, *, group="") |
None |
Remove a signal subscription |
await emit(channel, payload) |
int |
Publish a signal. Returns delivery count |
await aclose() |
None |
Close the connection |
Context manager / iteration:
async with SignalConn(server=("127.0.0.1", 6388)) as sc:
await sc.listen("events.>")
async for sig in sc: # sig.channel, sig.payload
...
Property:
| Property | Type | Description |
|---|---|---|
signals |
asyncio.Queue[Signal \| None] |
Queue of received signals. None sentinel indicates connection closed |
Low-level signal functions¶
sig_emit¶
async def sig_emit(
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
channel: str,
payload: str,
) -> int
Emit a signal on a literal channel (no wildcards). Returns the number of listeners delivered to. Works on a plain reader/writer pair without a SignalConn.
dflockd_client.sync_client¶
DistributedLock¶
@dataclass
class DistributedLock:
key: str
_: KW_ONLY
acquire_timeout_s: int = 10
lease_ttl_s: int | None = None
servers: list[tuple[str, int]] = [("127.0.0.1", 6388)]
sharding_strategy: ShardingStrategy = stable_hash_shard
renew_ratio: float = 0.5
ssl_context: ssl.SSLContext | None = None
auth_token: str | None = None
connect_timeout_s: float = 10
key is the only positional parameter. All others are keyword-only.
Methods:
| Method | Returns | Description |
|---|---|---|
acquire() |
bool |
Acquire the lock. Returns False on timeout |
enqueue() |
str |
Two-phase step 1: join queue. Returns "acquired" or "queued" |
wait(timeout_s=None) |
bool |
Two-phase step 2: block until granted. Returns False on timeout |
release() |
bool |
Release the lock and stop renewal |
close() |
None |
Close the connection and clean up |
Context manager:
Low-level functions¶
acquire¶
def acquire(
sock: socket.socket,
rfile: io.TextIOWrapper,
key: str,
acquire_timeout_s: int,
lease_ttl_s: int | None = None,
) -> tuple[str, int]
release¶
renew¶
def renew(
sock: socket.socket,
rfile: io.TextIOWrapper,
key: str,
token: str,
lease_ttl_s: int | None = None,
) -> int
enqueue¶
def enqueue(
sock: socket.socket,
rfile: io.TextIOWrapper,
key: str,
lease_ttl_s: int | None = None,
) -> tuple[str, str | None, int | None]
Two-phase step 1: join the FIFO queue. Returns (status, token, lease) where status is "acquired" or "queued".
wait¶
def wait(
sock: socket.socket,
rfile: io.TextIOWrapper,
key: str,
wait_timeout_s: int,
) -> tuple[str, int]
Two-phase step 2: block until lock is granted. Returns (token, lease). Raises TimeoutError on timeout.
stats¶
Query server state. Returns a StatsResult TypedDict with connections (int), locks (list), semaphores (list), idle_locks (list), and idle_semaphores (list). Raises RuntimeError on failure.
DistributedSemaphore¶
@dataclass
class DistributedSemaphore:
key: str
_: KW_ONLY
limit: int
acquire_timeout_s: int = 10
lease_ttl_s: int | None = None
servers: list[tuple[str, int]] = [("127.0.0.1", 6388)]
sharding_strategy: ShardingStrategy = stable_hash_shard
renew_ratio: float = 0.5
ssl_context: ssl.SSLContext | None = None
auth_token: str | None = None
connect_timeout_s: float = 10
key is the only positional parameter. All others (including limit) are keyword-only.
Methods:
| Method | Returns | Description |
|---|---|---|
acquire() |
bool |
Acquire a semaphore slot. Returns False on timeout |
enqueue() |
str |
Two-phase step 1: join queue. Returns "acquired" or "queued" |
wait(timeout_s=None) |
bool |
Two-phase step 2: block until granted. Returns False on timeout |
release() |
bool |
Release the slot and stop renewal |
close() |
None |
Close the connection and clean up |
Context manager:
Semaphore low-level functions¶
sem_acquire¶
def sem_acquire(
sock: socket.socket,
rfile: io.TextIOWrapper,
key: str,
acquire_timeout_s: int,
limit: int,
lease_ttl_s: int | None = None,
) -> tuple[str, int]
sem_release¶
sem_renew¶
def sem_renew(
sock: socket.socket,
rfile: io.TextIOWrapper,
key: str,
token: str,
lease_ttl_s: int | None = None,
) -> int
sem_enqueue¶
def sem_enqueue(
sock: socket.socket,
rfile: io.TextIOWrapper,
key: str,
limit: int,
lease_ttl_s: int | None = None,
) -> tuple[str, str | None, int | None]
Two-phase step 1: join the semaphore FIFO queue. Returns (status, token, lease) where status is "acquired" or "queued".
sem_wait¶
def sem_wait(
sock: socket.socket,
rfile: io.TextIOWrapper,
key: str,
wait_timeout_s: int,
) -> tuple[str, int]
Two-phase step 2: block until a semaphore slot is granted. Returns (token, lease). Raises TimeoutError on timeout.
SignalConn¶
@dataclass
class SignalConn:
server: tuple[str, int] = ("127.0.0.1", 6388)
ssl_context: ssl.SSLContext | None = None
auth_token: str | None = None
connect_timeout_s: float = 10
Methods:
| Method | Returns | Description |
|---|---|---|
connect() |
None |
Connect to the server and start background reader thread |
listen(pattern, *, group="") |
None |
Subscribe to signals matching pattern |
unlisten(pattern, *, group="") |
None |
Remove a signal subscription |
emit(channel, payload) |
int |
Publish a signal. Returns delivery count |
close() |
None |
Close the connection |
Context manager / iteration:
with SignalConn(server=("127.0.0.1", 6388)) as sc:
sc.listen("events.>")
for sig in sc: # sig.channel, sig.payload
...
Property:
| Property | Type | Description |
|---|---|---|
signals |
queue.Queue[Signal \| None] |
Queue of received signals. None sentinel indicates connection closed |
Low-level signal functions¶
sig_emit¶
Emit a signal on a literal channel (no wildcards). Returns the number of listeners delivered to. Works on a plain sock/rfile pair without a SignalConn.
dflockd_client.sharding¶
ShardingStrategy¶
A callable that maps (key, num_servers) to a server index.
stable_hash_shard¶
Default sharding strategy using zlib.crc32. Deterministic across processes regardless of PYTHONHASHSEED.