Airflow从入门到实战(万字长文)

这篇具有很好参考价值的文章主要介绍了Airflow从入门到实战(万字长文)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Airflow 基本概念

概述

Airflow 是一个以编程方式编写,安排和监视工作流的平台。

使用 Airflow 将工作流编写任务的有向无环图(DAG)。Airflow 计划程序在遵循指定的依赖项,同时在一组工作线程上执行任务。丰富的命令实用程序使在 DAG 上执行复杂的调度变的轻而易举。丰富的用户界面使查看生产中正在运行的管道,监视进度以及需要时对问题进行故障排除变的容易。

名词

(1)Dynamic:Airflow 配置需要实用 Python,允许动态生产管道。这允许编写可动态。这允许编写可动态实例化管道的代码。
(2)Extensible:轻松定义自己的运算符,执行程序并扩展库,使其适合于您的环境。
(3)Elegant:Airlfow 是精简的,使用功能强大的 Jinja 模板引擎,将脚本参数化内置于 Airflow 的核心中。
(4)Scalable:Airflow 具有模板块架构,并使用消息队列来安排任意数量的工作任务。

Airflow 安装

Airflow 官网

https://airflow.apache.org

安装 Python 环境

Airflow 是由 Python 语言编写的 Web 应用,要求 Python3.8 的环境。

安装 Miniconda

conda 是一个开源的包、环境管理器,可以用于在同一个机器上安装不同 Python 版本的软件包及其依赖,并能够在不同的 Python 环境之间切换,Anaconda 包括 Conda、Python 以及一大堆安装好的工具包,比如:numpy、pandas 等,Miniconda 包括 Conda、Python。

此处,我们不需要如此多的工具包,故选择 MiniConda。

1)下载 Miniconda(Python3 版本)

下载地址:https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh

2)安装 Miniconda
(1)执行以下命令进行安装,并按照提示操作,直到安装完成。

[root@hadoop102 software]# mkdir airflow
[root@hadoop102 software]# cd airflow/
--将安装包放入此目录中
[root@hadoop102 airflow]$ bash Miniconda3-latest-Linux-x86_64.sh

(2)在安装过程中,出现以下提示时,可以指定安装路径

airflow教程,Bigdata技术,python,Airflow

(3)出现以下字样,即为安装完成

airflow教程,Bigdata技术,python,Airflow

3)加载环境变量配置文件,使之生效

[root@hadoop102 airflow]# source ~/.bashrc

4)取消激活 base 环境

Miniconda 安装完成后,每次打开终端都会激活其默认的 base 环境,我们可通过以下命令,禁止激活默认 base 环境。

[root@hadoop102 airflow]# conda config --set auto_activate_base false

创建 Python3.8 环境

1)配置 conda 国内镜像

(base) [root@hadoop102 ~]$ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free
(base) [root@hadoop102 ~]$ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main
(base) [root@hadoop102 ~]$ conda config --set show_channel_urls yes

2)创建 Python3.8 环境

(base) [root@hadoop102 ~]$ conda create --name airflow python=3.8

说明:conda 环境管理常用命令
创建环境:conda create -n env_name
查看所有环境:conda info --envs
删除一个环境:conda remove -n env_name --all

3)激活 airflow 环境

(base) [root@hadoop102 ~]$ conda activate airflow

激活后效果如下图所示

[root@hadoop102 software]$ conda activate airflow
(airflow) [atguigu@hadoop102 software]$ 

说明:退出当前环境。

(airflow) [atguigu@hadoop102 ~]$ conda deactivate

4)执行 python -V 命令查看 python 版本

(airflow) [root@hadoop102 software]$ python -V
Python 3.8.13

安装 Airflow

1)更改 pip 的源

[root@hadoop102 software]$ conda activate airflow
(airflow) [root@hadoop102 software]$ pip install numpy -i https://pypi.tuna.tsinghua.edu.cn/simple
(airflow) [root@hadoop102 software]$ sudo mkdir ~/.pip
(airflow) [root@hadoop102 software]$ sudo vim ~/.pip/pip.conf

添加以下内容

[global]
index-url = https://pypi.tuna.tsinghua.edu.cn/simple
[install]
trusted-host = https://pypi.tuna.tsinghua.edu.cn

2)安装 airflow

(airflow) [root@hadoop102 software]$ pip install "apache-airflow==2.4.3"

3)初始化 airflow

(airflow) [root@hadoop102 software]$ airflow db init

4)查看版本

(airflow) [root@hadoop102 software]$ airflow version 
2.4.3

