Skip to content

Sync Client

The sync client uses the standard library's socket and threading for blocking operations. Use it from synchronous scripts, threaded servers, or any place asyncio isn't appropriate.

Imports

from dflockd_client import (
    SyncDistributedLock,
    SyncDistributedSemaphore,
    SyncConn,
)

DistributedLock

from dflockd_client import SyncDistributedLock

with SyncDistributedLock("my-key", acquire_timeout_s=10) as lock:
    print(f"token={lock.token} lease={lock.lease}")

If acquire_timeout_s elapses without a grant, __enter__ raises TimeoutError. The lease auto-renews in a daemon thread for as long as the lock is held; the thread is stopped on exit.

Manual acquire / release

lock = SyncDistributedLock("my-key", acquire_timeout_s=10)
if lock.acquire():
    try:
        ...
    finally:
        lock.release()

acquire() returns False on a server-side timeout, raises any other error.

Parameters

key is the only positional parameter; everything else is keyword-only.

Parameter Type Default Description
key str (required) Lock name
acquire_timeout_s int 10 Server-side wait, in seconds. 0 is non-blocking try-lock
lease_ttl_s int \| None None Lease duration. None uses the server's --default-lease-ttl
servers list[tuple[str, int]] [("127.0.0.1", 6388)] Server addresses
sharding_strategy ShardingStrategy stable_hash_shard (key, n) → index
renew_ratio float 0.5 Renew at lease * ratio seconds
ssl_context ssl.SSLContext \| None None TLS context
auth_token str \| None None Shared secret for --auth-token servers
connect_timeout_s float 10.0 TCP connect timeout

Methods

Method Returns Notes
acquire() bool False on server-side timeout
enqueue() str Two-phase step 1: "acquired" or "queued"
wait(timeout_s=None) bool Two-phase step 2; False on timeout
release() bool Stop renewal, send release, close connection
close() None Stop renewal, close connection (no release sent)

Attributes

Attribute Type Description
token str \| None Token issued by the server. None if not held
lease int Most recent server-reported remaining seconds

Two-phase acquisition

lock = SyncDistributedLock("my-key", acquire_timeout_s=10)
status = lock.enqueue()           # "acquired" or "queued"

if status == "acquired":
    ...
else:
    if lock.wait(timeout_s=10):   # blocks server-side up to 10s
        try:
            ...
        finally:
            lock.release()

enqueue() is non-blocking. The fast-path return is "acquired", in which case renewal starts immediately and wait() is a no-op. Otherwise the call returns "queued" and the same instance must be used for the follow-up wait().

If wait() times out, the connection is closed; you must enqueue() again to re-queue.

Background renewal

Once a lock is held, a daemon threading.Thread sends renew requests every lease * renew_ratio seconds. Renewal failure is logged and the thread exits. The client closes the broken connection, clears token and lease, and the server-side lease expires on its own if needed. Renewal stops on release(), close(), or context-manager exit.

If the instance is garbage-collected while still holding a connection, __del__ closes the underlying socket and emits a ResourceWarning.

DistributedSemaphore

Same shape as DistributedLock plus a required limit:

from dflockd_client import SyncDistributedSemaphore

with SyncDistributedSemaphore("pool", limit=3, acquire_timeout_s=10) as sem:
    print(f"token={sem.token}")

A Semaphore with limit=1 is exactly equivalent to a Lock. Mixing the two on the same key surfaces LimitMismatchError.

Methods

Identical to DistributedLock — the difference is purely the limit-aware wire commands (sl/se/sw/sr/sn).

Low-level API

For applications that want explicit connection management — e.g. holding many short-lived locks under a single TCP connection — drop down to SyncConn and the dflockd_client._sync module-level functions.

import socket
from dflockd_client import SyncConn
from dflockd_client._sync import acquire, release, renew

sock = socket.create_connection(("127.0.0.1", 6388))
conn = SyncConn(sock)
try:
    token, lease = acquire(conn, "my-key", acquire_timeout_s=10)
    remaining = renew(conn, "my-key", token)
    release(conn, "my-key", token)
finally:
    conn.close()

SyncConn is safe for concurrent use from multiple threads — an internal mutex serialises request/response pairs, so concurrent acquire(conn, "k1", ...) and acquire(conn, "k2", ...) from different threads share one TCP socket.

Functions

# locks
def acquire(conn, key, acquire_timeout_s, lease_ttl_s=None, *, prefix="", limit=None) -> tuple[str, int]
def release(conn, key, token, *, prefix="") -> None
def renew(conn, key, token, lease_ttl_s=None, *, prefix="", read_timeout=30.0) -> int
def enqueue(conn, key, lease_ttl_s=None, *, prefix="", limit=None) -> tuple[str, str | None, int | None]
def wait(conn, key, wait_timeout_s, *, prefix="") -> tuple[str, int]

# semaphore convenience wrappers (same with prefix="s" and limit set)
def sem_acquire(conn, key, acquire_timeout_s, limit, lease_ttl_s=None) -> tuple[str, int]
def sem_release(conn, key, token) -> None
def sem_renew(conn, key, token, lease_ttl_s=None) -> int
def sem_enqueue(conn, key, limit, lease_ttl_s=None) -> tuple[str, str | None, int | None]
def sem_wait(conn, key, wait_timeout_s) -> tuple[str, int]

# misc
def stats(conn) -> StatsResult
def authenticate(conn, auth_token) -> None
def open_conn(host, port, *, ssl_context, connect_timeout_s) -> SyncConn

acquire/enqueue enforce a (prefix, limit) invariant: prefix="" (lock) rejects limit; prefix="s" (semaphore) requires it. The sem_* wrappers exist so callers don't have to spell out prefix="s" manually.

If a low-level call raises, treat the connection as broken — close it and dial a fresh one.

Async equivalents

The async client provides the same surface with async/await. See Async Client.