Trigger Rules в Airflow

Разбираем все 10 trigger rules в Apache Airflow: как задачи реагируют на успех, ошибки и skip upstream-задач

Что такое Trigger Rules

По умолчанию задача в Airflow запускается только если все её upstream-задачи завершились успешно. Это поведение регулируется параметром trigger_rule и по умолчанию имеет значение all_success. Но в реальных пайплайнах часто нужна другая логика: запустить cleanup-задачу даже если что-то упало, выполнить финальное уведомление независимо от результата, или стартовать только когда хотя бы одна из параллельных веток сработала.

Trigger rules позволяют точно контролировать, при каких состояниях upstream-задач должна запуститься текущая задача. Всего в Airflow 10 правил.

Все 10 trigger rules

ПравилоКогда запускается
all_success (default)Все upstream успешны
all_failedВсе upstream упали
all_doneВсе upstream завершены (любым способом)
all_skippedВсе upstream в состоянии skipped
one_successХотя бы одна upstream успешна (не ждёт остальные)
one_failedХотя бы одна upstream упала
one_doneХотя бы одна upstream завершена (success или failed)
none_failedНи одна upstream не упала (success или skipped)
none_failed_min_one_successНи одна не упала + хотя бы одна успешна
none_skippedНи одна upstream не skipped (success или failed)
alwaysВсегда запускается независимо от состояния upstream

Базовый синтаксис

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime

with DAG('trigger_rules_demo', start_date=datetime(2025, 1, 1), schedule='@daily') as dag:

    task = PythonOperator(
        task_id='my_task',
        python_callable=my_func,
        trigger_rule=TriggerRule.ALL_DONE,  # или строкой: 'all_done'
    )

Примеры использования каждого правила

1. all_success (по умолчанию)

Задача запускается только если ВСЕ upstream успешны. Самая строгая логика — если одна из задач упала, все downstream автоматически получат статус upstream_failed.

extract = PythonOperator(task_id='extract', python_callable=extract_data)
transform = PythonOperator(task_id='transform', python_callable=transform_data)
load = PythonOperator(task_id='load', python_callable=load_data)  # all_success по умолчанию

extract >> transform >> load
# Если extract упадёт — transform и load станут upstream_failed

2. all_done — cleanup и финальные задачи

Самый популярный rule после default. Используется для задач, которые должны выполниться всегда после завершения пайплайна: очистка временных файлов, закрытие соединений, финальные уведомления.

etl = PythonOperator(task_id='etl', python_callable=run_etl)
cleanup = PythonOperator(
    task_id='cleanup_temp_files',
    python_callable=cleanup_temp,
    trigger_rule='all_done'  # запустится даже если etl упал
)
etl >> cleanup

3. all_failed — обработка критических ошибок

Запускается когда ВСЕ upstream упали. Полезно для alerting когда весь пайплайн развалился.

source_1 = PythonOperator(task_id='fetch_from_api_1', ...)
source_2 = PythonOperator(task_id='fetch_from_api_2', ...)
source_3 = PythonOperator(task_id='fetch_from_api_3', ...)

critical_alert = EmailOperator(
    task_id='critical_alert',
    to='oncall@company.com',
    subject='ВСЕ источники данных недоступны!',
    trigger_rule='all_failed'
)
[source_1, source_2, source_3] >> critical_alert

4. one_success — работаем с любыми доступными данными

Запускается как только ХОТЯ БЫ ОДНА upstream успешна, не дожидаясь остальных. Идеально для пайплайнов с fallback-источниками.

primary_api = PythonOperator(task_id='fetch_primary', ...)
backup_api = PythonOperator(task_id='fetch_backup', ...)
cache_source = PythonOperator(task_id='fetch_from_cache', ...)

process = PythonOperator(
    task_id='process_data',
    python_callable=process_any,
    trigger_rule='one_success'  # работаем с тем источником, который первым ответил
)
[primary_api, backup_api, cache_source] >> process

5. one_failed — реакция на первую ошибку

Запускается как только хотя бы одна upstream упала. Полезно для раннего alerting не дожидаясь остальных задач.

validation_1 = PythonOperator(task_id='validate_schema', ...)
validation_2 = PythonOperator(task_id='validate_rows', ...)
validation_3 = PythonOperator(task_id='validate_constraints', ...)

early_alert = SlackOperator(
    task_id='validation_failed_alert',
    message='Нашли проблему в данных!',
    trigger_rule='one_failed'
)
[validation_1, validation_2, validation_3] >> early_alert

6. none_failed — для BranchPythonOperator

Запускается если ни одна upstream не упала (но могут быть skipped). Главный use case — задачи после BranchPythonOperator, где часть веток намеренно пропускается.

from airflow.operators.python import BranchPythonOperator

def choose_branch(**context):
    return 'path_a' if context['ds_nodash'].endswith('0') else 'path_b'

branch = BranchPythonOperator(task_id='branch', python_callable=choose_branch)
path_a = PythonOperator(task_id='path_a', ...)
path_b = PythonOperator(task_id='path_b', ...)

# Без none_failed эта задача ВСЕГДА будет upstream_failed
# потому что одна из веток всегда skipped
join = PythonOperator(
    task_id='after_branch',
    trigger_rule='none_failed'
)
branch >> [path_a, path_b] >> join

