Doris学习笔记-Java自定义UDAF

这篇具有很好参考价值的文章主要介绍了Doris学习笔记-Java自定义UDAF。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

项目最近需要做一些数据统计方面的东西,发现数据字段都是很长一串数字的字符串,Doris自带的函数无法对其进行相应的运算操作,需要扩展实现相关的操作运算。
主要参考官方的文档资料完成相关的自定义扩展。需要注意的是在使用Java代码编写UDAF时,有一些必须实现的函数(标记required)和一个内部类State。
  • SUM求和运算函数
public class Sum {

    //Need an inner class to store data
    /*required*/
    public static class State {
        /*some variables if you need */
        public BigDecimal sum;
    }

    /*required*/
    public State create() {
        /* here could do some init work if needed */
        State state = new State();
        state.sum = new BigDecimal(0);
        return state;
    }

    /*required*/
    public void destroy(State state) {
        /* here could do some destroy work if needed */
    }

    /*Not Required*/
    public void reset(State state) {
        /*if you want this udaf function can work with window function.*/
        /*Must impl this, it will be reset to init state after calculate every window frame*/
        /**
        state.sum = new BigDecimal(0);
        **/
    }

    /*required*/
    //first argument is State, then other types your input
    public void add(State state, String value) throws Exception {
        try {
            /* here doing update work when input data*/
            if (null != value && !"".equals(value)) {
                state.sum = state.sum.add(new BigDecimal(value));
            }
        } catch (Exception e) {
            log.info(e.getMessage());
        }
    }

    /*required*/
    public void serialize(State state, DataOutputStream out)  {
        /* serialize some data into buffer */
        try {
            out.writeUTF(state.sum.toString());
        } catch (Exception e) {
            /* Do not throw exceptions */
            log.info(e.getMessage());
        }
    }

    /*required*/
    public void deserialize(State state, DataInputStream in)  {
        /* deserialize get data from buffer before you put */
        String value = "0";
        try {
            value = in.readUTF();
        } catch (Exception e) {
            /* Do not throw exceptions */
            log.info(e.getMessage());
        }
        state.sum = new BigDecimal(value);
    }

    /*required*/
    public void merge(State state, State rhs) throws Exception {
        /* merge data from state */
        if (null == rhs || null == rhs.sum || null == state) {
            return;
        }
        try {
            BigDecimal sum = state.sum;
            if (null == sum) {
                sum = new BigDecimal(0);
            }
            state.sum = sum.add(rhs.sum);
        } catch (Exception e) {
            log.info(e.getMessage());
        }
    }

    /*required*/
    //return Type you defined
    public String getValue(State state) throws Exception {
        /* return finally result */
        return null == state || null == state.sum ? "0" : state.sum.toString();
    }

}
  • AVG平均数运算函数
public class Avg {

    //Need an inner class to store data
    /*required*/
    public static class State {
        /*some variables if you need */
        public BigDecimal sum;
        public Integer count;
    }

    /*required*/
    public State create() {
        /* here could do some init work if needed */
        State state = new State();
        state.sum = new BigDecimal(0);
        state.count = 0;
        return state;
    }

    /*required*/
    public void destroy(State state) {
        /* here could do some destroy work if needed */
    }

    /*Not Required*/
    public void reset(State state) {
        /*if you want this udaf function can work with window function.*/
        /*Must impl this, it will be reset to init state after calculate every window frame*/
        /**
        state.sum = new BigDecimal(0);
        **/
    }

    /*required*/
    //first argument is State, then other types your input
    public void add(State state, String value) throws Exception {
        try {
            /* here doing update work when input data*/
            if (null != value && !"".equals(value)) {
                state.sum = state.sum.add(new BigDecimal(value));
                state.count += 1;
            }
        } catch (Exception e) {
            log.info(e.getMessage());
        }
    }

    /*required*/
    public void serialize(State state, DataOutputStream out)  {
        /* serialize some data into buffer */
        try {
            out.writeUTF(state.sum.toString());
            out.writeInt(state.count);
        } catch (Exception e) {
            /* Do not throw exceptions */
            log.info(e.getMessage());
        }
    }

    /*required*/
    public void deserialize(State state, DataInputStream in)  {
        /* deserialize get data from buffer before you put */
        String value = "0";
        Integer count = 0;
        try {
            value = in.readUTF();
            count = in.readInt();
        } catch (Exception e) {
            /* Do not throw exceptions */
            log.info(e.getMessage());
        }
        state.sum = new BigDecimal(value);
        state.count = count;
    }

