Airflowdata engineeringETLorchestrationmodern data stack

Airflow для аналитика 2026: DAG patterns + best practices

2026-05-30 11 мин

Apache Airflow — стандарт оркестрации пайплайнов в 2026. Если ты dbt-моделишь данные, Airflow запускает dbt по расписанию + ходит за свежими файлами + рассылает алерты в Slack + сохраняет результаты тестов. Этот гайд — практические DAG-паттерны без воды.

Покажу как писать идемпотентные DAG, использовать TaskFlow API (2.0+), грамотно работать с sensors и динамическими DAG-ами. Всё с примерами из аналитики маркетплейса.


Что узнаешь


Шаг 1: Airflow за 5 минут — концепции

DAG (Directed Acyclic Graph) — расписание задач + их зависимости.

Task — единица работы (запустить SQL, скопировать файл).

Scheduler — следит за расписаниями DAGов и кладёт их в очередь.

Worker — выполняет tasks из очереди.

Executor — стратегия выполнения (Local, Celery, Kubernetes).

Source DBs → Airflow Scheduler (cron) → Workers
                   ↓
            DAGs запускают:
            dbt run + tests
            S3 → Snowflake load
            Notebook → отчёт в Slack

Подборка 10 Airflow-вопросов с собесов — паттерны idempotency, scheduling и sensors.

Шаг 2: TaskFlow API (Airflow 2.0+) — современный синтаксис

Старый PythonOperator:

def extract(ds, **kwargs):
    return {'rows': 1000}

def transform(ti, **kwargs):
    data = ti.xcom_pull(task_ids='extract')
    return {'processed': data['rows'] * 2}

with DAG('old_style', schedule='@daily') as dag:
    e = PythonOperator(task_id='extract', python_callable=extract)
    t = PythonOperator(task_id='transform', python_callable=transform)
    e >> t

TaskFlow API — короче в 3 раза:

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, tags=['etl'])
def etl_pipeline():
    @task
    def extract():
        return {'rows': 1000}

    @task
    def transform(data: dict):
        return {'processed': data['rows'] * 2}

    @task
    def load(data: dict):
        print(f"Loaded {data['processed']} rows")

    load(transform(extract()))

etl_pipeline()

XCom передаётся автоматически через return value → assignment.

Шаг 3: Idempotency — главный паттерн

Idempotent task: запустить N раз = тот же результат. Безопасный retry без дублирования.

Bad (не идемпотентный):

@task
def load_revenue(ds):
    df = pd.read_csv(f's3://raw/events_{ds}.csv')
    df.to_sql('daily_revenue', engine, if_exists='append')   # ❌ retry дублирует

Good (идемпотентный):

@task
def load_revenue(ds):
    df = pd.read_csv(f's3://raw/events_{ds}.csv')
    with engine.begin() as conn:
        conn.execute(f"DELETE FROM daily_revenue WHERE day = '{ds}'")
        df.to_sql('daily_revenue', conn, if_exists='append', index=False)

Лучший вариант — UPSERT/MERGE:

@task
def load_revenue(ds):
    sql = f"""
    INSERT INTO daily_revenue (day, revenue)
    SELECT '{ds}', SUM(amount)
    FROM events
    WHERE day = '{ds}'
    GROUP BY day
    ON CONFLICT (day) DO UPDATE SET revenue = EXCLUDED.revenue;
    """
    engine.execute(sql)

Использование {{ ds }} (execution_date logical date в YYYY-MM-DD) — Airflow подставит правильную дату при backfill и retry.

Шаг 4: Sensors — ожидание условий

Sensor ждёт пока условие станет True (файл появился, partition готов, API доступен).

Modes:

from airflow.sensors.filesystem import FileSensor

wait_for_file = FileSensor(
    task_id='wait_for_csv',
    filepath='/data/incoming/sales_{{ ds }}.csv',
    poke_interval=60,           # проверять каждую минуту
    timeout=3600,               # ждать максимум 1 час
    mode='reschedule',          # освобождать worker между проверками
)

ExternalTaskSensor — ждать task из другого DAG:

from airflow.sensors.external_task import ExternalTaskSensor

wait_upstream = ExternalTaskSensor(
    task_id='wait_upstream_dag',
    external_dag_id='upstream_etl',
    external_task_id='load_final',
    timeout=7200,
    mode='reschedule',
)

Smart Sensors deprecated в 2.4+ → используй deferrable operators (новая модель async).

Шаг 5: Dynamic Task Mapping (Airflow 2.3+)

Один task description → N исполнений на runtime данных:

@dag(schedule='@daily', start_date=datetime(2024, 1, 1))
def per_country_etl():
    @task
    def list_countries():
        return ['ru', 'us', 'br', 'de', 'in']  # из DB или config

    @task
    def process_country(country: str):
        # Обработка для конкретной страны
        print(f"Processing {country}")
        return f"done_{country}"

    @task
    def summarize(results: list):
        print(f"Total countries: {len(results)}")

    countries = list_countries()
    results = process_country.expand(country=countries)
    summarize(results)

При runtime Airflow создаст 5 task-инстансов process_country (по одному на страну), параллельно или последовательно в зависимости от max_active_tis_per_dag.

Шаг 6: Error handling + alerts

on_failure_callback — слать в Slack/PagerDuty при падении:

def slack_alert(context):
    task = context['task'].task_id
    dag = context['dag'].dag_id
    msg = f"⚠️ {dag}.{task} FAILED. Log: {context['task_instance'].log_url}"
    send_to_slack(msg)

@dag(
    schedule='@daily',
    on_failure_callback=slack_alert,    # на падение всего DAG
    default_args={
        'on_failure_callback': slack_alert,  # на падение task
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
        'sla': timedelta(hours=2),
        'email_on_failure': False,
    }
)
def critical_etl():
    ...

SLA misses автоматом регистрируются если task не завершился в указанный timeframe — sla_miss_callback шлёт уведомление.

Шаг 7: Best practices в проде

from airflow.utils.task_group import TaskGroup

with TaskGroup('etl_block') as etl_group:
    extract() >> transform() >> load()

heavy_task = PythonOperator(
    task_id='heavy',
    python_callable=my_func,
    pool='snowflake_pool',
    pool_slots=2,
)

FAQ

Airflow vs Prefect vs Dagster?

Airflow — мейн-стрим (большая комьюнити, dbt integration, K8s executor). Prefect — современный API, лучше для ML pipelines. Dagster — software-defined assets, асset-centric model. В РФ в 99% случаев Airflow.

Сколько DAG норм для одного instance?

До 500-1000 DAG на standalone Scheduler без проблем. После — нужны несколько Scheduler-инстансов или HA setup. На очень больших instance (3000+) лучше разделить на logical clusters.

Schedule '@daily' vs '0 0 * * *'?

Эквивалентны: оба = midnight UTC каждый день. Cron-выражения дают точный контроль. @daily короче и понятнее.

catchup=True — кому нужен?

Для bulk-backfill при первом запуске нового DAG. После — переключай на False, иначе любое изменение start_date запустит сотни runs.

Dependence на dbt?

Используй BashOperator (dbt run --select model_name) или Astronomer Cosmos (https://github.com/astronomer/astronomer-cosmos) — генерит Airflow tasks из dbt manifest автоматически.

Что дальше

Источники

Airflow — это контроль над тысячами пайплайнов. Открой SQL-тренажёр и качай SQL под dbt-модели которые Airflow будет крутить.

Прокачай SQL для пайплайнов
480+ SQL-задач, 600+ заданий из реальных собесов, AI-разбор. Бесплатный старт.
Открыть тренажёр →