Apache SeaTunnel:新一代高性能、分布式、海量数据集成工具从入门到实践

这篇具有很好参考价值的文章主要介绍了Apache SeaTunnel:新一代高性能、分布式、海量数据集成工具从入门到实践。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

关于Apache SeaTunnel

Apache SeaTunnel 原名 Waterdrop,在 2021 年 10 月更名为 SeaTunnel 并申请加入 Apache孵化器。目前 Apache SeaTunnel 已发布 40+个版本,并在大量企业生产实践中使用,包括 J.P.Morgan、字节跳动、Stey、中国移动、富士康、腾讯云、国双、中科大数据研究院、360、Shoppe、Bilibili、新浪、搜狗、唯品会等企业,广泛应用于海量异构数据集成、CDC 数据同步,SaaS 数据集成以及多源数据处理等场景中。

2021 年 12 月 9 日, Apache SeaTunnel 以全票通过的优秀表现正式成为 Apache 孵化器项目。

2023 年 5 月 17 日,Apache 董事会通过 Apache SeaTunnel 毕业决议,结束了为期 18 个月的孵化,正式确定 Apache SeaTunnel 成为 Apache 顶级项目

Apache SeaTunnel 是新一代高性能、分布式、海量数据集成工具,支持上百种数据源 ( Database/Cloud/SaaS ) 支持海量数据的实时 CDC 和批量同步,可以稳定高效地同步万亿级数据。

作为一款简单一易用、超高性能、支持实时流式和离线批量处理的数据集成平台,Apache SeaTunnel 整体的特征和优势包括:

  • 丰富且可扩展的连接器:SeaTunnel提供了一个不依赖于特定执行引擎的连接器API。基于此API开发的连接器(Source, Transform, Sink)可以在许多不同的引擎上运行,例如当前支持的SeaTunnel Engine, Flink和Spark。
  • 连接器插件:插件设计允许用户轻松开发自己的连接器并将其集成到SeaTunnel项目中。目前,SeaTunnel支持100多个连接器,而且这个数字还在飙升。下面是当前支持的连接器列表
  • 批处理流集成:基于SeaTunnel Connector API开发的连接器完美兼容离线同步、实时同步、全同步、增量同步等场景。它们大大降低了管理数据集成任务的难度。
  • 支持分布式快照算法,保证数据一致性。
  • 多引擎支持:SeaTunnel默认使用SeaTunnel引擎进行数据同步。SeaTunnel还支持使用Flink或Spark作为连接器的执行引擎,以适应企业现有的技术组件。SeaTunnel支持多个版本的Spark和Flink。
  • JDBC多路复用,数据库日志多表解析:SeaTunnel支持多表或整个数据库同步,解决了JDBC过度连接的问题;支持多表或全数据库的日志读取和解析,解决了CDC多表同步场景需要处理日志重复读取和解析的问题。
  • 高吞吐量和低延迟:SeaTunnel支持并行读写,提供稳定可靠的高吞吐量和低延迟的数据同步能力。
  • 完善的实时监控:SeaTunnel支持对数据同步过程中每一步的详细监控信息,让用户轻松了解同步任务读写的数据数量、数据大小、QPS等信息。
  • 支持两种作业开发方法:编码和画布设计。SeaTunnel web项目https://github.com/apache/seatunnel-web提供了作业、调度、运行和监控功能的可视化管理

SeaTunnel系统架构设计

apache seatunnel,数据集成技术详解,apache,分布式,大数据

SeaTunnel 安装部署

SeaTunnel 引擎是 SeaTunnel 的默认引擎。SeaTunnel的安装包中已经包含了SeaTunnel Engine的全部内容

SeaTunnel 安装包获取

  1. 可以通过直接下载编译的包进行安装

https://dlcdn.apache.org/seatunnel/2.3.3/apache-seatunnel-2.3.3-bin.tar.gz

下载完毕之后上传到服务器上面并解压

# 解压到了/opt/module目录下
tar -zxvf apache-seatunnel-2.3.3-bin.tar.gz -C /opt/module
  1. 通过源代码方式进行编译获取安装包
  • 从https://seatunnel.apache.org/download或https://github.com/apache/seatunnel.git获取源码包
  • 使用maven命令构建安装包./mvnw -U -T 1C clean install -DskipTests -D"maven.test.skip"=true -D"maven.javadoc.skip"=true -D"checkstyle.skip"=true -D"license.skipAddThirdParty"
  • 然后就可以在 中获取安装包${Your_code_dir}/seatunnel-dist/target,例如:apache-seatunnel-2.3.3-SNAPSHOT-bin.tar.gz

