Task Retries
One of the most powerful features of tasks is the automatic retry logic. This is very useful when for example an API is temporarily not available or a particular file or data partition has not landed yet. This functionality allows us to fix these issues by retrying the Task based on some delay. Let's see some examples
Task
Below is an example of a basis retry. It retries the task trhee times if not successful
from prefect import task
import random
@task(retries = 3)
def unstable_api_call():
if random.random() < .7:
raise Exception("API temporarily unavaible")
return {"status": "success"}
result = unstable_api_call()
result
Retry Delays
Typically, when we implement a retry, it is often useful to specify a delay period as running a task almost immediately can result in the same issue. There are two types of delays.
Default delay
This type of delay means you set a specific number of seconds before the next retry.
@task(retries=3, retry_delay_seconds=60)
def rate_limited_api(data_size: int):
"""Wait 60 seconds between retries"""
logger = get_run_logger()
logger.info(f"Processing {data_size} items")
# Simulate a rate limit with 50% failure rate
if random.random() > 0.5:
logger.warning("Rate limit hit, will retry after 60 seconds")
raise Exception("Rate limit exceeded")
# Process data successfully
result = [i * 2 for i in range(data_size)]
logger.info(f"Successfully processed {len(result)} items")
return result
result = rate_limited_api(10)
result
Custom Exponetional Delay
Here you can pass on a list of seconds in increasing orders such that after each delay, the next execution happens a little bit longer.
from prefect import task, get_run_logger
import random
@task(retries=5, retry_delay_seconds=[1, 2, 4, 8, 16])
def api_with_backoff(number: int):
"""Retry with increasing delays: 1s, 2s, 4s, 8s, 16s"""
logger = get_run_logger()
logger.info(f"Computing factorial of {number}")
# Simulate random failures (70% failure rate to show retries)
if random.random() > 0.3:
logger.error("Computation failed, will retry with exponential backoff")
raise Exception("Computation error")
# Calculate factorial successfully
result = 1
for i in range(1, number + 1):
result *= i
logger.info(f"Successfully computed factorial: {result}")
return result
result = api_with_backoff(10)
result
Retry based on Condition
Another way to approach a retry is to check the status of te task or it's state and determine if we need to retry. This is different from an error within the task itself.
# Simple retry condition function
def should_retry(task, task_run, state):
"""Only retry if the error message contains 'temporary'"""
if state.type == "FAILED":
error = state.result()
return "temporary" in str(error).lower()
return False
@task(retries=3, retry_condition_fn=should_retry)
def smart_division(a: int, b: int):
"""Only retries on temporary errors, not permanent ones"""
logger = get_run_logger()
if b == 0:
# Permanent error - don't retry
logger.error("Division by zero - permanent error")
raise ValueError("Cannot divide by zero")
if b < 0:
# Temporary error - will retry
logger.warning("Temporary issue with negative divisor")
raise Exception("Temporary error: negative divisor")
result = a / b
logger.info(f"{a} / {b} = {result}")
return result
result = smart_division(10, 2)
print(f"Success: {result}")
Retry based on Condition
Another way to approach a retry is to check the status of te task or it's state and determine if we need to retry. This is different from an error within the task itself.
# Simple retry condition function
def should_retry(task, task_run, state):
"""Only retry if the error message contains 'temporary'"""
if state.type == "FAILED":
error = state.result()
return "temporary" in str(error).lower()
return False
@task(retries=3, retry_condition_fn=should_retry)
def smart_division(a: int, b: int):
"""Only retries on temporary errors, not permanent ones"""
logger = get_run_logger()
if b == 0:
# Permanent error - don't retry
logger.error("Division by zero - permanent error")
raise ValueError("Cannot divide by zero")
if b < 0:
# Temporary error - will retry
logger.warning("Temporary issue with negative divisor")
raise Exception("Temporary error: negative divisor")
result = a / b
logger.info(f"{a} / {b} = {result}")
return result
result = smart_division(10, 2)
print(f"Success: {result}")