canal server初始化源码分析

这篇具有很好参考价值的文章主要介绍了canal server初始化源码分析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

CanalLauncher类是canal server端启动的入口类,跟随代码进行深入。

在开始之前,我们可以先了解下,

canal 配置方式
  1. ManagerCanalInstanceGenerator: 基于manager管理的配置方式,实时感知配置并进行server重启
  2. SpringCanalInstanceGenerator:基于本地spring xml的配置方式,对于多instance的时候,不便于扩展,一个instance一个xml配置
canal 配置文件
  • canal.properties  (系统根配置文件)
  • instance.properties  (instance级别的配置文件,每个instance一份)
入口代码
   public static void main(String[] args) {
        try {
            logger.info("## set default uncaught exception handler");
            // 设置全局的异常捕获
            setGlobalUncaughtExceptionHandler();

            // 支持rocketmq client 配置日志路径
            System.setProperty("rocketmq.client.logUseSlf4j","true");

            // 加载canal 配置
            logger.info("## load canal configurations");
            String conf = System.getProperty("canal.conf", "classpath:canal.properties");
            Properties properties = new Properties();
            if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
                conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
                properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
            } else {
                properties.load(new FileInputStream(conf));
            }
             // 初始化启动类
            final CanalStarter canalStater = new CanalStarter(properties);
            // canal.admin.manager 配置地址
            String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
            if (StringUtils.isNotEmpty(managerAddress)) {
                // 获取admin 用户名和密码
                String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
                String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
                if (StringUtils.isEmpty(passwd)) {
                    throw new IllegalArgumentException(
                        "canal.admin.passwd is empty , pls check https://github.com/alibaba/canal/issues/4941");
                }
                // 设置默认端口号
                String adminPort = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, "11110");
                // 自否自动注册
                boolean autoRegister = BooleanUtils.toBoolean(CanalController.getProperty(properties,
                    CanalConstants.CANAL_ADMIN_AUTO_REGISTER));
                //集群地址
                String autoCluster = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_AUTO_CLUSTER);
                //注册名称
                String name = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_REGISTER_NAME);
                if (StringUtils.isEmpty(name)) {
                    // 以本地机器为默认名字
                    name = AddressUtils.getHostName();
                }
                //注册到zk 或者admin中server IP 不配置就是本地IP
                String registerIp = CanalController.getProperty(properties, CanalConstants.CANAL_REGISTER_IP);
                if (StringUtils.isEmpty(registerIp)) {
                    registerIp = AddressUtils.getHostIp();
                }
                // 初始化配置客户端
                final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,
                    user,
                    passwd,
                    registerIp,
                    Integer.parseInt(adminPort),
                    autoRegister,
                    autoCluster,
                    name);
                // 通过http方式加载远程配置 通过admin配置的信息
                PlainCanal canalConfig = configClient.findServer(null);
                if (canalConfig == null) {
                    throw new IllegalArgumentException("managerAddress:" + managerAddress
                                                       + " can't not found config for [" + registerIp + ":" + adminPort
                                                       + "]");
                }
                Properties managerProperties = canalConfig.getProperties();
                // merge local  本地配置优先级更高
                managerProperties.putAll(properties);
                // auto.scan.interval  instance自动扫描的间隔时间,单位秒
                int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties,
                    CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
                    "5"));
                executor.scheduleWithFixedDelay(new Runnable() {

                    private PlainCanal lastCanalConfig;
                    // 定时异步线程去加载远程配置
                    public void run() {
                        try {
                            if (lastCanalConfig == null) {
                                lastCanalConfig = configClient.findServer(null);
                            } else {
                                //通过md5的方式验证重新验证配置是否有变更,有变更返回最新数据,并重新加载
                                PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5());
                                if (newCanalConfig != null) {
                                    // 远程配置canal.properties修改重新加载整个应用
                                    canalStater.stop();
                                    Properties managerProperties = newCanalConfig.getProperties();
                                    // merge local
                                    managerProperties.putAll(properties);
                                    canalStater.setProperties(managerProperties);
                                    canalStater.start();

                                    lastCanalConfig = newCanalConfig;
                                }
                            }

                        } catch (Throwable e) {
                            logger.error("scan failed", e);
                        }
                    }

                }, 0, scanIntervalInSecond, TimeUnit.SECONDS);
                canalStater.setProperties(managerProperties);
            } else {
                canalStater.setProperties(properties);
            }

            canalStater.start();
            runningLatch.await();
            executor.shutdownNow();
        } catch (Throwable e) {
            logger.error("## Something goes wrong when starting up the canal Server:", e);
        }
    }

