tonglin0325的个人主页

Ubuntu16.04安装apache-airflow

1.安装apache-airflow 1.8.0#

服务器使用的是centos系统,需要安装好pip和setuptools,同时注意更新安装的版本

接下来参考安装好Airflow

1
2
3
Airflow 1.8 工作流平台搭建 http://blog.csdn.net/kk185800961/article/details/78431484
airflow最简安装方法 centos 6.5 http://blog.csdn.net/Excaliburace/article/details/53818530

以mysql作为数据库,airflow默认使用sqlite作为数据库

1.建表

1
2
3
4
5
6
# 创建相关数据库及账号  
mysql> create database airflow default charset utf8 collate utf8_general_ci;
mysql> create user airflow@'localhost' identified by 'airflow';
mysql> grant all on airflow.* to airflow@'localhost';
mysql> flush privileges;

2.安装airflow,需要环境隔离的时候请使用virtualenv ./env创建隔离环境

1
2
sudo pip install apache-airflow==1.8.0

3.使用pip来安装,安装的路径在python路径下site-packages文件夹,在使用上述命令一遍就能看到

1
2
~/anaconda2/lib/python2.7/site-packages/airflow

4.在/etc/proofile中添加,之后source一下

1
2
3
#Airflow
export AIRFLOW_HOME=/home/lintong/airflow

5.创建数据库,这一步会创建上面路径的airflow文件夹,以及文件夹中的一些文件

1
2
airflow initdb

 查看是否安装成功

1
2
airflow version

5.配置元数据库地址

1
2
3
/home/lintong/airflow
vim airflow.cfg

修改下图中的配置

1
2
sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow

6.安装python的mysql驱动

1
2
pip install mysql-python

再次初始化数据库

1
2
airflow initdb

7.启动web界面

1
2
3
airflow webserver -p 8080
http://localhost:8080/admin/

8.在airflow路径下新建一个dags文件夹,并创建一个DAG python脚本,参考:[AirFlow]AirFlow使用指南三 第一个DAG示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# -*- coding: utf-8 -*-

import airflow
import os
import time
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta

# -------------------------------------------------------------------------------
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(0),
'email': ['xxxxxxx'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'adhoc':False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'trigger_rule': u'all_success'
}

# -------------------------------------------------------------------------------
# dag

dag = DAG(
'example_print_dag',
default_args=default_args,
description='my first DAG',
schedule_interval=timedelta(days=1))


def each_content(content, path):
return ("echo \"" + content + " " + time.asctime(time.localtime(time.time())) + "\" >> " + path)

# -------------------------------------------------------------------------------
# first operator

#print1_operator = PythonOperator(
# task_id='print1_task',
# python_callable=each_content("1", "/home/lintong/桌面/test.txt"),
# dag=dag)

print1_operator = BashOperator(
task_id='print1_task',
bash_command='echo 1 >> /home/lintong/test.txt',
dag=dag)

# -------------------------------------------------------------------------------
# second operator

print2_operator = BashOperator(
task_id='print2_task',
bash_command='echo 2 >> /home/lintong/test.txt',
dag=dag)

# -------------------------------------------------------------------------------
# third operator

print3_operator = BashOperator(
task_id='print3_task',
bash_command='echo 3 >> /home/lintong/test.txt',
dag=dag)

# -------------------------------------------------------------------------------
# dependencies
#each_content("1", "/home/lintong/桌面/test.txt")
print2_operator.set_upstream(print1_operator)
print3_operator.set_upstream(print1_operator)

#if __name__ == "__main__":
# dag.cli()

9.在web界面中查看是否出现这个DAG

10.出现的时候,DAG的状态是off,需要将起状态设置为on,并点击后面的 绿色三角形 启动按钮

11.启动调度器

1
2
airflow scheduler

12.查看文件test.txt,其中会顺序出现1 2 3或者1 3 2

 

安装完成后使用下面shell脚本来启动Airflow,端口为8080

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#!/bin/bash

#set -x
#set -e
set -u

# 使用./start_airflow.sh "stop_all"或者"start_all"
if [ $1 == "stop_all" ]; then
# 获取所有进程,取出Airflow,去除grep,截取PID,干掉
ps -ef | grep -Ei 'airflow' | grep -v 'grep' | awk '{print $2}' | xargs kill
fi