配置 SEATUNNEL_HOME

安装Connectors插件

从2.2.0-beta开始,二进制包默认不提供connectors的依赖,因此在第一次使用它时,需要执行以下命令来安装连接器:(当然,您也可以从Apache Maven Repository[https://repo.maven.apache.org/maven2/org/apache/seatunnel/]手动下载连接器,然后手动移动到connectors/seatunnel目录)

sh bin/install-plugin.sh 2.3.3

如果需要指定connector的版本,以2.3.3版本为例,需要执行

sh bin/install-plugin.sh 2.3.3

一般情况下我们不需要所有的连接器插件,所以你可以通过配置config/plugin_config来指定你需要的插件,例如,你只需要connector-console插件,然后你可以修改plugin.properties,比如

--seatunnel-connectors--
connector-console
--end--

如果希望示例应用程序正常工作,则需要添加以下插件

--seatunnel-connectors--
connector-fake
connector-console
--end--

你可以在${SEATUNNEL_HOME}/connectors/plugins-mapping.properties下找到所有支持的连接器和相应的plugin_config配置名称

如果想手动安装V2 connector插件,只需要下载自己需要的V2 connector插件,放到${SEATUNNEL_HOME}/connectors/seatunnel目录下即可

启动示例作业

定义一个作业配置文件

我们可以直接使用官方的模版config/v2.batch.config.template, 该模版分别定义了env、source、sink

env:表示运行的参数设置,比如并发数,作业运行模式:BATCH/STRAM、checkpoint等等

source:表示数据源的定义

sink:表示目标端的数据源定义

执行命令:

./bin/seatunnel.sh --config ./config/v2.batch.config.template -e local

我是用的2.3.3,在运行后会报错,提示缺少:java.lang.NoClassDefFoundError: com/sun/jersey/client/impl/CopyOnWriteHashMap

报错信息:

2023-11-08 17:47:32,243 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Fatal Error,

2023-11-08 17:47:32,243 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Please submit bug report in https://github.com/apache/seatunnel/issues

2023-11-08 17:47:32,243 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Reason:SeaTunnel job executed failed

2023-11-08 17:47:32,244 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError: com/sun/jersey/client/impl/CopyOnWriteHashMap
	at com.hazelcast.spi.impl.AbstractInvocationFuture.wrapInCompletionException(AbstractInvocationFuture.java:1347)
	at com.hazelcast.spi.impl.AbstractInvocationFuture.cascadeException(AbstractInvocationFuture.java:1340)
	at com.hazelcast.spi.impl.AbstractInvocationFuture.access$200(AbstractInvocationFuture.java:65)
	at com.hazelcast.spi.impl.AbstractInvocationFuture$ApplyNode.execute(AbstractInvocationFuture.java:1478)
	at com.hazelcast.spi.impl.AbstractInvocationFuture.unblockOtherNode(AbstractInvocationFuture.java:797)
	at com.hazelcast.spi.impl.AbstractInvocationFuture.unblockAll(AbstractInvocationFuture.java:759)
	at com.hazelcast.spi.impl.AbstractInvocationFuture.complete0(AbstractInvocationFuture.java:1235)
	at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionallyInternal(AbstractInvocationFuture.java:1223)
	at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionally(AbstractInvocationFuture.java:709)
	at com.hazelcast.client.impl.spi.impl.ClientInvocation.completeExceptionally(ClientInvocation.java:294)
	at com.hazelcast.client.impl.spi.impl.ClientInvocation.notifyExceptionWithOwnedPermission(ClientInvocation.java:321)
	at com.hazelcast.client.impl.spi.impl.ClientInvocation.notifyException(ClientInvocation.java:304)
	at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.handleResponse(ClientResponseHandlerSupplier.java:164)
	at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.process(ClientResponseHandlerSupplier.java:141)
	at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.access$300(ClientResponseHandlerSupplier.java:60)
	at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier$DynamicResponseHandler.accept(ClientResponseHandlerSupplier.java:251)
	at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier$DynamicResponseHandler.accept(ClientResponseHandlerSupplier.java:243)
	at com.hazelcast.client.impl.connection.tcp.TcpClientConnection.handleClientMessage(TcpClientConnection.java:245)
	at com.hazelcast.client.impl.protocol.util.ClientMessageDecoder.handleMessage(ClientMessageDecoder.java:135)
	at com.hazelcast.client.impl.protocol.util.ClientMessageDecoder.onRead(ClientMessageDecoder.java:89)
	at com.hazelcast.internal.networking.nio.NioInboundPipeline.process(NioInboundPipeline.java:136)
	at com.hazelcast.internal.networking.nio.NioThread.processSelectionKey(NioThread.java:383)
	at com.hazelcast.internal.networking.nio.NioThread.processSelectionKeys(NioThread.java:368)
	at com.hazelcast.internal.networking.nio.NioThread.selectLoop(NioThread.java:294)
	at com.hazelcast.internal.networking.nio.NioThread.executeRun(NioThread.java:249)
	at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
Caused by: java.lang.NoClassDefFoundError: com/sun/jersey/client/impl/CopyOnWriteHashMap
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan$Builder.<init>(CheckpointPlan.java:66)

下载安装包

下载地址[https://repo.maven.apache.org/maven2/org/apache/seatunnel/seatunnel-hadoop3-3.1.4-uber/2.3.2/seatunnel-hadoop3-3.1.4-uber-2.3.2-optional.jar]

将安装包放入 $SEATUNNEL_HOME/lib 下面

再次运行:

./bin/seatunnel.sh --config ./config/v2.batch.config.template -e local

运行结果

2023-11-08 17:55:32,575 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - wait checkpoint completed: 9223372036854775807
2023-11-08 17:55:32,621 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - pending checkpoint(9223372036854775807/1@774572734674894849) notify finished!
2023-11-08 17:55:32,621 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start notify checkpoint completed, checkpoint:org.apache.seatunnel.engine.server.checkpoint.CompletedCheckpoint@7e4b2ce6
2023-11-08 17:55:32,627 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start clean pending checkpoint cause CheckpointCoordinator completed.
2023-11-08 17:55:32,628 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - Turn checkpoint_state_774572734674894849_1 state from null to FINISHED
2023-11-08 17:55:32,672 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 20000, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=1}
2023-11-08 17:55:32,672 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] Task TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=1} complete with state FINISHED
2023-11-08 17:55:32,672 INFO  org.apache.seatunnel.engine.server.CoordinatorService - [localhost]:5801 [seatunnel-610439] [5.1] Received task end from execution TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=1}, state FINISHED
2023-11-08 17:55:32,674 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SplitEnumerator (1/1)] turn to end state FINISHED.
2023-11-08 17:55:32,674 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SplitEnumerator (1/1)] end with state FINISHED
2023-11-08 17:55:32,684 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 50000, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30000}
2023-11-08 17:55:32,684 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 50001, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30001}
2023-11-08 17:55:33,480 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 40001, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30001}
2023-11-08 17:55:33,480 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 40000, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30000}
2023-11-08 17:55:33,480 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] Task TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30001} complete with state FINISHED
2023-11-08 17:55:33,480 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] Task TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30000} complete with state FINISHED
2023-11-08 17:55:33,480 INFO  org.apache.seatunnel.engine.server.CoordinatorService - [localhost]:5801 [seatunnel-610439] [5.1] Received task end from execution TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30001}, state FINISHED
2023-11-08 17:55:33,480 INFO  org.apache.seatunnel.engine.server.CoordinatorService - [localhost]:5801 [seatunnel-610439] [5.1] Received task end from execution TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30000}, state FINISHED
2023-11-08 17:55:33,482 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SourceTask (1/2)] turn to end state FINISHED.
2023-11-08 17:55:33,482 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SourceTask (2/2)] turn to end state FINISHED.
2023-11-08 17:55:33,482 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SourceTask (1/2)] end with state FINISHED
2023-11-08 17:55:33,482 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SourceTask (2/2)] end with state FINISHED
2023-11-08 17:55:33,482 INFO  org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)] end with state FINISHED
2023-11-08 17:55:33,506 INFO  org.apache.seatunnel.engine.server.master.JobMaster - release the pipeline Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)] resource
2023-11-08 17:55:33,507 INFO  org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774572734674894849, slot: SlotProfile{worker=[localhost]:5801, slotID=1, ownerJobID=774572734674894849, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='82bd6da9-17b1-41ba-b745-9f6d4fea5378'}
2023-11-08 17:55:33,507 INFO  org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774572734674894849, slot: SlotProfile{worker=[localhost]:5801, slotID=2, ownerJobID=774572734674894849, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='82bd6da9-17b1-41ba-b745-9f6d4fea5378'}
2023-11-08 17:55:33,507 INFO  org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774572734674894849, slot: SlotProfile{worker=[localhost]:5801, slotID=3, ownerJobID=774572734674894849, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='82bd6da9-17b1-41ba-b745-9f6d4fea5378'}
2023-11-08 17:55:33,510 INFO  org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)] turn to end state FINISHED.
2023-11-08 17:55:33,511 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan - Job SeaTunnel_Job (774572734674894849) end with state FINISHED
2023-11-08 17:55:33,523 INFO  org.apache.seatunnel.engine.client.job.ClientJobProxy - Job (774572734674894849) end with state FINISHED
2023-11-08 17:55:33,546 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand -
***********************************************
           Job Statistic Information
