概述
airflow DAG 任务执行的核心就是 operator,airflow 2.6 将很多 operator 从自身的项目中剥离出来,作为外部 provider 提供,比如说我们想要在 DAG 中使用 http operator 就需要先导入第三方包。
Documentation
这样就保证了 airflow 本身作为任务编排调度系统的功能内聚性,单一性,其实上边的第三方 provider 的 operator 也是遵守了自定义 operator 的开发流程。
但系统的架构设计一般是和公司的组织架构想结合的,需要我们自己去实现对应的 operator 去满足这种架构设计,就拿我所在的部门来说,组织架构主要被分为了三层:
- 应用平台组负责对外产品系统,可以理解为大前端,是需要不断迭代,经常变更的
- 中间件组比较稳定,为各个系统赋能,并不会经常变更,并且需要通用,还有一个重要的作用就是承接应用平台组与基础架构组,在保证前端产品频繁变更的同时,底层基础架构无需变动。同样的,当底层基础架构变更时,也不会直接影响到产品侧。
- 基础架构组就维护着集群的资源,以及对应的任务。
自定义 operator 需求场景
可以发现 airflow 本身的 operator 并不能完全满足部门本身的架构设计:
上边中间组中的离线调度系统就是对 airflow 的进一步封装,数据开发平台会提交离线任务至离线调度系统,但是离线调度系统本身的提交任务至集群功能不能满足所需,就需要用到 ygg 任务提交中间件。
ygg 任务提交中间件的功能也比较单纯,就是将外部传来的任务参数构造成对应的 yaml 文件,然后提交至集群即可,这个时候 airflow 就需要一个 ygg 的 operator 用于将任务参数发送至 ygg 任务提交中间件。
实现步骤
-
自定义 operator 编写
自定义 operator 需要继承 BaseOperator。
注:airflow v2.1 之前在编写自定义 operator 的时候需要使用 apply_defaults 装饰,后续版本不需要。from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.landsat_ygg_invoke_hook import MyCustomHook class MyCustomOperator(BaseOperator): @apply_defaults def __init__(self, my_param, *args, **kwargs): super(MyCustomOperator, self).__init__(*args, **kwargs) self.my_param = my_param self.hook = None def execute(self, context): """ 自定义 operator 具体的业务逻辑 """ self.hook = MyCustomHook(context) state, log = self.hook.run(context) def on_kill(self): """ 当任务实例被杀死时,会调用该方法 """ state, log = self.hook.kill(context)
- 自定义 operator 需要重写 execute 方法,该方法的 context 参数表示 task instance 的相关信息,可以从中获取对应信息。
- 如果自定义 operator 需要调用外部服务的接口,一种比较好的做法就是通过自定义 hook 去执行业务逻辑,在 operator 的 execute 方法中控制 hook 的执行,并获取对应的执行结果相关信息。
-
on_kill
方法是否继承为可选的,airflow 本身任务实例的杀死并不会影响到外部依赖,比如说根据上边的场景,通过airflow 提交任务信息至 ygg 中间件,ygg 中间件在提交至计算集群(k8s),当 airflow 杀死实例的时候,并不会真正杀死已经提交至集群上的任务实例。
这个时候就需要实现on_kill
方法,通过自定义 hook 去杀死计算集群(k8s)上对应的任务实例。
-
自定义 hook 编写
自定义 operator 需要继承 BaseHook。from airflow.hooks.base import BaseHook class MyCustomHook(BaseHook): def __init__(self, ygg_conn_id='ygg_default', context=None): super(LandsatYggInvokeHook, self).__init__() self.ygg_conn_id = ygg_conn_id self.context = context def get_conn(self, schema=None): """ 从 connection 表中获取外部服务连接信息 """ pass def run(self): """ 具体的执行业务逻辑 """ pass def kill(self): """ 具体的杀死外部服务的任务实例的业务逻辑 """ pass
-
配置 airflow.cfg
EasyAirflow git 地址:https://github.com/itnoobzzy/EasyAirflow注: 在本地开发自定义 operator 的时候,需要将自定义 hooks 和 自定义 operator 的上级目录(plugins)设置为 source root 目录,这样在本地调试的时候就不会报 import 错误:
把自定义 operators 和 自定义 hooks 都放在 plugins 目录下,然后再 airflow.cfg 目录中指定 plugins 的目录,这样在 DAG 中就可以引用自定义的 operator 了。
需要注意的是如果是用 docker 或者 k8s 启动的 airflow 必须将 plugins 挂载至容器内部,并且airflow.cfg 中的目录也是填写的容器内的 plugins 的目录:
文章来源:https://www.toymoban.com/news/detail-490736.html -
DAG 中引用自定义 operator
如果自定义 operator 没有bug, 并且plugins 相关配置正确,在使用的时候和内置的 operator 一样直接引用即可:文章来源地址https://www.toymoban.com/news/detail-490736.htmlfrom my_custom_operator import MyCustomOperator my_task = MyCustomOperator( task_id='my_task', my_param='some_value', ... )
到了这里,关于airflow 自定义 operator 开发的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!