- 1.只执行单个任务
- 2.从一个任务开始,执行它以及它的下游任务
- 3.airflow命令行
- 4.airflow会的connection配置参数
- 5.在airflow界面上触发特定execution date的任务
- 6.sensor的reschedule mode
- 7.sensor的retry
- 8.出现报错:The scheduler does not appear to be running. Last heartbeat was received X minutes ago
- 9.airflow运行hive operator任务出现报错:airflow.exceptions.AirflowException: /usr/bin/env: ‘bash’: No such file or directory
- 10.airflow配置parsing_processes,parallelism,dag_concurrencyd参数的调整
1.只执行单个任务#
将downstream和recursive按钮的点击状态取消,然后点击clear,最后选择Ignore All Deps,然后点击run
2.从一个任务开始,执行它以及它的下游任务#
将downstream和recursive按钮的点击状态取消,然后点击clear,最后选择Ignore Task Deps,然后点击run
3.airflow命令行#
1 | https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#dags |
1.第一次登录创建airflow用户#
1 | airflow users create --username airflow --role Admin --password airflow --email airflow@xxx.com --lastname airflow --firstname airflow |
2.根据dag id删除一个dag#
1 | airflow dags delete {dag_id} |
3.触发一个airflow dag#
1 | airflow dags trigger --help |
注意execution_time要在start_date和end_date之间,否则会报
1 | ValueError: The execution_date [2022-07-19T08:00:00+00:00] should be >= start_date [2022-07-20T00:00:00+00:00] from DAG's default_args |
4.airflow按start_date和end_date触发backfill任务#
1 | airflow dags backfill -s 2022-01-01 -e 2022-01-10 DAG_ID |
执行的任务是从2022-01-01到2022-01-09,包含startr_date,不包含end_date
5.测试airflow task#
4.airflow会的connection配置参数#
1 | from airflow.hooks.base import BaseHook |
5.在airflow界面上触发特定execution date的任务#
点击DAG Runs
add a new record
add dag run
state注意填写running
run id格式参考其他的dag:scheduled__2022-09-18T23:00:00+00:00,最后点击save
成功触发一个特定execution date的dag
如果需要回溯的dag特别多,也可以将dag的start_date设置好,并将catchup=True,这样如果是新建的dag的会,就会从start_date开始运行
6.sensor的reschedule mode#
对于长时间检测的sensor,如果sensor的条件不满足,reschedule mode的sensor会释放worker给其他的task
In sensor mode=’reschedule’ means that if the criteria of the sensor isn’t True then the sensor will release the worker to other tasks.
This is very useful for cases when sensor may wait for a long time. 参考:
1 | https://stackoverflow.com/questions/66715894/how-does-the-mode-reschedule-in-airflow-sensors-work |
7.sensor的retry#
sensor中可以指定重试的次数,比如
1 | timeout=60 * 60, |
retries=2表示总共会尝试运行3轮,超时之后运行下一轮,所以将会在airflow日志中看到
1 | [2022-09-06 00:02:19,350] {taskinstance.py:1068} INFO - Starting attempt 1 of 3 |
timeout=60 * 60 表示每一轮尝试的超时时间会1个小时,但是在同一轮中会尝试多次,timeout默认值是606024*7(1周)
retry_delay=timedelta(minutes=5) 表示超时后retry下一轮的间隔是5分钟,所以可以看到上面3轮重试的间隔是1小时5分钟
poke_interval 表示sensor check的间隔,默认值是60(1分钟)
参考:https://help.aliyun.com/document_detail/336856.html
8.出现报错:The scheduler does not appear to be running. Last heartbeat was received X minutes ago#
查看/health接口之后scheduler的状态是unhealthy
文章介绍主要原因是配置中parsing_processes的数量过大,大于cpu的核数,将scheduler上报心跳的线程的cpu占用了导致
9.airflow运行hive operator任务出现报错:airflow.exceptions.AirflowException: /usr/bin/env: ‘bash’: No such file or directory#
这可能是由于使用的是supervisor来启动的airflow,所以需要额外的airflow中配置environment,如下
1 | environment=PATH="/home/xxx/envs/your_env/bin:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin" |
10.airflow配置parsing_processes,parallelism,dag_concurrencyd参数的调整#