项目最近需要做一些数据统计方面的东西,发现数据字段都是很长一串数字的字符串,Doris自带的函数无法对其进行相应的运算操作,需要扩展实现相关的操作运算。
主要参考官方的文档资料完成相关的自定义扩展。需要注意的是在使用Java代码编写UDAF时,有一些必须实现的函数(标记required)和一个内部类State。
- SUM求和运算函数
public class Sum {
//Need an inner class to store data
/*required*/
public static class State {
/*some variables if you need */
public BigDecimal sum;
}
/*required*/
public State create() {
/* here could do some init work if needed */
State state = new State();
state.sum = new BigDecimal(0);
return state;
}
/*required*/
public void destroy(State state) {
/* here could do some destroy work if needed */
}
/*Not Required*/
public void reset(State state) {
/*if you want this udaf function can work with window function.*/
/*Must impl this, it will be reset to init state after calculate every window frame*/
/**
state.sum = new BigDecimal(0);
**/
}
/*required*/
//first argument is State, then other types your input
public void add(State state, String value) throws Exception {
try {
/* here doing update work when input data*/
if (null != value && !"".equals(value)) {
state.sum = state.sum.add(new BigDecimal(value));
}
} catch (Exception e) {
log.info(e.getMessage());
}
}
/*required*/
public void serialize(State state, DataOutputStream out) {
/* serialize some data into buffer */
try {
out.writeUTF(state.sum.toString());
} catch (Exception e) {
/* Do not throw exceptions */
log.info(e.getMessage());
}
}
/*required*/
public void deserialize(State state, DataInputStream in) {
/* deserialize get data from buffer before you put */
String value = "0";
try {
value = in.readUTF();
} catch (Exception e) {
/* Do not throw exceptions */
log.info(e.getMessage());
}
state.sum = new BigDecimal(value);
}
/*required*/
public void merge(State state, State rhs) throws Exception {
/* merge data from state */
if (null == rhs || null == rhs.sum || null == state) {
return;
}
try {
BigDecimal sum = state.sum;
if (null == sum) {
sum = new BigDecimal(0);
}
state.sum = sum.add(rhs.sum);
} catch (Exception e) {
log.info(e.getMessage());
}
}
/*required*/
//return Type you defined
public String getValue(State state) throws Exception {
/* return finally result */
return null == state || null == state.sum ? "0" : state.sum.toString();
}
}
- AVG平均数运算函数
public class Avg {
//Need an inner class to store data
/*required*/
public static class State {
/*some variables if you need */
public BigDecimal sum;
public Integer count;
}
/*required*/
public State create() {
/* here could do some init work if needed */
State state = new State();
state.sum = new BigDecimal(0);
state.count = 0;
return state;
}
/*required*/
public void destroy(State state) {
/* here could do some destroy work if needed */
}
/*Not Required*/
public void reset(State state) {
/*if you want this udaf function can work with window function.*/
/*Must impl this, it will be reset to init state after calculate every window frame*/
/**
state.sum = new BigDecimal(0);
**/
}
/*required*/
//first argument is State, then other types your input
public void add(State state, String value) throws Exception {
try {
/* here doing update work when input data*/
if (null != value && !"".equals(value)) {
state.sum = state.sum.add(new BigDecimal(value));
state.count += 1;
}
} catch (Exception e) {
log.info(e.getMessage());
}
}
/*required*/
public void serialize(State state, DataOutputStream out) {
/* serialize some data into buffer */
try {
out.writeUTF(state.sum.toString());
out.writeInt(state.count);
} catch (Exception e) {
/* Do not throw exceptions */
log.info(e.getMessage());
}
}
/*required*/
public void deserialize(State state, DataInputStream in) {
/* deserialize get data from buffer before you put */
String value = "0";
Integer count = 0;
try {
value = in.readUTF();
count = in.readInt();
} catch (Exception e) {
/* Do not throw exceptions */
log.info(e.getMessage());
}
state.sum = new BigDecimal(value);
state.count = count;
}
/*required*/
public void merge(State state, State rhs) throws Exception {
/* merge data from state */
if (null == rhs || null == rhs.sum || null == state) {
return;
}
try {
BigDecimal sum = state.sum;
sum = null == sum ? new BigDecimal(0) : sum;
state.sum = sum.add(rhs.sum);
Integer count = state.count;
count = null == count ? 0 : count;
state.count = count + rhs.count;
} catch (Exception e) {
log.info(e.getMessage());
}
}
/*required*/
//return Type you defined
public String getValue(State state) throws Exception {
/* return finally result */
return null == state || null == state.sum || null == state.count ? "0" :
state.sum.divide(new BigDecimal(state.count)).toString();
}
}
- MAX最大值运算函数
public class Max {
//Need an inner class to store data
/*required*/
public static class State {
/*some variables if you need */
public BigDecimal max;
}
/*required*/
public State create() {
/* here could do some init work if needed */
State state = new State();
state.max = new BigDecimal(0);
return state;
}
/*required*/
public void destroy(State state) {
/* here could do some destroy work if needed */
}
/*Not Required*/
public void reset(State state) {
/*if you want this udaf function can work with window function.*/
/*Must impl this, it will be reset to init state after calculate every window frame*/
/**
state.sum = new BigDecimal(0);
**/
}
/*required*/
//first argument is State, then other types your input
public void add(State state, String value) throws Exception {
try {
/* here doing update work when input data*/
if (null != value && !"".equals(value)) {
BigDecimal valueBd = new BigDecimal(value);
state.max = state.max.compareTo(valueBd) == -1 ? valueBd : state.max;
}
} catch (Exception e) {
log.info(e.getMessage());
}
}
/*required*/
public void serialize(State state, DataOutputStream out) {
/* serialize some data into buffer */
try {
out.writeUTF(state.max.toString());
} catch (Exception e) {
/* Do not throw exceptions */
log.info(e.getMessage());
}
}
/*required*/
public void deserialize(State state, DataInputStream in) {
/* deserialize get data from buffer before you put */
String value = "0";
try {
value = in.readUTF();
} catch (Exception e) {
/* Do not throw exceptions */
log.info(e.getMessage());
}
state.max = new BigDecimal(value);
}
/*required*/
public void merge(State state, State rhs) throws Exception {
/* merge data from state */
if (null == rhs || null == rhs.max || null == state) {
return;
}
try {
BigDecimal sum = state.max;
if (null == sum) {
sum = new BigDecimal(0);
}
state.max = state.max.compareTo(rhs.max) == -1 ? rhs.max : state.max;
} catch (Exception e) {
log.info(e.getMessage());
}
}
/*required*/
//return Type you defined
public String getValue(State state) throws Exception {
/* return finally result */
return null == state || null == state.max ? "0" : state.max.toString();
}
}
- 项目打成jar包,上传至每个FE、BE节点。
- 创建函数命令
CREATE FUNCTION
name ([,...])
[RETURNS] rettype
PROPERTIES (["key"="value"][,...])
CREATE AGGREGATE FUNCTION
name ([,...])
[RETURNS] rettype
PROPERTIES (["key"="value"][,...])
PROPERTIES中symbol表示的是包含UDF类的类名,这个参数是必须设定的。
PROPERTIES中file表示的包含用户UDF的jar包,这个参数是必须设定的。
PROPERTIES中type表示的 UDF 调用类型,默认为 Native,使用 Java UDF时传 JAVA_UDF。
PROPERTIES中always_nullable表示的 UDF 返回结果中是否有可能出现NULL值,是可选参数,默认值为true。
name: 一个function是要归属于某个DB的,name的形式为dbName.funcName。当dbName没有明确指定的时候,就是使用当前session所在的db作为dbName。
CREATE GLOBAL AGGREGATE FUNCTION udaf_sum(string) RETURNS string PROPERTIES (
"file"="file:///home/doris/udf/doris-udaf-1.0.0.jar",
"symbol"="com.doris.udaf.func.Sum",
"always_nullable"="true",
"type"="JAVA_UDF"
);
"file"="http://ip:port/xxxxxx.jar", 当在多机环境时也可以使用http的方式下载jar包。必须让每个BE节点都能获取到jar包; 否则将会返回错误状态信息"Couldn't open file ......"。
"always_nullable"可选属性, 如果在计算中对出现的NULL值有特殊处理,确定结果中不会返回NULL,可以设为false,这样在整个查询计算过程中性能可能更好些。
DROP GLOBAL FUNCTION udaf_sum(string);
SHOW CREATE FUNCTION udaf_sum(string);
- 官方提供的使用须知
1、不支持复杂数据类型(HLL,Bitmap)。
2、当前允许用户自己指定JVM最大堆大小,配置项是jvm_max_heap_size。配置项在BE安装目录下的be.conf全局配置中,默认512M,如果需要聚合数据,建议调大一些,增加性能,减少内存溢出风险。
3、char类型的udf在create function时需要使用String类型。
4、由于jvm加载同名类的问题,不要同时使用多个同名类作为udf实现,如果想更新某个同名类的udf,需要重启be重新加载classpath。 - 官方还提供远程UDAF的文档资料,有需求的话可以参考示例。
- 还有一点就是发现自定义的UDAF无法在物料视图里面使用。
文章来源地址https://www.toymoban.com/news/detail-792647.html
文章来源:https://www.toymoban.com/news/detail-792647.html
到了这里,关于Doris学习笔记-Java自定义UDAF的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!