Prefect Tasks
Prefect has to main apis that allow us to develop data pipelines the pythonic way. These are
Task
A task is a Python function decorated with @task that represents a discrete unit of work in your workflow. Tasks
are the atomic operations that make up your data pipeline. Decorate functions with
- Automatic retries on failure
- Caching based on inputs
- Parallel execution capabilities
- Granular observability and logging
Basic Task
Let's look at a basic task.
from prefect import task
@task
def add_numbers(a: int, b: int) -> int:
return a + b
result = add_numbers(5, 3)
print(result)
That's it! The @task decorator transforms your function into a Prefect task. Notice that the output is not simply the return value but a temporary server is started to execute the task.
Task with Logging
When defining complicated tasks, unlike the earlier example, we need to have some type of logging. This way we can trace any challenges.
from prefect import task, get_run_logger
@task
def process_data(data: list) -> int:
logger = get_run_logger()
logger.info(f"Processing {len(data)} items")
result = sum(data)
logger.info(f"Sum calculated: {result}")
return result
result = process_data([1, 2, 3, 4, 5])
result
Task Configuration - Name, Tags and Description.
So far, we have seen only cases where we are using tasks as is. However, these tasks can be configured in many different ways. For example, we can assign names, tags, description and many other attributes.
Task Name, Description and Tags
Let's see how to define a task name, description and tags. We will use these features in the future where we compile tasks into a flow.
@task(name="fetch-data")
def fetch_data():
pass
@task(name="transform-data", description="Transforms data from API")
def transform_data():
pass
@task(tags=["processed-data"])
def load_data():
pass