***********************************************
Start Time                : 2023-11-08 17:55:30
End Time                  : 2023-11-08 17:55:33
Total Time(s)             :                   2
Total Read Count          :                  32
Total Write Count         :                  32
Total Failed Count        :                   0
***********************************************

2023-11-08 17:55:33,546 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-610439] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTTING_DOWN
2023-11-08 17:55:33,548 INFO  com.hazelcast.internal.server.tcp.TcpServerConnection - [localhost]:5801 [seatunnel-610439] [5.1] Connection[id=1, /127.0.0.1:5801->/127.0.0.1:33602, qualifier=null, endpoint=[127.0.0.1]:33602, remoteUuid=38fbe3e1-876c-48e9-b145-1989888393ab, alive=false, connectionType=JVM, planeIndex=-1] closed. Reason: Connection closed by the other side
2023-11-08 17:55:33,548 INFO  com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 [seatunnel-610439] [5.1] Removed connection to endpoint: [localhost]:5801:6f7f4921-7d71-42af-b26a-6f11a7960118, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/127.0.0.1:33602->localhost/127.0.0.1:5801}, remoteAddress=[localhost]:5801, lastReadTime=2023-11-08 17:55:33.543, lastWriteTime=2023-11-08 17:55:33.523, closedTime=2023-11-08 17:55:33.547, connected server version=5.1}
2023-11-08 17:55:33,548 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-610439] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED
2023-11-08 17:55:33,549 INFO  com.hazelcast.client.impl.ClientEndpointManager - [localhost]:5801 [seatunnel-610439] [5.1] Destroying ClientEndpoint{connection=Connection[id=1, /127.0.0.1:5801->/127.0.0.1:33602, qualifier=null, endpoint=[127.0.0.1]:33602, remoteUuid=38fbe3e1-876c-48e9-b145-1989888393ab, alive=false, connectionType=JVM, planeIndex=-1], clientUuid=38fbe3e1-876c-48e9-b145-1989888393ab, clientName=hz.client_1, authenticated=true, clientVersion=5.1, creationTime=1699437330660, latest clientAttributes=lastStatisticsCollectionTime=1699437330713,enterprise=false,clientType=JVM,clientVersion=5.1,clusterConnectionTimestamp=1699437330653,clientAddress=127.0.0.1,clientName=hz.client_1,credentials.principal=null,os.committedVirtualMemorySize=7048228864,os.freePhysicalMemorySize=15730012160,os.freeSwapSpaceSize=0,os.maxFileDescriptorCount=65535,os.openFileDescriptorCount=51,os.processCpuTime=5290000000,os.systemLoadAverage=0.86,os.totalPhysicalMemorySize=33566306304,os.totalSwapSpaceSize=0,runtime.availableProcessors=8,runtime.freeMemory=994659352,runtime.maxMemory=1029177344,runtime.totalMemory=1029177344,runtime.uptime=2424,runtime.usedMemory=34517992, labels=[]}
2023-11-08 17:55:33,550 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-610439] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN
2023-11-08 17:55:33,550 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed SeaTunnel client......
2023-11-08 17:55:33,551 INFO  com.hazelcast.core.LifecycleService - [localhost]:5801 [seatunnel-610439] [5.1] [localhost]:5801 is SHUTTING_DOWN
2023-11-08 17:55:33,553 INFO  com.hazelcast.internal.partition.impl.MigrationManager - [localhost]:5801 [seatunnel-610439] [5.1] Shutdown request of Member [localhost]:5801 - 6f7f4921-7d71-42af-b26a-6f11a7960118 this is handled
2023-11-08 17:55:33,557 INFO  com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-610439] [5.1] Shutting down connection manager...
2023-11-08 17:55:33,558 INFO  com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-610439] [5.1] Shutting down node engine...
2023-11-08 17:55:35,980 INFO  com.hazelcast.instance.impl.NodeExtension - [localhost]:5801 [seatunnel-610439] [5.1] Destroying node NodeExtension.
2023-11-08 17:55:35,980 INFO  com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-610439] [5.1] Hazelcast Shutdown is completed in 2427 ms.
2023-11-08 17:55:35,980 INFO  com.hazelcast.core.LifecycleService - [localhost]:5801 [seatunnel-610439] [5.1] [localhost]:5801 is SHUTDOWN
2023-11-08 17:55:35,980 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed HazelcastInstance ......
2023-11-08 17:55:35,981 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed metrics executor service ......
2023-11-08 17:55:35,981 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - run shutdown hook because get close signal

