When one needs to run a series of scripts in a specific order, say you first want to extract the data, then Transform/clean it and finally load it in a database, then you have actually understood what problem airflow solves. Doing such a task manually daily would be difficult, Airflow automates it.
What is Apache Airflow
Apache airflow is an open source tool that was developed by AirBnb in 2014. It allows one to schedule, monitor and manage workflows using python code.
In short, instead of running scripts manually or creating messy cron jobs, you write a python code that defines the workflow automatically. Airflow runs the workflow, tracks what failed and lets you retry the failed tasks.
The Core Concept: What Is a DAG?
DAGS stands for Directed Acyclic Graph. It just means:
- Directed - scheduled tasks runs in order, task B can't run before A.
- Acyclic - After running task B you can go back to task A.
- Graph - the connected tasks are visually represented in a map.
A DAG is like a recipe, you can't frost a cake before you bake it. A specific order must be followed.
A Real World Example of a DAG - a daily data pipeline
extract_data → transform_data → load_to_warehouse
A basic DAG code
from airflow import DAG
from airflow.operators.python import PythonOperator
def extract():
print("Extracting data ...")
def transform():
print("Transforming data...")
def load():
print("loading data to warehouse...")
with DAG(
dag_id="daily_pipeline",
start_date=datetime(2026, 5, 10),
schedule_interval="@daily",
catchup=False
) as dag:
extract_task = PythonOperator(task_id="extract", python_callable=extract)
transform_task = PythonOperator(task_id="transform", python_callable=transform)
load_task = PythonOperator(task_id="load", pyhon_callable=load)
extract_task >> trasform_task >> load_task
">>" operator sets the order in which the tasks run, so, transform_task cannot run before extract_task.
What are Tasks
Every box in the dag is a task, extract_task is a task, transform_task is another task, and load_task is another task. Airlow runs each task independently.
Each task has one of these statuses at runtime:
- queued — waiting to run
- running — currently executing
- success — finished without errors
- failed — something went wrong
- skipped — intentionally bypassed
- retrying — failed but trying again.
if for instance, the load_task fails, Airflow does not retry the extract_task and trasform_task, rather it only retries load_task, which saves on time.
What Are Operators?
An operator is the template that defines what a task actually does. Think of an operator as a worker who already knows how to do one specific type of job.
Airflow comes with many inbuild operators:
1. PythonOperator
This runs any python code:
from airlfow.operators.python import PythonOperator
def my_fucntion():
print("Hello from python!")
task = PythonOperator(
task_id = "run_python",
python_callable = my_function
)
2. BashOperator
This runs any bash command or shell script.
from airflow.operators.bash import BashOperator
task = BashOperator(
task_id = "run_bash",
bash_command = "echo 'Pipeline started' && python3 scripts/extract.py"
3. EmailOperator
This sends an email, useful for reports.
from airflow.operators.email import EmailOperator
task = EmailOperator(
task_id = "send_email",
to = "exampleemail@gmail.com",
subject = "Daily Report is ready",
html_comtent = "<p> Your pipeline finished successfully. <p>"
)
4. PostgresOperator
Runs an SQL query against a Postgres database.
from airflow.providers.postgres.operators.postgres import PostgresOperator
task = PostgresOperator(
task_id = "run_sql",
postrges_conn_id = "my_postgres_connection",
sql = "INSERT INTO reports SELECT * FROM staging WHERE date = '{{ ds }}';"
)
5. S3ToRedshiftOperator
This copies data from Amazon S3 directly into Redshift. No python code is needed for the actual move.
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
task = S3ToRedshiftOperator(
task_id="load_to_redshift",
s3_bucket="my-data-bucket",
s3_key="data/2024/sales.csv",
schema="public",
table="sales",
copy_options=["CSV", "IGNOREHEADER 1"]
)
What is Scheduling?
This is the part of the dag where you define how frequent or when to run your dag.
Airflow supports two formats:
1. Preset shortcuts:
| Preset | Meaning |
|---|---|
| @once | Run one time only |
| @hourly | Every hour |
| @daily | Once a day at midnight |
| @weekly | Once a week |
| @monthly | Once a month |
2. Cron expressions:
"0 6 * * *" → Every day at 6:00 AM
"0 6 * * 1" → Every Monday at 6:00 AM
"*/15 * * * *" → Every 15 minutes
"0 0 1 * *" → First day of every month at midnight
Cron follows: Minute hour day month day-of-week
Example of running a pipeline everyday at 7am
with dag(
task_id = "morning_pipeline",
start_date = datetime(2026 5 10)
schedule_interval = "0 7 * * *",
catchup = False
) as dag:
...
Everything about multi-step pipelines becomes easier to run with Apache Airflow. We don't have to babysit scripts every day anymore; our workflow is written once in Python, and Airflow does the rest. A DAG is a "full workflow"; a task is "one step" in the workflow; an operator is "the worker" that performs the step; and scheduling is "how or when" you want the workflow to run.
Top comments (0)