flink的TwoPhaseCommitSinkFunction怎么做才能提供精准一次保证

这篇具有很好参考价值的文章主要介绍了flink的TwoPhaseCommitSinkFunction怎么做才能提供精准一次保证。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

TwoPhaseCommitSinkFunction是flink中基于二阶段事务提交和检查点机制配合使用实现的精准一次的输出数据汇,但是想要实现精准一次的输出,实际使用中需要注意几个方面,否则不仅仅达不到精准一次输出,反而可能导致数据丢失,连至少一次的语义都不能达到

TwoPhaseCommitSinkFunction注意事项

TwoPhaseCommitSinkFunction是通过在两阶段提交协议实现的事务,大概简化为一下步骤:
1 在收到检查点分隔符的时候,开启事务,并把记录都写到开启的事务中,
2. 开始进行状态的保存时,把检查点id对应的事务结束掉,做好准备提交的准备,并开启下一个事务

public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // this is like the pre-commit of a 2-phase-commit transaction
        // we are ready to commit and remember the transaction

        checkState(
                currentTransactionHolder != null,
                "bug: no transaction object when performing state snapshot");

        long checkpointId = context.getCheckpointId();
        LOG.debug(
                "{} - checkpoint {} triggered, flushing transaction '{}'",
                name(),
                context.getCheckpointId(),
                currentTransactionHolder);
		//当前检查点对应的事务做好准备,比如进行stream.flush等,准备好提交事务
        preCommit(currentTransactionHolder.handle);
        // 把当前检查点id对应的事务添加到状态中
        pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
        LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);

        currentTransactionHolder = beginTransactionInternal();
        LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);
        // 把当前检查点id对应的事务添加到状态中
        state.clear();
        state.add(
                new State<>(
                        this.currentTransactionHolder,
                        new ArrayList<>(pendingCommitTransactions.values()),
                        userContext));
    }

  1. 收到检查点完成的通知notify方法,提交第二步中检查点id对应的事务,注意这一步不是每次flink在进行检查点的时候都会通知,这种情况下,某一次的notify方法就需要把前几次的事务一起进行提交了,另外,如果提交某个检查点的事务失败,那么应用会重启,并且在重启后的initSnapshot方法中再次进行事务提交,如果还是失败,这个过程一直持续
    public final void notifyCheckpointComplete(long checkpointId) throws Exception {
        Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator =
                pendingCommitTransactions.entrySet().iterator();
        Throwable firstError = null;

        while (pendingTransactionIterator.hasNext()) {
            Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
            Long pendingTransactionCheckpointId = entry.getKey();
            TransactionHolder<TXN> pendingTransaction = entry.getValue();
            if (pendingTransactionCheckpointId > checkpointId) {
                continue;
            }

            LOG.info(
                    "{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
                    name(),
                    checkpointId,
                    pendingTransaction,
                    pendingTransactionCheckpointId);

            logWarningIfTimeoutAlmostReached(pendingTransaction);
            try {
            // 提交事务
                commit(pendingTransaction.handle);
            } catch (Throwable t) {
            //事务失败时记录异常,后面会把异常抛出导致应用重启
                if (firstError == null) {
                    firstError = t;
                }
            }

            LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);
          // 事务成功后移除当前的事务
            pendingTransactionIterator.remove();
        }

        if (firstError != null) {
        // 事务提交失败会抛出异常,导致job异常中止
            throw new FlinkRuntimeException(
                    "Committing one of transactions failed, logging first encountered failure",
                    firstError);
        }
    }

总结:

1。事务不能提交失败,如果失败会导致作业失败然后重新提交,如果最终没有成功提交,那么数据会丢失
2。数据库服务端的事务超时时间不能设置太短,不能仅仅大于检查点的间隔大小,原因是上面说的,flink有可能丢失检查点完成后的通知消息,所以服务端的事务超时时间要设置的足够大.文章来源地址https://www.toymoban.com/news/detail-715674.html