日志输入如上内容,说明配置成功

Seatunnel示例:mysql-to-mysql

接下来,我们在测试一下使用seatunnel实现从mysql同步到mysql的配置

首先,需要下载mysql jdbc的的依赖,这里我们可以选择在plugin-mapping.properties文件中配置connector-jdbc ,也可以直接将connector-jdbc的jar包放入到 $SEATUNNEL_HOME//connectors/seatunnel 下面

创建mysql库和表

create database test_01;
create table test_01.user(userid int(4) primary key not null auto_increment,username varchar(16) not null);

create database test_02;
create table test_02.user(userid int(4) primary key not null auto_increment,username varchar(16) not null);

插入数据

insert into test_01.user (username) values ("zhangsan");
insert into test_01.user (username) values ("lisi");

创建同步配置文件

env {
  job.mode = "BATCH"
}

# 配置数据源
source {
  jdbc {
    url = "jdbc:mysql://172.1.1.54:3306/test_01"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "admin"
    password = "xxxxxxx"
    generate_sink_sql = true
    database = "test_01"
    table = "user"
    query = "select * from test_01.user"
  }
}

transform {
}

# 配置目标库
sink {
  jdbc {
    url = "jdbc:mysql://172.1.1.54:3306/test_02"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "admin"
    password = "xxxxxx"
    generate_sink_sql = true
    database = "test_02"
    table = "user"
  }

}

