airflow 自定义 operator 开发

这篇具有很好参考价值的文章主要介绍了airflow 自定义 operator 开发。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

概述

airflow DAG 任务执行的核心就是 operator,airflow 2.6 将很多 operator 从自身的项目中剥离出来,作为外部 provider 提供,比如说我们想要在 DAG 中使用 http operator 就需要先导入第三方包。

Documentation

image.png
这样就保证了 airflow 本身作为任务编排调度系统的功能内聚性,单一性,其实上边的第三方 provider 的 operator 也是遵守了自定义 operator 的开发流程。
但系统的架构设计一般是和公司的组织架构想结合的,需要我们自己去实现对应的 operator 去满足这种架构设计,就拿我所在的部门来说,组织架构主要被分为了三层:

  • 应用平台组负责对外产品系统,可以理解为大前端,是需要不断迭代,经常变更的
  • 中间件组比较稳定,为各个系统赋能,并不会经常变更,并且需要通用,还有一个重要的作用就是承接应用平台组与基础架构组,在保证前端产品频繁变更的同时,底层基础架构无需变动。同样的,当底层基础架构变更时,也不会直接影响到产品侧。
  • 基础架构组就维护着集群的资源,以及对应的任务。

image.png
自定义 operator 需求场景
可以发现 airflow 本身的 operator 并不能完全满足部门本身的架构设计:
上边中间组中的离线调度系统就是对 airflow 的进一步封装,数据开发平台会提交离线任务至离线调度系统,但是离线调度系统本身的提交任务至集群功能不能满足所需,就需要用到 ygg 任务提交中间件。
ygg 任务提交中间件的功能也比较单纯,就是将外部传来的任务参数构造成对应的 yaml 文件,然后提交至集群即可,这个时候 airflow 就需要一个 ygg 的 operator 用于将任务参数发送至 ygg 任务提交中间件。

实现步骤

  1. 自定义 operator 编写
    自定义 operator 需要继承 BaseOperator。
    注:airflow v2.1 之前在编写自定义 operator 的时候需要使用 apply_defaults 装饰,后续版本不需要。

    from airflow.models import BaseOperator
    from airflow.utils.decorators import apply_defaults
    
    from hooks.landsat_ygg_invoke_hook import MyCustomHook
    
    
    class MyCustomOperator(BaseOperator):
        @apply_defaults
        def __init__(self, my_param, *args, **kwargs):
            super(MyCustomOperator, self).__init__(*args, **kwargs)
            self.my_param = my_param
            self.hook = None
    
        def execute(self, context):
            """
            自定义 operator 具体的业务逻辑
          	"""
          	self.hook = MyCustomHook(context)
            state, log = self.hook.run(context)
            
        def on_kill(self):
            """
            当任务实例被杀死时,会调用该方法
            """
            state, log = self.hook.kill(context)
    
    • 自定义 operator 需要重写 execute 方法,该方法的 context 参数表示 task instance 的相关信息,可以从中获取对应信息。
    • 如果自定义 operator 需要调用外部服务的接口,一种比较好的做法就是通过自定义 hook 去执行业务逻辑,在 operator 的 execute 方法中控制 hook 的执行,并获取对应的执行结果相关信息。
    • on_kill 方法是否继承为可选的,airflow 本身任务实例的杀死并不会影响到外部依赖,比如说根据上边的场景,通过airflow 提交任务信息至 ygg 中间件,ygg 中间件在提交至计算集群(k8s),当 airflow 杀死实例的时候,并不会真正杀死已经提交至集群上的任务实例。
      这个时候就需要实现 on_kill 方法,通过自定义 hook 去杀死计算集群(k8s)上对应的任务实例。
  2. 自定义 hook 编写
    自定义 operator 需要继承 BaseHook。

    from airflow.hooks.base import BaseHook
    
    
    class MyCustomHook(BaseHook):
        def __init__(self, ygg_conn_id='ygg_default', context=None):
            super(LandsatYggInvokeHook, self).__init__()
            self.ygg_conn_id = ygg_conn_id
            self.context = context
      	
      	def get_conn(self, schema=None):
          	"""
          	从 connection 表中获取外部服务连接信息
          	"""
          	pass
      	
        def run(self):
            """
            具体的执行业务逻辑
          	"""
            pass
            
        def kill(self):
            """
            具体的杀死外部服务的任务实例的业务逻辑
            """
            pass
    
  3. 配置 airflow.cfg
    EasyAirflow git 地址:
    https://github.com/itnoobzzy/EasyAirflowimage.png
    注: 在本地开发自定义 operator 的时候,需要将自定义 hooks 和 自定义 operator 的上级目录(plugins)设置为 source root 目录,这样在本地调试的时候就不会报 import 错误:
    image.png
    把自定义 operators 和 自定义 hooks 都放在 plugins 目录下,然后再 airflow.cfg 目录中指定 plugins 的目录,这样在 DAG 中就可以引用自定义的 operator 了。
    image.png
    需要注意的是如果是用 docker 或者 k8s 启动的 airflow 必须将 plugins 挂载至容器内部,并且airflow.cfg 中的目录也是填写的容器内的 plugins 的目录:
    image.png
    image.png

  4. DAG 中引用自定义 operator
    如果自定义 operator 没有bug, 并且plugins 相关配置正确,在使用的时候和内置的 operator 一样直接引用即可:文章来源地址https://www.toymoban.com/news/detail-490736.html

    from my_custom_operator import MyCustomOperator
    
    my_task = MyCustomOperator(
        task_id='my_task',
        my_param='some_value',
        ...
    )
    