上面代码中,对于配置的加载有两部分,当配置了canal.admin.manager 后台管理地址,会先去验证配置的用户名和密码,去获取相应的server配置(通过http的方式进行获取)

    public PlainCanal findServer(String md5) {
        if (StringUtils.isEmpty(md5)) {
            md5 = "";
        }
        String url = configURL + "/api/v1/config/server_polling?ip=" + localIp + "&port=" + adminPort + "&md5=" + md5
                     + "&register=" + (autoRegister ? 1 : 0) + "&cluster=" + StringUtils.stripToEmpty(autoCluster) + "&name=" + StringUtils.stripToEmpty(name);
        return queryConfig(url);
    }

当通过admin的方式进行加载配置,会每隔默认5s通过http的方式获取admin端的配置,会对整体配置进行md5值进行比较,有变更会对server进行重启。canal server初始化源码分析,canal源码,canal增量同步

跟随CanalStarter.start() 方法进入到启动方法,在CanalLauncher中已加载到server相关的配置。

   1.  根据配置canal.serverMode获取服务端模式,有tcp、相关的mq( kafka, rocketMQ, rabbitMQ, pulsarMQ)方式,除了tcp模式,需要初始化mq相关配置

        // 服务端模式 可通过配置进行变更 canal.serverMode
        String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
        if (!"tcp".equalsIgnoreCase(serverMode)) {
            ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);
            // /plugin 和 /canal/plugin"
            canalMQProducer = loader
                .getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);
            if (canalMQProducer != null) {
                canalMQProducer =  new ProxyCanalMQProducer(canalMQProducer);
                // 初始化mq信息 由canal.serverMode 决定使用哪种mq
                canalMQProducer.init(properties);
            }
        }

        if (canalMQProducer != null) {
            MQProperties mqProperties = canalMQProducer.getMqProperties();
            // disable netty
            System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
            if (mqProperties.isFlatMessage()) {
                // 设置为raw避免ByteString->Entry的二次解析
                System.setProperty("canal.instance.memory.rawEntry", "false");
            }
        }

2. 初始化canal调度控制器CanalController,并进行start,CanalController主要是初始化各个instance配置

3.  非tcp的方式,进行启动mq。

        if (canalMQProducer != null) {
            canalMQStarter = new CanalMQStarter(canalMQProducer);
            // 当前server上部署的instance列表
            String destinations = CanalController.getDestinations(properties);
            // 启动mq
            canalMQStarter.start(destinations);
            controller.setCanalMQStarter(canalMQStarter);
        }

4. 初始化CanalAdminController:提供canal admin的管理操作

        // start canalAdmin
        String port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT);
        if (canalAdmin == null && StringUtils.isNotEmpty(port)) {
            String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
            String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
            CanalAdminController canalAdmin = new CanalAdminController(this);
            canalAdmin.setUser(user);
            canalAdmin.setPasswd(passwd);
            String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP);

            logger.debug("canal admin port:{}, canal admin user:{}, canal admin password: {}, canal ip:{}",
                port,
                user,
                passwd,
                ip);

            CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();
            canalAdminWithNetty.setCanalAdmin(canalAdmin);
            canalAdminWithNetty.setPort(Integer.parseInt(port));
            canalAdminWithNetty.setIp(ip);
            canalAdminWithNetty.start();
            this.canalAdmin = canalAdminWithNetty;
        }

5. jvm停止时,shutdown监听admin配置线程池文章来源地址https://www.toymoban.com/news/detail-803504.html