运行命令:

./bin/seatunnel.sh -e LOCAL -c ./config/mysql-to-mysql.conf

输出如下信息表示同步成功:

023-11-08 20:55:00,468 INFO  org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774617897816293377, slot: SlotProfile{worker=[localhost]:5801, slotID=1, ownerJobID=774617897816293377, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='36c05043-4938-47e0-927c-94e7cac4f749'}
2023-11-08 20:55:00,468 INFO  org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774617897816293377, slot: SlotProfile{worker=[localhost]:5801, slotID=2, ownerJobID=774617897816293377, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='36c05043-4938-47e0-927c-94e7cac4f749'}
2023-11-08 20:55:00,470 INFO  org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (774617897816293377), Pipeline: [(1/1)] turn to end state FINISHED.
2023-11-08 20:55:00,471 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan - Job SeaTunnel_Job (774617897816293377) end with state FINISHED
2023-11-08 20:55:00,483 INFO  org.apache.seatunnel.engine.client.job.ClientJobProxy - Job (774617897816293377) end with state FINISHED
2023-11-08 20:55:00,511 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand -
***********************************************
           Job Statistic Information
***********************************************
Start Time                : 2023-11-08 20:54:58
End Time                  : 2023-11-08 20:55:00
Total Time(s)             :                   1
Total Read Count          :                   2
Total Write Count         :                   2
Total Failed Count        :                   0
***********************************************

