FlinkSQL 的行级权限解决方案及源码

这篇具有很好参考价值的文章主要介绍了FlinkSQL 的行级权限解决方案及源码。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

flink-sql-security

FlinkSQL的行级权限解决方案及源码,支持面向用户级别的行级数据访问控制,即特定用户只能访问授权过的行,隐藏未授权的行数据。此方案是实时领域Flink的解决方案,类似离线数仓Hive中Ranger Row-level Filter方案。

序号 作者 版本 时间 备注
1 HamaWhite 1.0.0 2022-12-15 1. 增加文档和源码

源码地址: https://github.com/HamaWhiteGG/flink-sql-security

一、基础知识

1.1 行级权限

行级权限即横向数据安全保护,可以解决不同人员只允许访问不同数据行的问题。例如针对订单表,用户A只能查看到北京区域的数据,用户B只能查看到杭州区域的数据。

flink ranger,云栖号技术分享,大数据,数据库,hive,云计算,阿里云

1.2 业务流程

1.2.1 设置行级权限

管理员配置用户、表、行级权限条件,例如下面的配置。

序号 用户名 表名 行级权限条件
1 用户A orders region = 'beijing'
2 用户B orders region = 'hangzhou'

1.2.2 用户查询数据

用户在系统上查询orders表的数据时,系统在底层查询时会根据该用户的行级权限条件来自动过滤数据,即让行级权限生效。

当用户A和用户B在执行下面相同的SQL时,会查看到不同的结果数据。

SELECT * FROM orders;

用户A查看到的结果数据是:

order_id order_date customer_name price product_id order_status region
10001 2020-07-30 10:08:22 Jack 50.50 102 false beijing
10002 2020-07-30 10:11:09 Sally 15.00 105 false beijing
注: 系统底层最终执行的SQL是:  SELECT * FROM orders WHERE region = 'beijing'

<br/>

用户B查看到的结果数据是:

order_id order_date customer_name price product_id order_status region
10003 2020-07-30 12:00:30 Edward 25.25 106 false hangzhou
10004 2022-12-15 12:11:09 John 78.00 103 false hangzhou
注: 系统底层最终执行的SQL是:  SELECT * FROM orders WHERE region = 'hangzhou' 。

1.3 组件版本

组件名称 版本 备注
Flink 1.16.0
Flink-connector-mysql-cdc 2.3.0

二、Hive行级权限解决方案

在离线数仓工具Hive领域,由于发展多年已有Ranger来支持表数据的行级权限控制,详见参考文献[[2]](https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.0/authorization-ranger/content/row_level_filtering_in_hive_with_ranger_policies.html)。下图是在Ranger里配置Hive表行级过滤条件的页面,供参考。

flink ranger,云栖号技术分享,大数据,数据库,hive,云计算,阿里云

但由于Flink实时数仓领域发展相对较短,Ranger还不支持FlinkSQL,以及要依赖Ranger会导致系统部署和运维过重,因此开始自研实时数仓的行级权限解决工具

三、FlinkSQL行级权限解决方案

3.1 解决方案

3.1.1 FlinkSQL执行流程

