RisingWave分布式SQL流处理数据库调研

这篇具有很好参考价值的文章主要介绍了RisingWave分布式SQL流处理数据库调研。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

概述

RisingWave是一款分布式SQL流处理数据库,旨在帮助用户降低实时应用的的开发成本。作为专为云上分布式流处理而设计的系统,RisingWave为用户提供了与PostgreSQL类似的使用体验,官方宣称具备比Flink高出10倍的性能(指throughput)以及更低的成本。RisingWave开发只需要关注SQL开发,而不需要像Flink那样去关注

  • RisingWave与Flink不同的是,RisingWave既可以做流处理也可以存储;而Flink只是流处理框架,而不能存储数据,计算后的数据需要存储到外部系统中。官方宣称可以完全替代FlinkSQL。
  • RisingWave与批数据库不同的是,RisingWave可以做流处理,按预定义逻辑实时处理数据,官网宣称可以做到流批一体,批数据库只能处理批数据。

使用场景

RisingWave 的强项是流处理,底层存储为行存,更加适合对已存储的数据高并发点查,而并非全表扫描。RisingWave 的主要使用场景包括了监控、报警、实时动态报表、流式 ETL、机器学习特征工程等。其已经运用到金融交易、制造业、新媒体、物流等领域。
但是,RisingWave 不适合做分析型随机查询。为支持分析型随机查询,用户还需将数据导入到实时分析数据库中进行操作。不少用户将 RisingWave 与 ClickHouse、Apache Doris 等实时分析数据库组合使用:他们使用 RisingWave 做流计算,同时使用实时分析数据库进行分析型随机查询。RisingWave 已经支持到sink ClickHouse、Apache Doris等OLTP中,具体可以参考RisingWave Sink

注意:
RisingWave 不支持读写事务处理,但其支持只读事务。在生产中,使用 RisingWave 的最佳实践是将 RisingWave 放在事务型数据库的下游。RisingWave 通过 CDC 从事务型数据库中读取已经被序列化过的数据。

RisingWave 应用

部署

RisingWave 单机试玩模式

docker run -itd \
-p 4566:4566 \
-p 5691:5691 \
--privileged \
--name=risingwave \
risingwavelabs/risingwave:latest playground

RisingWave 单机 Docker Compose 部署模式(测试推荐这种模式部署,以下测试基于此种模式)

clone the risingwave repository.

git clone https://github.com/risingwavelabs/risingwave.git

进入docker目录

cd docker

启动RisingWave集群

#使用MinIO存储状态后端,standalone模式启动
export RW_IMAGE=risingwavelabs/risingwave:latest
export ENABLE_TELEMETRY=true
docker compose up -d

安装postgresql客户端

由于RisingWave兼容postgresql协议,所以通过postgresql客户端可以直接操作RisingWave
安装postgresql客户端

yum install -y postgresql

使用 psql 连接

psql -h localhost -p 4566 -d dev -U root

启动mysql并开启binlog

  • 启动mysql
# 查看详细默认配置
 docker run -it --rm mysql:5.7 --verbose --help
 #启动mysql server
docker run -d \
--name mysql5.7 \
--restart=always \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=123456 \
-v /data/mysql5.7/data:/var/lib/mysql \#数据文件
-v /data/mysql5.7/conf:/etc/mysql/conf.d \#配置文件
-v /data/mysql5.7/log:/var/log \#日志文件
mysql:5.7 \
--character-set-server=utf8mb4 \
--collation-server=utf8mb4_unicode_ci \
--log-bin=/var/lib/mysql/mysql-bin \#开启binlog配置
--server-id=2 #开启binlog配置

  • 链接mysql

docker exec -it mysql5.7 mysql -h127.0.0.1 -P3306 -p’123456’

  • 验证是否开启 binlog

show variables like ‘%log_bin%’;

  • 授权
--授权RisingWave作为slave访问mysql binlog
grant RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT on *.* to 'root'@'%' IDENTIFIED BY '123456';
--grant ALL PRIVILEGES on db01.* to 'root'@'%' IDENTIFIED BY '123456';
flush  privileges;
--取消授权,如有需要
REVOKE  GRANT OPTION on *.* FROM 'root'@'%';
REVOKE  ALL PRIVILEGES on *.* FROM 'root'@'%';
REVOKE  ALL PRIVILEGES on db01.* FROM 'root'@'%';
flush  privileges;
--查看授权
show grants for root@'%';

部署kafka

  • 启动kafka
# step-1
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper:latest
# step-2
# 启动Kafka,将以下的俩个192.168.1.100换为本身的IP地址bash
docker run  -d \
--name kafka \
--restart=always \
-p 8092:8092 \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.1.100:2181/kafka \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.100:8092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:8092 \
-t wurstmeister/kafka
  • 与kafka交互