5)airflow 安装好存放路径

(airflow) [root@hadoop102 airflow]$ pwd
/root/airflow

6)创建账号

(airflow) [root@hadoop102 airflow]$ airflow users create \
--username admin \
--firstname bigdata \
--lastname bigdata \
--role Admin \
--email 1127914080@qq.com

airflow教程,Bigdata技术,python,Airflow

此时会让你输入密码,这里笔者的密码设置为123456

7)启动 airflow 调度

(airflow) [root@hadoop102 airflow]$ airflow scheduler -D

8)启动 airflow web 服务,启动后浏览器访问 http://hadoop102:8080

(airflow) [root@hadoop102 airflow]$ airflow webserver -p 8080 -D

airflow教程,Bigdata技术,python,Airflow

airflow教程,Bigdata技术,python,Airflow

则已经进入了airflow的页面。

启动停止脚本

https://blog.csdn.net/weixin_45417821/article/details/

安装后的一些细节问题

页面中显示了两个问题,第一个问题是希望将元数据存放在MySQL或者PostgresSQL中,第二个问题是不建议用这个执行器,接下来我们进行改进。
airflow教程,Bigdata技术,python,Airflow

修改数据库为 MySQL

1)在 MySQL 中建库

mysql> CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

2)如果报错 Linux error:1425F102:SSL routines:ssl_choose_client_version:unsupported protocol,可以关闭 MySQL 的 SSL 证书

查看 SSL 是否开启 YES 为开启

mysql> SHOW VARIABLES LIKE '%ssl%';
+---------------+-----------------+
| Variable_name | Value           |
+---------------+-----------------+
| have_openssl  | YES             |
| have_ssl      | YES             |
| ssl_ca        | ca.pem          |
| ssl_capath    |                 |
| ssl_cert      | server-cert.pem |
| ssl_cipher    |                 |
| ssl_crl       |                 |
| ssl_crlpath   |                 |
| ssl_key       | server-key.pem  |
+---------------+-----------------+
9 rows in set (0.02 sec)

3)修改配置文件 my.cnf,加入以下内容:

vim /etc/my.cnf
# disable_ssl
skip_ssl

并重启mysql

sudo systemctl restart mysqld

4)添加 python 连接的依赖:官网介绍的方法有两种,这里我们选择下面的连接器。

官网连接器地址:https://airflow.apache.org/docs/apache-airflow/2.4.3/howto/set-up-database.html

airflow教程,Bigdata技术,python,Airflow

(airflow) [root@hadoop102 airflow]$ pip install mysql-connector-python

5)修改 airflow 的配置文件:

(airflow)[root@hadoop102 ~]$ cd /root/airflow
[database]
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engines.
# More information here:
# http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri
#sql_alchemy_conn = sqlite:home/atguigu/airflow/airflow.db
sql_alchemy_conn = mysql+mysqlconnector://root:000000@hadoop102:3306/airflow_db

6)关闭 airflow,初始化后重启:

(airflow) [root@hadoop102 ~]$ af.sh stop
(airflow) [root@hadoop102 airflow]$ airflow db init
(airflow) [root@hadoop102 ~]$ af.sh start

7)初始化报错 1067 - Invalid default value for ‘update_at’:
原因:字段 ‘update_at’ 为 timestamp 类型,取值范围是:1970-01-01 00:00:00 到2037-12-31 23:59:59(UTC +8 北京时间从 1970-01-01 08:00:00 开始),而这里默认给了空值,所以导致失败。

推荐修改 mysql 存储时间戳格式:

mysql> set GLOBAL sql_mode ='STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'

重启 MySQL 会造成参数失效,推荐将参数写入到配置文件 my.cnf 中。

vim /etc/my.cnf

sql_mode = STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION

并重启mysql

sudo systemctl restart mysqld

再次初始化查看效果:命令 :airflow db init

8)重新创建账号登录:

(airflow) [root@hadoop102 airflow]$ airflow users create \
--username admin \
--firstname bigdata \
--lastname bigdata \
--role Admin \
--email 1127914080@qq.com

密码依然是123456

启动airflow ,并打开查看

af.sh start

此时发现已经成功进入了 ,并且数据库方面的提示已经消失了

airflow教程,Bigdata技术,python,Airflow

修改执行器

官网不推荐在开发中使用顺序执行器,会造成任务调度阻塞。

airflow教程,Bigdata技术,python,Airflow

关闭airflow,修改配置文件

