Apache Airflow is an open-source platform designed to programmatically author, schedule, and monitor workflows. While DAGs (Directed Acyclic Graphs) are the backbone of Airflow, each DAG consists of tasks that represent the individual operations within a workflow. Airflow’s tasks command provides users with a suite of sub-commands to interact with, test, and manage these tasks. In this article, we’ll deep dive into the tasks command, covering its functionalities with practical examples.
Overview of the tasks
Command:
The tasks command is a versatile tool for task-level operations within a DAG. From testing to checking the status, this command offers a comprehensive toolset for task management.
Syntax:
The general format of the tasks command is:
airflow tasks <SUBCOMMAND> <DAG_ID> <TASK_ID> <EXECUTION_DATE>
Available Sub-Commands:
test:
This sub-command lets you test a specific task in a DAG for a particular execution date without creating any records in the metadata database.
Example:
airflow tasks test freshers_in_sample_dag start_task 2023-01-01
Example:
airflow tasks run freshers_in_sample_dag start_task 2023-01-01
clear:
It clears the status of a task instance (or instances) for a specific execution date. Useful for re-running tasks.
Example:
airflow tasks clear freshers_in_sample_dag start_task -e 2023-01-01
state:
This checks the current state of a task instance.
Example:
airflow tasks state freshers_in_sample_dag start_task 2023-01-01
list:
Lists all tasks in a DAG.
Example:
airflow tasks list freshers_in_sample_dag
failed-deps:
Checks why a task could not run for a specific execution date due to unmet dependencies.
Example:
airflow tasks failed-deps freshers_in_sample_dag start_task 2023-01-01
Sample DAG:
To provide context, let’s use the following simple DAG:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('freshers_in_sample_dag',
default_args=default_args,
description='A basic tutorial DAG',
schedule_interval=timedelta(days=1),
catchup=False)
start = DummyOperator(task_id='start_task', dag=dag)
end = DummyOperator(task_id='end_task', dag=dag)
start >> end