基于SPI的插件式开发实现方案之@AutoService+ServiceLoader介绍及Dolphinscheduler中的实际应用

这篇具有很好参考价值的文章主要介绍了基于SPI的插件式开发实现方案之@AutoService+ServiceLoader介绍及Dolphinscheduler中的实际应用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.插件化开发概述

插件化开发模式正在很多编程语言或技术框架中得以广泛的应用实践,比如大家熟悉的jenkins,docker可视化管理平台rancher,以及日常编码使用的编辑器idea,vscode等。

实现服务模块之间解耦的方式有很多,但是插件来说,其解耦的程度似乎更高,而且更灵活,可定制化、个性化更好。

以spring来说,之所以具备如此广泛的生态,与其自身内置的各种可扩展的插件机制是分不开的。spring框架提供了很多基于插件化的扩展点。插件化机制让系统的扩展性得以提升,从而可以丰富系统的周边应用生态

2.插件化开发常见思路

以java为例,这里结合实际经验,整理一些常用的插件化实现思路:

  • spi机制;
  • 约定配置和目录,利用反射配合实现;
  • springboot中的Factories机制;
  • java agent(探针)技术;
  • spring内置扩展点;
  • 第三方插件包,例如:spring-plugin-core;
  • spring aop技术;

3.基于AutoService进行组件化开发

使用AutoService+ServiceLoader,本篇博客主要就是介绍此方案

缺点:使用的反射去实例化对象

优点:易配置,易调试,上手快

3.1. AutoService介绍

Github地址:https://github.com/google/auto/tree/master/service

AutoService是Google开源的用来方便生成符合ServiceLoader规范的开源库,使用非常的简单。官方的介绍是java.util.ServiceLoader 风格的服务提供者的配置/元数据生成器。

翻译成中文就是自动服务,这个程序能自动做什么?Java 注释处理器和其他系统使用 java.util.ServiceLoader 来注册使用 META-INF 元数据的已知类型的实现。但是,开发人员很容易忘记更新或正确指定服务描述符。

人工维护配置/元数据的过程
什么意思, 就是我们手动进行SPI插件开发的时候, 都需要手动在类加载路径classpath目录创建两级目录META-INF/services, 然后创建一个以需要扩展的接口的全限定路径名的名称的文件(javax.annotation.processing.Processor), 然后在文件中写入该接口的实现类的全限定路径名(com.yanyelai.MyProcessor)。

人工维护配置/元数据的弊端
如果类路径更新了或者接口的名称定义改变了,包名修改了等等,开发任务忘记了更新对应的配置文件,是不是就会发生问题,可能你会说这怎么可能,怎么会犯这种低级错误,不一定的, 团队越大,开发人员的水平也是良莠不齐, 不能保证所有人都能不出错。

AutoService就是用来解决以上问题的,使用了这个AutoService就不同手动创建这个配置文件了, 在插件编译打包会自动生成这个配置数据,不用再手动创建和维护这个配置文件了。

3.2. AutoService使用示例

在插件中引入AutoService服务

  <dependency>
            <groupId>com.google.auto.service</groupId>
            <artifactId>auto-service</artifactId>
            <version>1.1.0</version>
            <scope>provided</scope>
        </dependency>

首先定义一个接口

package com.yanyelai;

import javax.annotation.processing.Processor;

@AutoService(Processor.class)
final class CustomProcessor implements Processor {
  // …
}

AutoService 将在输出类文件夹中生成文件
META-INF/services/javax.annotation.processing.Processor 。该文件将包含:

com.yanyelai.CustomProcessor

对于 javax.annotation.processing.Processor,如果此元数据文件包含在 jar 中,并且该 jar 位于 javac 的类路径上,则 javac 将自动加载它,并将其包含在其正常的注解处理环境中。java.util.ServiceLoader 的其他用户可能会出于不同的目的使用基础结构,但此元数据将适当地提供自动加载。

