Concurrency¶
AsyncTimeout¶
Wrap an async combinator with a timeout. If execution exceeds the timeout, returns (False, ctx) or calls the on_timeout handler.
Fields:
inner: AsyncCombinator[T]-- The combinator to wraptimeout: float-- Timeout in secondson_timeout: Callable[[T], T] | None-- Optional callback to modify context on timeouttimed_out: bool-- Whether the last execution timed out (not concurrency-safe)
with_timeout¶
with_timeout(
combinator: AsyncCombinator[T],
timeout: float,
on_timeout: Callable[[T], T] | None = None,
) -> AsyncTimeout[T]
Factory function for AsyncTimeout.
AsyncLimited¶
Limit concurrent executions using a semaphore.
Constructor:
Parameters:
inner-- The combinator to wrapmax_concurrent-- Maximum number of concurrent executionsname-- Optional name to share semaphore across limiters
limited¶
limited(
combinator: AsyncCombinator[T],
max_concurrent: int,
name: str | None = None,
) -> AsyncLimited[T]
Factory function for AsyncLimited.
from kompoz import limited
# Instance-specific limit
limited_api = limited(api_check, max_concurrent=5)
# Shared limit across multiple combinators
check_a = limited(api_a, max_concurrent=10, name="api_pool")
check_b = limited(api_b, max_concurrent=10, name="api_pool")
AsyncCircuitBreaker¶
Circuit breaker pattern for fault tolerance.
Constructor:
AsyncCircuitBreaker(
inner: AsyncCombinator[T],
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
half_open_max_calls: int = 1,
on_state_change: Callable | None = None,
)
Properties:
state: CircuitState-- Current circuit state
Methods:
async run(ctx: T) -> tuple[bool, T]-- Execute (or reject if circuit is open)get_stats() -> CircuitBreakerStats-- Get current statisticsasync reset() -> None-- Manually reset to CLOSED state
circuit_breaker¶
circuit_breaker(
combinator: AsyncCombinator[T],
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
half_open_max_calls: int = 1,
on_state_change: Callable | None = None,
) -> AsyncCircuitBreaker[T]
Factory function for AsyncCircuitBreaker.
from kompoz import circuit_breaker
protected = circuit_breaker(flaky_api, failure_threshold=5, recovery_timeout=30.0)
CircuitState¶
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Requests rejected
HALF_OPEN = "half_open" # Testing recovery