[root@hadoop102 bin]# af.sh stop
(airflow) [root@hadoop102 airflow]# vim airflow.cfg 

添加如下内容

[core]
# The executor class that airflow should use. Choices include
# ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``,
# ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` or the
# full import path to the class when using a custom executor.
executor = LocalExecutor

可以使用官方推荐的几种执行器,也可以自定义。这里我们选择本地执行器即可。之后再次启动

[root@hadoop102 bin]# af.sh start

hadoop102:8080

可以发现已经没有警告提示了。

airflow教程,Bigdata技术,python,Airflow

部署使用

1)测试环境启动
本次测试使用的是 spark 的官方案例,所有需要启动 hadoop 和 spark 的历史服务器。(这里笔者已经配置好脚本了)

[root@hadoop102 bin]$ hdp.sh start

2)查看 Airflow 配置文件

(alrflow) [root@airflow work-py]# vim ~/airflow/airflow.cfg

airflow教程,Bigdata技术,python,Airflow
代码仓库的目录,别忘了airflow是用代码进行调度的

3)编写.py 脚本,创建 work-py 目录用于存放 python 调度脚本

(airflow) [root@hadoop102 airflow]$ mkdir ~/airflow/dags
(airflow) [root@hadoop102 airflow]$ cd dags/
(airflow) [root@hadoop102 dags]$ vim wordcount.py

添加如下内容

#!/usr/bin/python
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
 # 用户 test_owner DAG下的所有者
 'owner': 'luanhao',
 # 是否开启任务依赖
 'depends_on_past': True, 
 # 邮箱
 'email': ['1127914080@qq.com'],
 # 启动时间
 'start_date':datetime(2023,1,19),
 # 出错是否发邮件报警
 'email_on_failure': False,
 # 重试是否发邮件报警
 'email_on_retry': False,
 # 重试次数
 'retries': 1,
 # 重试时间间隔
 'retry_delay': timedelta(minutes=5),
}
# 声明任务图
# test代表任务名称(可以修改其他名称,这里我们用wordcount)
dag = DAG('wordcount', default_args=default_args, schedule_interval=timedelta(days=1))
# 创建单个任务
t1 = BashOperator(
 # 任务 id
 task_id='dwd',
 # 任务命令 使用Spark的wordcount
 bash_command='ssh hadoop102 "/opt/module/spark-3.2-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-3.2-yarn/examples/jars/spark-examples*.jar 10 "',
 # 重试次数
 retries=1,
 # 把任务添加进图中
 dag=dag)

t2 = BashOperator(
 task_id='dws',
 bash_command='ssh hadoop102 "/opt/module/spark-3.2-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-3.2-yarn/examples/jars/spark-examples*.jar 10 "',
 retries=1,
 dag=dag)

t3 = BashOperator(
 task_id='ads',
 bash_command='ssh hadoop102 "/opt/module/spark-3.2-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-3.2-yarn/examples/jars/spark-examples*.jar 10 "',
 retries=1,
 dag=dag)
 
# 设置任务依赖
t2.set_upstream(t1)
t3.set_upstream(t2)

一些重要参数

必须导包

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args 设置默认参数。
depends_on_past 是否开启任务依赖。
schedule_interval 调度频率。
retries 重试次数 。
start_date 开始时间。
BashOperator 具体执行任务,如果为 true 前置任务必须成功完成才会走下一个依赖任务,如果为 false 则忽略是否成功完成。
task_id 任务唯一标识(必填)。
bash_command 具体任务执行命令。
set_upstream 设置依赖 如上图所示 ads 任务依赖 dws 任务依赖 dwd 任务。

出现wordcount,

airflow教程,Bigdata技术,python,Airflow

运行

airflow教程,Bigdata技术,python,Airflow

点击成功任务,查看日志,步骤如下

airflow教程,Bigdata技术,python,Airflow

airflow教程,Bigdata技术,python,Airflow

airflow教程,Bigdata技术,python,Airflow

airflow教程,Bigdata技术,python,Airflow

日志内容如下:

airflow教程,Bigdata技术,python,Airflow

查看 dag 图、甘特图

airflow教程,Bigdata技术,python,Airflow

airflow教程,Bigdata技术,python,Airflow

查看脚本代码

airflow教程,Bigdata技术,python,Airflow

Dag 任务操作

删除 Dag 任务

主要删除 DAG 任务不会删除底层文件,过一会还会自动加载回来。

airflow教程,Bigdata技术,python,Airflow

查看当前所有 dag 任务

