Task Retries

One of the most powerful features of tasks is the automatic retry logic. This is very useful when for example an API is temporarily not available or a particular file or data partition has not landed yet. This functionality allows us to fix these issues by retrying the Task based on some delay. Let's see some examples

Task

Below is an example of a basis retry. It retries the task trhee times if not successful

from prefect import task
import random 

@task(retries = 3)
def unstable_api_call():
    if random.random() < .7:
        raise Exception("API temporarily unavaible")
    return {"status": "success"}

result = unstable_api_call()
result
OUTPUT23:32:02.845 | INFO | Task run 'unstable_api_call' - Task run failed with exception: Exception('API temporarily unavaible') - Retry 1/3 will start immediately 23:32:02.850 | INFO | Task run 'unstable_api_call' - Finished in state Completed() {'status': 'success'}

Retry Delays

Typically, when we implement a retry, it is often useful to specify a delay period as running a task almost immediately can result in the same issue. There are two types of delays.

Default delay

This type of delay means you set a specific number of seconds before the next retry.

@task(retries=3, retry_delay_seconds=60)
def rate_limited_api(data_size: int):
    """Wait 60 seconds between retries"""
    logger = get_run_logger()
    
    logger.info(f"Processing {data_size} items")
    
    # Simulate a rate limit with 50% failure rate
    if random.random() > 0.5:
        logger.warning("Rate limit hit, will retry after 60 seconds")
        raise Exception("Rate limit exceeded")
    
    # Process data successfully
    result = [i * 2 for i in range(data_size)]
    logger.info(f"Successfully processed {len(result)} items")
    return result
    

result = rate_limited_api(10)
result
OUTPUT23:30:47.947 | INFO | Task run 'rate_limited_api' - Processing 10 items 23:30:47.948 | WARNING | Task run 'rate_limited_api' - Rate limit hit, will retry after 60 seconds 23:30:47.956 | INFO | Task run 'rate_limited_api' - Task run failed with exception: Exception('Rate limit exceeded') - Retry 1/3 will start 60 second(s) from now 23:31:47.955 | INFO | Task run 'rate_limited_api' - Processing 10 items 23:31:47.956 | INFO | Task run 'rate_limited_api' - Successfully processed 10 items 23:31:47.959 | INFO | Task run 'rate_limited_api' - Finished in state Completed() [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Custom Exponetional Delay

Here you can pass on a list of seconds in increasing orders such that after each delay, the next execution happens a little bit longer.

from prefect import task, get_run_logger
import random

@task(retries=5, retry_delay_seconds=[1, 2, 4, 8, 16])
def api_with_backoff(number: int):
    """Retry with increasing delays: 1s, 2s, 4s, 8s, 16s"""
    logger = get_run_logger()
    
    logger.info(f"Computing factorial of {number}")
    
    # Simulate random failures (70% failure rate to show retries)
    if random.random() > 0.3:
        logger.error("Computation failed, will retry with exponential backoff")
        raise Exception("Computation error")
    
    # Calculate factorial successfully
    result = 1
    for i in range(1, number + 1):
        result *= i
    
    logger.info(f"Successfully computed factorial: {result}")
    return result

result = api_with_backoff(10)
result
OUTPUT00:02:00.461 | INFO | Task run 'api_with_backoff' - Computing factorial of 10 00:02:00.462 | ERROR | Task run 'api_with_backoff' - Computation failed, will retry with exponential backoff 00:02:00.464 | INFO | Task run 'api_with_backoff' - Task run failed with exception: Exception('Computation error') - Retry 1/5 will start 1 second(s) from now 00:02:01.475 | INFO | Task run 'api_with_backoff' - Computing factorial of 10 00:02:01.477 | INFO | Task run 'api_with_backoff' - Successfully computed factorial: 3628800 00:02:01.482 | INFO | Task run 'api_with_backoff' - Finished in state Completed() 3628800

Retry based on Condition

Another way to approach a retry is to check the status of te task or it's state and determine if we need to retry. This is different from an error within the task itself.

# Simple retry condition function
def should_retry(task, task_run, state):
    """Only retry if the error message contains 'temporary'"""
    if state.type == "FAILED":
        error = state.result()
        return "temporary" in str(error).lower()
    return False

@task(retries=3, retry_condition_fn=should_retry)
def smart_division(a: int, b: int):
    """Only retries on temporary errors, not permanent ones"""
    logger = get_run_logger()
    
    if b == 0:
        # Permanent error - don't retry
        logger.error("Division by zero - permanent error")
        raise ValueError("Cannot divide by zero")
    
    if b < 0:
        # Temporary error - will retry
        logger.warning("Temporary issue with negative divisor")
        raise Exception("Temporary error: negative divisor")
    
    result = a / b
    logger.info(f"{a} / {b} = {result}")
    return result
result = smart_division(10, 2)
print(f"Success: {result}")
OUTPUT00:15:03.370 | INFO | Task run 'smart_division' - 10 / 2 = 5.0 00:15:03.372 | INFO | Task run 'smart_division' - Finished in state Completed() Success: 5.0

Retry based on Condition

Another way to approach a retry is to check the status of te task or it's state and determine if we need to retry. This is different from an error within the task itself.

# Simple retry condition function
def should_retry(task, task_run, state):
    """Only retry if the error message contains 'temporary'"""
    if state.type == "FAILED":
        error = state.result()
        return "temporary" in str(error).lower()
    return False

@task(retries=3, retry_condition_fn=should_retry)
def smart_division(a: int, b: int):
    """Only retries on temporary errors, not permanent ones"""
    logger = get_run_logger()
    
    if b == 0:
        # Permanent error - don't retry
        logger.error("Division by zero - permanent error")
        raise ValueError("Cannot divide by zero")
    
    if b < 0:
        # Temporary error - will retry
        logger.warning("Temporary issue with negative divisor")
        raise Exception("Temporary error: negative divisor")
    
    result = a / b
    logger.info(f"{a} / {b} = {result}")
    return result
result = smart_division(10, 2)
print(f"Success: {result}")
OUTPUT00:15:03.370 | INFO | Task run 'smart_division' - 10 / 2 = 5.0 00:15:03.372 | INFO | Task run 'smart_division' - Finished in state Completed() Success: 5.0