#list
docker run -it --rm wurstmeister/kafka kafka-topics.sh --bootstrap-server 192.168.1.100:8092 --list
#create topic
docker run -it --rm wurstmeister/kafka kafka-topics.sh --bootstrap-server 192.168.1.100:8092 --create --replication-factor 1 --partitions 1 --topic test2
#producer
docker run -it --rm wurstmeister/kafka kafka-console-producer.sh --bootstrap-server 192.168.1.100:8092 --topic test1
#consumer
docker run -it --rm wurstmeister/kafka kafka-console-consumer.sh --bootstrap-server 192.168.1.100:8092 --topic test1

  • 或通过kcat与kafka交互
docker pull edenhill/kcat:1.7.1
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t test_sink_topic -C -J
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t test_sink_topic -C

RisingWave 使用demo

  1. 数据导出sink demo
-- create table
CREATE TABLE t1 (v1 int, v2 int) 
WITH (
     connector = 'datagen',

     fields.v1.kind = 'sequence',
     fields.v1.start = '1',
  
     fields.v2.kind = 'random',
     fields.v2.min = '-10',
     fields.v2.max = '10',
     fields.v2.seed = '1',
  
     datagen.rows.per.second = '10'
 ) ROW FORMAT JSON;
-- create sink
CREATE SINK test_sink_1
FROM t1 
WITH (
        properties.bootstrap.server = '192.168.1.100:8092',
        topic = 'test_sink_topic',
        connector = 'kafka',
        primary_key = 'v1'
)
FORMAT UPSERT ENCODE JSON;
 

查看kafka sink 结果

docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t test_sink_topic -C -J

  1. 连接器 source
--source 连接器
CREATE SOURCE IF NOT EXISTS source_1 (
   v1 integer,
   v2 integer,
)
WITH (
   connector='kafka',
   topic='test_sink_topic',
   properties.bootstrap.server='192.168.1.100:8092',
   scan.startup.mode='latest',
) FORMAT PLAIN ENCODE JSON;
-- table连接器
CREATE TABLE IF NOT EXISTS table_1 (
   v1 integer,
   v2 integer,
)
WITH (
   connector='kafka',
   topic='test_sink_topic',
   properties.bootstrap.server='192.168.1.100:8092',
   scan.startup.mode='latest',
) FORMAT PLAIN ENCODE JSON;
  1. Change Data Capture (CDC) 直连 MySQL CDC
    --mysql ddl:
    create database db01;
    use db01;
    CREATE TABLE orders (
       order_id int(11) NOT NULL AUTO_INCREMENT,
       price decimal(11),
       PRIMARY KEY (order_id)
    );
    
   -- risingwave ddl
    CREATE TABLE orders (
       order_id int,
       price decimal,
       PRIMARY KEY (order_id)
    ) WITH (
     connector = 'mysql-cdc',
     hostname = '192.168.1.100',
     port = '3306',
     username = 'root',
     password = '123456',
     database.name = 'db01',
     table.name = 'orders',
    );
    
    --mysql dml
    insert into orders(price) values(12),(10),(23);
    insert into orders(price) values(12),(10);
    update orders set price=100  where order_id=1;
    delete from orders where order_id=3;

	  -- risingwave验证数据
	  select * from orders ;
  1. 直接导出物化视图/表数据 (CREATE SINK FROM)
CREATE TABLE t11 (v1 int, v2 int) 
WITH (
     connector = 'datagen',

     fields.v1.kind = 'sequence',
     fields.v1.start = '1',
  
     fields.v2.kind = 'random',
     fields.v2.min = '-10',
     fields.v2.max = '10',
     fields.v2.seed = '1',
  
     datagen.rows.per.second = '10'
 ) ROW FORMAT JSON;

create materialized view mv_t11 as select count(*) from t11;

CREATE SINK sink1 FROM mv_t11 
WITH (
   connector='kafka',
   properties.bootstrap.server='192.168.1.100:8092',
   topic='t_sink1'
)
FORMAT PLAIN ENCODE JSON(
   force_append_only='true'
);

check结果

docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink1 -C -J

  1. 导出 Query 的数据(CREATE SINK AS)
CREATE TABLE t11 (v1 int, v2 int) 
WITH (
     connector = 'datagen',

     fields.v1.kind = 'sequence',
     fields.v1.start = '1',
  
     fields.v2.kind = 'random',
     fields.v2.min = '-10',
     fields.v2.max = '10',
     fields.v2.seed = '1',
  
     datagen.rows.per.second = '10'
 ) ROW FORMAT JSON;

CREATE SINK sink2 AS 
SELECT 
   avg(v1) as avg_v1, 
   avg(v2) as avg_v2 
FROM t1
WITH (
   connector='kafka',
   properties.bootstrap.server='192.168.1.100:8092',
   topic='t_sink2'
)
FORMAT PLAIN ENCODE JSON(
   force_append_only='true'
);

check结果

docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink1 -C -J

总结

RisingWave 提供与 PostgreSQL 兼容的标准SQL接口。用户可以像使用 PostgreSQL 一样处理数据流。屏蔽了实时处理底层需要遇到的一些技术细节(状态存储,数据一致性,分布式集群扩展等),供应用方快速的开发实时数据流,进行流式ETL。具有以下特性:同步的实时性(可以保证实时的新鲜度,doris等OLAP引擎采用异步实时)、强一致性(doris等OLAP引擎仅提供最终一致性)、高可用、高并发、流处理语义、资源隔离。可以应用在一些数据看版,监控,实时指标等场景。

