1.spark-sql读取数据jdbc或者hive数据本地模式导出依赖
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>5.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>5.2.1</version>
</dependency>
<dependency>
<groupId>jxl</groupId>
<artifactId>jxl</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
</dependency>
注意:当前excel和commons-io版本都是较较新版本,而commons-io在spark的jars安装目录下也在commons-io的包,如版本冲突,找不到org\apache\commons\io\output\ByteArrayOutputStream.class。如果spark的是2.4或者更低版本,则找不到org\apache\commons\io\output\UnsynchronizedByteArrayOutputStream.class,请同步spark的版本的当前应用包的即可!文章来源:https://www.toymoban.com/news/detail-430711.html
2.spark-sql导出excel或者csv
当前是使用spark的本地聚集方法,Iterator<Row> localIterator = dataset.toLocalIterator();把数据聚集到本地再导出,纯手工实现,可灵活调,不适合大数据集批量导出,在结果集不大的情况可使用此方法导出,仅供参考!文章来源地址https://www.toymoban.com/news/detail-430711.html
2.1导出代码
/**
* 生成文件导出
*
* @param sparkSession sparkSession
* @param paramsMap 参数
*/
public static void sqlAccessFileExport(SparkSession sparkSession, Map<String, Object> paramsMap) {
Long downloadTaskId = MapParamUtil.getLongValue(paramsMap, "downloadTaskId");
String taskName = MapParamUtil.getStringValue(paramsMap, "taskName");
String fileType = MapParamUtil.getStringValue(paramsMap, "fileType");
String waterMark = MapParamUtil.getStringValue(paramsMap, "waterMark");
String tmpDir = MapParamUtil.getStringValue(paramsMap, "tmpDir");
String receiveClient = MapParamUtil.getStringValue(paramsMap, "receiveClient");
Map<String, Object> metaData = (Map<String, Object>) paramsMap.get("metaData");
// 文件名称
String fileName = UUID.randomUUID().toString().concat(".").concat(fileType);
if (!tmpDir.endsWith("/")) {
tmpDir = tmpDir.concat("/");
}
String fileLocation = tmpDir.concat(fileName);
Map<String, Object> returnMap = new HashMap<String, Object>(10);
returnMap.put("downloadTaskId", downloadTaskId);
returnMap.put("method", "sqlAccessFileExportResult");
try {
long start = System.currentTimeMillis();
Dataset<Row> dataset = SqlAccessFile.getDataset(sparkSession, paramsMap);
// 如果是excel文件,创建方式不同
if ("xlsx".equalsIgnoreCase(fileType) || "xls".equalsIgnoreCase(fileType)) {
SqlAccessFile.createExcel(dataset, fileLocation, taskName, waterMark, metaData);
}
else {
// 创建csv文件
SqlAccessFile.createCsvOrTxt(dataset, fileLocation, metaData);
}
long end = System.currentTimeMillis();
returnMap.put("resultCode", KeyValues.SUCCESS);
returnMap.put("resultDesc", "PTO label run success:" + (end - start) + "ms");
returnMap.put("total", dataset.count());
returnMap.put("fileLocation", fileLocation);
SendMessageUtil.sendMessage(receiveClient, returnMap);
}
catch (Exception e) {
logger.error(e.getMessage(), e);
returnMap.put("method", "sqlAccessFileExportResult");
returnMap.put("resultCode", KeyValues.FAIL);
returnMap.put("resultDesc", e.getMessage());
SendMessageUtil.sendMessage(receiveClient, returnMap);
}
}
2.2hive或jdbc分布式数据集的获取
/**
* 获取数据集
*
* @param sparkSession sparkSession
* @param paramsMap 参数
* @return Dataset<Row>
*/
private static Dataset<Row> getDataset(SparkSession sparkSession, Map<String, Object> paramsMap) {
String datasourceType = MapParamUtil.getStringValue(paramsMap, "datasourceType");
String url = MapParamUtil.getStringValue(paramsMap, "url");
String username = MapParamUtil.getStringValue(paramsMap, "username");
String password = MapParamUtil.getStringValue(paramsMap, "password");
String driver = MapParamUtil.getStringValue(paramsMap, "driver");
String sql = MapParamUtil.getStringValue(paramsMap, "sql");
Long downloadTaskId = MapParamUtil.getLongValue(paramsMap, "downloadTaskId");
if ("hive".equalsIgnoreCase(datasourceType)) {
Dataset<Row> dataset = sparkSession.sql(sql);
return dataset;
}
else {
Dataset<Row> dataset = sparkSession.read().format("jdbc")
// 地址
.option("url", url)
// 用户名
.option("user", username)
// 密码
.option("password", new DesCryptUtil(DesCryptUtil.simpleKey).decrypt(password))
// 驱动
.option("driver", driver)
// 生成临时表查询逻辑中,查询逻辑
.option("dbtable", "(" + sql + ") as temp_" + downloadTaskId).load();
return dataset;
}
}
2.3导出Excel代码示例
/**
* 创建excel文件
*
* @param dataset 数据集
* @param fileLocation 文件路径
* @param metaData 元数据翻译
*/
private static void createExcel(Dataset<Row> dataset, String fileLocation, String taskName, String waterMark,
Map<String, Object> metaData) {
// 自动关闭流
try (Workbook workbook = new XSSFWorkbook();
FileOutputStream fileOutputStream = new FileOutputStream(fileLocation);) {
CellStyle headStyle = workbook.createCellStyle();
headStyle.setAlignment(HorizontalAlignment.CENTER);
headStyle.setVerticalAlignment(VerticalAlignment.CENTER);
headStyle.setFillPattern(FillPatternType.SOLID_FOREGROUND);
headStyle.setFillForegroundColor(IndexedColors.GREY_25_PERCENT.getIndex());
// 创建工作薄,写入列头
Sheet sheet = workbook.createSheet(taskName);
org.apache.poi.ss.usermodel.Row excelRow = sheet.createRow(0);
String[] headersNames = dataset.columns();
for (int i = 0; i < headersNames.length; i++) {
Cell cell = excelRow.createCell(i);
String headersName = headersNames[i];
cell.setCellValue(MapParamUtil.getStringValue(metaData, headersName, headersName));
}
// 抽样数据到本地
Iterator<Row> localIterator = dataset.toLocalIterator();
while (localIterator.hasNext()) {
Row row = localIterator.next();
excelRow = sheet.createRow(sheet.getLastRowNum() + 1);
for (int i = 0; i < headersNames.length; i++) {
Cell cell = excelRow.createCell(i);
cell.setCellValue(String.valueOf(row.get(i)));
}
}
// 水印输出处理
if (StringUtil.isNotEmpty(waterMark)) {
ExcelWaterRemarkUtils.painWaterMarkToWorld((XSSFWorkbook) workbook, waterMark);
}
// 生成文件
workbook.write(fileOutputStream);
fileOutputStream.flush();
}
catch (Exception e) {
logger.error(e.getMessage(), e);
throw new SparkException(e.getMessage(), e);
}
}
2.4导出csv文件示例
/**
* 生成csv文件或者txt
*
* @param dataset 数据集
* @param fileLocation 文件路径
* @param metaData 元数据翻译
*/
private static void createCsvOrTxt(Dataset<Row> dataset, String fileLocation, Map<String, Object> metaData) {
try (FileOutputStream fileOutputStream = new FileOutputStream(fileLocation);
OutputStreamWriter outputStreamWriter = new OutputStreamWriter(fileOutputStream, "UTF-8");
BufferedWriter bufferedWriter = new BufferedWriter(outputStreamWriter);) {
// 获取列头,进行元数据翻译操作
String[] columns = dataset.columns();
List<String> headersNames = new ArrayList<>(20);
for (int i = 0; i < columns.length; i++) {
String columnCode = columns[i];
headersNames.add(MapParamUtil.getStringValue(metaData, columnCode, columnCode));
}
// 写入列头,用逗号分隔
bufferedWriter.write(StringUtil.join(headersNames, ","));
bufferedWriter.newLine();
// 抽样数据到本地
Iterator<Row> localIterator = dataset.toLocalIterator();
while (localIterator.hasNext()) {
Row row = localIterator.next();
// 竖线分隔
String mkString = row.mkString(",");
bufferedWriter.write(mkString);
bufferedWriter.newLine();
}
bufferedWriter.flush();
}
catch (Exception e) {
logger.error(e.getMessage(), e);
throw new SparkException(e.getMessage(), e);
}
}
2.5入paramMap入参参考
{
"method": "sqlAccessFileExport",
"appName": "xxx分析_develop_6924_2_20230420",
"receiveClient": "http://xxx.xx.xx.xxx:8559/dataservice/",
"url": "jdbc:postgresql://xxx.xx.xx.xx:5432/bigdata?gssEncMode\u003ddisable\u0026reWriteBatchedInserts\u003dtrue",
"sql": "select product_name,charge_zy_sum,calculate979,product_name_count from smart_test.app_1681284022118 limit 100",
"downloadTaskId": 42,
"datasourceType": "gp",
"tmpDir": "/home/xxxxx/xxxx/xxxxx/shuxkaiftp",
"metaData": {
"charge_zy_sum": "主营收入(万)",
"product_name_count": "产品名称",
"calculate979": "计算字段979",
"product_name": "产品名称(产品维度)"
},
"password": "2047d192e697a909",
"driver": "org.postgresql.Driver",
"taskName": "xxx分析_develop_6924_2_20230420",
"waterMark": "xxxx分析-(xxxuser) 2023-04-29 23:06:02",
"fileType": "xlsx",
"username": "gpxxxx"
}
到了这里,关于spark-sql(jdbc)本地模式导出csv或Excel文件的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!