【Phoenix】phoenix实现每个Primarykey主键保留N版本数据,CDC数据记录为Changelog格式

这篇具有很好参考价值的文章主要介绍了【Phoenix】phoenix实现每个Primarykey主键保留N版本数据,CDC数据记录为Changelog格式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、背景:

CDC数据中包含了,数据的变更过程。当CDC写入传统数据库最终每一个primary key下会保存一条数据。当然可以使用特殊手段保存多分记录但是显然造成了数据膨胀。
另外数据湖Hudi(0.13.1)是不支持保存所有Changelog其Compaction机制会清除所有旧版本的内容。Iceberg支持TimeTravel,能查到某个时间点的数据状态,但是不能列举的单条记录的Change过程。
所以目前只能手动实现。
其实,实现思路很简单,将原PrimaryKey+Cdc的 ts_ms 一起作为新表的 PrimaryKey就可以了。但需要注意的是一条数据可能变更很多次,但一般需要保存近几次的变更,所以就需要删除部分旧变更记录。ts_ms 就是CDC数据中记录的日志实际产生的时间,具体参见debezium 。如果原表primarykey是联合主键,即有多个字段共同组成,则最好将这些字段拼接为一个字符串,方便后续关联。

本文思路
CDC --写入-> Phoenix + 定期删除旧版本记录

CDC数据写入略过,此处使用SQL模拟写入。

二、Phoenix旧版记录删除(DEMO)

phoenix doc

bin/sqlline.py www.xx.com:2181
-- 直接创建phoenix表
create table TEST.TEST_VERSION(
ID VARCHAR NOT NULL,
TS TIMESTAMP NOT NULL,
NAME VARCHAR,
CONSTRAINT my_pk PRIMARY KEY (ID,TS)
) VERSIONS=5;

再去hbase shell中查看,hbase 关联表已经有phoenix创建了。

hbase(main):032:0> desc "TEST:TEST_VERSION"
Table TEST:TEST_VERSION is ENABLED
TEST:TEST_VERSION, {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.phoenix.coprocessor.ScanRegionObserver|805306366|', coprocessor$2 => '|org.apache.phoenix.coprocessor.UngroupedAggregateRe
gionObserver|805306366|', coprocessor$3 => '|org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver|805306366|', coprocessor$4 => '|org.apache.phoenix.coprocessor.ServerCachingEndpointImpl|80
5306366|', coprocessor$5 => '|org.apache.phoenix.hbase.index.Indexer|805306366|index.builder=org.apache.phoenix.index.PhoenixIndexBuilder,org.apache.hadoop.hbase.index.codec.class=org.apache.phoenix
.index.PhoenixIndexCodec', METADATA => {'OWNER' => 'dcetl'}}
COLUMN FAMILIES DESCRIPTION
{NAME => '0', VERSIONS => '5', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'FAST_DIFF', T
TL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'NONE', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPE
N => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}
-- 在phoenix中向表插入数据
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 10:00:00'),'zhangsan');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 11:00:00'),'lisi');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 12:00:00'),'wangwu');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 13:00:00'),'zhaoliu');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 14:00:00'),'liuqi');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 15:00:00'),'sunba');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 07:00:00'),'sunyang');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 08:00:00'),'chaoyang');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 09:00:00'),'xuri');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 09:30:00'),'chenxi');
-- OK再查询一下数据插入情况
SELECT * FROM TEST.TEST_VERSION;

以下假设每个PrimaryKey需要保留最新的3版本数据。所以红色框内是需要删除的数据。
phoenix 主键,数据库,大数据

现在需要使用row_number的函数给每个primarykey的不通version数据标识。但是phoenix并没有开窗函数。只有agg聚合函数。
phoenix对SQL的限制还是比较多的如:
(1)join 非等值连接不支持,如on a.id>s.id 是不支持的,也不支持数组比较连接,如on a.id = ARRAY[1,2,3]。 会报错:Error: Does not support non-standard or non-equi correlated-subquery conditions. (state=,code=0)
(2)where exists 格式的非等值连接不支持。select ... from A where exists (select 1 from B where A.id>B.id) 是不支持的。会报错:Error: Does not support non-standard or non-equi correlated-subquery conditions. (state=,code=0)
(2)没有开窗window函数
(3)DELETE FROM不支持JOIN

