【Phoenix】phoenix实现每个Primarykey主键保留N版本数据,CDC数据记录为Changelog格式

这篇具有很好参考价值的文章主要介绍了【Phoenix】phoenix实现每个Primarykey主键保留N版本数据,CDC数据记录为Changelog格式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、背景:

CDC数据中包含了,数据的变更过程。当CDC写入传统数据库最终每一个primary key下会保存一条数据。当然可以使用特殊手段保存多分记录但是显然造成了数据膨胀。
另外数据湖Hudi(0.13.1)是不支持保存所有Changelog其Compaction机制会清除所有旧版本的内容。Iceberg支持TimeTravel,能查到某个时间点的数据状态,但是不能列举的单条记录的Change过程。
所以目前只能手动实现。
其实,实现思路很简单,将原PrimaryKey+Cdc的 ts_ms 一起作为新表的 PrimaryKey就可以了。但需要注意的是一条数据可能变更很多次,但一般需要保存近几次的变更,所以就需要删除部分旧变更记录。ts_ms 就是CDC数据中记录的日志实际产生的时间,具体参见debezium 。如果原表primarykey是联合主键,即有多个字段共同组成,则最好将这些字段拼接为一个字符串,方便后续关联。

本文思路
CDC --写入-> Phoenix + 定期删除旧版本记录

CDC数据写入略过,此处使用SQL模拟写入。

二、Phoenix旧版记录删除(DEMO)

phoenix doc

bin/sqlline.py www.xx.com:2181
-- 直接创建phoenix表
create table TEST.TEST_VERSION(
ID VARCHAR NOT NULL,
TS TIMESTAMP NOT NULL,
NAME VARCHAR,
CONSTRAINT my_pk PRIMARY KEY (ID,TS)
) VERSIONS=5;

再去hbase shell中查看,hbase 关联表已经有phoenix创建了。

hbase(main):032:0> desc "TEST:TEST_VERSION"
Table TEST:TEST_VERSION is ENABLED
TEST:TEST_VERSION, {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.phoenix.coprocessor.ScanRegionObserver|805306366|', coprocessor$2 => '|org.apache.phoenix.coprocessor.UngroupedAggregateRe
gionObserver|805306366|', coprocessor$3 => '|org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver|805306366|', coprocessor$4 => '|org.apache.phoenix.coprocessor.ServerCachingEndpointImpl|80
5306366|', coprocessor$5 => '|org.apache.phoenix.hbase.index.Indexer|805306366|index.builder=org.apache.phoenix.index.PhoenixIndexBuilder,org.apache.hadoop.hbase.index.codec.class=org.apache.phoenix
.index.PhoenixIndexCodec', METADATA => {'OWNER' => 'dcetl'}}
COLUMN FAMILIES DESCRIPTION
{NAME => '0', VERSIONS => '5', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'FAST_DIFF', T
TL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'NONE', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPE
N => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}
-- 在phoenix中向表插入数据
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 10:00:00'),'zhangsan');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 11:00:00'),'lisi');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 12:00:00'),'wangwu');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 13:00:00'),'zhaoliu');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 14:00:00'),'liuqi');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 15:00:00'),'sunba');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 07:00:00'),'sunyang');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 08:00:00'),'chaoyang');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 09:00:00'),'xuri');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 09:30:00'),'chenxi');
-- OK再查询一下数据插入情况
SELECT * FROM TEST.TEST_VERSION;

以下假设每个PrimaryKey需要保留最新的3版本数据。所以红色框内是需要删除的数据。
phoenix 主键,数据库,大数据

现在需要使用row_number的函数给每个primarykey的不通version数据标识。但是phoenix并没有开窗函数。只有agg聚合函数。
phoenix对SQL的限制还是比较多的如:
(1)join 非等值连接不支持,如on a.id>s.id 是不支持的,也不支持数组比较连接,如on a.id = ARRAY[1,2,3]。 会报错:Error: Does not support non-standard or non-equi correlated-subquery conditions. (state=,code=0)
(2)where exists 格式的非等值连接不支持。select ... from A where exists (select 1 from B where A.id>B.id) 是不支持的。会报错:Error: Does not support non-standard or non-equi correlated-subquery conditions. (state=,code=0)
(2)没有开窗window函数
(3)DELETE FROM不支持JOIN