到了这里,关于airflow 自定义 operator 开发的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Operator 开发实践 四 (WebHook)

    1. WebHook介绍 我们知道访问Kubernetes API有好几种方式,比如使用kubectl命令、使用client-go之类的开发库、直接通过REST请求等。不管是一个使用kubectl的真人用户,还是一个Service Account,都可以通过API访问认证,这个过程官网有一张图描述得很直观 当一个访问请求发送到API Server的

    2024年02月07日
    浏览(35)
  • 云原生|详解Kubernetes Operator在项目中的开发应用

    目录 一、使用场景 (一)client-go中处理逻辑 (二)controller-runtime中处理逻辑 二、使用controller-runtime开发operator项目 (一)生成框架代码 (二)定义crd字段 (三)生成crd文件 (四)初始化manager (五)配置controller (六)配置webhook controller-runtime是基于kubernetes控制器模式衍

    2024年02月04日
    浏览(34)
  • Apache Airflow (一) : Airflow架构、术语、工作原理

    🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客  🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。  🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频 目录 1.什么是Airflow 2. Airflow架构 3. 

    2024年01月22日
    浏览(39)
  • FreeRTOS学习笔记——四、任务的定义与任务切换的实现

    本章是我们真正从从0 到1 写FreeRTOS 的第一章, 属于基础中的基础 必须要学会创建任务,并重点掌握任务是如何切换 因为任务的切换是由汇编代码来完成的,所以代码看起来比较难懂,但是会尽力把代码讲得透彻 如果本章内容学不会, 后面的内容根本无从下手 在这章中: 我

    2024年02月07日
    浏览(63)
  • 区块链基础知识1:定义、原理及概述

    随着数字经济的快速发展,区块链技术逐渐崭露头角,成为推动各行各业数字化转型的重要力量。本文旨在介绍区块链的基础知识,包括其定义、原理以及应用领域,以期为读者提供一个全面而深入的了解。 区块链是一种基于去中心化、分布式、不可篡改的数据存储结构。它

    2024年04月27日
    浏览(46)
  • 如何调整 Windows 11 任务栏位置、对齐方式,及自定义任务栏

    更新于:2023-11-22 分类:Windows 阅读(115407) 评论(12) 如果你是 Windows 11 用户中的一员,一定在不断尝试它的新功能。Windows 11 操作系统采用了全新设计的外观,具有重新设计的 Windows 资源管理器、圆润的窗口边缘和默认将应用程序图标居中的任务栏。如果你是刚从 Windows 10 升

    2024年01月21日
    浏览(34)
  • [ZenTao]源码阅读:自定义任务类型

    1、module/custom/control.php  2、module/custom/model.php

    2024年02月10日
    浏览(29)
  • Hadoop3教程(二):HDFS的定义及概述

    随着实际生产环境中的数据越来越大,在一台服务器上无法存储下所有的数据,那么就要把数据分散到多台服务器的磁盘里存放。但是像这样做跨服务器的数据管理和维护是很难的,所以就迫切需要一种方式,来协调管理多台机器上的文件,这就是分布式文件管理系统。 HD

    2024年02月07日
    浏览(46)
  • 云计算概述(发展过程、定义、发展阶段、云计算榜单)(一)

    本文目录:  零、00时光宝盒 一、前言 二、云计算的发展过程 三、云计算的定义 四、云计算发展阶段 五、云计算公司榜单看云计算兴衰 六、参考资料   2022年,嫂子捡到3只小奶猫,幼猫,因要去旅游,曾寄养在我家10天。我没有猫笼子,就把猫放在鱼缸里养,我称之为“养

    2024年01月17日
    浏览(78)
  • 206.Flink(一):flink概述,flink集群搭建,flink中执行任务,单节点、yarn运行模式,三种部署模式的具体实现

    Flink官网地址:Apache Flink® — Stateful Computations over Data Streams | Apache Flink Flink是一个 框架 和 分布式处理引擎 ,用于对 无界 和 有界 数据流进行 有状态计算 。 无界流(流): 有定义流的开始,没有定义结束。会无休止产生数据 无界流数据必须持续处理 有界流(批): 有定

    2024年02月11日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包