# 查看所有任务
(airflow) [root@hadoop102 airflow]$ airflow list_dags
# 查看单个任务
(airflow) [root@hadoop102 airflow]$ airflow tasks list wordcount --tree

配置邮件服务器

1)保证邮箱已开 SMTP 服务(这里我们使用QQ邮箱,当然其他邮箱也可以)

airflow教程,Bigdata技术,python,Airflow

2)修改 airflow 配置文件,用 stmps 服务对应 587 端口

(airflow) [root@hadoop102 airflow]$ vim ~/airflow/airflow.cfg
smtp_host = smtp.qq.com
smtp_starttls = True
smtp_ssl = False
smtp_user = 1127914080@qq.com
# smtp_user =
smtp_password = yyyfjkoqvsnzhhgb
# smtp_password =
smtp_port = 587
smtp_mail_from = 1127914080@qq.com

3)重启 airflow

[root@hadoop102 bin]$ af.sh stop
[root@hadoop102 bin]$ af.sh start

4)新增 workflows.py 脚本,并加入邮箱功能

(airflow) [root@hadoop102 dags]# vim workflows.py

#!/usr/bin/python
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.email_operator import EmailOperator
from datetime import datetime, timedelta
default_args = {
 # 用户
 'owner': 'luanhao',
 # 是否开启任务依赖
 'depends_on_past': True, 
 # 邮箱
 'email': ['1127914080@qq.com'],
 # 启动时间
 'start_date':datetime(2023,1,19),
 # 出错是否发邮件报警
 'email_on_failure': True,
 # 重试是否发邮件报警
 'email_on_retry': True,
 # 重试次数
 'retries': 1,
 # 重试时间间隔
 'retry_delay': timedelta(minutes=5),
}

# 声明任务图
dag = DAG('workflows', default_args=default_args, schedule_interval=timedelta(days=1))
# 创建单个任务
t1 = BashOperator(
 # 任务 id
 task_id='dwd',
 # 任务命令
 bash_command='ssh hadoop102 "/opt/module/spark-3.2-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-3.2-yarn/examples/jars/spark-examples*.jar 10 "',
 # 重试次数
 retries=1,
 # 把任务添加进图中
 dag=dag)

t2 = BashOperator(
 task_id='dws',
 bash_command='ssh hadoop102 "/opt/module/spark-3.2-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-3.2-yarn/examples/jars/spark-examples*.jar 10 "',
 retries=3,
 dag=dag)

t3 = BashOperator(
 task_id='ads',
 bash_command='ssh hadoop102 "/opt/module/spark-3.2-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-3.2-yarn/examples/jars/spark-examples*.jar 10 "',
 retries=3,
 dag=dag)

email=EmailOperator(
 # 邮箱id
 task_id="email",
 # 发送给xxx
 to="1063182043@qq.com ",
 # 邮箱主题
 subject="hi,你好啊,我是你来自远方的最亲密的伙伴",
 # 发送内容
 html_content="<h1>后天就要过年了,明天该剪头了撒</h1>",
 # 抄送给xxx
 cc="1127914080@qq.com ",
 dag=dag)

# 任务之间相互依赖
t2.set_upstream(t1)
t3.set_upstream(t2)
email.set_upstream(t3)

参数讲解

task_id=“email” : 邮箱的id
to="1063182043@qq.com " :发送给对方
subject=“hi,你好啊,我是你来自远方的最亲密的伙伴”:邮箱的主题描述
html_content= “< h1 >后天就要过年了,明天该剪头了撒</ h1 >”:邮箱的内容
cc="1127914080@qq.com " : 邮箱的抄送内容,抄送给对方

启动

airflow教程,Bigdata技术,python,Airflow

运行测试

airflow教程,Bigdata技术,python,Airflow

airflow教程,Bigdata技术,python,Airflow文章来源地址https://www.toymoban.com/news/detail-799082.html

