背景
在flink中可以通过使用事务性数据汇实现精准一次的保证,本文基于Kakfa的事务处理来看一下在Flink 内部如何实现基于两阶段提交协议的事务性数据汇.
flink kafka事务性数据汇的实现
1。首先在开始进行快照的时候也就是收到checkpoint通知的时候,在snapshot方法中会开启一个新的事务,代码如下:
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// this is like the pre-commit of a 2-phase-commit transaction
// we are ready to commit and remember the transaction
checkState(
currentTransactionHolder != null,
"bug: no transaction object when performing state snapshot");
long checkpointId = context.getCheckpointId();
LOG.debug(
"{} - checkpoint {} triggered, flushing transaction '{}'",
name(),
context.getCheckpointId(),
currentTransactionHolder);
preCommit(currentTransactionHolder.handle);
// 调用kafkaProducer.flush();清理上一个事务的状态(注意不是提交),只是确保前一个事务的所有资源清理完毕
pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);
// 调用producer.beginTransaction();方法开启一个新的kafka事务
currentTransactionHolder = beginTransactionInternal();
LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);
state.clear();
state.add(
new State<>(
this.currentTransactionHolder,
new ArrayList<>(pendingCommitTransactions.values()),
userContext));
}
2.其次在JobManager通知检查点完成的通知方法,也就是notifyCheckpointComplete方法中提交事务文章来源:https://www.toymoban.com/news/detail-745222.html
Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator =
pendingCommitTransactions.entrySet().iterator();
Throwable firstError = null;
while (pendingTransactionIterator.hasNext()) {
Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
Long pendingTransactionCheckpointId = entry.getKey();
TransactionHolder<TXN> pendingTransaction = entry.getValue();
if (pendingTransactionCheckpointId > checkpointId) {
continue;
}
LOG.info(
"{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
name(),
checkpointId,
pendingTransaction,
pendingTransactionCheckpointId);
logWarningIfTimeoutAlmostReached(pendingTransaction);
try {
//调用producer.commitTransaction()方法提交事务
commit(pendingTransaction.handle);
} catch (Throwable t) {
if (firstError == null) {
firstError = t;
}
}
LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);
pendingTransactionIterator.remove();
}
if (firstError != null) {
throw new FlinkRuntimeException(
"Committing one of transactions failed, logging first encountered failure",
firstError);
}
至此,一个两阶段提交的flink事务性数据汇完成了,这个事务性数据汇可以构成端到端一致性的一部分文章来源地址https://www.toymoban.com/news/detail-745222.html
到了这里,关于Flink的基于两阶段提交协议的事务数据汇实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!