The post has been translated automatically. Original language: English
In a classic DWH architecture, data flows through three layers before reaching end users. Coordinating this with Airflow requires careful structuring — you want layer isolation, proper dependency enforcement, and clear failure notifications. Here's a battle-tested pattern.
Source (Oracle / SQL Server) → ODS → DDS → DM
The pattern: one trigger DAG + layer DAGs
Instead of putting everything in one giant DAG, split by layer. Each layer has its own DAG with its own schedule=None. A main_trigger_dag runs on schedule and fires them in sequence using TriggerDagRunOperator.
# main_trigger_dag.py
from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
import requests
def notify(status, **ctx):
token = "<BOT_TOKEN>"
chat = "<CHAT_ID>"
dag_id = ctx["dag"].dag_id
msg = f"✅ {dag_id} — {status}" if status == "SUCCESS" \
else f"❌ {dag_id} failed"
requests.post(
f"https://api.telegram.org/bot{token}/sendMessage",
json={"chat_id": chat, "text": msg}
)
with DAG(
dag_id="main_trigger_dag",
start_date=datetime(2024, 1, 1),
schedule_interval="0 2 * * 1-5", # weekdays 02:00
catchup=False,
) as dag:
trigger_ods = TriggerDagRunOperator(
task_id="trigger_ods",
trigger_dag_id="ods_load_dag",
wait_for_completion=True,
poke_interval=30,
)
trigger_dds = TriggerDagRunOperator(
task_id="trigger_dds",
trigger_dag_id="dds_load_dag",
wait_for_completion=True,
poke_interval=30,
)
trigger_dm = TriggerDagRunOperator(
task_id="trigger_dm",
trigger_dag_id="dm_load_dag",
wait_for_completion=True,
poke_interval=30,
)
notify_ok = PythonOperator(
task_id="notify_success",
python_callable=notify,
op_kwargs={"status": "SUCCESS"},
)
trigger_ods >> trigger_dds >> trigger_dm >> notify_okLayer DAG structure (same pattern, repeated)
Each layer DAG reads a config table to get the list of tables it should load, then fans out in parallel. Here's the ODS example:
# ods_load_dag.py
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
def get_tables():
hook = PostgresHook(postgres_conn_id="greenplum")
rows = hook.get_records("""
SELECT table_name, source_conn, load_type
FROM etl.load_config
WHERE layer = 'ODS' AND is_active = true
""")
return rows
def load_table(table_name, source_conn, load_type, **ctx):
# your COPY / INSERT logic here
...
with DAG("ods_load_dag", schedule_interval=None, ...) as dag:
tables = get_tables()
for row in tables:
PythonOperator(
task_id=f"load_{row[0]}",
python_callable=load_table,
op_kwargs={"table_name": row[0], "source_conn": row[1], "load_type": row[2]},
)Tip: use wait_for_completion=True in TriggerDagRunOperator — without it, the trigger task completes immediately and DDS can start before ODS finishes.
Key decisions to make upfront
- Config-driven table lists — don't hardcode tables in DAG code. Use a DB config table so you can enable/disable loads without touching DAGs.
- Failure isolation — if one ODS table fails, you can choose to continue others or abort. Set trigger_rule on the DDS trigger accordingly.
- Idempotency — ODS should be truncate+insert or use watermarks. DDS/DM transforms should be replayable.
- Retries — set retries=2 and retry_delay=timedelta(minutes=5) at the DAG default_args level.
- SLA — use sla=timedelta(hours=3) on the trigger DAG so Airflow alerts if the full pipeline runs too long.
Result
This structure gives you: clean separation of concerns per layer, parallel table loading within each layer, sequential layer enforcement across layers, and Telegram notifications on success or failure. The Airflow UI shows three separate DAGs which makes debugging much easier than one monolithic graph.
Happy to share the DDS SCD2 load pattern or the config table schema if anyone's interested.
Result
This structure gives you: clean separation of concerns per layer, parallel table loading within each layer, sequential layer enforcement across layers, and Telegram notifications on success or failure. The Airflow UI shows three separate DAGs which makes debugging much easier than one monolithic graph.
Happy to share the DDS SCD2 load pattern or the config table schema if anyone's interested.
In a classic DWH architecture, data flows through three layers before reaching end users. Coordinating this with Airflow requires careful structuring — you want layer isolation, proper dependency enforcement, and clear failure notifications. Here's a battle-tested pattern.
Source (Oracle / SQL Server) → ODS → DDS → DM
The pattern: one trigger DAG + layer DAGs
Instead of putting everything in one giant DAG, split by layer. Each layer has its own DAG with its own schedule=None. A main_trigger_dag runs on schedule and fires them in sequence using TriggerDagRunOperator.
# main_trigger_dag.py
from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
import requests
def notify(status, **ctx):
token = "<BOT_TOKEN>"
chat = "<CHAT_ID>"
dag_id = ctx["dag"].dag_id
msg = f"✅ {dag_id} — {status}" if status == "SUCCESS" \
else f"❌ {dag_id} failed"
requests.post(
f"https://api.telegram.org/bot{token}/sendMessage",
json={"chat_id": chat, "text": msg}
)
with DAG(
dag_id="main_trigger_dag",
start_date=datetime(2024, 1, 1),
schedule_interval="0 2 * * 1-5", # weekdays 02:00
catchup=False,
) as dag:
trigger_ods = TriggerDagRunOperator(
task_id="trigger_ods",
trigger_dag_id="ods_load_dag",
wait_for_completion=True,
poke_interval=30,
)
trigger_dds = TriggerDagRunOperator(
task_id="trigger_dds",
trigger_dag_id="dds_load_dag",
wait_for_completion=True,
poke_interval=30,
)
trigger_dm = TriggerDagRunOperator(
task_id="trigger_dm",
trigger_dag_id="dm_load_dag",
wait_for_completion=True,
poke_interval=30,
)
notify_ok = PythonOperator(
task_id="notify_success",
python_callable=notify,
op_kwargs={"status": "SUCCESS"},
)
trigger_ods >> trigger_dds >> trigger_dm >> notify_okLayer DAG structure (same pattern, repeated)
Each layer DAG reads a config table to get the list of tables it should load, then fans out in parallel. Here's the ODS example:
# ods_load_dag.py
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
def get_tables():
hook = PostgresHook(postgres_conn_id="greenplum")
rows = hook.get_records("""
SELECT table_name, source_conn, load_type
FROM etl.load_config
WHERE layer = 'ODS' AND is_active = true
""")
return rows
def load_table(table_name, source_conn, load_type, **ctx):
# your COPY / INSERT logic here
...
with DAG("ods_load_dag", schedule_interval=None, ...) as dag:
tables = get_tables()
for row in tables:
PythonOperator(
task_id=f"load_{row[0]}",
python_callable=load_table,
op_kwargs={"table_name": row[0], "source_conn": row[1], "load_type": row[2]},
)Tip: use wait_for_completion=True in TriggerDagRunOperator — without it, the trigger task completes immediately and DDS can start before ODS finishes.
Key decisions to make upfront
- Config-driven table lists — don't hardcode tables in DAG code. Use a DB config table so you can enable/disable loads without touching DAGs.
- Failure isolation — if one ODS table fails, you can choose to continue others or abort. Set trigger_rule on the DDS trigger accordingly.
- Idempotency — ODS should be truncate+insert or use watermarks. DDS/DM transforms should be replayable.
- Retries — set retries=2 and retry_delay=timedelta(minutes=5) at the DAG default_args level.
- SLA — use sla=timedelta(hours=3) on the trigger DAG so Airflow alerts if the full pipeline runs too long.
Result
This structure gives you: clean separation of concerns per layer, parallel table loading within each layer, sequential layer enforcement across layers, and Telegram notifications on success or failure. The Airflow UI shows three separate DAGs which makes debugging much easier than one monolithic graph.
Happy to share the DDS SCD2 load pattern or the config table schema if anyone's interested.
Result
This structure gives you: clean separation of concerns per layer, parallel table loading within each layer, sequential layer enforcement across layers, and Telegram notifications on success or failure. The Airflow UI shows three separate DAGs which makes debugging much easier than one monolithic graph.
Happy to share the DDS SCD2 load pattern or the config table schema if anyone's interested.