可以参考作者文章[[FlinkSQL字段血缘解决方案及源码]](https://github.com/HamaWhiteGG/flink-sql-lineage/blob/main/README_CN.md),本文根据Flink1.16修正和简化后的执行流程如下图所示。

flink ranger,云栖号技术分享,大数据,数据库,hive,云计算,阿里云

在CalciteParser.parse()处理后会得到一个SqlNode类型的抽象语法树(Abstract Syntax Tree,简称AST),本文会在Parse阶段,通过组装行级过滤条件生成新的AST来实现行级权限控制。

3.1.2 Calcite对象继承关系

下面章节要用到Calcite中的SqlNode、SqlCall、SqlIdentifier、SqlJoin、SqlBasicCall和SqlSelect等类,此处进行简单介绍以及展示它们间继承关系,以便读者阅读本文源码。

序号 介绍
1 SqlNode A SqlNode is a SQL parse tree.
2 SqlCall A SqlCall is a call to an SqlOperator operator.
3 SqlIdentifier A SqlIdentifier is an identifier, possibly compound.
4 SqlJoin Parse tree node representing a JOIN clause.
5 SqlBasicCall Implementation of SqlCall that keeps its operands in an array.
6 SqlSelect A SqlSelect is a node of a parse tree which represents a select statement.

flink ranger,云栖号技术分享,大数据,数据库,hive,云计算,阿里云

3.1.3 解决思路

在Parser阶段,如果执行的SQL包含对表的查询操作,则一定会构建Calcite SqlSelect对象。因此限制表的行级权限,只要在构建Calcite SqlSelect对象时对Where条件进行拦截即可,而不需要解析用户执行的各种SQL来查找配置过行级权限条件约束的表。

在SqlSelect对象构造Where条件时,要通过执行用户和表名来查找配置的行级权限条件,系统会把此条件用CalciteParser提供的parseExpression(String sqlExpression)方法解析生成一个SqlBacicCall再返回。然后结合用户执行的SQL和配置的行级权限条件重新组装Where条件,即生成新的带行级过滤条件Abstract Syntax Tree,最后基于新的AST再执行后续的Validate、Convert、Optimize和Execute阶段。

flink ranger,云栖号技术分享,大数据,数据库,hive,云计算,阿里云

以上整个过程对执行SQL的用户都是透明和无感知的,还是调用Flink自带的TableEnvironment.executeSql(String statement)方法即可。

注: 要通过技术手段把执行用户传递到Calcite SqlSelect中。

3.2 重写SQL

主要在org.apache.calcite.sql.SqlSelect的构造方法中完成。

3.2.1 主要流程

主流程如下图所示,根据From的类型进行不同的操作,例如针对SqlJoin类型,要分别遍历其left和right节点,而且要支持递归操作以便支持三张表及以上JOIN;针对SqlIdentifier类型,要额外判断下是否来自JOIN,如果是的话且JOIN时且未定义表别名,则用表名作为别名;针对SqlBasicCall类型,如果来自于子查询,说明已在子查询中组装过行级权限条件,则直接返回当前Where即可,否则分别取出表名和别名。

然后再获取行级权限条件解析后生成SqlBacicCall类型的Permissions,并给Permissions增加别名,最后把已有Where和Permissions进行组装生成新的Where,来作为SqlSelect对象的Where约束。

flink ranger,云栖号技术分享,大数据,数据库,hive,云计算,阿里云

上述流程图的各个分支,都会在下面的用例测试章节中会举例说明。

3.2.2 核心源码

核心源码位于SqlSelect中新增的addCondition()addPermission()buildWhereClause()三个方法,下面只给出控制主流程addCondition()的源码。

/**
 * The main process of controlling row-level permissions
 */
private SqlNode addCondition(SqlNode from, SqlNode where, boolean fromJoin) {
    if (from instanceof SqlIdentifier) {
        String tableName = from.toString();
        // the table name is used as an alias for join
        String tableAlias = fromJoin ? tableName : null;
        return addPermission(where, tableName, tableAlias);
    } else if (from instanceof SqlJoin) {
        SqlJoin sqlJoin = (SqlJoin) from;
        // support recursive processing, such as join for three tables, process left sqlNode
        where = addCondition(sqlJoin.getLeft(), where, true);
        // process right sqlNode
        return addCondition(sqlJoin.getRight(), where, true);
    } else if (from instanceof SqlBasicCall) {
        // Table has an alias or comes from a subquery
        SqlNode[] tableNodes = ((SqlBasicCall) from).getOperands();
        /**
         * If there is a subquery in the Join, row-level filtering has been appended to the subquery.
         * What is returned here is the SqlSelect type, just return the original where directly
         */
        if (!(tableNodes[0] instanceof SqlIdentifier)) {
            return where;
        }
        String tableName = tableNodes[0].toString();
        String tableAlias = tableNodes[1].toString();
        return addPermission(where, tableName, tableAlias);
    }
    return where;
}

四、用例测试

用例测试数据来自于CDC Connectors for Apache Flink
[[6]](https://ververica.github.io/flink-cdc-connectors/master/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/mysql-postgres-tutorial-zh.html)官网,在此表示感谢。下载本文源码后,可通过Maven运行单元测试。

$ cd flink-sql-security
$ mvn test

4.1 新建Mysql表及初始化数据

Mysql新建表语句及初始化数据SQL详见源码[[flink-sql-security/data/database]](https://github.com/HamaWhiteGG/flink-sql-security/tree/main/data/database)里面的mysql_ddl.sql和mysql_init.sql文件,本文给orders表增加一个region字段。

4.2 新建Flink表

4.2.1 新建mysql cdc类型的orders表

DROP TABLE IF EXISTS orders;

CREATE TABLE IF NOT EXISTS orders (
    order_id INT PRIMARY KEY NOT ENFORCED,
    order_date TIMESTAMP(0),
    customer_name STRING,
    product_id INT,
    price DECIMAL(10, 5),
    order_status BOOLEAN,
    region STRING
) WITH (
    'connector'='mysql-cdc',
    'hostname'='xxx.xxx.xxx.xxx',
    'port'='3306',
    'username'='root',
    'password'='xxx',
    'server-time-zone'='Asia/Shanghai',
    'database-name'='demo',
    'table-name'='orders'
);

4.2.2 新建mysql cdc类型的products表

DROP TABLE IF EXISTS products;

CREATE TABLE IF NOT EXISTS products (
    id INT PRIMARY KEY NOT ENFORCED,
    name STRING,
    description STRING
) WITH (
    'connector'='mysql-cdc',
    'hostname'='xxx.xxx.xxx.xxx',
    'port'='3306',
    'username'='root',
    'password'='xxx',
    'server-time-zone'='Asia/Shanghai',
    'database-name'='demo',
    'table-name'='products'
);

4.2.3 新建mysql cdc类型shipments表

DROP TABLE IF EXISTS shipments;

CREATE TABLE IF NOT EXISTS shipments (
    shipment_id INT PRIMARY KEY NOT ENFORCED,
    order_id INT,
    origin STRING,
    destination STRING,
    is_arrived BOOLEAN
) WITH (
    'connector'='mysql-cdc',
    'hostname'='xxx.xxx.xxx.xxx',
    'port'='3306',
    'username'='root',
    'password'='xxx',
    'server-time-zone'='Asia/Shanghai',
    'database-name'='demo',
    'table-name'='shipments'
);

4.2.4 新建print类型print_sink表

DROP TABLE IF EXISTS print_sink;

CREATE TABLE IF NOT EXISTS print_sink (
    order_id INT PRIMARY KEY NOT ENFORCED,
    order_date TIMESTAMP(0),
    customer_name STRING,
    product_id INT,
    price DECIMAL(10, 5),
    order_status BOOLEAN,
    region STRING
) WITH (
    'connector'='print'
);

4.3 测试用例

详细测试用例可查看源码中的单测,下面只描述部分测试点。

4.3.1 简单SELECT

4.3.1.1 行级权限条件

序号 用户名 表名 行级权限条件
1 用户A orders region = 'beijing'

4.3.1.2 输入SQL

SELECT * FROM orders;

4.3.1.3 输出SQL

SELECT * FROM orders WHERE region = 'beijing';

4.3.1.4 测试小结

输入SQL中没有WHERE条件,只需要把行级过滤条件region = 'beijing'追加到WHERE后即可。

4.3.2 SELECT带复杂WHERE约束

4.3.2.1 行级权限条件

序号 用户名 表名 行级权限条件
1 用户A orders region = 'beijing'

4.3.2.2 输入SQL

SELECT * FROM orders WHERE price > 45.0 OR customer_name = 'John';

4.3.2.3 输出SQL

SELECT * FROM orders WHERE (price > 45.0 OR customer_name = 'John') AND region = 'beijing';

4.3.2.4 测试小结

输入SQL中有两个约束条件,中间用的是OR,因此在组装region = 'beijing'时,要给已有的price > 45.0 OR customer_name = 'John'增加括号。

4.3.3 两表JOIN且含子查询

4.3.3.1 行级权限条件

序号 用户名 表名 行级权限条件
1 用户A orders region = 'beijing'

4.3.3.2 输入SQL

SELECT
    o.*,
    p.name,
    p.description
FROM 
    (SELECT
        *
     FROM 
        orders
     WHERE 
        order_status = FALSE
    ) AS o
LEFT JOIN products AS p ON o.product_id = p.id
WHERE
    o.price > 45.0 OR o.customer_name = 'John'

4.3.3.3 输出SQL

SELECT
    o.*,
    p.name,
    p.description
FROM 
    (SELECT
        *
     FROM 
        orders
     WHERE 
        order_status = FALSE AND region = 'beijing'
    ) AS o
LEFT JOIN products AS p ON o.product_id = p.id
WHERE
    o.price > 45.0 OR o.customer_name = 'John'

4.3.3.4 测试小结

针对比较复杂的SQL,例如两表在JOIN时且其中左表来自于子查询SELECT * FROM orders WHERE order_status = FALSE,行级过滤条件region = 'beijing'只会追加到子查询的里面。

4.3.4 三表JOIN

4.3.4.1 行级权限条件

序号 用户名 表名 行级权限条件
1 用户A orders region = 'beijing'
2 用户A products name = 'hammer'
3 用户A shipments is_arrived = FALSE

4.3.4.2 输入SQL

SELECT
  o.*,
  p.name,
  p.description,
  s.shipment_id,
  s.origin,
  s.destination,
  s.is_arrived
FROM
  orders AS o
  LEFT JOIN products AS p ON o.product_id=p.id
  LEFT JOIN shipments AS s ON o.order_id=s.order_id;

4.3.4.3 输出SQL

SELECT
  o.*,
  p.name,
  p.description,
  s.shipment_id,
  s.origin,
  s.destination,
  s.is_arrived
FROM
  orders AS o
  LEFT JOIN products AS p ON o.product_id=p.id
  LEFT JOIN shipments AS s ON o.order_id=s.order_id
WHERE
  o.region='beijing'
  AND p.name='hammer'
  AND s.is_arrived=FALSE;

4.3.4.4 测试小结

三张表进行JOIN时,会分别获取ordersproductsshipments三张表的行级权限条件: region = 'beijing'name = 'hammer'is_arrived = FALSE,然后增加orders表的别名o、products表的别名p、shipments表的别名s,最后组装到WHERE子句后面。

4.3.5 INSERT来自带子查询的SELECT

4.3.5.1 行级权限条件

序号 用户名 表名 行级权限条件
1 用户A orders region = 'beijing'

4.3.5.2 输入SQL

INSERT INTO print_sink SELECT * FROM (SELECT * FROM orders);

4.3.5.3 输出SQL

INSERT INTO print_sink (SELECT * FROM (SELECT * FROM orders WHERE region = 'beijing'));

4.3.5.4 测试小结

无论运行SQL类型是INSERT、SELECT或者其他,只会找到查询oders表的子句,然后对其组装行级权限条件。

4.3.6 运行SQL

测试两个不同用户执行相同的SQL,两个用户的行级权限条件不一样。

4.3.6.1 行级权限条件

序号 用户名 表名 行级权限条件
1 用户A orders region = 'beijing'
2 用户B orders region = 'hangzhou'

4.3.6.2 输入SQL

SELECT * FROM orders;

4.3.6.3 执行SQL

用户A的真实执行SQL:

SELECT * FROM orders WHERE region = 'beijing';

用户B的真实执行SQL:

SELECT * FROM orders WHERE region = 'hangzhou';

4.3.6.4 测试小结

用户调用下面的执行方法,除传递要执行的SQL参数外,只需要额外指定执行的用户即可,便能自动按照行级权限限制来执行。

/**
 * Execute the single sql with user permissions
 */
public TableResult execute(String username, String singleSql) {
    System.setProperty(EXECUTE_USERNAME, username);
    return tableEnv.executeSql(singleSql);
}

五、源码修改步骤

注: Flink版本1.16.0依赖的Calcite是1.26.0版本。

5.1 新增Parser和ParserImpl类

复制Flink源码中的org.apache.flink.table.delegation.Parser和org.apache.flink.table.planner.delegation.ParserImpl到项目下,新增下面两个方法及实现。

/**
 * Parses a SQL expression into a {@link SqlNode}. The {@link SqlNode} is not yet validated.
 *
 * @param sqlExpression a SQL expression string to parse
 * @return a parsed SQL node
 * @throws SqlParserException if an exception is thrown when parsing the statement
 */
@Override
public SqlNode parseExpression(String sqlExpression) {
    CalciteParser parser = calciteParserSupplier.get();
    return parser.parseExpression(sqlExpression);
}


/**
 * Entry point for parsing SQL queries and return the abstract syntax tree
 *
 * @param statement the SQL statement to evaluate
 * @return abstract syntax tree
 * @throws org.apache.flink.table.api.SqlParserException when failed to parse the statement
 */
@Override
public SqlNode parseSql(String statement) {
    CalciteParser parser = calciteParserSupplier.get();

    // use parseSqlList here because we need to support statement end with ';' in sql client.
    SqlNodeList sqlNodeList = parser.parseSqlList(statement);
    List<SqlNode> parsed = sqlNodeList.getList();
    Preconditions.checkArgument(parsed.size() == 1, "only single statement supported");
    return parsed.get(0);
}

5.2 新增SqlSelect类

复制Calcite源码中的org.apache.calcite.sql.SqlSelect到项目下,新增上文提到的addCondition()addPermission()buildWhereClause()三个方法。
并且在构造方法中注释掉原有的this.where = where行,并添加如下代码:

// add row level filter condition for where clause
SqlNode rowFilterWhere = addCondition(from, where, false);
if (rowFilterWhere != where) {
    LOG.info("Rewritten SQL based on row-level privilege filtering for user [{}]", System.getProperty(EXECUTE_USERNAME));
}
this.where = rowFilterWhere;

5.3 封装SecurityContext类

新建SecurityContext类,主要添加下面三个方法:

/**
 * Add row-level filter conditions and return new SQL
 */
public String addRowFilter(String username, String singleSql) {
    System.setProperty(EXECUTE_USERNAME, username);

    // in the modified SqlSelect, filter conditions will be added to the where clause
    SqlNode parsedTree = tableEnv.getParser().parseSql(singleSql);
    return parsedTree.toString();
}


/**
 * Query the configured permission point according to the user name and table name, and return
 * it to SqlBasicCall
 */
public SqlBasicCall queryPermissions(String username, String tableName) {
    String permissions = rowLevelPermissions.get(username, tableName);
    LOG.info("username: {}, tableName: {}, permissions: {}", username, tableName, permissions);
    if (permissions != null) {
        return (SqlBasicCall) tableEnv.getParser().parseExpression(permissions);
    }
    return null;
}


/**
 * Execute the single sql with user permissions
 */
public TableResult execute(String username, String singleSql) {
    System.setProperty(EXECUTE_USERNAME, username);
    return tableEnv.executeSql(singleSql);
}

六、下一步计划

  1. 支持数据脱敏(Data Masking)
  2. 开发ranger-flink-plugin

原文链接

本文为阿里云原创内容,未经允许不得转载。文章来源地址https://www.toymoban.com/news/detail-758167.html

到了这里,关于FlinkSQL 的行级权限解决方案及源码的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • IIS由于出现权限不足而无法读取配置文件解决方案

    今天来谈一谈关于iis配置上的问题,在启动网站时,提示由于权限不足而无法读取配置,查看本地所有账户均有读写操作的权限 图上显示由于权限不足由于权限不足而无法读取配置文件,所以就从权限入手: 1、右击文件夹-属性-安全,点击编辑,添加 Everyone用户 赋予它所有

    2024年02月15日
    浏览(46)
  • win10共享磁盘/硬盘提示“您没有权限访问,请与网络管理员联系请求访问权限”解决方案

    百度上一大堆方法很多都是没用的,没有解决到我的实际问题。 1、在“网络和共享中心”关闭“密码保护的共享” 2、在“启用和关闭windows功能”中开启SMB文件共享支持。 3、在磁盘安全选项中添加“everyone”用户(重点!) 桌面右下角打开网络和共享中心 选择“更改高级

    2023年04月10日
    浏览(45)
  • MacOS 为指定应用添加指定权限(浏览器无法使用摄像头、麦克风终极解决方案)

    起因:需要浏览器在线做一些测评,但我的 Chrome 没有摄像头/麦克风权限,并且在设置中是没有手动添加按钮的。 我尝试了重装软件,更新系统(上面的 13.5 就是这么来的,我本来都半年懒得更新系统了),都没有任何用。 系统版本:MacOS 13.5.1(需要开启 sip,可参考 macOS

    2024年02月07日
    浏览(80)
  • unity 以管理员权限运行 与 无法读取D3DCompiler DLL文件 解决方案

    在unity hub下载后,用编辑器无法打开项目,可能出现的问题。   Unity 以管理员权限运行,这不受支持。Unity在您的项目中执行脚本和二进制库,这些脚本和二进制库可能来自第三方来源,并可能对您的计算机有害。Unity 还可以执行仍在开发中且尚未完全测试的脚本和二进制库

    2024年04月13日
    浏览(50)
  • layui手机端上传文件时返回404 Not Found的解决方案(client_body_temp权限设置)

    client_body_temp是一个指令指定保存客户端请求体临时文件的目录路径,以及是否进行缓存的配置指令。 在Web服务器中,当客户端向服务器发送请求时,请求体中包含了请求的主体部分,比如表单数据、上传的文件等。当服务器需要读取和处理这些数据时,会将请求体保存到一

    2024年02月15日
    浏览(48)
  • 非常全面的数字人解决方案(含源码)

    TheRamU/Fay: 语音互动,直播自动带货 虚拟数字人 (github.com) fay: 这是一个数字人项目,包含python内核及ue数字人模型,可以用于做数字助理及自动直播,又或者作为你的应用入口也很帅 (gitee.com) 补充mac上的安装办法:(34条消息) Fay数字人开源项目在mac 上的安装办法_郭泽斌之心的

    2024年01月25日
    浏览(40)
  • vue3 一个基于pinia简单易懂的系统权限管理实现方案,vue-router动态路由异步问题解决

    作为项目经验稀少的vue开发者来说,在关键技术点上的经验不多,我希望通过我的思想和实践,把好的东西分享在这里,目的是进一步促进技术交流。项目即将完成,权限是最后的收尾工作,好的权限实现方案,可以让我们没有后顾之忧,也可以提升项目的运行速度。 在开发

    2023年04月08日
    浏览(57)
  • java智慧工地 人脸识别终端,智慧工地解决方案源码

    智慧工地 即施工现场全面数字化过程,使用IOT、云、移动、大数据、AI等关键技术,进行生产要素、管理过程、建筑物实体的数据采集、数据治理,最终通过大数据和人工智能帮助项目实现精益管理。 智慧工地 围绕工程现场人、机、料、法、环及施工过程中质量、安全、进度

    2024年02月04日
    浏览(52)
  • 【CAS6.6源码解析】源码构建时-默认service配置不生效解决方案

    CAS6的源码提供了默认的HTTPSandIMAPS-10000001.json配置用于授权所有的https和imaps服务,但是当添加JsonServiceRegistry模块启动后,会发现service是没有被注册的,是由于json路径引起的错误,可以把路径修改为绝对路径以解决此问题。 CAS默认采用json的方式去文件中读取服务的定义从而注

    2024年02月15日
    浏览(50)
  • 进口跨境商城源码:高效、安全、可扩展的电商平台解决方案

    电子商务的兴起为跨境贸易提供了前所未有的机会和挑战。在这个全球化的时代,跨境电商平台成为许多企业进军国际市场的首选。然而,搭建一个高效、安全、可扩展的进口跨境商城并非易事。 1. 解决方案概述 我们推出的 \\\"进口跨境商城源码\\\" 提供了一个完整的电商平台解

    2024年02月06日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包