2023-11-08 20:55:00,511 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-250845] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTTING_DOWN
2023-11-08 20:55:00,514 INFO  com.hazelcast.internal.server.tcp.TcpServerConnection - [localhost]:5801 [seatunnel-250845] [5.1] Connection[id=1, /127.0.0.1:5801->/127.0.0.1:33254, qualifier=null, endpoint=[127.0.0.1]:33254, remoteUuid=332c6983-64e1-4d2a-8d4b-7b40d9677a39, alive=false, connectionType=JVM, planeIndex=-1] closed. Reason: Connection closed by the other side
2023-11-08 20:55:00,514 INFO  com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 [seatunnel-250845] [5.1] Removed connection to endpoint: [localhost]:5801:1373b501-f19f-47a2-9ef2-66d14e5a31c0, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/127.0.0.1:33254->localhost/127.0.0.1:5801}, remoteAddress=[localhost]:5801, lastReadTime=2023-11-08 20:55:00.508, lastWriteTime=2023-11-08 20:55:00.483, closedTime=2023-11-08 20:55:00.512, connected server version=5.1}
2023-11-08 20:55:00,515 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-250845] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED
2023-11-08 20:55:00,516 INFO  com.hazelcast.client.impl.ClientEndpointManager - [localhost]:5801 [seatunnel-250845] [5.1] Destroying ClientEndpoint{connection=Connection[id=1, /127.0.0.1:5801->/127.0.0.1:33254, qualifier=null, endpoint=[127.0.0.1]:33254, remoteUuid=332c6983-64e1-4d2a-8d4b-7b40d9677a39, alive=false, connectionType=JVM, planeIndex=-1], clientUuid=332c6983-64e1-4d2a-8d4b-7b40d9677a39, clientName=hz.client_1, authenticated=true, clientVersion=5.1, creationTime=1699448098390, latest clientAttributes=lastStatisticsCollectionTime=1699448098443,enterprise=false,clientType=JVM,clientVersion=5.1,clusterConnectionTimestamp=1699448098383,clientAddress=127.0.0.1,clientName=hz.client_1,credentials.principal=null,os.committedVirtualMemorySize=7048228864,os.freePhysicalMemorySize=12655243264,os.freeSwapSpaceSize=0,os.maxFileDescriptorCount=65535,os.openFileDescriptorCount=51,os.processCpuTime=5340000000,os.systemLoadAverage=0.24,os.totalPhysicalMemorySize=33566306304,os.totalSwapSpaceSize=0,runtime.availableProcessors=8,runtime.freeMemory=994543624,runtime.maxMemory=1029177344,runtime.totalMemory=1029177344,runtime.uptime=2526,runtime.usedMemory=34633720, labels=[]}
2023-11-08 20:55:00,516 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-250845] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN
2023-11-08 20:55:00,516 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed SeaTunnel client......
2023-11-08 20:55:00,517 INFO  com.hazelcast.core.LifecycleService - [localhost]:5801 [seatunnel-250845] [5.1] [localhost]:5801 is SHUTTING_DOWN
2023-11-08 20:55:00,520 INFO  com.hazelcast.internal.partition.impl.MigrationManager - [localhost]:5801 [seatunnel-250845] [5.1] Shutdown request of Member [localhost]:5801 - 1373b501-f19f-47a2-9ef2-66d14e5a31c0 this is handled
2023-11-08 20:55:00,523 INFO  com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-250845] [5.1] Shutting down connection manager...
2023-11-08 20:55:00,525 INFO  com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-250845] [5.1] Shutting down node engine...
2023-11-08 20:55:03,377 INFO  com.hazelcast.instance.impl.NodeExtension - [localhost]:5801 [seatunnel-250845] [5.1] Destroying node NodeExtension.
2023-11-08 20:55:03,377 INFO  com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-250845] [5.1] Hazelcast Shutdown is completed in 2858 ms.
2023-11-08 20:55:03,378 INFO  com.hazelcast.core.LifecycleService - [localhost]:5801 [seatunnel-250845] [5.1] [localhost]:5801 is SHUTDOWN
2023-11-08 20:55:03,378 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed HazelcastInstance ......
2023-11-08 20:55:03,378 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed metrics executor service ......
2023-11-08 20:55:03,379 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - run shutdown hook because get close signal

查看同步结果

MySQL [(none)]>
MySQL [(none)]> select * from test_02.user;
+--------+----------+
| userid | username |
+--------+----------+
|      1 | zhangsan |
|      2 | lisi     |
+--------+----------+
2 rows in set (0.00 sec)

Seatunnel 集成Flink&Spark引擎

编辑seatunnel-env.sh文件

修改FLINK_HOME为flink部署目录

修改Spark_HOME为spark部署目录

FLINK_HOME = /data/flink-1.14.5/
SPRK_HOME  = /data/spark-2.4.6/

启动Flink集群

./bin/start_cluster.sh

执行命令,还是拿mysql-to-msyq.conf 为例