4.Dolphinscheduler中AutoService的实际应用案例

4.1.Task插件中AutoService的实际应用案例解读

海豚调度Dolphinscheduler中使用了插件化开发数据源、注册中心、告警插件及任务插件, 解读一个其他的都是类似的, 我们这里用Task插件为例来进行说明。

首先,dolphinscheduler的源码中肯定引用了AutoService依赖, 验证一下:
基于SPI的插件式开发实现方案之@AutoService+ServiceLoader介绍及Dolphinscheduler中的实际应用,SpringBoot,大数据,Spring全家桶,java,海豚调度,spring
基于SPI的插件式开发实现方案之@AutoService+ServiceLoader介绍及Dolphinscheduler中的实际应用,SpringBoot,大数据,Spring全家桶,java,海豚调度,spring

我们一起来看看dolphinscheduler中是如何实现插件加载的

dolphinscheduler-api的启动类ApiApplicationServer.java中进行了任务插件初始化加载,当Spring容器准备时,会触发任务插件安装的方法执行, 源码如下图:
基于SPI的插件式开发实现方案之@AutoService+ServiceLoader介绍及Dolphinscheduler中的实际应用,SpringBoot,大数据,Spring全家桶,java,海豚调度,spring

    @EventListener
    public void run(ApplicationReadyEvent readyEvent) {
        logger.info("Received spring application context ready event will load taskPlugin and write to DB");
        // install task plugin 安装任务插件,这个方法执行会动态加载所有的任务插件注册到Spring容器
        taskPluginManager.loadPlugin();
        for (Map.Entry<String, TaskChannelFactory> entry : taskPluginManager.getTaskChannelFactoryMap().entrySet()) {
            String taskPluginName = entry.getKey();
            TaskChannelFactory taskChannelFactory = entry.getValue();
            List<PluginParams> params = taskChannelFactory.getParams();
            String paramsJson = PluginParamsTransfer.transferParamsToJson(params);

            PluginDefine pluginDefine = new PluginDefine(taskPluginName, PluginType.TASK.getDesc(), paramsJson);
            pluginDao.addOrUpdatePluginDefine(pluginDefine);
        }
    }

dolphinscheduler-service中的TaskPluginManager类中看看loadPlugin方法是如何实现任务插件加载的

   /**
     * 从classpath加载任务插件
     */
    public void loadPlugin() {
        if (!loadedFlag.compareAndSet(false, true)) {
            logger.warn("The task plugin has already been loaded");
            return;
        }
        // 创建了PrioritySPIFactory工厂类,加载TaskChannelFactory的实现类就在这里进行
        PrioritySPIFactory<TaskChannelFactory> prioritySPIFactory = new PrioritySPIFactory<>(TaskChannelFactory.class);
        for (Map.Entry<String, TaskChannelFactory> entry : prioritySPIFactory.getSPIMap().entrySet()) {
            String factoryName = entry.getKey();
            TaskChannelFactory factory = entry.getValue();

            logger.info("Registering task plugin: {} - {}", factoryName, factory.getClass());

            taskChannelFactoryMap.put(factoryName, factory);
            taskChannelMap.put(factoryName, factory.create());

            logger.info("Registered task plugin: {} - {}", factoryName, factory.getClass());
        }
    }

dolphinscheduler-spi中看看PrioritySPIFactory的有参构造方法

    public PrioritySPIFactory(Class<T> spiClass) {
    	// 这里调用了ServiceLoader.load动态从classpath中加载TaskChannelFactory的实现类,使用反射注入到Spring容器,
    	// 那么肯定所有的TaskChannelFactory的实现类上都加了@AutoService(TaskChannelFactory.class)注解。
        for (T t : ServiceLoader.load(spiClass)) {
            if (map.containsKey(t.getIdentify().getName())) {
                resolveConflict(t);
            } else {
                map.put(t.getIdentify().getName(), t);
            }
        }
    }

