一、概述
工作中遇到通过sparksession解析csv、parquet文件并预览top100的需求。
二、实现过程
1. 业务流程
2. 业务逻辑
为了便于测试,下面在单元测试中使用模拟数据(模拟Dataset.toJSON()返回值
)来说明
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class GroupingByDataTest
{
static List<String> result = new ArrayList<>();
@BeforeAll
public static void init()
{
result.add("{\"student_no\":\"0204006\",\"student_name\":\"学生6\",\"field\":\"项目6\",\"value2\":\"6\",\"sex\":\"女\"}");
result.add("{\"student_no\":\"0204006\",\"student_name\":\"学生6\",\"field\":\"项目6\",\"value2\":\"6\",\"sex\":\"女\"}");
result.add("{\"student_no\":\"0204006\",\"student_name\":\"学生6\",\"field\":\"项目6\",\"value2\":\"6\",\"sex\":\"女\"}");
result.add("{\"student_no\":\"0204006\",\"student_name\":\"学生6\",\"field\":\"项目6\",\"value2\":\"6\",\"sex\":\"女\"}");
}
@Test
public void test002()
{
Map<Object, List<Object>> r = result.stream()
.map(s -> JSONObject.parseObject(s).entrySet()) // map
.flatMap(m -> m.stream()) // flatMap
.collect(Collectors.groupingBy(mp -> mp.getKey(), Collectors.mapping(x -> x.getValue(), Collectors.toList())));
log.info("{}", r);
}
}
3. 运行结果
com.fly.lambda.GroupingByDataTest - {
student_name=[学生6, 学生6, 学生6, 学生6],
student_no=[0204006, 0204006, 0204006, 0204006],
value2=[6, 6, 6, 6],
field=[项目6, 项目6, 项目6, 项目6],
sex=[女, 女, 女, 女]}
目前看来似乎一切正常。
三、bug现象
实际测试过程中发现,hive数据仓库中的字段由于各种原因并不一定都有值,从而导致保存的结果文件csv、parquet时字段值为空
1. 单元测试
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class GroupingByDataTest
{
static List<String> result = new ArrayList<>();
@BeforeAll
public static void init()
{
result.add("{\"student_name\":\"学生1\",\"student_no\":\"0204001\",\"field\":\"项目1\", \"sex\":\"男\"}");
result.add("{\"student_name\":\"学生2\",\"student_no\":\"0204002\",\"field\":\"项目2\",\"value2\":\"2\" }");
result.add("{\"student_name\":\"学生3\", \"field\":\"项目3\",\"value2\":\"3\",\"sex\":\"女\"}");
result.add("{ \"student_no\":\"0204004\",\"field\":\"项目4\",\"value2\":\"4\",\"sex\":\"男\"}");
result.add("{\"student_name\":\"学生5\",\"student_no\":\"0204005\",\"field\":\"项目5\",\"value2\":\"5\",\"sex\":\"女\"}");
result.add("{\"student_no\":\"0204006\",\"student_name\":\"学生6\",\"field\":\"项目6\",\"value2\":\"6\",\"sex\":\"女\"}");
}
@Test
public void test002()
{
Map<Object, List<Object>> r = result.stream()
.map(s -> JSONObject.parseObject(s).entrySet()) // map
.flatMap(m -> m.stream()) // flatMap
.collect(Collectors.groupingBy(mp -> mp.getKey(), Collectors.mapping(x -> x.getValue(), Collectors.toList())));
log.info("{}", r);
}
}
2.运行结果
com.fly.lambda.GroupingByDataTest - {
student_name=[学生1, 学生2, 学生3, 学生5, 学生6],
student_no=[0204001, 0204002, 0204004, 0204005, 0204006],
value2=[2, 3, 4, 5, 6],
field=[项目1, 项目2, 项目3, 项目4, 项目5, 项目6],
sex=[男, 女, 男, 女, 女]}
期望的结果为
com.fly.lambda.GroupingByDataTest - before : {
student_name=[学生1, 学生2, 学生3, null, 学生5, 学生6],
student_no=[0204001, 0204002, null, 0204004, 0204005, 0204006],
value2=[null, 2, 3, 4, 5, 6],
field=[项目1, 项目2, 项目3, 项目4, 项目5, 项目6],
sex=[男, null, 女, 男, 女, 女]}
三、解决思路
解决这个问题有2个思路
1. 思路一
从数据来源解决,也就是 hiveSQL读取数据
使用 coalsce 函数进行空值处理,实际去解决的过程中发现2个问题。
- )强制业务用户编辑hiveSQL时显式调用(
用户体验太差,增加使用难度
) - )不强制业务用户编辑hiveSQL时显式调用,后台接受到SQL后自动添加coalsce 函数(
后台业务逻辑复杂,eg: 使用了条件语句、多表关联查询、字段命名特殊、添加 as 别名等各种情况,不一而足,几乎没法妥善处理
)
2. 思路二
hiveSQL读取数据、数据写入csv或parquet文件正常进行,不用特殊处理, 修改步骤3
分为2步骤,步骤1,遍历获取全部的key去重,步骤2,自动对缺失数据的key补充空值
核心代码如下:
@Test
public void test003()
throws IOException
{
// 取keys
List<String> keys = result.stream().map(s -> JSONObject.parseObject(s).entrySet()).flatMap(m -> m.stream()).map(r -> r.getKey()).distinct().collect(Collectors.toList());
keys.stream().forEach(log::info);
Map<String, List<Object>> r = result.stream()
.map(s -> parse(s, keys))
.flatMap(m -> m.stream()) // flatMap
.collect(Collectors.groupingBy(mp -> mp.getKey(), Collectors.mapping(x -> x.getValue(), Collectors.toList())));
log.info("before : {}", r);
log.info("sorted : {}", new TreeMap<>(r));
}
/**
* 设置value, 根据需要补充空值
*/
private Set<Entry<String, Object>> parse(String s, List<String> keys)
{
JSONObject jsonObject = JSONObject.parseObject(s);
keys.stream().forEach(key -> {
if (!jsonObject.containsKey(key))
{
jsonObject.put(key, null);
}
});
return jsonObject.entrySet();
}
运行结果
2024-01-14 13:57:53.211 [main] INFO com.fly.lambda.GroupingByDataTest - student_name
2024-01-14 13:57:53.217 [main] INFO com.fly.lambda.GroupingByDataTest - student_no
2024-01-14 13:57:53.217 [main] INFO com.fly.lambda.GroupingByDataTest - field
2024-01-14 13:57:53.217 [main] INFO com.fly.lambda.GroupingByDataTest - sex
2024-01-14 13:57:53.217 [main] INFO com.fly.lambda.GroupingByDataTest - value2
2024-01-14 13:57:53.232 [main] INFO com.fly.lambda.GroupingByDataTest - before : {student_name=[学生1, 学生2, 学生3, null, 学生5, 学生6], student_no=[0204001, 0204002, null, 0204004, 0204005, 0204006], value2=[null, 2, 3, 4, 5, 6], field=[项目1, 项目2, 项目3, 项目4, 项目5, 项目6], sex=[男, null, 女, 男, 女, 女]}
2024-01-14 13:57:53.237 [main] INFO com.fly.lambda.GroupingByDataTest - sorted : {field=[项目1, 项目2, 项目3, 项目4, 项目5, 项目6], sex=[男, null, 女, 男, 女, 女], student_name=[学生1, 学生2, 学生3, null, 学生5, 学生6], student_no=[0204001, 0204002, null, 0204004, 0204005, 0204006], value2=[null, 2, 3, 4, 5, 6]}
可以说,花比较小的成本,以比较少的代码变动,相对稳妥的解决了问题。
有任何问题和建议,都可以向我提问讨论,大家一起进步,谢谢!
文章来源:https://www.toymoban.com/news/detail-803528.html
-over-文章来源地址https://www.toymoban.com/news/detail-803528.html
到了这里,关于记csv、parquet数据预览一个bug的解决的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!