注意:如果是同步mysql的话,需要将jdbc的jar包放在flink/lib目录下,这次其实和使用flink做一些数据同步一样,相关的依赖包都给到flink。

./bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/mysql-to-mysql.conf

执行结果如下:

Execute SeaTunnel Flink Job: ${FLINK_HOME}/bin/flink run -c org.apache.seatunnel.core.starter.flink.SeaTunnelFlink /mnt/apache-seatunnel-2.3.3/starter/seatunnel-flink-13-starter.jar --config ./config/mysql-to-mysql.conf --name SeaTunnel
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/mnt/kmr/flink1/1/flink-1.14.5/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/mnt/kmr/hadoop/1/hadoop-3.1.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Job has been submitted with JobID a643a1990705f817479b1d2880c9f038
Program execution finished
Job with JobID a643a1990705f817479b1d2880c9f038 has finished.
Job Runtime: 2356 ms

清空test_02.user表,使用spark导入

MySQL [(none)]> truncate table test_02.user;
Query OK, 0 rows affected (0.01 sec)

执行导入命令

./bin/start-seatunnel-spark-2-connector-v2.sh \
--master local[2] \
--deploy-mode client \
--config ./config/mysql-to-mysql.conf

执行结果如下:

23/11/08 21:19:57 INFO executor.FieldNamedPreparedStatement: PrepareStatement sql is:
INSERT INTO `test_02`.`user` (`userid`, `username`) VALUES (?, ?)