到了这里,关于Airflow从入门到实战(万字长文)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • airflow 自定义 operator 开发

    airflow DAG 任务执行的核心就是 operator,airflow 2.6 将很多 operator 从自身的项目中剥离出来,作为外部 provider 提供,比如说我们想要在 DAG 中使用 http operator 就需要先导入第三方包。 Documentation 这样就保证了 airflow 本身作为任务编排调度系统的功能内聚性,单一性,其实上边的第

    2024年02月09日
    浏览(18)
  • DolphinDB +Python Airflow 高效实现数据清洗

    DolphinDB 作为一款高性能时序数据库,其在实际生产环境中常有数据的清洗、装换以及加载等需求,而对于该如何结构化管理好 ETL 作业,Airflow 提供了一种很好的思路。本篇教程为生产环境中 ETL 实践需求提供了一个解决方案,将 Python Airflow 引入到 DolphinDB 的高可用集群中,通

    2023年04月14日
    浏览(18)
  • MLOPS:大数据/服务器下的大规模机器学习技术—流水线处理技术的简介(标准化/自动化/可复用化)、常用框架(Pipeline/TFX、Airflow/Beam/Kubeflow/MLflow、Fli

    MLOPS:大数据/服务器下的大规模机器学习技术—流水线处理技术的简介(标准化/自动化/可复用化)、常用框架(Pipeline/TFX、Airflow/Beam/Kubeflow/MLflow、Flink/Kafka)之详细攻略 目录 流水线处理技术的简介 1、流水线处理技术的概述(标准化/自动化/可复用化)

    2024年02月08日
    浏览(26)
  • Apache Zeppelin结合Apache Airflow使用1

    之前学了Zeppelin的使用,今天开始结合Airflow串任务。 Apache Airflow和Apache Zeppelin是两个不同的工具,各自用于不同的目的。Airflow用于编排和调度工作流,而Zeppelin是一个交互式数据分析和可视化的笔记本工具。虽然它们有不同的主要用途,但可以结合使用以满足一些复杂的数据

    2024年01月23日
    浏览(17)
  • 大数据调度平台oozie、azkaban、dolphinscheduler、AirFlow对比

     Apache Oozie#      Linkedin Azkaban #      Azkaban:最适合shell脚本,当job不多的时候,可以使用。  Apache Airflow #          Airflow 在使用时有一大痛点:使用Python语言来定义工作流的。    Apache DolphinScheduler #    特点:分布式、去中心化、易扩展的可视化工作流任务调度系统    

    2024年02月09日
    浏览(21)
  • Airflow大揭秘:如何让大数据任务调度变得简单高效?

    介绍:Airflow是一个开源的、用于创建、调度和监控数据管道的工作流平台。这个平台使用Python编写,并通过有向无环图(Directed Acyclic Graph, DAG)来管理任务流程,使得用户不需要知道业务数据的具体内容,只需设置任务之间的依赖关系,即可实现任务的自动调度。 在具体应

    2024年01月20日
    浏览(25)
  • 大数据调度最佳实践 | 从Airflow迁移到Apache DolphinScheduler

    有部分用户原来是使用 Airflow 作为调度系统的,但是由于 Airflow 只能通过代码来定义工作流,并且没有对资源、项目的粒度划分,导致在部分需要较强权限控制的场景下不能很好的贴合客户需求, 所以部分用户需要将调度系统从 Airflow 迁移到 Apache Dolphinscheduler。 秉承着解决

    2024年02月08日
    浏览(23)
  • airflow v2.6.0 k8s 部署(Rancher)

    k8s Rancher, 阿里云的 nas 存储 一台物理机(需要挂载PVC: dags plugins 和 logs) mysql 数据库和redis 包含airflow 以及对应依赖库的基础镜像 这里使用 airflow 的 CeleryExecutor 部署在 k8s 上,并不是使用 KubernetesExecutor. 基础镜像构建 Dockerfile 文件 这里使用的是 airflow 官方的V2.6.0 的 python3.

    2024年02月09日
    浏览(43)
  • CVE-2020-11978 Apache Airflow 命令注入漏洞分析与利用

    漏洞软件:Apache Airflow 影响版本:= 1.10.10 Vulhub 漏洞测试靶场 进入 /root/vulhub/airflow/CVE-2020-11978/ 目录 运行以下命令启动环境 在客户端访问 server-ip:8080 找到 example_trigger_target_dag 开启 (有向无环图) 后变为 “On” 状态 在这一列的右侧点击如下按钮 输入以下字符后点击 Trigger 按钮

    2024年02月07日
    浏览(20)
  • 【高危】Apache Airflow Spark Provider 任意文件读取漏洞 (CVE-2023-40272)

    Apache Airflow Spark Provider是Apache Airflow项目的一个插件,用于在Airflow中管理和调度Apache Spark作业。 受影响版本中,在JDBC连接时,由于没有对conn_prefix参数做验证,允许输入\\\"?\\\"来指定参数。攻击者可以通过构造参数?allowLoadLocalInfile=true连接攻击者控制的恶意mysql服务器,读取Airfl

    2024年02月11日
    浏览(18)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包