最终发下有一下函数可用
(1)NTH_VALUE 获取分组排序的第N个值。 返回原值的类型。
(2)FIRST_VALUESLAST_VALUES 获取分区排序后的前、后的N个值,返回ARRAY类型。
此三个函数官网doc中,案例是这样的 FIRST_VALUES( name, 3 ) WITHIN GROUP (ORDER BY salary DESC) 是全局分组,而实际使用中是需要搭配 GROUP BY 使用的。

所以可以获取到

-- 方案一:使用NTH_VALUE获取阈值
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,NTH_VALUE(TS,3) WITHIN GROUP (ORDER BY TS DESC) THRES FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < Z.THRES

-- 方案二:使用FIRST_VALUES获取到一个ARRAY 
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,FIRST_VALUES(TS,3) WITHIN GROUP (ORDER BY TS DESC) TSS FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < ALL(Z.TSS);

由于phoenix支持行子查询,以下是官方案例。这样就能绕过不使用DELETE … JOIN了。

Row subqueries
A subquery can return multiple fields in one row, which is considered returning a row constructor. The row constructor on both sides of the operator (IN/NOT IN, EXISTS/NOT EXISTS or comparison operator) must contain the same number of values, like in the below example:
SELECT column1, column2
FROM t1
WHERE (column1, column2) IN
    (SELECT column3, column4
     FROM t2
     WHERE column5 = ‘nowhere’);
This query returns all pairs of (column1, column2) that can match any pair of (column3, column4) in the second table after being filtered by condition: column5 = ‘nowhere’.

最终实现删除 除N个较新的以外的所有旧版本数据, SQL如下:

-- NTH_VALUE方式
DELETE FROM TEST.TEST_VERSION
WHERE (ID,TS) IN (
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,NTH_VALUE(TS,3) WITHIN GROUP (ORDER BY TS DESC) THRES FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < Z.THRES
);

-- FIRST_VALUES方式
DELETE FROM TEST.TEST_VERSION
WHERE (ID,TS) IN (
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,FIRST_VALUES(TS,3) WITHIN GROUP (ORDER BY TS DESC) TSS FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < ALL(Z.TSS)
);

删除后效果:
phoenix 主键,数据库,大数据

三、探索

3.1 Phoenix的Row Timestamp 探索

Phoenix的Row Timestamp是为了在meta中更快检索数据而设置的。不能实现hbase 中的versions 数据在phoenix中展现。
如下测试案例:
phoenix建表,并插入数据:

create table TEST.TEST_ROW_TIMESTAMP(
ID VARCHAR NOT NULL,
TS TIMESTAMP NOT NULL,
NAME VARCHAR,
CONSTRAINT my_pk PRIMARY KEY (ID,TS ROW_TIMESTAMP)
) VERSIONS=5;