看到了吧,这里就是用ServiceLoader类实现TaskChannelFactory接口实现类的加载。

验证一下所有的TaskChannelFactory的实现类上都是用了@AutoService(TaskChannelFactory.class)注解

dolphinsscheduler中当前已经提供的TASK的插件列表如下:

TaskChannelFactory
├─ TaskChannelFactory               // TaskChannelFactory接口
│  └─ SubProcessTaskChannelFactory	// SubProcessTask接口实现工厂类
│  └─ PythonTaskChannelFactory 		// PythonTask接口实现工厂类 
│  └─ SqlTaskChannelFactory			// SqlTask接口实现工厂类 
│  └─ JupyterTaskChannelFactory		// JupyterTask接口实现工厂类 
│  └─ DependentTaskChannelFactory	// DependentTask接口实现工厂类
│  └─ DataxTaskChannelFactory		// DataxTask接口实现工厂类
│  └─ HttpTaskChannelFactory		// HttpTask接口实现工厂类
│  └─ PigeonTaskChannelFactory      // PigeonTask接口实现工厂类
│  └─ ShellTaskChannelFactory       // ShellTask接口实现工厂类
│  └─ ZeppelinTaskChannelFactory    // ZeppelinTask接口实现工厂类
│  └─ MlflowTaskChannelFactory      // MlflowTask接口实现工厂类
│  └─ DinkyTaskChannelFactory       // DinkyTask接口实现工厂类
│  └─ FlinkTaskChannelFactory       // FlinkTask接口实现工厂类
│  └─ SparkTaskChannelFactory       // SparkTask接口实现工厂类
│  └─ SagemakerTaskChannelFactory   // SagemakerTask接口实现工厂类
│  └─ EmrTaskChannelFactory         // EmrTask接口实现工厂类
│  └─ K8sTaskChannelFactory         // K8sTask接口实现工厂类
│  └─ SeatunnelTaskChannelFactory   // SeatunnelTask接口实现工厂类
│  └─ FlinkStreamTaskChannelFactory // FlinkStreamTask接口实现工厂类
│  └─ ConditionsTaskChannelFactory  // ConditionsTask接口实现工厂类
│  └─ DvcTaskChannelFactory         // DvcTask接口实现工厂类
│  └─ OpenmldbTaskChannelFactory    // OpenmldbTask接口实现工厂类
│  └─ ChunJunTaskChannelFactory     // ChunJunTask接口实现工厂类
│  └─ SqoopTaskChannelFactory       // SqoopTask口实现工厂类
│  └─ DataQualityTaskChannelFactory // DataQualityTask接口实现工厂类
│  └─ BlockingTaskChannelFactory    // BlockingTask接口实现工厂类
│  └─ PytorchTaskChannelFactory     // PytorchTask接口实现工厂类
│  └─ SwitchTaskChannelFactory      // SwitchTask接口实现工厂类
│  └─ HiveCliTaskChannelFactory     // HiveCliTask接口实现工厂类
│  └─ MapReduceTaskChannelFactory   // MapReduceTask接口实现工厂类
│  └─ ProcedureTaskChannelFactory   // ProcedureTask接口实现工厂类

我们找一个TaskChannelFactory实现类,这里就拿SqlTaskChannelFactory实现类来解释(其他的实现类都是大同小异)

@AutoService(TaskChannelFactory.class)
public class SqlTaskChannelFactory implements TaskChannelFactory {
    @Override
    public String getName() {
        return "SQL";
    }

    @Override
    public List<PluginParams> getParams() {
        return null;
    }

    @Override
    public TaskChannel create() {
        return new SqlTaskChannel();
    }
}

发现了没,实现类都是用@AutoService进行注解,那么打包之后肯定会自动在classpath的META-INF\services目录下生成一个以接口为名称org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory
的文件,里面的内容就是SqlTaskChannelFactory实现类的全限定路径名称,

