Retry Logic¶
Retry failed operations with configurable backoff.
Basic Retry¶
from kompoz import Retry
# Simple retry
fetch = Retry(fetch_from_api, max_attempts=3)
# Exponential backoff
fetch = Retry(
fetch_from_api,
max_attempts=5,
backoff=1.0, # Initial delay in seconds
exponential=True, # Double delay each attempt
jitter=0.1 # Random jitter to avoid thundering herd
)
ok, result = fetch.run(request)
Observability Hooks¶
Retry combinators support observability via callbacks and state tracking:
from kompoz import Retry, AsyncRetry
# Callback for monitoring retries
def on_retry(attempt: int, error: Exception | None, delay: float):
print(f"Retry {attempt}: error={error}, waiting {delay}s")
metrics.increment("api.retries", tags={"attempt": attempt})
fetch = Retry(
fetch_from_api,
max_attempts=3,
backoff=1.0,
on_retry=on_retry # Called before each retry
)
ok, result = fetch.run(request)
# After execution, check state
print(f"Total attempts: {fetch.attempts_made}")
print(f"Last error: {fetch.last_error}")
Thread-safe alternative: run_with_info()¶
Like last_error on transforms, the attempts_made and last_error attributes on
Retry / AsyncRetry are mutated on each call. Use run_with_info() for a pure
alternative that returns all metadata in a RetryResult:
info = fetch.run_with_info(request)
print(f"ok={info.ok}, attempts={info.attempts_made}, error={info.last_error}")
Note
run_with_info() is available on both Retry and AsyncRetry.
Async Retry¶
For async retries, the callback can be sync or async:
async def on_retry_async(attempt, error, delay):
await log_to_service(f"Retry {attempt}")
fetch = AsyncRetry(
fetch_from_api,
max_attempts=3,
on_retry=on_retry_async # Async callback supported
)
DSL Integration¶
Use the :retry modifier in DSL expressions: