Prefect Tasks

Prefect has to main apis that allow us to develop data pipelines the pythonic way. These are task and flow. In this section, we will deal with the task.

Task

A task is a Python function decorated with @task that represents a discrete unit of work in your workflow. Tasks are the atomic operations that make up your data pipeline. Decorate functions with @task to get starting with task and have the features below:

  • Automatic retries on failure
  • Caching based on inputs
  • Parallel execution capabilities
  • Granular observability and logging

Basic Task

Let's look at a basic task.

from prefect import task

@task
def add_numbers(a: int, b: int) -> int:
    return a + b


result = add_numbers(5, 3)
print(result)
OUTPUT20:12:53.734 | INFO | prefect - Starting temporary server on http://127.0.0.1:8106 See https://docs.prefect.io/v3/concepts/server#how-to-guides for more information on running a dedicated Prefect server. 20:12:56.518 | INFO | Task run 'add_numbers' - Finished in state Completed() 8

That's it! The @task decorator transforms your function into a Prefect task. Notice that the output is not simply the return value but a temporary server is started to execute the task.

Task with Logging

When defining complicated tasks, unlike the earlier example, we need to have some type of logging. This way we can trace any challenges.

from prefect import task, get_run_logger

@task 
def process_data(data: list) -> int:
    logger = get_run_logger()
    logger.info(f"Processing {len(data)} items")
    result = sum(data)
    logger.info(f"Sum calculated: {result}")
    return result 

result = process_data([1, 2, 3, 4, 5])
result
OUTPUT20:19:33.432 | INFO | Task run 'process_data' - Processing 5 items 20:19:33.434 | INFO | Task run 'process_data' - Sum calculated: 15 20:19:33.436 | INFO | Task run 'process_data' - Finished in state Completed()

Task Configuration - Name, Tags and Description.

So far, we have seen only cases where we are using tasks as is. However, these tasks can be configured in many different ways. For example, we can assign names, tags, description and many other attributes.

Task Name, Description and Tags

Let's see how to define a task name, description and tags. We will use these features in the future where we compile tasks into a flow.

@task(name="fetch-data")
def fetch_data():
    pass

@task(name="transform-data", description="Transforms data from API")
def transform_data():
    pass

@task(tags=["processed-data"])
def load_data():
    pass