最终发下有一下函数可用
(1)NTH_VALUE 获取分组排序的第N个值。 返回原值的类型。
(2)FIRST_VALUESLAST_VALUES 获取分区排序后的前、后的N个值,返回ARRAY类型。
此三个函数官网doc中,案例是这样的 FIRST_VALUES( name, 3 ) WITHIN GROUP (ORDER BY salary DESC) 是全局分组,而实际使用中是需要搭配 GROUP BY 使用的。

所以可以获取到

-- 方案一:使用NTH_VALUE获取阈值
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,NTH_VALUE(TS,3) WITHIN GROUP (ORDER BY TS DESC) THRES FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < Z.THRES

-- 方案二:使用FIRST_VALUES获取到一个ARRAY 
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,FIRST_VALUES(TS,3) WITHIN GROUP (ORDER BY TS DESC) TSS FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < ALL(Z.TSS);

由于phoenix支持行子查询,以下是官方案例。这样就能绕过不使用DELETE … JOIN了。

Row subqueries
A subquery can return multiple fields in one row, which is considered returning a row constructor. The row constructor on both sides of the operator (IN/NOT IN, EXISTS/NOT EXISTS or comparison operator) must contain the same number of values, like in the below example:
SELECT column1, column2
FROM t1
WHERE (column1, column2) IN
    (SELECT column3, column4
     FROM t2
     WHERE column5 = ‘nowhere’);
This query returns all pairs of (column1, column2) that can match any pair of (column3, column4) in the second table after being filtered by condition: column5 = ‘nowhere’.

最终实现删除 除N个较新的以外的所有旧版本数据, SQL如下:

-- NTH_VALUE方式
DELETE FROM TEST.TEST_VERSION
WHERE (ID,TS) IN (
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,NTH_VALUE(TS,3) WITHIN GROUP (ORDER BY TS DESC) THRES FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < Z.THRES
);

-- FIRST_VALUES方式
DELETE FROM TEST.TEST_VERSION
WHERE (ID,TS) IN (
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,FIRST_VALUES(TS,3) WITHIN GROUP (ORDER BY TS DESC) TSS FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < ALL(Z.TSS)
);

删除后效果:
phoenix 主键,数据库,大数据

三、探索

3.1 Phoenix的Row Timestamp 探索

Phoenix的Row Timestamp是为了在meta中更快检索数据而设置的。不能实现hbase 中的versions 数据在phoenix中展现。
如下测试案例:
phoenix建表,并插入数据:

create table TEST.TEST_ROW_TIMESTAMP(
ID VARCHAR NOT NULL,
TS TIMESTAMP NOT NULL,
NAME VARCHAR,
CONSTRAINT my_pk PRIMARY KEY (ID,TS ROW_TIMESTAMP)
) VERSIONS=5;