到了这里,关于canal server初始化源码分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • spring事物初始化过程分析

    1.注入4个bd 2.执行 org.springframework.aop.framework.autoproxy.AbstractAutoProxyCreator#postProcessBeforeInstantiation 逻辑分析:遍历所有的bd: 获取beanname-判断beanname是否有长度并且没有被处理过-是否遍历放入advisedBeans- 是否是基础类-是否该跳过  4大基础类 3.执行org.springframework.aop.framework.autoprox

    2024年02月02日
    浏览(36)
  • UE4 源码解析----引擎初始化流程

       在研究UE4的源码过程中着实不理解的地方有很多,今天给大家分享一下UE4引擎的初始化流程。 C++的函数入口都是Main() 函数入口,UE4也是一样,EngineSourceRuntimeLaunchPrivate Windows函数入口  引擎入口函数为:GuardedMain UE4所有相关的代码都在游戏循环函数中,在Launch.cpp中,写

    2024年02月06日
    浏览(85)
  • 【源码解析】聊聊SpringBean是如何初始化和创建

    我们知道通过类进行修复不同的属性,比如单例、原型等,而具体的流程是怎么样的呢,这一篇我们开始从源码的视角分析以下。 在刷新容器中有一个方法,其实就是 Bean创建的过程。 而BeanFactory中 preInstantiateSingletons是初始化所有的bean对象的核心流程。 而这里通过去遍历所

    2024年02月05日
    浏览(41)
  • 『手撕 Mybatis 源码』06 - Mapper 代理方式初始化

    首先修改一下 SqlSession 获取代理对象方式,即通过 getMapper() 来拿到动态代理对象 修改 sqlMapConfig.xml 引入配置文件的方式 把 UserMapper.xml 放到和 com.itheima.mapper.UserMapper 同一个目录,同时修改一下命名空间,然后就可以学习 MyBatis 的代理方式 问题 package name=“com.itheima.mapper”/

    2024年02月09日
    浏览(46)
  • Android13音频子系统分析(二)---初始化

    目录 一、AudioPolicyService初始化 1.1 AudioPolicyService::onFirstRef()函数 1.2 AudioCommandThread线程 1.3 AudioPolicyManager初始化 1.3.1 解析audio_policy_configuration.xml配置文件 1.3.2 解析audio_policy_engine_configuration.xml配置文件 1.3.3 AudioPolicyManager::onNewAudioModulesAvailableInt()函数 1.3.4 AudioPolicyManager::updateDe

    2024年02月03日
    浏览(43)
  • Spring 填充属性和初始化流程源码剖析及扩展实现

    在上一篇博文 讲解 Spring 实例化的不同方式及相关生命周期源码剖析 介绍了 Spring 实例化的不同方式,本文主要围绕实例化过后对象的填充属性和初始化过程进行详细流程剖析 回顾前言知识,doCreateBean-createBeanInstance,通过 Supplier 接口、FactoryMethod、构造函数反射 invoke,创建

    2024年02月06日
    浏览(42)
  • 从内核源码看 slab 内存池的创建初始化流程

    在上篇文章 《细节拉满,80 张图带你一步一步推演 slab 内存池的设计与实现 》中,笔者从 slab cache 的总体架构演进角度以及 slab cache 的运行原理角度为大家勾勒出了 slab cache 的总体架构视图,基于这个视图详细阐述了 slab cache 的内存分配以及释放原理。 slab cache 机制确实比

    2023年04月12日
    浏览(52)
  • Android 11 Ethernet以太网架构分析(1)——初始化

    android中以太网常被用作共享网络,或者是定制化设备连接网线的需求。 本章将会详细分析该模块对以太网的逻辑实现,是大家对此有更深入认识。 初始化 Systemserver 在安卓系统中有一个关于以太网的服务,在systemserver中启动 frameworks/base/services/java/com/android/server/SystemServer.j

    2024年02月04日
    浏览(75)
  • 【Vue2.0源码学习】生命周期篇-初始化阶段(initState)

    本篇文章介绍生命周期初始化阶段所调用的第五个初始化函数—— initState 。 从函数名字上来看,这个函数是用来初始化实例状态的,那么什么是实例的状态呢?在前面文章中我们略有提及,在我们日常开发中,在 Vue 组件中会写一些如 props 、 data 、 methods 、 computed 、 watc

    2024年02月09日
    浏览(53)
  • 【Vue2.0源码学习】生命周期篇-初始化阶段(initInjections)

    本篇文章介绍生命周期初始化阶段所调用的第四个初始化函数—— initInjections 。从函数名字上来看,该函数是用来初始化实例中的 inject 选项的。说到 inject 选项,那必然离不开 provide 选项,这两个选项都是成对出现的,它们的作用是:允许一个祖先组件向其所有子孙后代注

    2024年02月09日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包