基于SPI的插件式开发实现方案之@AutoService+ServiceLoader介绍及Dolphinscheduler中的实际应用,SpringBoot,大数据,Spring全家桶,java,海豚调度,spring
SqlTask的插件打成的jar包中也包含了这个元数据文件,如果此元数据文件包含在 jar 中,并且该 jar 位于 javac 的类路径上,则 javac 将自动加载它,并将其包含在其正常的注解处理环境中。java.util.ServiceLoader 的其他用户可能会出于不同的目的使用基础结构,但此元数据将适当地提供自动加载。
基于SPI的插件式开发实现方案之@AutoService+ServiceLoader介绍及Dolphinscheduler中的实际应用,SpringBoot,大数据,Spring全家桶,java,海豚调度,spring
通过以上方式, 就实实现了任务插件的自动加载,通过源码解读之后, 是不是发现其实插件化开发也挺简单的, 没想象的那么复杂。

4.2.dollphinscheduler的Task插件开发

以上我们讲述了插件加载的所有过程,下面我们再讲讲怎么进行Task的插件开发, 如果不会, 我们可以找一个现有的插件看看它的实现思路,因为所有的插件的主体框架肯定是一样, 去源码里面验证一下:
dolphinscheduler的task插件都在dolphinscheduler-task-plugin模块下
基于SPI的插件式开发实现方案之@AutoService+ServiceLoader介绍及Dolphinscheduler中的实际应用,SpringBoot,大数据,Spring全家桶,java,海豚调度,spring
我们就用SQL的任务插件来看看,插件应该怎么写
基于SPI的插件式开发实现方案之@AutoService+ServiceLoader介绍及Dolphinscheduler中的实际应用,SpringBoot,大数据,Spring全家桶,java,海豚调度,spring
看到没有, 插件的开发是不是很简单, 基本见名知意:

  • SqlTaskChannelFactory
    这个就是实现TaskChannelFactory接口的实现类,最主要的方法肯定包含获取任务类型名称及创建SqlTaskChannel的方法

  • SqlTaskChannel
    这个就是实现TaskChannel接口的实现类,最主要的方法肯定包含创建任务、解析任务参数、获取资源信息(数据源等)

  • SqlTask
    Sql任务,主要就是围绕Sql类型的任务的处理, 包含SQL任务中参数的获取、SQL任务的处理逻辑、任务的取消、查询和修改的SQL语句的处理逻辑等等, 这块应该是整个自定义插件开发的核心, 其中包含了不同于其他任务插件的逻辑处理,这里基本都是可以定制处理业务的。

  • SqlSplitter
    这个应该是进行SQL语句的分割处理的, SQL任务插件应该支持自定义SQL语句执行,所以如果用户自定义输入了多条SQL语句, 肯定需要按照某种特定的规则进行解析处理,然后再执行。

  • SqlBinds
    这个应该也很明显, SQL中语句跟输入参数的关系绑定, 然后在任务处理时再处理这种绑定关系,完成参数变量和参数值的替换, 获取到真正需要执行的SQL语句,在进行下一步处理。

通过SQL任务插件的源码解读, 我们知道如果我需要二开一个自己的任务插件,

首先创建一个插件模块dolphinscheduler-task-demo,

pom文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<!--
  ~ Licensed to the Apache Software Foundation (ASF) under one or more
  ~ contributor license agreements.  See the NOTICE file distributed with
  ~ this work for additional information regarding copyright ownership.
  ~ The ASF licenses this file to You under the Apache License, Version 2.0
  ~ (the "License"); you may not use this file except in compliance with
  ~ the License.  You may obtain a copy of the License at
  ~
  ~     http://www.apache.org/licenses/LICENSE-2.0
  ~
  ~ Unless required by applicable law or agreed to in writing, software
  ~ distributed under the License is distributed on an "AS IS" BASIS,
  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  ~ See the License for the specific language governing permissions and
  ~ limitations under the License.
  -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.apache.dolphinscheduler</groupId>
        <artifactId>dolphinscheduler-task-plugin</artifactId>
        <version>3.1.5</version>
    </parent>
    <artifactId>dolphinscheduler-task-demo</artifactId>
    <packaging>jar</packaging>

    <dependencies>
        <dependency>
            <groupId>org.apache.dolphinscheduler</groupId>
            <artifactId>dolphinscheduler-spi</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.dolphinscheduler</groupId>
            <artifactId>dolphinscheduler-task-api</artifactId>
            <version>${project.version}</version>
        </dependency>
    </dependencies>
