maxwell 基于zookeeper的高可用方案

这篇具有很好参考价值的文章主要介绍了maxwell 基于zookeeper的高可用方案。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Maxwell版本1.39.2

一: 添加zk的pox文件

<!-- customize HA -->
<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-framework</artifactId>
  <version>5.4.0</version>
</dependency>

<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-recipes</artifactId>
  <version>5.4.0</version>
</dependency>

二: 创建zk工具类

在 com.zendesk.maxwell.util 包下创建 CuratorUtil 类,后面会使用此类实现高可用

package com.zendesk.maxwell.util;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class CuratorUtil {

 private final String zookeeperServers;
 private final int sessionTimeoutMs;
 private final int connectionTimeoutMs;
 private final int baseSleepTimeMs;
 private final int maxRetries;
 private CuratorFramework client;


 public CuratorUtil(String zookeeperServers, int sessionTimeoutMs, int connectionTimeoutMs, int baseSleepTimeMs,
   int maxRetries) {
  this.zookeeperServers = zookeeperServers;
  this.sessionTimeoutMs = sessionTimeoutMs;
  this.connectionTimeoutMs = connectionTimeoutMs;
  this.baseSleepTimeMs = baseSleepTimeMs;
  this.maxRetries = maxRetries;
 }

 /*
  * 构造 zookeeper 客户端,并连接 zookeeper 集群
  */
 public void start() {
  ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(this.baseSleepTimeMs, this.maxRetries);
  client = CuratorFrameworkFactory.newClient(
    this.zookeeperServers,
    this.sessionTimeoutMs,
    this.connectionTimeoutMs,
    retryPolicy
  );
  client.start();
 }

 /*
  * 实现分布式锁
  */
 public void highAvailable() {
  // 1.连接 Zookeeper 客户端
  this.start();
  // 2.向 zookeeper 注册自己
  String lockPath = "/maxwell/ha/lock";
  InterProcessMutex lock = new InterProcessMutex(client, lockPath);
  try {
   // 3.获取锁
   lock.acquire();
   // 4.将自己信息注册到 leader 路径
   String leaderPath = "/maxwell/ha/leader";
   client.create()
     .withMode(CreateMode.EPHEMERAL)
     .forPath(leaderPath);
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}

三: 修改 com.zendesk.maxwell 包下的MaxwellConfig类

3.1 添加属性

// 类新增属性
public String zookeeperServers;
public int sessionTimeoutMs;
public int connectionTimeoutMs;
public int baseSleepTimeMs;
public int maxRetries;

3.2 buildOptionParser 方法添加代码

parser.accepts( "zookeeper", "zookeeper servers support maxwell high available" )
  .withRequiredArg();
parser.accepts( "session_timeout_ms", "session timeout ms with zookeeper" )
  .withRequiredArg();
parser.accepts( "connection_timeout_ms", "connection timeout ms with zookeeper" )
  .withRequiredArg();
parser.accepts( "base_sleep_time_ms", "base sleep time ms if retry" )
  .withRequiredArg();
parser.accepts( "max_retries", "max retry times" )
  .withRequiredArg();

3.3 setup 方法添加代码

this.haMode = fetchBooleanOption("ha", options, properties, false);
this.zookeeperServers = fetchStringOption("zookeeper",options, properties, null);
this.sessionTimeoutMs = fetchIntegerOption("session_timeout_ms",options, properties, 5000);
this.connectionTimeoutMs = fetchIntegerOption("connection_timeout_ms",options, properties, 5000);
this.baseSleepTimeMs = fetchIntegerOption("base_sleep_time_ms",options, properties, 5000);
this.maxRetries = fetchIntegerOption("max_retries",options, properties, 3);
if (haMode && zookeeperServers == null){
 LOGGER.error("you must specify --zookeeper because you want to use maxwell in ha mode");
}

四:修改 com.zendesk.maxwell.Maxwell 的main函数

将代码段

if ( config.haMode ) {
 new MaxwellHA(maxwell, config.jgroupsConf, config.raftMemberID, config.clientID).startHA();
} else {
 maxwell.start();
}

全部注释掉,修改为

if ( config.haMode ) {
 CuratorUtil curatorUtil = new CuratorUtil(config.zookeeperServers, config.sessionTimeoutMs, config.connectionTimeoutMs, config.baseSleepTimeMs, config.maxRetries);
 curatorUtil.highAvailable();
}
maxwell.start();

然后重新打包就能得到基于zk的高可用版本了,打包时可以将test包删除,防止出现错误

源码下载地址

五: 启动脚本

5.1 创建配置文件 config.properties

log_level=info

# mysql login info
host=localhost
port=3306
user=root
password=root123
schema_database=maxwell
# options to pass into the jdbc connection, given as opt=val&opt2=val2
#jdbc_options=opt1=100&opt2=hello

producer=kafka

#       *** kafka ***
producer=kafka
#kafka.bootstrap.servers=hosta:9092,hostb:9092
kafka.bootstrap.servers=localhost:9092
kafka.max.request.size = 104857600

kafka_topic=mysql.%{database}.%{table}
kafka_version=2.7.0

# alternative kafka topic to write DDL (alter/create/drop) to.  Defaults to kafka_topic
#ddl_kafka_topic=maxwell_ddl

# hash function to use.  "default" is just the JVM's 'hashCode' function.
#kafka_partition_hash=default # [default, murmur3]

# how maxwell writes its kafka key.
#
# 'hash' looks like:
# {"database":"test","table":"tickets","pk.id":10001}
#
# 'array' looks like:
# ["test","tickets",[{"id":10001}]]
#
# default: "hash"
#kafka_key_format=hash # [hash, array]

5.2 启动脚本编写 startup.sh

#!/bin/bash

single(){
  bin/maxwell --filter 'exclude: *.*, include: cp.*' --kafka_version=2.7.0 --config=config.properties --daemon
  echo -e "\033[32m单机版启动成功\n\033[0m"
}

ha(){
  ## zookeeper 多个用,分割
  bin/maxwell --filter 'exclude: *.*, include: cp.*' --kafka_version=2.7.0 --config=config.properties --ha --zookeeper=127.0.0.1:2181 --daemon
  echo -e "\033[32m高可用版启动成功\n\033[0m"
}

case "$1" in
  'ha')
     ha
     ;;
  *)
     single
     ;;