    /*required*/
    public void merge(State state, State rhs) throws Exception {
        /* merge data from state */
        if (null == rhs || null == rhs.sum || null == state) {
            return;
        }
        try {
            BigDecimal sum = state.sum;
            sum = null == sum ? new BigDecimal(0) : sum;
            state.sum = sum.add(rhs.sum);
            Integer count = state.count;
            count = null == count ? 0 : count;
            state.count = count + rhs.count;
        } catch (Exception e) {
            log.info(e.getMessage());
        }
    }

    /*required*/
    //return Type you defined
    public String getValue(State state) throws Exception {
        /* return finally result */
        return null == state || null == state.sum || null == state.count ? "0" :
            state.sum.divide(new BigDecimal(state.count)).toString();
    }

}
  • MAX最大值运算函数
public class Max {

    //Need an inner class to store data
    /*required*/
    public static class State {
        /*some variables if you need */
        public BigDecimal max;
    }

    /*required*/
    public State create() {
        /* here could do some init work if needed */
        State state = new State();
        state.max = new BigDecimal(0);
        return state;
    }

    /*required*/
    public void destroy(State state) {
        /* here could do some destroy work if needed */
    }

    /*Not Required*/
    public void reset(State state) {
        /*if you want this udaf function can work with window function.*/
        /*Must impl this, it will be reset to init state after calculate every window frame*/
        /**
        state.sum = new BigDecimal(0);
        **/
    }

    /*required*/
    //first argument is State, then other types your input
    public void add(State state, String value) throws Exception {
        try {
            /* here doing update work when input data*/
            if (null != value && !"".equals(value)) {
                BigDecimal valueBd = new BigDecimal(value);
                state.max = state.max.compareTo(valueBd) == -1 ? valueBd : state.max;
            }
        } catch (Exception e) {
            log.info(e.getMessage());
        }
    }

    /*required*/
    public void serialize(State state, DataOutputStream out)  {
        /* serialize some data into buffer */
        try {
            out.writeUTF(state.max.toString());
        } catch (Exception e) {
            /* Do not throw exceptions */
            log.info(e.getMessage());
        }
    }

    /*required*/
    public void deserialize(State state, DataInputStream in)  {
        /* deserialize get data from buffer before you put */
        String value = "0";
        try {
            value = in.readUTF();
        } catch (Exception e) {
            /* Do not throw exceptions */
            log.info(e.getMessage());
        }
        state.max = new BigDecimal(value);
    }

    /*required*/
    public void merge(State state, State rhs) throws Exception {
        /* merge data from state */
        if (null == rhs || null == rhs.max || null == state) {
            return;
        }
        try {
            BigDecimal sum = state.max;
            if (null == sum) {
                sum = new BigDecimal(0);
            }
            state.max = state.max.compareTo(rhs.max) == -1 ? rhs.max : state.max;
        } catch (Exception e) {
            log.info(e.getMessage());
        }
    }

    /*required*/
    //return Type you defined
    public String getValue(State state) throws Exception {
        /* return finally result */
        return null == state || null == state.max ? "0" : state.max.toString();
    }

}
  • 项目打成jar包,上传至每个FE、BE节点。
  • 创建函数命令
CREATE FUNCTION 
name ([,...])
[RETURNS] rettype
PROPERTIES (["key"="value"][,...])  

CREATE AGGREGATE FUNCTION
name ([,...])
[RETURNS] rettype
PROPERTIES (["key"="value"][,...])

PROPERTIES中symbol表示的是包含UDF类的类名,这个参数是必须设定的。
PROPERTIES中file表示的包含用户UDF的jar包,这个参数是必须设定的。
PROPERTIES中type表示的 UDF 调用类型,默认为 Native,使用 Java UDF时传 JAVA_UDF。
PROPERTIES中always_nullable表示的 UDF 返回结果中是否有可能出现NULL值,是可选参数,默认值为true。
name: 一个function是要归属于某个DB的,name的形式为dbName.funcName。当dbName没有明确指定的时候,就是使用当前session所在的db作为dbName。

CREATE GLOBAL AGGREGATE FUNCTION udaf_sum(string) RETURNS string PROPERTIES (
"file"="file:///home/doris/udf/doris-udaf-1.0.0.jar",
"symbol"="com.doris.udaf.func.Sum",
"always_nullable"="true",
"type"="JAVA_UDF"
);

"file"="http://ip:port/xxxxxx.jar", 当在多机环境时也可以使用http的方式下载jar包。必须让每个BE节点都能获取到jar包; 否则将会返回错误状态信息"Couldn't open file ......""always_nullable"可选属性, 如果在计算中对出现的NULL值有特殊处理,确定结果中不会返回NULL,可以设为false,这样在整个查询计算过程中性能可能更好些。