23/11/08 21:19:57 INFO v2.DataWritingSparkTask: Commit authorized for partition 0 (task 0, attempt 0, stage 0.0)
23/11/08 21:19:57 INFO v2.DataWritingSparkTask: Committed partition 0 (task 0, attempt 0, stage 0.0)
23/11/08 21:19:57 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 1148 bytes result sent to driver
23/11/08 21:19:57 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1486 ms on localhost (executor driver) (1/1)
23/11/08 21:19:57 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
23/11/08 21:19:57 INFO scheduler.DAGScheduler: ResultStage 0 (save at SinkExecuteProcessor.java:117) finished in 1.568 s
23/11/08 21:19:57 INFO scheduler.DAGScheduler: Job 0 finished: save at SinkExecuteProcessor.java:117, took 1.610054 s
23/11/08 21:19:57 INFO v2.WriteToDataSourceV2Exec: Data source writer org.apache.seatunnel.translation.spark.sink.writer.SparkDataSourceWriter@25ea068e is committing.
23/11/08 21:19:57 INFO v2.WriteToDataSourceV2Exec: Data source writer org.apache.seatunnel.translation.spark.sink.writer.SparkDataSourceWriter@25ea068e committed.
23/11/08 21:19:57 INFO execution.SparkExecution: Spark Execution started
23/11/08 21:19:57 INFO spark.SparkContext: Invoking stop() from shutdown hook
23/11/08 21:19:57 INFO server.AbstractConnector: Stopped Spark@6e8a9c30{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
23/11/08 21:19:57 INFO ui.SparkUI: Stopped Spark web UI at http://kmr-b55b8d33-gn-0a6e9139-az1-master-1-2.ksc.com:4040
23/11/08 21:19:57 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/11/08 21:19:57 INFO memory.MemoryStore: MemoryStore cleared
23/11/08 21:19:57 INFO storage.BlockManager: BlockManager stopped
23/11/08 21:19:57 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
23/11/08 21:19:57 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/11/08 21:19:57 INFO spark.SparkContext: Successfully stopped SparkContext
23/11/08 21:19:57 INFO util.ShutdownHookManager: Shutdown hook called
23/11/08 21:19:57 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-6dfc2825-0da5-4e4c-9623-bddcccab0849

查询数据表,已导入成功

MySQL [(none)]> truncate table test_02.user;
Query OK, 0 rows affected (0.01 sec)

MySQL [(none)]> select * from test_02.user;
+--------+----------+
| userid | username |
+--------+----------+
|      1 | zhangsan |
|      2 | lisi     |
+--------+----------+

apache seatunnel,数据集成技术详解,apache,分布式,大数据文章来源地址https://www.toymoban.com/news/detail-767971.html

到了这里,关于Apache SeaTunnel:新一代高性能、分布式、海量数据集成工具从入门到实践的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 从 Elasticsearch 到 Apache Doris,10 倍性价比的新一代日志存储分析平台

    作者介绍:肖康,SelectDB 技术副总裁 日志数据的处理与分析是最典型的大数据分析场景之一,过去业内以 Elasticsearch 和 Grafana Loki 为代表的两类架构难以同时兼顾高吞吐实时写入、低成本海量存储、实时文本检索的需求。Apache Doris 借鉴了信息检索的核心技术,在存储引擎上实

    2024年02月03日
    浏览(62)
  • 从 Elasticsearch 到 Apache Doris,10 倍性价比的新一代日志存储分析平台|新版本揭秘

    日志数据的处理与分析是最典型的大数据分析场景之一,过去业内以 Elasticsearch 和 Grafana Loki 为代表的两类架构难以同时兼顾高吞吐实时写入、低成本海量存储、实时文本检索的需求。Apache Doris 借鉴了信息检索的核心技术,在存储引擎上实现了面向 AP 场景优化的高性能倒排索

    2024年02月12日
    浏览(53)
  • 移远通信推出新一代高算力智能模组SG885G-WF,为工业和消费级IoT应用带来全新性能标杆

    2023年7月24日,全球领先的物联网整体解决方案供应商移远通信宣布,正式推出其新一代旗舰级安卓智能模组SG885G-WF。该智能模组具有高达48 TOPS 的AI综合算力、强大性能及丰富的多媒体功能,非常适用于需要高处理能力和多媒体功能的工业和消费者应用。 SG885G-WF采用由高通技

    2024年02月15日
    浏览(58)
  • 数据仓库系列:StarRocks 下一代高性能分析数据仓库的架构、数据存储及表设计

    本文是学习StarRocks的读书笔记,让你快速理解下一代高性能分析数据仓库的架构、数据存储及表设计。 StarRocks的架构相对简单。 整个系统只包含两种类型的组件,前端(FE)和后端(BE),StarRocks不依赖任何外部组件,简化了部署和维护。 FE和BE可以在不停机的情况下横向扩展。

    2024年02月16日
    浏览(63)
  • 高性能的全文检索库Apache Lucene 介绍

            Apache Lucene 是一个高性能的全文检索库,由 Apache Software Foundation 维护。Lucene 提供了丰富的 API,用于实现快速、准确的全文搜索。本文将详细介绍 Apache Lucene 的技术特点、应用场景和优势。         Apache Lucene 的历史和发展         Apache Lucene 起源于 1999 年

    2024年03月27日
    浏览(53)
  • 1.5 新一代信息技术

    战略性新兴产业是以重大技术突破和重大发展需求为基础,对经济社会全局和长远发展具有重大引领带动作用,知识技术密集、物质资源消耗少、成长潜力大、综合效益好的产业。 依据《国务院关于加快培育和发展战略性新兴产业的决定》(国发(2010) 32号),七个战略性新兴产

    2023年04月08日
    浏览(64)
  • Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库

    Apache Doris是一个基于MPP架构的易于使用,高性能和实时的分析数据库,以其极高的速度和易用性而闻名。海量数据下返回查询结果仅需亚秒级响应时间,不仅可以支持高并发点查询场景,还可以支持高通量复杂分析场景。 这些都使得 Apache Doris 成为报表分析、即席查询、统一

    2024年02月03日
    浏览(57)
  • No.14新一代信息技术

    新一代信息技术产业包括:加快建设宽带、泛在、融合、安全的信息忘了基础设施,推动新一代移动通信、下一代互联网核心设备和智能终端的研发及产业化,加快推进三网融合,促进物联网、云计算的研发和示范应用。 大数据、云计算、互联网+、物联网、智慧城市等是新

    2024年02月09日
    浏览(49)
  • 新一代硬件安全:第一章-简介

    Chapter 1 Introduction 1.1 Fundamentals of Hardware Security In our modern age of omnipresent and highly interconnected information technology, cybersecurity becomes ever more challenged. For example, with the rise of the Internet of Things (IoT), most such equipment is connected to the internet in some way, often inscrutable to the regular customers. This f

    2024年02月12日
    浏览(56)
  • 云计算:新一代的技术革命

    云计算,作为21世纪的一项重要技术革命,已在全球范围内引发了深远的影响。它改变了我们存储和处理数据的方式,使得企业无需再建设和维护昂贵的本地服务器和数据中心。本文将深入探讨云计算的基本概念,类型,主要优点,以及它在未来可能的发展趋势。 云计算的基

    2024年02月12日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包