Examples¶
Hold a lock with auto-renewal¶
Acquire a lock, hold it for an extended period while the client automatically renews the lease in the background:
import asyncio
from dflockd_client.client import DistributedLock
async def main():
async with DistributedLock("foo", acquire_timeout_s=10, lease_ttl_s=20) as lock:
print(f"acquired key={lock.key} token={lock.token} lease={lock.lease}")
await asyncio.sleep(45) # lease renews automatically
print("done critical section")
asyncio.run(main())
FIFO lock ordering¶
Multiple workers competing for the same lock are granted access in FIFO order:
import asyncio
from dflockd_client.client import DistributedLock
async def worker(worker_id: int):
async with DistributedLock("foo", acquire_timeout_s=12) as lock:
print(f"acquired ({worker_id}): {lock.token}")
await asyncio.sleep(1)
print(f"released ({worker_id}): {lock.token}")
async def main():
tasks = [worker(i) for i in range(9)]
await asyncio.gather(*tasks)
asyncio.run(main())
import threading
import time
from dflockd_client.sync_client import DistributedLock
def worker(worker_id: int):
with DistributedLock("foo", acquire_timeout_s=30) as lock:
print(f"acquired ({worker_id}): {lock.token}")
time.sleep(1)
print(f"released ({worker_id}): {lock.token}")
threads = [threading.Thread(target=worker, args=(i,)) for i in range(9)]
for t in threads:
t.start()
for t in threads:
t.join()
Two-phase lock acquisition¶
Split enqueue and wait to notify an external system between joining the queue and blocking:
import asyncio
from dflockd_client.client import DistributedLock
async def main():
lock = DistributedLock("my-key", acquire_timeout_s=10, lease_ttl_s=20)
status = await lock.enqueue() # "acquired" or "queued"
print(f"enqueue: {status}")
await notify_external_system() # your application logic
if await lock.wait(timeout_s=10): # blocks until granted
try:
print(f"lock held: {lock.token}")
await asyncio.sleep(1)
finally:
await lock.release()
asyncio.run(main())
from dflockd_client.sync_client import DistributedLock
lock = DistributedLock("my-key", acquire_timeout_s=10, lease_ttl_s=20)
status = lock.enqueue() # "acquired" or "queued"
print(f"enqueue: {status}")
notify_external_system() # your application logic
if lock.wait(timeout_s=10): # blocks until granted
try:
print(f"lock held: {lock.token}")
finally:
lock.release()
If the lock is free at enqueue time, it is acquired immediately (fast path) and wait() returns True without blocking. The lease auto-renews in the background from the moment of acquisition.
Semaphore — bounded concurrency¶
Use DistributedSemaphore to allow up to N concurrent holders on the same key:
import asyncio
from dflockd_client.client import DistributedSemaphore
async def worker(worker_id: int):
async with DistributedSemaphore("pool", limit=3, acquire_timeout_s=30) as sem:
print(f"acquired ({worker_id}): {sem.token}")
await asyncio.sleep(1)
print(f"released ({worker_id}): {sem.token}")
async def main():
tasks = [worker(i) for i in range(9)]
await asyncio.gather(*tasks)
asyncio.run(main())
import threading
import time
from dflockd_client.sync_client import DistributedSemaphore
def worker(worker_id: int):
with DistributedSemaphore("pool", limit=3, acquire_timeout_s=30) as sem:
print(f"acquired ({worker_id}): {sem.token}")
time.sleep(1)
print(f"released ({worker_id}): {sem.token}")
threads = [threading.Thread(target=worker, args=(i,)) for i in range(9)]
for t in threads:
t.start()
for t in threads:
t.join()
Up to 3 workers run concurrently; the remaining workers wait in FIFO order for a slot to open.
Two-phase semaphore acquisition¶
Split enqueue and wait for semaphores, just like locks:
import asyncio
from dflockd_client.client import DistributedSemaphore
async def main():
sem = DistributedSemaphore("my-key", limit=3, acquire_timeout_s=10, lease_ttl_s=20)
status = await sem.enqueue() # "acquired" or "queued"
print(f"enqueue: {status}")
await notify_external_system() # your application logic
if await sem.wait(timeout_s=10): # blocks until granted
try:
print(f"semaphore held: {sem.token}")
await asyncio.sleep(1)
finally:
await sem.release()
asyncio.run(main())
from dflockd_client.sync_client import DistributedSemaphore
sem = DistributedSemaphore("my-key", limit=3, acquire_timeout_s=10, lease_ttl_s=20)
status = sem.enqueue() # "acquired" or "queued"
print(f"enqueue: {status}")
notify_external_system() # your application logic
if sem.wait(timeout_s=10): # blocks until granted
try:
print(f"semaphore held: {sem.token}")
finally:
sem.release()
Server stats¶
Query the server for its current state — connections, held locks, and active semaphores:
import asyncio
from dflockd_client.client import stats
async def main():
reader, writer = await asyncio.open_connection("127.0.0.1", 6388)
result = await stats(reader, writer)
print(f"connections: {result['connections']}")
print(f"locks: {result['locks']}")
print(f"semaphores: {result['semaphores']}")
writer.close()
await writer.wait_closed()
asyncio.run(main())
import socket
from dflockd_client.sync_client import stats
sock = socket.create_connection(("127.0.0.1", 6388))
rfile = sock.makefile("r", encoding="utf-8")
result = stats(sock, rfile)
print(f"connections: {result['connections']}")
print(f"locks: {result['locks']}")
print(f"semaphores: {result['semaphores']}")
rfile.close()
sock.close()
Authentication¶
Connect to a dflockd server started with --auth-token:
The same auth_token parameter works on DistributedSemaphore. A PermissionError is raised if the token is wrong.
TLS connection¶
Connect to a TLS-enabled dflockd server using an ssl.SSLContext:
import asyncio
import ssl
from dflockd_client.client import DistributedLock
async def main():
ctx = ssl.create_default_context() # uses system CA bundle
# or: ctx = ssl.create_default_context(cafile="/path/to/ca.pem")
async with DistributedLock("my-key", ssl_context=ctx) as lock:
print(f"token={lock.token} lease={lock.lease}")
asyncio.run(main())
The same ssl_context parameter works on DistributedSemaphore.
Signals — pub/sub messaging¶
Subscribe to channels with wildcard patterns and receive signals in real time:
import asyncio
from dflockd_client.client import SignalConn
async def main():
async with SignalConn(server=("127.0.0.1", 6388)) as listener:
await listener.listen("events.>")
# Emit from a separate connection
async with SignalConn(server=("127.0.0.1", 6388)) as emitter:
n = await emitter.emit("events.user.login", "alice")
print(f"delivered to {n} listener(s)")
async for sig in listener:
print(f"{sig.channel}: {sig.payload}")
break
asyncio.run(main())
from dflockd_client.sync_client import SignalConn
with SignalConn(server=("127.0.0.1", 6388)) as listener:
listener.listen("events.>")
with SignalConn(server=("127.0.0.1", 6388)) as emitter:
n = emitter.emit("events.user.login", "alice")
print(f"delivered to {n} listener(s)")
for sig in listener:
print(f"{sig.channel}: {sig.payload}")
break
Signal queue groups¶
Load-balance signals across multiple consumers using queue groups. Within a group, each signal is delivered to exactly one member via round-robin:
import asyncio
from dflockd_client.client import SignalConn
async def worker(name: str, host: str, port: int):
async with SignalConn(server=(host, port)) as sc:
await sc.listen("jobs.>", group="workers")
async for sig in sc:
print(f"{name} got: {sig.channel} {sig.payload}")
async def main():
tasks = [worker(f"w{i}", "127.0.0.1", 6388) for i in range(3)]
await asyncio.gather(*tasks)
asyncio.run(main())
import threading
from dflockd_client.sync_client import SignalConn
def worker(name: str, host: str, port: int):
with SignalConn(server=(host, port)) as sc:
sc.listen("jobs.>", group="workers")
for sig in sc:
print(f"{name} got: {sig.channel} {sig.payload}")
threads = [threading.Thread(target=worker, args=(f"w{i}", "127.0.0.1", 6388)) for i in range(3)]
for t in threads:
t.start()
Multi-server sharding¶
Distribute keys across multiple dflockd instances. Each key deterministically routes to the same server:
from dflockd_client.sync_client import DistributedLock
servers = [("server1", 6388), ("server2", 6388), ("server3", 6388)]
with DistributedLock("my-key", servers=servers) as lock:
print(f"token={lock.token} lease={lock.lease}")
Custom sharding strategy¶
Override the default CRC-32 sharding with your own logic:
from dflockd_client.sync_client import DistributedLock
def my_strategy(key: str, num_servers: int) -> int:
"""Route all keys to the first server."""
return 0
servers = [("server1", 6388), ("server2", 6388)]
with DistributedLock("my-key", servers=servers, sharding_strategy=my_strategy) as lock:
print(f"token={lock.token}")