DROP GLOBAL FUNCTION udaf_sum(string);

SHOW CREATE FUNCTION udaf_sum(string);
  • 官方提供的使用须知
    1、不支持复杂数据类型(HLL,Bitmap)。
    2、当前允许用户自己指定JVM最大堆大小,配置项是jvm_max_heap_size。配置项在BE安装目录下的be.conf全局配置中,默认512M,如果需要聚合数据,建议调大一些,增加性能,减少内存溢出风险。
    3、char类型的udf在create function时需要使用String类型。
    4、由于jvm加载同名类的问题,不要同时使用多个同名类作为udf实现,如果想更新某个同名类的udf,需要重启be重新加载classpath。
  • 官方还提供远程UDAF的文档资料,有需求的话可以参考示例。
  • 还有一点就是发现自定义的UDAF无法在物料视图里面使用。

文章来源地址https://www.toymoban.com/news/detail-792647.html

到了这里,关于Doris学习笔记-Java自定义UDAF的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Apache Doris (八) :Doris分布式部署(五) Broker部署及Doris集群启动脚本

    目录 1.Broker部署及扩缩容 1.1 BROKER 部署 1.2 BROKER 扩缩容 2. Apache Doris集群启停脚本

    2024年02月11日
    浏览(40)
  • 大数据Doris(三十三):Doris高级设置

    文章目录 Doris高级设置 一、增大内存

    2024年02月04日
    浏览(50)
  • doris安装部署-通过docker部署doris集群

    配置一个FE+三个BE的集群,使用版本1.1.5,并且指定固定IP和网络。 下载FE和BE包 准备FE和BE环境 配置FE 配置BE 在FE中添加BE 开始使用doris 从官方下载已经编译好的包: doris下载 1.1 doris官方下载 配置FE和BE的目录结构; 把第一步的压缩包解压后放在对应的FE和BE; 运行命令: 各

    2024年02月03日
    浏览(45)
  • Apache Doris (四十八): Doris表结构变更-替换表

     🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客  🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。  🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频 目录

    2024年02月07日
    浏览(42)
  • 大数据Doris(十四):Doris表中的数据基本概念

    文章目录 Doris表中的数据基本概念 一、​​​​​​​Row Column

    2024年02月06日
    浏览(48)
  • Doris:MySQL数据同步到Doris的N种方式

    目录 1.CSV文件方式 1.1 导出mysql数据 1.2 导入数据 2.JDBC 编码方式 3.JDBC Catalog 方式 3.1 上传mysql驱动包 3.2 创建mysql catalog 3.3. 插入数据 4.Binlog Load 方式         当mysql与doris服务之间无法通过网络互联时,可以通过将mysql数据导出成csv文件,然后再在doris服务器导入csv文件的方

    2024年02月04日
    浏览(47)
  • Apache Doris 系列: 基础篇-Flink SQL写入Doris

    本文介绍 Flink SQL如何流式写入 Apache Doris,分为一下几个部分: Flink Doris connector Doris FE 节点配置 Flink SQL 写 Doris Flink Doris connector 本质是通过Stream Load来时实现数据的查询和写入功能。 支持二阶段提交,可实现Exatly Once的写入。 1)需在 apache-doris/fe/fe.conf 配置文件添加如下配置

    2023年04月08日
    浏览(46)
  • 【Doris实战】Apache-doris-2.0.2部署帮助手册

    校验时间:2023年10月11日 版权声明:本文为CSDN博主「顧棟」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/weixin_43820556/article/details/133754689 安装包 apache-doris-2.0.2-bin-x64.tar.gz jdk1.8.0_131.tgz mysql-5.7.43-linux-glibc2.12-x86_64.tar.g

    2024年02月07日
    浏览(51)
  • Doris注意事项,Doris部署在阿里云,写不进去数据

    Doris官网 https://doris.apache.org/ 本地idea访问FE,FE会返回BE的地址,但是在服务器上通过ip addr查看,发现只有局域网IP,所以FE返回了局域网的IP,导致idea连接不上BE 重写BackendV2类,返回公网IP即可。 在项目下新建包名 然后放入 BackendV2类 然后重写修改toBackendString()方法,将公网

    2024年02月14日
    浏览(37)
  • Apache Doris (二十三) :Doris 数据导入(一)Insert Into

    目录 1. 语法及参数 2. 案例 ​​​​3. 注意事项 3.1. 关于插入数据量

    2024年02月13日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包