1.安装apache-airflow 1.8.0 服务器使用的是centos系统,需要安装好pip和setuptools,同时注意更新安装的版本
接下来参考安装好Airflow
1 2 3 Airflow 1.8 工作流平台搭建 http://blog.csdn.net/kk185800961/article/details/78431484 airflow最简安装方法 centos 6.5 http://blog.csdn.net/Excaliburace/article/details/53818530
以mysql作为数据库,airflow默认使用sqlite作为数据库
1.建表
1 2 3 4 5 6 # 创建相关数据库及账号 mysql> create database airflow default charset utf8 collate utf8_general_ci; mysql> create user airflow@'localhost' identified by 'airflow'; mysql> grant all on airflow.* to airflow@'localhost'; mysql> flush privileges;
2.安装airflow,需要环境隔离的时候请使用virtualenv ./en v创建隔离环境
1 2 sudo pip install apache-airflow==1.8.0
3.使用pip来安装,安装的路径在python路径下site-packages文件夹,在使用上述命令一遍就能看到
1 2 ~/anaconda2/lib/python2.7/site-packages/airflow
4.在/etc/proofile中添加,之后source一下
1 2 3 #Airflow export AIRFLOW_HOME=/home/lintong/airflow
5.创建数据库,这一步会创建上面路径的airflow文件夹,以及文件夹中的一些文件
查看是否安装成功
5.配置元数据库地址
1 2 3 /home/lintong/airflow vim airflow.cfg
修改下图中的配置
1 2 sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow
6.安装python的mysql驱动
1 2 pip install mysql-python
再次初始化数据库
7.启动web界面
1 2 3 airflow webserver -p 8080 http://localhost:8080/admin/
8.在airflow路径下新建一个dags文件夹,并创建一个DAG python脚本,参考:[AirFlow]AirFlow使用指南三 第一个DAG示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 # -*- coding: utf-8 -*- import airflow import os import time from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from datetime import timedelta # ------------------------------------------------------------------------------- # these args will get passed on to each operator # you can override them on a per-task basis during operator initialization default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': airflow.utils.dates.days_ago(0), 'email': ['xxxxxxx'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5) # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), # 'wait_for_downstream': False, # 'dag': dag, # 'adhoc':False, # 'sla': timedelta(hours=2), # 'execution_timeout': timedelta(seconds=300), # 'on_failure_callback': some_function, # 'on_success_callback': some_other_function, # 'on_retry_callback': another_function, # 'trigger_rule': u'all_success' } # ------------------------------------------------------------------------------- # dag dag = DAG( 'example_print_dag', default_args=default_args, description='my first DAG', schedule_interval=timedelta(days=1)) def each_content(content, path): return ("echo \"" + content + " " + time.asctime(time.localtime(time.time())) + "\" >> " + path) # ------------------------------------------------------------------------------- # first operator #print1_operator = PythonOperator( # task_id='print1_task', # python_callable=each_content("1", "/home/lintong/桌面/test.txt"), # dag=dag) print1_operator = BashOperator( task_id='print1_task', bash_command='echo 1 >> /home/lintong/test.txt', dag=dag) # ------------------------------------------------------------------------------- # second operator print2_operator = BashOperator( task_id='print2_task', bash_command='echo 2 >> /home/lintong/test.txt', dag=dag) # ------------------------------------------------------------------------------- # third operator print3_operator = BashOperator( task_id='print3_task', bash_command='echo 3 >> /home/lintong/test.txt', dag=dag) # ------------------------------------------------------------------------------- # dependencies #each_content("1", "/home/lintong/桌面/test.txt") print2_operator.set_upstream(print1_operator) print3_operator.set_upstream(print1_operator) #if __name__ == "__main__": # dag.cli()
9.在web界面中查看是否出现这个DAG
10.出现的时候,DAG的状态是off,需要将起状态设置为on,并点击后面的 绿色三角形 启动按钮
11.启动调度器
12.查看文件test.txt,其中会顺序出现1 2 3或者1 3 2
安装完成后使用下面shell脚本来启动Airflow,端口为8080
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 #!/bin/bash #set -x #set -e set -u # 使用./start_airflow.sh "stop_all"或者"start_all" if [ $1 == "stop_all" ]; then # 获取所有进程,取出Airflow,去除grep,截取PID,干掉 ps -ef | grep -Ei 'airflow' | grep -v 'grep' | awk '{print $2}' | xargs kill fi if [ $1 == "start_all" ]; then cd /home/lintong/airflow/logs nohup airflow webserver >>webserver.log 2>&1 & nohup airflow worker >>worker.log 2>&1 & nohup airflow scheduler >>scheduler.log 2>&1 & echo "后台启动Airflow" fi
添加用户的python脚本
1 2 3 4 5 6 7 8 9 10 11 12 from airflow import models, settings from airflow.contrib.auth.backends.password_auth import PasswordUser user = PasswordUser(models.User()) user.username = 'lintong' user.email = 'xxxxx@gmail.com' user.password = 'XXXXX' session = settings.Session() session.add(user) session.commit() session.close() exit()
2.安装apache-airflow 1.10.0 如果要安装1.10版本的,请使用python3.6
如果pip3找不到了
1 2 sudo python3 -m pip install --upgrade --force-reinstall pip
如果虚拟环境中不存在pip3
1 2 sudo apt-get install python3-venv
安装Python3.6
1 2 3 4 sudo add-apt-repository ppa:deadsnakes/ppa sudo apt-get update sudo apt-get install python3.6
安装pip3.6
1 2 3 wget https://bootstrap.pypa.io/get-pip.py sudo python3.6 get-pip.py
安装python-dev,不然会报error: command ‘x86_64-linux-gnu-gcc’ failed with exit status 1
1 2 sudo apt-get install python3.6-dev
添加AIRFLOW_HOME
1 2 3 #Airflow export AIRFLOW_HOME=/home/lintong/airflow
安装airflow 1.10.10
1 2 sudo pip3.6 install --force-reinstall apache-airflow==1.10.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
airflow version,然后airflow_home目录下会自动出现airflow.cfg文件,修改airflow.cfg
1 2 sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow
安装mysqlclient,因为python3没有mysql-python
1 2 pip3.6 install mysqlclient
初始化db
设置supervisor启动
airflow_scheduler
1 2 3 4 5 6 7 8 9 10 lintong@master:/etc/supervisor/conf.d$ cat airflow_scheduler.conf [program:airflow_scheduler] directory=/home/lintong/airflow command = airflow scheduler user = lintong autostart = false autorestart = true stderr_logfile = /var/log/supervisor/airflow_scheduler.stderr.log stdout_logfile = /var/log/supervisor/airflow_scheduler.stdout.log
airflow_worker
当设置 airflow 的 executors 设置为 CeleryExecutor 时才需要开启 worker 守护进程
1 2 3 4 5 6 7 8 9 10 lintong@master:/etc/supervisor/conf.d$ cat airflow_worker.conf [program:airflow_worker] directory=/home/lintong/airflow command = airflow worker user = lintong autostart = false autorestart = true stderr_logfile = /var/log/supervisor/airflow_worker.stderr.log stdout_logfile = /var/log/supervisor/airflow_worker.stdout.log
airflow_webserver
1 2 3 4 5 6 7 8 9 10 lintong@master:/etc/supervisor/conf.d$ cat airflow_webserver.conf [program:airflow_webserver] directory=/home/lintong/airflow command = airflow webserver -p 10017 user = lintong autostart = false autorestart = true stderr_logfile = /var/log/supervisor/airflow_webserver.stderr.log stdout_logfile = /var/log/supervisor/airflow_webserver.stdout.log
一些需要修改的配置
1.可以在页面看到configurationview
2.不显示airflow exmaple的dag,需要airflow resetdb,然后airflow initdb
3.修改LocalExecutor成CeleryExecutor
1 2 3 pip3 install celery pip3 install redis
修改配置
1 2 3 4 executor = CeleryExecutor broker_url = redis://127.0.0.1:6379/0 celery_result_backend = redis://127.0.0.1:6379/0
使用LocalExecutor的时候不需要额外启动airflow worker
参考
1 2 https://blog.csdn.net/zzq900503/article/details/104537121
4.并发相关的配置
1 2 3 4 5 6 7 8 9 [celery] worker_concurrency = 32 # 每台worker的能同时运行的最大实例数量,如果work单独部署的话,可以和所在机器的核数保持一致 [core] parallelism = 128 # 同时运行的最大实例数量,如果你有4台worker,每台worker的最高并发是32的话,就是128 [scheduler] # 调度器解析dag的线程数量 parsing_processes = 20