if [ $1 == "start_all" ]; then
cd /home/lintong/airflow/logs
nohup airflow webserver >>webserver.log 2>&1 &
nohup airflow worker >>worker.log 2>&1 &
nohup airflow scheduler >>scheduler.log 2>&1 &
echo "后台启动Airflow"
fi

 添加用户的python脚本

1
2
3
4
5
6
7
8
9
10
11
12
from airflow import models,   settings
from airflow.contrib.auth.backends.password_auth import PasswordUser
user = PasswordUser(models.User())
user.username = 'lintong'
user.email = 'xxxxx@gmail.com'
user.password = 'XXXXX'
session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()

 

2.安装apache-airflow 1.10.0#

如果要安装1.10版本的,请使用python3.6

如果pip3找不到了

1
2
sudo python3 -m pip install --upgrade --force-reinstall pip

如果虚拟环境中不存在pip3

1
2
sudo apt-get install python3-venv

安装Python3.6

1
2
3
4
sudo add-apt-repository ppa:deadsnakes/ppa
sudo apt-get update
sudo apt-get install python3.6

安装pip3.6

1
2
3
wget https://bootstrap.pypa.io/get-pip.py
sudo python3.6 get-pip.py

安装python-dev,不然会报error: command ‘x86_64-linux-gnu-gcc’ failed with exit status 1

1
2
sudo apt-get install python3.6-dev

添加AIRFLOW_HOME

1
2
3
#Airflow
export AIRFLOW_HOME=/home/lintong/airflow

安装airflow 1.10.10

1
2
sudo pip3.6 install --force-reinstall apache-airflow==1.10.0 -i https://pypi.tuna.tsinghua.edu.cn/simple

airflow version,然后airflow_home目录下会自动出现airflow.cfg文件,修改airflow.cfg

1
2
sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow

安装mysqlclient,因为python3没有mysql-python

1
2
pip3.6 install mysqlclient

初始化db

1
2
airflow initdb

设置supervisor启动

airflow_scheduler

1
2
3
4
5
6
7
8
9
10
lintong@master:/etc/supervisor/conf.d$ cat airflow_scheduler.conf
[program:airflow_scheduler]
directory=/home/lintong/airflow
command = airflow scheduler
user = lintong
autostart = false
autorestart = true
stderr_logfile = /var/log/supervisor/airflow_scheduler.stderr.log
stdout_logfile = /var/log/supervisor/airflow_scheduler.stdout.log

airflow_worker

当设置 airflow 的 executors 设置为 CeleryExecutor 时才需要开启 worker 守护进程

1
2
3
4
5
6
7
8
9
10
lintong@master:/etc/supervisor/conf.d$ cat airflow_worker.conf
[program:airflow_worker]
directory=/home/lintong/airflow
command = airflow worker
user = lintong
autostart = false
autorestart = true
stderr_logfile = /var/log/supervisor/airflow_worker.stderr.log
stdout_logfile = /var/log/supervisor/airflow_worker.stdout.log

airflow_webserver

1
2
3
4
5
6
7
8
9
10
lintong@master:/etc/supervisor/conf.d$ cat airflow_webserver.conf
[program:airflow_webserver]
directory=/home/lintong/airflow
command = airflow webserver -p 10017
user = lintong
autostart = false
autorestart = true
stderr_logfile = /var/log/supervisor/airflow_webserver.stderr.log
stdout_logfile = /var/log/supervisor/airflow_webserver.stdout.log

  

一些需要修改的配置

1.可以在页面看到configurationview

1
2
expose_config = True

2.不显示airflow exmaple的dag,需要airflow resetdb,然后airflow initdb

1
2
load_examples = False

3.修改LocalExecutor成CeleryExecutor

1
2
3
pip3 install celery
pip3 install redis

修改配置

1
2
3
4
executor = CeleryExecutor
broker_url = redis://127.0.0.1:6379/0
celery_result_backend = redis://127.0.0.1:6379/0

使用LocalExecutor的时候不需要额外启动airflow worker

参考

1
2
https://blog.csdn.net/zzq900503/article/details/104537121

4.并发相关的配置

1
2
3
4
5
6
7
8
9
[celery]
worker_concurrency = 32 # 每台worker的能同时运行的最大实例数量,如果work单独部署的话,可以和所在机器的核数保持一致

[core]
parallelism = 128 # 同时运行的最大实例数量,如果你有4台worker,每台worker的最高并发是32的话,就是128

[scheduler] # 调度器解析dag的线程数量
parsing_processes = 20