相关文章

github 仓库
官方文档
中文文档
创始人知乎主页
Slack文章来源地址https://www.toymoban.com/news/detail-832812.html

到了这里,关于RisingWave分布式SQL流处理数据库调研的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 聊聊分布式 SQL 数据库Doris(六)

    此处的负载均衡指的是FE层的负载均衡. 当部署多个 FE 节点时,用户可以在多个 FE 之上部署负载均衡层来实现 Doris 的高可用。官方文档描述: 负载均衡 。 实现方式 实现方式有多种,如下列举。 开发者在应用层自己进行重试与负载均衡。 JDBC Connector 发现一个连接挂掉,就自

    2024年02月05日
    浏览(37)
  • 聊聊分布式 SQL 数据库Doris(七)

    Doris的存储结构是类似LSM-Tree设计的,因此很多方面都是通用的,先阅读了解LSM相关的知识,再看Doris的底层存储与读取流程会清晰透彻很多,LSM基本知识如下: 原理:把各种数据先用log等形式组织在内存中(该数据结构称为MemTable,且有序);到达一定数据量后再批量merge写入磁

    2024年02月05日
    浏览(36)
  • 聊聊分布式 SQL 数据库Doris(五)

    阅读 Doris SQL 原理解析,总结下Doris中SQL解析流程: 词法识别:解析原始SQL文本,拆分token 语法识别:将token转换成AST 单机逻辑查询计划:将AST经过一系列的优化(比如,谓词下推等)成查询计划,提高执行性能与效率。 分布式逻辑查询计划:根据分布式环境(数据分布信息

    2024年02月05日
    浏览(40)
  • 聊聊分布式 SQL 数据库Doris(三)

    在 Doris 的存储引擎规则: 表的数据是以分区为单位存储的,不指定分区创建时,默认就一个分区. 用户数据首先被划分成若干个分区(Partition),划分的规则通常是按照用户指定的分区列进行范围划分,比如按时间划分。 在每个分区内,数据被进一步的按照Hash的方式分桶,分

    2024年02月05日
    浏览(39)
  • 聊聊分布式 SQL 数据库Doris(四)

    FE层的架构都能在网上找到说明. 但BE层的架构模式、一致性保障、与FE层之间的请求逻辑,数据传输逻辑等,我个人暂时没有找到相应的博客说明这些的。当然这些是我个人在学习与使用Doris过程中,对内部交互逻辑与实现感兴趣才有这些疑问. 还好现在有GPT这类大模型,有了

    2024年02月05日
    浏览(39)
  • 解释什么是分布式数据库,列举几种常见的分布式数据库系统

    敏感信息和隐私保护是指在收集、存储和使用个人数据时,需要采取一系列措施来保护这些数据的安全和机密性,防止数据被未经授权的第三方访问、使用或泄露。这些措施包括加密、访问控制、数据脱敏、数据加密、隐私政策等。 在隐私保护的技术手段方面,常用的技术包

    2024年02月08日
    浏览(39)
  • 分布式数据库架构

    对于mysql架构,一定会使用到读写分离,在此基础上有五种常见架构设计:一主一从或多从、主主复制、级联复制、主主与级联复制结合。 1.1、主从复制 这种架构设计是使用的最多的。在读写分离的基础上,会存在一台master作为写机,一个或多个slave作为读机。因为在实际的

    2024年02月10日
    浏览(33)
  • 【大数据】分布式数据库HBase

    目录 1.概述 1.1.前言 1.2.数据模型 1.3.列式存储的优势 2.实现原理 2.1.region 2.2.LSM树 2.3.完整读写过程 2.4.master的作用 本文式作者大数据系列专栏中的一篇文章,按照专栏来阅读,循序渐进能更好的理解,专栏地址: https://blog.csdn.net/joker_zjn/category_12631789.html?spm=1001.2014.3001.5482 当

    2024年04月27日
    浏览(30)
  • 分布式数据库HBase

    HBase是一个高可靠、高性能、 面向列 、可伸缩的分布式数据库,是谷歌BigTable的开源实现,主要用来存储非结构化和把结构化的松散数据。 HBase的目标是处理非常庞大的表,可以通过水平扩展的方式,利用 廉价计算机集群 处理由超过10亿行数据和数百万列元素组成的数据表。

    2024年02月09日
    浏览(43)
  • 分析型数据库:分布式分析型数据库

    分析型数据库的另外一个发展方向就是以分布式技术来代替MPP的并行计算,一方面分布式技术比MPP有更好的可扩展性,对底层的异构软硬件支持度更好,可以解决MPP数据库的几个关键架构问题。本文介绍分布式分析型数据库。 — 背景介绍— 目前在分布式分析型数据库领域,

    2023年04月14日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包