UPSERT INTO TEST.TEST_ROW_TIMESTAMP(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 09:30:00'),'windows');
UPSERT INTO TEST.TEST_ROW_TIMESTAMP(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 10:30:00'),'mac');
UPSERT INTO TEST.TEST_ROW_TIMESTAMP(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 11:30:00'),'linux');

在hbase中查询表:

hbase(main):050:0> desc 'TEST:TEST_ROW_TIMESTAMP'
Table TEST:TEST_ROW_TIMESTAMP is ENABLED
TEST:TEST_ROW_TIMESTAMP, {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.phoenix.coprocessor.ScanRegionObserver|805306366|', coprocessor$2 => '|org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver|805306366|', coprocessor$3
=> '|org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver|805306366|', coprocessor$4 => '|org.apache.phoenix.coprocessor.ServerCachingEndpointImpl|805306366|', coprocessor$5 => '|org.apache.phoenix.hbase.index.Indexer|805306366|index.b
uilder=org.apache.phoenix.index.PhoenixIndexBuilder,org.apache.hadoop.hbase.index.codec.class=org.apache.phoenix.index.PhoenixIndexCodec', METADATA => {'OWNER' => 'dcetl'}}
COLUMN FAMILIES DESCRIPTION
{NAME => '0', VERSIONS => '5', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'FAST_DIFF', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICAT
ION_SCOPE => '0', BLOOMFILTER => 'NONE', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}
1 row(s)
Took 0.0235 seconds

hbase(main):049:0> scan 'TEST:TEST_ROW_TIMESTAMP'
ROW                                                            COLUMN+CELL
 rk001\x00\x80\x00\x01o`p\xC1\xC0\x00\x00\x00\x00              column=0:\x00\x00\x00\x00, timestamp=1577871000000, value=x
 rk001\x00\x80\x00\x01o`p\xC1\xC0\x00\x00\x00\x00              column=0:\x80\x0B, timestamp=1577871000000, value=windows
 rk001\x00\x80\x00\x01o`\xA7\xB0@\x00\x00\x00\x00              column=0:\x00\x00\x00\x00, timestamp=1577874600000, value=x
 rk001\x00\x80\x00\x01o`\xA7\xB0@\x00\x00\x00\x00              column=0:\x80\x0B, timestamp=1577874600000, value=mac
 rk001\x00\x80\x00\x01o`\xDE\x9E\xC0\x00\x00\x00\x00           column=0:\x00\x00\x00\x00, timestamp=1577878200000, value=x
 rk001\x00\x80\x00\x01o`\xDE\x9E\xC0\x00\x00\x00\x00           column=0:\x80\x0B, timestamp=1577878200000, value=linux
3 row(s)
Took 0.0072 seconds

如上查询结果,我们希望在hbase中只有一行数据,并保存为对多个版本,但实际查询到了多条数据,timestamp做为hbase表的rowkey的一部分了。phoenix在创建表时候没有使用hbase多版本保存机制。

3.2 phoenix 和 hbase表结构不一致

先创建hbase Table

create 'TEST:TEST_DIF_TS',{NAME => 'COLS', VERSIONS => 3}
put 'TEST:TEST_DIF_TS','001', 'COLS:NAME','zhangsan'
put 'TEST:TEST_DIF_TS','001', 'COLS:TS', 1695189085000
put 'TEST:TEST_DIF_TS','001', 'COLS:NAME','lisi'
put 'TEST:TEST_DIF_TS','001', 'COLS:TS', 1695189090000
put 'TEST:TEST_DIF_TS','001', 'COLS:NAME','wangwu'
put 'TEST:TEST_DIF_TS','001', 'COLS:TS', 1695189095000
put 'TEST:TEST_DIF_TS','001', 'COLS:NAME','zhaoliu'
put 'TEST:TEST_DIF_TS','001', 'COLS:TS', 1695189105000

get 'TEST:TEST_DIF_TS','001',{COLUMN=>'COLS:NAME',VERSIONS=>3}
# 结果:
COLUMN                                             CELL
 COLS:NAME                                         timestamp=1695784642879, value=zhaoliu
 COLS:NAME                                         timestamp=1695784642857, value=wangwu
 COLS:NAME                                         timestamp=1695784642830, value=lisi

创建Phoenix Table

create table TEST.TEST_DIF_TS(
ID VARCHAR NOT NULL,
TS TIMESTAMP NOT NULL,
NAME VARCHAR,
CONSTRAINT my_pk PRIMARY KEY (ID,TS)
);
UPSERT INTO TEST.TEST_DIF_TS(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 11:30:00'),'XXX');

0: jdbc:phoenix:...> select * from  TEST.TEST_DIF_TS;
+--------+--------------------------+-------+
|   ID   |            TS            | NAME  |
+--------+--------------------------+-------+
| rk001  | 2020-01-01 11:30:00.000  | XXX   |
+--------+--------------------------+-------+

再翻查hbase Table数据

hbase(main):004:0> scan 'TEST:TEST_DIF_TS'
ROW                                                COLUMN+CELL
 001                                               column=COLS:NAME, timestamp=1695784642879, value=zhaoliu
 001                                               column=COLS:TS, timestamp=1695784643741, value=1695189105000
 rk001\x00\x80\x00\x01o`\xDE\x9E\xC0\x00\x00\x00\x column=0:\x00\x00\x00\x00, timestamp=1695786568345, value=x
 00
 rk001\x00\x80\x00\x01o`\xDE\x9E\xC0\x00\x00\x00\x column=0:\x80\x0B, timestamp=1695786568345, value=XXX

可以看到Phoenix只能查询到自己插入的数据,但是hbase可以查询到phoenix,所以phoenix会把不符合自己表结构的数据过滤掉。phoenix的会将自己所有的primary key字段拼接后作为hbase 的rowkey存入hbase。

参考文章:

Phoenix实践 —— Phoenix SQL常用基本语法总结小记
Phoenix 对 Hbase 中表的映射
phoenix使用详解
Phoenix 简介及使用方式
phoenix创建映射表和创建索引、删除索引、重建索引文章来源地址https://www.toymoban.com/news/detail-730872.html

到了这里,关于【Phoenix】phoenix实现每个Primarykey主键保留N版本数据,CDC数据记录为Changelog格式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • Phoenix 时区问题

    Phoenix 时区问题

          最近在测试flink从trino查询数据插入到phoenix5的功能,发现一个时间的问题:  明明插入的时间是 \\\'1940-06-01\\\',查询出来的时间会少一天,同样的 Timestamp 也会自动少掉 8个小时,详细原因参考  Phoenix关于时区的处理方式说明。但是文章中阿里的已经处理,但是社区版的仍

    2024年02月15日
    浏览(5)
  • phoenix连接hbase

    phoenix连接hbase

    一、安装phoennix添加配置    1、将phoenix-server-hbase-2.4-5.1.2.jar拷贝至hbase的的lib下   2、配置phoenix可以访问hbase的系统表     (1)将以下配置添加至hbase-site.xml中          (2)将hbase-stie.xml拷贝到phoenix/bin目录下    二、启动phoenix服务      1、启动hbase             2、启

    2024年02月15日
    浏览(6)
  • hbase-phoenix

    phoenix 客户端,

    2024年02月11日
    浏览(20)
  • spark+phoenix读取hbase

    正常来说这个内容应该网上可参考的文章很多,但是我还是捣鼓了好久,现在记录下来,给自己个备忘录。 phoenix是操作hbase的皮肤,他可以轻松的使用sql语句来操作hbase,比直接用hbase的原语操作要友好的多。spark直接操作hbase也是通过hbase的原语操作,操作起来比较繁琐,下

    2024年01月22日
    浏览(6)
  • zookeeper + hadoop + hbase + phoenix

    IP hostname 192.168.23.130 hadoop01 192.168.23.131 hadoop02 192.168.23.132 hadoop03 jdk-1.8 zookeeper-3.8.1 hadoop-3.2.4 hbase-2.4.15 phoenix-2.4.0-5.1.3 1、关闭防火墙 2、设置主机名 3、配置主机hosts 4、设置ssh免密登录 分发JDK安装包到其他节点 配置JDK环境变量(所有节点都需要执行) 修改zookeeper配置 添加z

    2024年02月06日
    浏览(12)
  • 1400*B. Phoenix and Beauty(贪心&构造)

    1400*B. Phoenix and Beauty(贪心&构造)

    Problem - 1348B - Codeforces  解析:         满足题意,会构成循环序列,其中循环节长度为 k,统计数列中不同元素的个数 cnt ,如果cnt k,显然无解。         否则,由于题意中已经证明,答案序列个数小于等于 10000,并且 n和k的范围都为100,则n*k小于等于10000,所以我们直接

    2024年02月08日
    浏览(7)
  • HBase在大数据集群的安装部署及整合Phoenix

    前提:需要保证三台虚拟机hadoop102-104已经部署好基本配置。未完成的可以参考:https://blog.csdn.net/weixin_73195042/article/details/135886619 上传HBase安装包到/opt/software文件夹内 配置环境变量 在末尾添加 使用 source 让配置的环境变量生效 将环境变量分发到其他虚拟机上,并且也要sour

    2024年04月27日
    浏览(17)
  • 关于flink-sql-connector-phoenix的重写逻辑

    目录 重写意义 代码结构  调用链路 POM文件配置 代码解析 一、PhoenixJdbcD

    2024年02月12日
    浏览(10)
  • SanctuaryAI推出Phoenix: 专为工作而设计的人形通用机器人

    SanctuaryAI推出Phoenix: 专为工作而设计的人形通用机器人

    唯一入选《时代》杂志 2023 年最佳发明的通用机器人。 称机器人自主做家务的速度和 灵活度 已达到了和人类相当的水平。 官网链接:https://sanctuary.ai/ Sanctuary AI 由其开创性的 人工智能控制系统 Carbon™ 提供支持,在宣布其技术首次商业部署后不到两个月,Sanctuary AI 公布了其

    2024年03月09日
    浏览(9)
  • Phoenix FD(火凤凰全能流体动力学3Dmax插件)

    Phoenix FD(火凤凰全能流体动力学3Dmax插件)

    Phoenix FD是专为艺术家打造的全能流体动力学插件,可以模拟真实的火焰,烟雾,液体,海洋,泼溅,雾气等等效果。使用方便的参数来完善,调整并渲染各种基于物理的流体效果。与行业最主流的工具兼容,如: OpenVDB, Alembic, Krakatoa 和 thinkingParticles。与 3ds Max 无缝整合,并

    2024年02月06日
    浏览(6)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包