7. none_failed_min_one_success — строгая проверка после branching

Как none_failed, но требует чтобы хотя бы ОДНА ветка успешно отработала (не только все skipped). Более строгий вариант для join после branch.

branch = BranchPythonOperator(task_id='branch', python_callable=choose)
path_a = PythonOperator(task_id='path_a', ...)
path_b = PythonOperator(task_id='path_b', ...)

# Этот финализатор требует реальный успех, а не просто отсутствие ошибок
finalize = PythonOperator(
    task_id='finalize',
    trigger_rule='none_failed_min_one_success'
)
branch >> [path_a, path_b] >> finalize

8. always — запуск независимо от всего

Задача запускается ВСЕГДА, даже если upstream ещё не завершились (в 2.x требует aware использования). Полезно для задач типа heartbeat или начальной инициализации внутри DAG.

init_logger = PythonOperator(
    task_id='init_run_metadata',
    python_callable=record_run_start,
    trigger_rule='always'  # логируем факт запуска в любой ситуации
)

9. none_skipped — требуем реальное выполнение

Запускается если ни одна upstream не в состоянии skipped. Все должны быть либо success, либо failed.

check_a = PythonOperator(task_id='check_a', ...)
check_b = PythonOperator(task_id='check_b', ...)

audit = PythonOperator(
    task_id='audit',
    trigger_rule='none_skipped'  # аудит нужен только если все проверки реально бежали
)
[check_a, check_b] >> audit

10. all_skipped — обработка когда всё пропущено

Запускается когда ВСЕ upstream skipped. Редкий rule, используется например для сигнала что ни одна ветка branch-оператора не выбрана.

branch = BranchPythonOperator(task_id='router', python_callable=route_to_none_sometimes)
path_a = PythonOperator(task_id='path_a', ...)
path_b = PythonOperator(task_id='path_b', ...)

no_action = PythonOperator(
    task_id='log_no_route',
    trigger_rule='all_skipped'  # если router не выбрал ни одну ветку
)
branch >> [path_a, path_b] >> no_action

Полный практический пример: ETL с cleanup и alerting

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime

with DAG('etl_with_branches', start_date=datetime(2025, 1, 1), schedule='@daily') as dag:

    extract = PythonOperator(task_id='extract', python_callable=extract_data)

    # Ветвление по типу данных
    branch = BranchPythonOperator(
        task_id='detect_data_type',
        python_callable=lambda: 'process_csv' if is_csv() else 'process_json'
    )

    process_csv = PythonOperator(task_id='process_csv', python_callable=handle_csv)
    process_json = PythonOperator(task_id='process_json', python_callable=handle_json)

    # Join после branch — нужен none_failed
    load = PythonOperator(
        task_id='load_to_warehouse',
        python_callable=load,
        trigger_rule='none_failed_min_one_success'
    )

    # Cleanup всегда — all_done
    cleanup = PythonOperator(
        task_id='cleanup_tmp',
        python_callable=cleanup_tmp,
        trigger_rule='all_done'
    )

    # Alert только при падении — one_failed
    alert = PythonOperator(
        task_id='send_alert',
        python_callable=send_telegram_alert,
        trigger_rule='one_failed'
    )

    # Финальный notification — all_success
    notify_success = PythonOperator(
        task_id='notify_success',
        python_callable=send_success_message,
        # trigger_rule='all_success' — по умолчанию
    )

    extract >> branch >> [process_csv, process_json] >> load >> [cleanup, notify_success]
    [extract, process_csv, process_json, load] >> alert

Типичные ошибки

1. Забыли none_failed после BranchPythonOperator

Самая частая проблема. После branch одна из веток всегда skipped, поэтому downstream с default all_success всегда в состоянии upstream_failed.

# ❌ НЕПРАВИЛЬНО
branch >> [path_a, path_b] >> join  # join НИКОГДА не запустится

# ✅ ПРАВИЛЬНО
join = PythonOperator(task_id='join', trigger_rule='none_failed')
branch >> [path_a, path_b] >> join

2. Использование all_done без понимания, что задача скроет падения

Если навешать all_done на финальный success_email — он отправится даже если весь пайплайн упал. Для уведомлений об успехе оставляй all_success.

3. one_success не ждёт остальные задачи

Задача с one_success запустится СРАЗУ как только любая upstream успешна, не дожидаясь остальных. Если нужны данные от всех — используй all_success.

Визуализация trigger rules в UI

В Airflow UI (Graph View) задача со специальным trigger rule отмечена специальным значком в углу. При клике на задачу во вкладке Details видно текущий trigger_rule. В логах Scheduler видно почему задача не запустилась — например: "Dependencies not met: upstream_failed".

Краткая шпаргалка

  • all_success — стандарт, менять не надо
  • all_done — для cleanup и финальных задач, которые всегда нужны
  • none_failed — ОБЯЗАТЕЛЬНО после BranchPythonOperator
  • one_failed — для ранних alerts при первой ошибке
  • one_success — для fallback-источников данных
  • none_failed_min_one_success — строгий join после branch
  • all_failed — для критических алертов (весь пайплайн упал)
  • Остальные — редкие, для специфичных сценариев