Что такое Trigger Rules
По умолчанию задача в Airflow запускается только если все её upstream-задачи завершились успешно. Это поведение регулируется параметром trigger_rule и по умолчанию имеет значение all_success. Но в реальных пайплайнах часто нужна другая логика: запустить cleanup-задачу даже если что-то упало, выполнить финальное уведомление независимо от результата, или стартовать только когда хотя бы одна из параллельных веток сработала.
Trigger rules позволяют точно контролировать, при каких состояниях upstream-задач должна запуститься текущая задача. Всего в Airflow 10 правил.
Все 10 trigger rules
| Правило | Когда запускается |
|---|---|
all_success (default) | Все upstream успешны |
all_failed | Все upstream упали |
all_done | Все upstream завершены (любым способом) |
all_skipped | Все upstream в состоянии skipped |
one_success | Хотя бы одна upstream успешна (не ждёт остальные) |
one_failed | Хотя бы одна upstream упала |
one_done | Хотя бы одна upstream завершена (success или failed) |
none_failed | Ни одна upstream не упала (success или skipped) |
none_failed_min_one_success | Ни одна не упала + хотя бы одна успешна |
none_skipped | Ни одна upstream не skipped (success или failed) |
always | Всегда запускается независимо от состояния upstream |
Базовый синтаксис
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime
with DAG('trigger_rules_demo', start_date=datetime(2025, 1, 1), schedule='@daily') as dag:
task = PythonOperator(
task_id='my_task',
python_callable=my_func,
trigger_rule=TriggerRule.ALL_DONE, # или строкой: 'all_done'
)
Примеры использования каждого правила
1. all_success (по умолчанию)
Задача запускается только если ВСЕ upstream успешны. Самая строгая логика — если одна из задач упала, все downstream автоматически получат статус upstream_failed.
extract = PythonOperator(task_id='extract', python_callable=extract_data)
transform = PythonOperator(task_id='transform', python_callable=transform_data)
load = PythonOperator(task_id='load', python_callable=load_data) # all_success по умолчанию
extract >> transform >> load
# Если extract упадёт — transform и load станут upstream_failed
2. all_done — cleanup и финальные задачи
Самый популярный rule после default. Используется для задач, которые должны выполниться всегда после завершения пайплайна: очистка временных файлов, закрытие соединений, финальные уведомления.
etl = PythonOperator(task_id='etl', python_callable=run_etl)
cleanup = PythonOperator(
task_id='cleanup_temp_files',
python_callable=cleanup_temp,
trigger_rule='all_done' # запустится даже если etl упал
)
etl >> cleanup
3. all_failed — обработка критических ошибок
Запускается когда ВСЕ upstream упали. Полезно для alerting когда весь пайплайн развалился.
source_1 = PythonOperator(task_id='fetch_from_api_1', ...)
source_2 = PythonOperator(task_id='fetch_from_api_2', ...)
source_3 = PythonOperator(task_id='fetch_from_api_3', ...)
critical_alert = EmailOperator(
task_id='critical_alert',
to='oncall@company.com',
subject='ВСЕ источники данных недоступны!',
trigger_rule='all_failed'
)
[source_1, source_2, source_3] >> critical_alert
4. one_success — работаем с любыми доступными данными
Запускается как только ХОТЯ БЫ ОДНА upstream успешна, не дожидаясь остальных. Идеально для пайплайнов с fallback-источниками.
primary_api = PythonOperator(task_id='fetch_primary', ...)
backup_api = PythonOperator(task_id='fetch_backup', ...)
cache_source = PythonOperator(task_id='fetch_from_cache', ...)
process = PythonOperator(
task_id='process_data',
python_callable=process_any,
trigger_rule='one_success' # работаем с тем источником, который первым ответил
)
[primary_api, backup_api, cache_source] >> process
5. one_failed — реакция на первую ошибку
Запускается как только хотя бы одна upstream упала. Полезно для раннего alerting не дожидаясь остальных задач.
validation_1 = PythonOperator(task_id='validate_schema', ...)
validation_2 = PythonOperator(task_id='validate_rows', ...)
validation_3 = PythonOperator(task_id='validate_constraints', ...)
early_alert = SlackOperator(
task_id='validation_failed_alert',
message='Нашли проблему в данных!',
trigger_rule='one_failed'
)
[validation_1, validation_2, validation_3] >> early_alert
6. none_failed — для BranchPythonOperator
Запускается если ни одна upstream не упала (но могут быть skipped). Главный use case — задачи после BranchPythonOperator, где часть веток намеренно пропускается.
from airflow.operators.python import BranchPythonOperator
def choose_branch(**context):
return 'path_a' if context['ds_nodash'].endswith('0') else 'path_b'
branch = BranchPythonOperator(task_id='branch', python_callable=choose_branch)
path_a = PythonOperator(task_id='path_a', ...)
path_b = PythonOperator(task_id='path_b', ...)
# Без none_failed эта задача ВСЕГДА будет upstream_failed
# потому что одна из веток всегда skipped
join = PythonOperator(
task_id='after_branch',
trigger_rule='none_failed'
)
branch >> [path_a, path_b] >> join
7. none_failed_min_one_success — строгая проверка после branching
Как none_failed, но требует чтобы хотя бы ОДНА ветка успешно отработала (не только все skipped). Более строгий вариант для join после branch.
branch = BranchPythonOperator(task_id='branch', python_callable=choose)
path_a = PythonOperator(task_id='path_a', ...)
path_b = PythonOperator(task_id='path_b', ...)
# Этот финализатор требует реальный успех, а не просто отсутствие ошибок
finalize = PythonOperator(
task_id='finalize',
trigger_rule='none_failed_min_one_success'
)
branch >> [path_a, path_b] >> finalize
8. always — запуск независимо от всего
Задача запускается ВСЕГДА, даже если upstream ещё не завершились (в 2.x требует aware использования). Полезно для задач типа heartbeat или начальной инициализации внутри DAG.
init_logger = PythonOperator(
task_id='init_run_metadata',
python_callable=record_run_start,
trigger_rule='always' # логируем факт запуска в любой ситуации
)
9. none_skipped — требуем реальное выполнение
Запускается если ни одна upstream не в состоянии skipped. Все должны быть либо success, либо failed.
check_a = PythonOperator(task_id='check_a', ...)
check_b = PythonOperator(task_id='check_b', ...)
audit = PythonOperator(
task_id='audit',
trigger_rule='none_skipped' # аудит нужен только если все проверки реально бежали
)
[check_a, check_b] >> audit
10. all_skipped — обработка когда всё пропущено
Запускается когда ВСЕ upstream skipped. Редкий rule, используется например для сигнала что ни одна ветка branch-оператора не выбрана.
branch = BranchPythonOperator(task_id='router', python_callable=route_to_none_sometimes)
path_a = PythonOperator(task_id='path_a', ...)
path_b = PythonOperator(task_id='path_b', ...)
no_action = PythonOperator(
task_id='log_no_route',
trigger_rule='all_skipped' # если router не выбрал ни одну ветку
)
branch >> [path_a, path_b] >> no_action
Полный практический пример: ETL с cleanup и alerting
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime
with DAG('etl_with_branches', start_date=datetime(2025, 1, 1), schedule='@daily') as dag:
extract = PythonOperator(task_id='extract', python_callable=extract_data)
# Ветвление по типу данных
branch = BranchPythonOperator(
task_id='detect_data_type',
python_callable=lambda: 'process_csv' if is_csv() else 'process_json'
)
process_csv = PythonOperator(task_id='process_csv', python_callable=handle_csv)
process_json = PythonOperator(task_id='process_json', python_callable=handle_json)
# Join после branch — нужен none_failed
load = PythonOperator(
task_id='load_to_warehouse',
python_callable=load,
trigger_rule='none_failed_min_one_success'
)
# Cleanup всегда — all_done
cleanup = PythonOperator(
task_id='cleanup_tmp',
python_callable=cleanup_tmp,
trigger_rule='all_done'
)
# Alert только при падении — one_failed
alert = PythonOperator(
task_id='send_alert',
python_callable=send_telegram_alert,
trigger_rule='one_failed'
)
# Финальный notification — all_success
notify_success = PythonOperator(
task_id='notify_success',
python_callable=send_success_message,
# trigger_rule='all_success' — по умолчанию
)
extract >> branch >> [process_csv, process_json] >> load >> [cleanup, notify_success]
[extract, process_csv, process_json, load] >> alert
Типичные ошибки
1. Забыли none_failed после BranchPythonOperator
Самая частая проблема. После branch одна из веток всегда skipped, поэтому downstream с default all_success всегда в состоянии upstream_failed.
# ❌ НЕПРАВИЛЬНО
branch >> [path_a, path_b] >> join # join НИКОГДА не запустится
# ✅ ПРАВИЛЬНО
join = PythonOperator(task_id='join', trigger_rule='none_failed')
branch >> [path_a, path_b] >> join
2. Использование all_done без понимания, что задача скроет падения
Если навешать all_done на финальный success_email — он отправится даже если весь пайплайн упал. Для уведомлений об успехе оставляй all_success.
3. one_success не ждёт остальные задачи
Задача с one_success запустится СРАЗУ как только любая upstream успешна, не дожидаясь остальных. Если нужны данные от всех — используй all_success.
Визуализация trigger rules в UI
В Airflow UI (Graph View) задача со специальным trigger rule отмечена специальным значком в углу. При клике на задачу во вкладке Details видно текущий trigger_rule. В логах Scheduler видно почему задача не запустилась — например: "Dependencies not met: upstream_failed".
Краткая шпаргалка
- all_success — стандарт, менять не надо
- all_done — для cleanup и финальных задач, которые всегда нужны
- none_failed — ОБЯЗАТЕЛЬНО после BranchPythonOperator
- one_failed — для ранних alerts при первой ошибке
- one_success — для fallback-источников данных
- none_failed_min_one_success — строгий join после branch
- all_failed — для критических алертов (весь пайплайн упал)
- Остальные — редкие, для специфичных сценариев