UPSERT INTO TEST.TEST_ROW_TIMESTAMP(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 09:30:00'),'windows');
UPSERT INTO TEST.TEST_ROW_TIMESTAMP(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 10:30:00'),'mac');
UPSERT INTO TEST.TEST_ROW_TIMESTAMP(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 11:30:00'),'linux');

在hbase中查询表:

hbase(main):050:0> desc 'TEST:TEST_ROW_TIMESTAMP'
Table TEST:TEST_ROW_TIMESTAMP is ENABLED
TEST:TEST_ROW_TIMESTAMP, {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.phoenix.coprocessor.ScanRegionObserver|805306366|', coprocessor$2 => '|org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver|805306366|', coprocessor$3
=> '|org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver|805306366|', coprocessor$4 => '|org.apache.phoenix.coprocessor.ServerCachingEndpointImpl|805306366|', coprocessor$5 => '|org.apache.phoenix.hbase.index.Indexer|805306366|index.b
uilder=org.apache.phoenix.index.PhoenixIndexBuilder,org.apache.hadoop.hbase.index.codec.class=org.apache.phoenix.index.PhoenixIndexCodec', METADATA => {'OWNER' => 'dcetl'}}
COLUMN FAMILIES DESCRIPTION
{NAME => '0', VERSIONS => '5', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'FAST_DIFF', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICAT
ION_SCOPE => '0', BLOOMFILTER => 'NONE', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}
1 row(s)
Took 0.0235 seconds

hbase(main):049:0> scan 'TEST:TEST_ROW_TIMESTAMP'
ROW                                                            COLUMN+CELL
 rk001\x00\x80\x00\x01o`p\xC1\xC0\x00\x00\x00\x00              column=0:\x00\x00\x00\x00, timestamp=1577871000000, value=x
 rk001\x00\x80\x00\x01o`p\xC1\xC0\x00\x00\x00\x00              column=0:\x80\x0B, timestamp=1577871000000, value=windows
 rk001\x00\x80\x00\x01o`\xA7\xB0@\x00\x00\x00\x00              column=0:\x00\x00\x00\x00, timestamp=1577874600000, value=x
 rk001\x00\x80\x00\x01o`\xA7\xB0@\x00\x00\x00\x00              column=0:\x80\x0B, timestamp=1577874600000, value=mac
 rk001\x00\x80\x00\x01o`\xDE\x9E\xC0\x00\x00\x00\x00           column=0:\x00\x00\x00\x00, timestamp=1577878200000, value=x
 rk001\x00\x80\x00\x01o`\xDE\x9E\xC0\x00\x00\x00\x00           column=0:\x80\x0B, timestamp=1577878200000, value=linux
3 row(s)
Took 0.0072 seconds

如上查询结果,我们希望在hbase中只有一行数据,并保存为对多个版本,但实际查询到了多条数据,timestamp做为hbase表的rowkey的一部分了。phoenix在创建表时候没有使用hbase多版本保存机制。

3.2 phoenix 和 hbase表结构不一致

先创建hbase Table

create 'TEST:TEST_DIF_TS',{NAME => 'COLS', VERSIONS => 3}
put 'TEST:TEST_DIF_TS','001', 'COLS:NAME','zhangsan'
put 'TEST:TEST_DIF_TS','001', 'COLS:TS', 1695189085000
put 'TEST:TEST_DIF_TS','001', 'COLS:NAME','lisi'
put 'TEST:TEST_DIF_TS','001', 'COLS:TS', 1695189090000
put 'TEST:TEST_DIF_TS','001', 'COLS:NAME','wangwu'
put 'TEST:TEST_DIF_TS','001', 'COLS:TS', 1695189095000
put 'TEST:TEST_DIF_TS','001', 'COLS:NAME','zhaoliu'
put 'TEST:TEST_DIF_TS','001', 'COLS:TS', 1695189105000

get 'TEST:TEST_DIF_TS','001',{COLUMN=>'COLS:NAME',VERSIONS=>3}
# 结果:
COLUMN                                             CELL
 COLS:NAME                                         timestamp=1695784642879, value=zhaoliu
 COLS:NAME                                         timestamp=1695784642857, value=wangwu
 COLS:NAME                                         timestamp=1695784642830, value=lisi

创建Phoenix Table

create table TEST.TEST_DIF_TS(
ID VARCHAR NOT NULL,
TS TIMESTAMP NOT NULL,
NAME VARCHAR,
CONSTRAINT my_pk PRIMARY KEY (ID,TS)
);
UPSERT INTO TEST.TEST_DIF_TS(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 11:30:00'),'XXX');

0: jdbc:phoenix:...> select * from  TEST.TEST_DIF_TS;
+--------+--------------------------+-------+
|   ID   |            TS            | NAME  |
+--------+--------------------------+-------+
| rk001  | 2020-01-01 11:30:00.000  | XXX   |
+--------+--------------------------+-------+

再翻查hbase Table数据

hbase(main):004:0> scan 'TEST:TEST_DIF_TS'
ROW                                                COLUMN+CELL
 001                                               column=COLS:NAME, timestamp=1695784642879, value=zhaoliu
 001                                               column=COLS:TS, timestamp=1695784643741, value=1695189105000
 rk001\x00\x80\x00\x01o`\xDE\x9E\xC0\x00\x00\x00\x column=0:\x00\x00\x00\x00, timestamp=1695786568345, value=x
 00
 rk001\x00\x80\x00\x01o`\xDE\x9E\xC0\x00\x00\x00\x column=0:\x80\x0B, timestamp=1695786568345, value=XXX

可以看到Phoenix只能查询到自己插入的数据,但是hbase可以查询到phoenix,所以phoenix会把不符合自己表结构的数据过滤掉。phoenix的会将自己所有的primary key字段拼接后作为hbase 的rowkey存入hbase。

参考文章:

Phoenix实践 —— Phoenix SQL常用基本语法总结小记
Phoenix 对 Hbase 中表的映射
phoenix使用详解
Phoenix 简介及使用方式
phoenix创建映射表和创建索引、删除索引、重建索引文章来源地址https://www.toymoban.com/news/detail-730872.html

到了这里,关于【Phoenix】phoenix实现每个Primarykey主键保留N版本数据,CDC数据记录为Changelog格式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • hbase-phoenix

    phoenix 客户端,

    2024年02月11日
    浏览(49)
  • Phoenix基本使用

    Phoenix是HBase的开源SQL皮肤。可以使用标准JDBC API代替HBase客户端API来创建表,插入数据和查询HBase数据。 容易集成:如Spark,Hive,Pig,Flume和Map Reduce。 性能好:直接使用HBase API以及协处理器和自定义过滤器,可以为小型查询提供毫秒级的性能,或者为数千万行提供数秒的性能

    2024年01月23日
    浏览(34)
  • Phoenix 时区问题

          最近在测试flink从trino查询数据插入到phoenix5的功能,发现一个时间的问题:  明明插入的时间是 \\\'1940-06-01\\\',查询出来的时间会少一天,同样的 Timestamp 也会自动少掉 8个小时,详细原因参考  Phoenix关于时区的处理方式说明。但是文章中阿里的已经处理,但是社区版的仍

    2024年02月15日
    浏览(35)
  • spark+phoenix读取hbase

    正常来说这个内容应该网上可参考的文章很多,但是我还是捣鼓了好久,现在记录下来,给自己个备忘录。 phoenix是操作hbase的皮肤,他可以轻松的使用sql语句来操作hbase,比直接用hbase的原语操作要友好的多。spark直接操作hbase也是通过hbase的原语操作,操作起来比较繁琐,下

    2024年01月22日
    浏览(40)
  • zookeeper + hadoop + hbase + phoenix

    IP hostname 192.168.23.130 hadoop01 192.168.23.131 hadoop02 192.168.23.132 hadoop03 jdk-1.8 zookeeper-3.8.1 hadoop-3.2.4 hbase-2.4.15 phoenix-2.4.0-5.1.3 1、关闭防火墙 2、设置主机名 3、配置主机hosts 4、设置ssh免密登录 分发JDK安装包到其他节点 配置JDK环境变量(所有节点都需要执行) 修改zookeeper配置 添加z

    2024年02月06日
    浏览(47)
  • 1400*B. Phoenix and Beauty(贪心&构造)

    Problem - 1348B - Codeforces  解析:         满足题意,会构成循环序列,其中循环节长度为 k,统计数列中不同元素的个数 cnt ,如果cnt k,显然无解。         否则,由于题意中已经证明,答案序列个数小于等于 10000,并且 n和k的范围都为100,则n*k小于等于10000,所以我们直接

    2024年02月08日
    浏览(46)
  • HBase在大数据集群的安装部署及整合Phoenix

    前提:需要保证三台虚拟机hadoop102-104已经部署好基本配置。未完成的可以参考:https://blog.csdn.net/weixin_73195042/article/details/135886619 上传HBase安装包到/opt/software文件夹内 配置环境变量 在末尾添加 使用 source 让配置的环境变量生效 将环境变量分发到其他虚拟机上,并且也要sour

    2024年04月27日
    浏览(50)
  • 关于flink-sql-connector-phoenix的重写逻辑

    目录 重写意义 代码结构  调用链路 POM文件配置 代码解析 一、PhoenixJdbcD

    2024年02月12日
    浏览(39)
  • SanctuaryAI推出Phoenix: 专为工作而设计的人形通用机器人

    唯一入选《时代》杂志 2023 年最佳发明的通用机器人。 称机器人自主做家务的速度和 灵活度 已达到了和人类相当的水平。 官网链接:https://sanctuary.ai/ Sanctuary AI 由其开创性的 人工智能控制系统 Carbon™ 提供支持,在宣布其技术首次商业部署后不到两个月,Sanctuary AI 公布了其

    2024年03月09日
    浏览(43)
  • 基于 HBase & Phoenix 构建实时数仓(1)—— Hadoop HA 安装部署

    目录 一、主机规划 二、环境准备 1. 启动 NTP 时钟同步 2. 修改 hosts 文件 3. 配置所有主机间 ssh 免密 4. 修改用户可打开文件数与进程数(可选) 三、安装 JDK 四、安装部署 Zookeeper 集群 1. 解压、配置环境变量 2. 创建配置文件 3. 创建新的空 ZooKeeper 数据目录和事务日志目录 4

    2024年03月10日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包