esac

5.2.1 高可用版本启动命令

./startup.sh ha

5.2.2 单机版启动命令

./startup.sh


 文章来源地址https://www.toymoban.com/news/detail-630275.html

到了这里,关于maxwell 基于zookeeper的高可用方案的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • PostgreSQL基于Patroni方案的高可用启动流程分析

    什么是Patroni 在很多生产环境中,分布式数据库以高可用性、数据分布性、负载均衡等特性,被用户广泛应用。而作为高可用数据库的解决方案——Patroni,是专门为PostgreSQL数据库设计的,一款以Python语言实现的高可用架构模板。该架构模板,旨在通过外部共享存储软件(ku

    2024年02月07日
    浏览(28)
  • 基于MySql,Redis,Mq,ES的高可用方案解析

    本文将接着前文 1w5字详细介绍分布式系统的那些技术方案 文章基础上,进行实际的案例解析  高可用对于当下的系统而言,可以说是一个硬指标,常年专注于业务开发的我们,对于高可用最直观的感觉可能就是祈祷应用不要出问题,不要报错;即便有问题,也最好不是我们

    2024年02月03日
    浏览(27)
  • 基于Zookeeper搭建Kafka高可用集群(实践可用)

    目录 一、Zookeeper集群搭建 1.1 下载  解压 1.2 修改配置 1.3 标识节点 1.4 启动集群 1.5 集群验证 二、Kafka集群搭建 2.1 下载解压 2.2 拷贝配置文件 2.3 修改配置 2.4 启动集群 2.5 创建测试主题 2.6 写入数据测试 为保证集群高可用,Zookeeper 集群的节点数最好是奇数,最少有

    2024年02月09日
    浏览(31)
  • 大数据Maxwell(二):使用Maxwell增量和全量同步MySQL数据

    文章目录 使用Maxwell增量和全量同步MySQL数据 一、使用Maxwell同步MySQL数据

    2023年04月09日
    浏览(30)
  • lvs DR模式+基于五台服务器部署keepalived + lvs DR模式架构(前端带路由)负载均衡的高可用集群

    lvs DR 模式+基于五台服务器部署keepalived + lvs DR模式架构(前端带路由)负载均衡的高可用集群 DR 模式一: 客户端:172.20.26.167 LVS服务器:172.20.26.198 后端服务器:172.20.26.218 后端服务器:172.20.26.210 两台后端服务器 yum install nginx tcpdump -y echo \\\"this is 26.218 page\\\" /usr/share/nginx/html/index

    2024年02月19日
    浏览(42)
  • Maxwell安装部署

    database:变更数据所属的数据库 table:变更数据所属的表 type:数据变更类型 ts:数据变更发生的时间 xid:事务id commit:事务提交标志,可用于重新组装事务 data:对于insert类型,表示插入的数据;对于update类型,表示修改之后的数据;对于delete类型,表示删除的数据 old:对

    2024年02月22日
    浏览(19)
  • 配置Maxwell

    链接: maxwells官网 (2)脚本内容如下 [gpb@hadoop102 bin]$ chmod 777 mxw.sh 4.3 增量数据同步 4.4 历史数据全量同步 上一节,我们已经实现了使用Maxwell实时增量同步MySQL变更数据的功能。但有时只有增量数据是不够的,我们可能需要使用到MySQL数据库中从历史至今的一个完整的数据集。

    2024年02月11日
    浏览(20)
  • Maxwell - 增量数据同步工具

            今天来学习一个新的大数据小工具 Maxwell ,它和 Sqoop 很像。Sqoop主要用于在 Hadoop (比如 HDFS、Hive、HBase 等)和关系型数据库之间进行数据的批量导入和导出,而 Maxwell 则主要用于监控数据库的变化(通过监控 binlog ),并将变化的数据以JSON格式发布到消息队列(一

    2024年02月20日
    浏览(29)
  • Maxwell与canal工具对比

    Maxwell和Canal是两种不同的数据同步工具,都是在数据迁移、数据同步、数据分发等领域发挥作用的工具,但是它们之间存在一些差异。 Maxwell是一种开源的MySQL数据库同步工具,它可以将MySQL数据库的binlog转化为JSON格式,并将其发送到消息队列中。Maxwell有以下几个特点: -易于

    2024年02月13日
    浏览(28)
  • Maxwell+RabbitMq实现数据同步

    Maxwell是由美国Zendesk开源,用Java编写的MySQL等关系型数据库的实时抓取软件,能够实时抓取MySQL二进制日志binlog,并生成JSON格式的消息,作为生产者发送给kafaka、RabbitMQ、Redis等系统的应用程序。常用的场景有: ETL、维护缓存、收集表级别的DML指标、增量数据同步到搜索引擎、

    2024年02月20日
    浏览(29)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包