</project>

先定义一个实现了TaskChannelFactoryMyTaskChannelFactory类, 如下:

@AutoService(TaskChannelFactory.class)
public class MyTaskChannelFactory implements TaskChannelFactory {
    @Override
    public String getName() {
        return "DEMO";
    }

    @Override
    public List<PluginParams> getParams() {
        return null;
    }

    @Override
    public TaskChannel create() {
        return new MyTaskChannel();
    }
}

再定义一个实现了TaskChannelMyTaskChannel类, 如下:

public class MyTaskChannel implements TaskChannel {
    @Override
    public void cancelApplication(boolean status) {

    }

    @Override
    public AbstractTask createTask(TaskExecutionContext taskRequest) {
        return new MyTask(taskRequest);
    }

    @Override
    public AbstractParameters parseParameters(ParametersNode parametersNode) {
        // 这里就是从任务定义中获取参数并解析方法, 如果我们的任务插件中有一些定制的参数输入要求
        // 需要自定义一个参数类继承AbstractParameters,然后增加自定义的任务输入参数字段即可
        return JSONUtils.parseObject(parametersNode.getTaskParams(), MyParameters.class);
    }

    @Override
    public ResourceParametersHelper getResources(String parameters) {
    	// 如果你的参数是字符串,可以使用下面的这种方式进行解析,然后在解析出来的对象中获取属性信息
        return JSONUtils.parseObject(parameters, MyParameters.class).getResources();
    }

}

再定义一个实现了org.apache.dolphinscheduler.plugin.task.api.AbstractTaskMyTask类,如果你的自定义任务需要提交给Yarn去进行资源调度,那个则需要实现 org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask。以下三个方法必须重写,其他的集成方法可以按需重写。

public class MyTask implements AbstractTask{

    public abstract void handle(TaskCallBack taskCallBack) throws TaskException {
		  // 任务的逻辑处理
    }

    public abstract void cancel() throws TaskException {
        // 任务的取消
    }

    public abstract AbstractParameters getParameters() {
        // 任务的参数获取
    };

}

然后在任务处理的方法中如果需要一些额外的工具方法, 可以创建一些辅助工具类,这样基本一个组件就开发好了。然后前端根据我们自定义组件的输入参数要求进行前端页面的开发然后对接调试就可以了。文章来源地址https://www.toymoban.com/news/detail-786027.html