到了这里,关于flink的TwoPhaseCommitSinkFunction怎么做才能提供精准一次保证的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Win7 您需要Trustedinstaller 提供的权限才能对此文件夹进行更改

    对系统文件进行修改、删除的操作就会出现这样的情况, 根本原因 就是当前自己账户 没有权限 ,那么 解决的办法 就是 给予自己账户权限 步骤一:进入属性页面 步骤二:进入安全页面,点击“高级”按键 步骤三:进入所有者页面,点击编辑 步骤四:进入 高级安全

    2024年02月04日
    浏览(44)
  • 网络诊断“您可能需要与该网络的Internet服务提供商ISP签署协议才能获得访问Internet的权限

    问题描述: 当打开电脑,发现无法连接网络,使用windows的网络诊断,显示“您可能需要与该网络的Internet服务提供商ISP签署协议才能获得访问Internet的权限” 解决方案: (1)同时按“win+R”键,输入“regedit”,点击确定 (2)找到HKET_LOCAL_MACHINE–SYSTEM–CurrentControlSet–servi

    2024年02月11日
    浏览(44)
  • 怎么才能提升自己工作能力?

    表现最好的员工通常是获得加薪和工作晋升的人。您可以采取某些措施来提高您的工作绩效,并帮助您的主管将您视为他们最好的员工之一。在本文中,我们列出了 12 个技巧,可以立即提高您的工作绩效。 什么是工作绩效? 工作绩效是指您的责任历史和出色完成工作的能力

    2024年02月14日
    浏览(42)
  • 码链跳转小程序怎么用?怎么才能接入?

    最近,关注微信站外跳转的小伙伴,可能注意到了一系列的跳转小程序的活跃。这类小程序都叫做码链系列,码链运营服务,码链运营服务助手,码链运营小助手,码链运营助手,码链运营管家,码链运营小管家,码链运营星等小程序。 码链类的小程序是原来腾讯营销通的产

    2024年02月04日
    浏览(44)
  • 路由器怎么才能设置成交换机?

    公司或者学校有很多地方的网线口少的很,所以大家会买交换机或者路由器,WiFi的未来是是趋势所以很多人会选择路由器,所以就会涉及到登陆的问题,一般是每个人都有自己的登录账号或者密码的,这时怎么办呢。 1、将网线的一端连在网络接口上,网线接通,将路由器拆

    2024年02月08日
    浏览(62)
  • 淘宝bd导致人群不精准?怎么做?

    什么是人群标签! 在千人千面规则下,大家不难意识到人群标签的重要性,比如说你今天搜索了男鞋,看了几款不同店的男鞋,等你第二天再打开手淘时,就会发现手淘首页的板块首页栏,和猜你喜欢等,都会给你推荐各种男鞋,以及和男鞋相似的产品,而且给你推荐的这些

    2024年02月05日
    浏览(48)
  • js中setinterval怎么用?怎么才能让setinterval停下来?

    setinterval()是定时调用的函数,可按照指定的周期(以毫秒计)来调用函数或计算表达式。 setinterval()的作用是在播放动画的时,每隔一定时间就调用函数,方法或对象。 setInterval() 方法会不停地调用函数,直到 clearInterval() 被调用或窗口被关闭。 由 setInterval()返回的ID值可用作

    2024年02月02日
    浏览(46)
  • 怎么才能真正理解服务器是什么?

    构成互联网世界的基本节点是一个又一个的计算机和网络设备。 服务器是提供特定服务的计算机,你平时用的计算机叫做终端设备。他们在机器上的本质是一样的,但因为承担不同的角色,所以有一些区别: 0、归属与成本 服务器:属于提供服务者,一般是公司等团体组织需

    2024年02月09日
    浏览(38)
  • 怎么才能远程控制笔记本电脑?

    为什么AnyViewer是远程控制笔记本电脑软件的首选?以下是选择AnyViewer成为笔记本电脑远程控制软件的主要因素。         跨平台能力 AnyViewer作为一款跨平台远程控制软件,不仅可以用于从一台Windows电脑远程控制另一台Windows电脑,还可以从iOS和Android设备远程控制电脑。   

    2024年02月14日
    浏览(57)
  • 路由器ip地址怎么设置才能上网

    在互联网时代,路由器已经成为了我们生活中不可或缺的一部分。而路由器的IP地址则是路由器配置的关键。那么,如何设置路由器的IP地址才能上网呢?虎观代理小二二将为您提供详细的步骤和指导。 一、确认路由器IP地址 在开始设置路由器的IP地址之前,您需要先确定您的

    2024年02月06日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包