到了这里,关于基于SPI的插件式开发实现方案之@AutoService+ServiceLoader介绍及Dolphinscheduler中的实际应用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【开箱即用】开发了一个基于环信IM聊天室的Vue3插件,从而快速实现仿直播间聊天窗功能

    由于看到有部分的需求为在页面层,快速的引入一个包,并且以简单的配置,就可以快速实现一个聊天窗口,因此尝试以 Vue3 插件的形式开发一个轻量的聊天窗口。 这次简单分享一下此插件的实现思路,以及实现过程,并描述一下本次插件发布 npm 的过程。 Vue3 pnpm Typescript

    2024年02月09日
    浏览(31)
  • Flutter 插件开发遇到的问题及解决方案

    本文主要对笔者flutter插件开发过程中如下问题做了解决。 一、Flutter插件android模块中的代码报红问题解决 二、Flutter Plugin 开发中引入本地 aar 包报错的问题。 三、Flutter插件项目中获取到 Activity 1、在开发Flutter插件时,打开插件的android项目,准备编写native端的代码时,发现各

    2024年02月20日
    浏览(33)
  • 【XR806开发板试用】简单点灯-- 基于SPI控制W2812矩阵幻彩动图和字幕显示系统

    使用指南 自己踩过的坑 必须app开头 鸿蒙hb 依赖python 环境。建议使用conda虚拟环境 下载开启硬件校验和烧录重启 不是科普文,自行百度 `/*WS2812B Timing sequence ________ | | T0L | 0 code |------|--------------| | T0H |________________| 1 code |---------|--------- --| | T1H |______________| RET code | Treset | |--

    2024年04月13日
    浏览(18)
  • 海康摄像头前端调用实时画面解决方案(无插件版开发)

    项目中有一个需求,是需要把海康摄像机的实时画面在项目前端的页面中展示出来。本文的技术栈主要用到了 vue3、vite、threejs 等,辅助软件主要有 海康自带的iVMS-4200 3.9.1.4 客户端、VLC media player 等。原先最开始是想使用海康官方提供的WEB无插件开发包,但是在实际开发中发

    2024年02月16日
    浏览(36)
  • 基于STM32实现W25Q16读写操作(spi)

    在之前我们学习了flash闪存,这个更多的是内部数据存储,容量也是会比较小。这次我们来学习一下更多的存储单元w25q16,顺便了解spi———串行外围设备接口。 在我们的核心板子上基本都会有这么一块芯片,只是有的容量会计较大,大家可以查看板子的原理图,如图所示:

    2024年01月19日
    浏览(25)
  • FPGA实现SPI协议基于ADC128S022进行模拟信号采集

    使用vivado联合modelsim实现SPI协议基于ADC128S022进行模拟信号连续采集。 SPI是串行外设接口,是一种同步/全双工/主从式接口。通常由四根信号线构成: CS_N :片选信号,主从式接口,可以有多个从机,用片选信号进行从机选择; SCLK :串行时钟线,由主机提供给从机; MISO :主机

    2024年02月14日
    浏览(34)
  • 【接口协议】FPGA实现SPI协议基于ADC128S022进行模拟信号采集

    使用vivado联合modelsim实现SPI协议基于ADC128S022进行模拟信号连续采集。 SPI是串行外设接口,是一种同步/全双工/主从式接口。通常由四根信号线构成: CS_N :片选信号,主从式接口,可以有多个从机,用片选信号进行从机选择; SCLK :串行时钟线,由主机提供给从机; MISO :主机

    2024年02月07日
    浏览(31)
  • Hadoop 源码中使用ServiceLoader

    java.util.ServiceLoader使用 今天在看hadoop源代的时候发现,在FileSystem中用到了java.util.ServiceLoader这个类来从配置文件中加载子类或者接口的实现类。以前从来没有使用过这个类,进去大概看了一下具体的实现。主要是从META-INF/services这个目录下的配置文件加载给定接口或者基类的

    2024年04月10日
    浏览(22)
  • 基于chatGPT插件开发

    人工智能(Artificial Intelligence,AI)是指通过计算机模拟人类智能的一种技术。AI技术的发展涉及到多个学科领域,包括计算机科学、数学、统计学、心理学、哲学等。目前AI技术最火爆的莫过于chatGPT4.0,它不但在智能方面有很大提升,而且开放了插件开发,今天和大家分享这

    2023年04月10日
    浏览(18)
  • FPGA实现基于SPI协议的Flash驱动控制(全擦除、页擦除、读数据、页写、连续写—地址写)

    本论文使用Verilog HDL硬件描述语言,结合野火可以FPGA征途Pro开发板,实现了SPI通信协议的全擦除,扇区擦除,读数据,页写,连续写的驱动设计。在 Altera Cyclone Ⅳ 芯片上采用“自顶向下”的模块化设计思想及 Verilog HDL 硬件描述语言,设计并实现串行外设接口(SPI)。在 Qu

    2024年02月12日
    浏览(26)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包