<dependency>
<groupId>com.qcloud</groupId>
<artifactId>cos_api</artifactId>
<version>5.6.133</version>
</dependency>
package gaei.cn.x5l.x5lhive2cos.utils;
import com.qcloud.cos.COSClient;
import com.qcloud.cos.ClientConfig;
import com.qcloud.cos.auth.BasicCOSCredentials;
import com.qcloud.cos.auth.COSCredentials;
import com.qcloud.cos.exception.CosClientException;
import com.qcloud.cos.exception.CosServiceException;
import com.qcloud.cos.http.HttpProtocol;
import com.qcloud.cos.model.ListObjectsRequest;
import com.qcloud.cos.model.ObjectListing;
import com.qcloud.cos.region.Region;
import org.apache.commons.lang3.SystemUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static java.util.stream.Collectors.toList;
public class CosSampleDate2Mysql {
//Mysql连接信息系
private static String MYSQL_URL = "jdbc:mysql://10.1.1.1:3316/test?useSSL=false";
private static String MYSQL_USERNAME = "root";
private static String MYSQL_PASSWORD = "123456";
//Mysql目标
private static String MYSQL_TABLE = "`test`.`test`";
private static Connection conn = null;
private static ResultSet rs = null;
private static PreparedStatement ps = null;
public static void main(String[] args) {
//Hive库表
String dbName;
String tableName;
if (SystemUtils.IS_OS_WINDOWS_10) {
//cos中的库名表名
dbName = "database";
tableName = "table_name";
} else {
dbName = args[0];
tableName = args[1];
}
// 1 初始化用户身份信息(secretId, secretKey)。
String secretId = "*******************************";
String secretKey = "*******************************";
COSCredentials cred = new BasicCOSCredentials(secretId, secretKey);
// 2 设置 bucket 的地域。
Region region = new Region("ap-guangzhou");
ClientConfig clientConfig = new ClientConfig(region);
// 这里建议设置使用 https 协议(从 5.6.54 版本开始,默认使用了 https)。
clientConfig.setHttpProtocol(HttpProtocol.https);
// 3 生成 cos 客户端。
COSClient cosClient = new COSClient(cred, clientConfig);
// 遍历 Bucket
// List<Bucket> buckets = cosClient.listBuckets();
// for (Bucket bucketElement : buckets) {
// String bucketName = bucketElement.getName();
// String bucketLocation = bucketElement.getLocation();
// System.out.println("bucketName: " + bucketName);
// System.out.println("bucketLocation: " + bucketLocation);
// }
// 4 遍历 bucket 目录
ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
// 设置 bucket 名称
String bucketName = "pro-x5l-1111111111";
listObjectsRequest.setBucketName(bucketName);
// prefix 表示列出的 object 的 key 以 prefix 开始(以/user/x5l/hive/ods_x5l/ods_hive_gb_and_bms_gb/开头桶下的所有绝对路径)
listObjectsRequest.setPrefix("/user/x5l/hive/" + dbName + "/" + tableName + "/");
// deliter 表示分隔符, 设置为/表示列出当前目录下的 object, 设置为空表示列出所有的 object
listObjectsRequest.setDelimiter("/");
// 设置最大遍历出多少个对象, 一次 listobject 最大支持1000
listObjectsRequest.setMaxKeys(100000000);
ObjectListing objectListing = null;
List<String> commonPrefixs = null;
do {
try {
objectListing = cosClient.listObjects(listObjectsRequest);
} catch (CosServiceException e) {
e.printStackTrace();
return;
} catch (CosClientException e) {
e.printStackTrace();
return;
}
// common prefix 表示表示被 delimiter 截断的路径, 如 delimter 设置为/, common prefix 则表示所有子目录的路径
commonPrefixs = objectListing.getCommonPrefixes();
// object summary 表示所有列出的 object 列表
// List<COSObjectSummary> cosObjectSummaries = objectListing.getObjectSummaries();
// for (COSObjectSummary cosObjectSummary : cosObjectSummaries) {
// // 文件的路径 key
// String key = cosObjectSummary.getKey();
// // 文件的 etag
// String etag = cosObjectSummary.getETag();
// // 文件的长度
// long fileSize = cosObjectSummary.getSize();
// // 文件的存储类型
// String storageClasses = cosObjectSummary.getStorageClass();
// }
// String nextMarker = objectListing.getNextMarker();
// listObjectsRequest.setMarker(nextMarker);
} while (objectListing.isTruncated());
//处理目录结构,获取分区时间目录
List<String> sampleDates = new ArrayList<>();
for (String commonPrefix : commonPrefixs) {
sampleDates.add(commonPrefix.split("/")[5].split("=")[1]);
}
//获取mysql中已有分区时间
List<String> mysqlSampleDates = new ArrayList<>();
try {
conn = DBConn.conn(MYSQL_URL, MYSQL_USERNAME, MYSQL_PASSWORD);
ps = conn.prepareStatement("select sample_date from " + MYSQL_TABLE + " where `database` = '" + dbName + "' and table_name = '" + tableName + "'");
rs = ps.executeQuery();
while (rs.next()) {
String sampleDate = rs.getString("sample_date");
mysqlSampleDates.add(sampleDate);
// System.out.println("Mysql中已有的时间分区:" + sampleDate);
}
} catch (Exception e) {
e.printStackTrace();
}
//获取mysql中缺失分区时间
List<String> allSampleDates = sampleDates.stream().filter(item -> !mysqlSampleDates.contains(item)).collect(toList());
List<String> result = allSampleDates.stream().filter(item -> item.contains("202302") || item.contains("202303") || item.contains("202304") || item.contains("202305")).collect(toList());
//解析后的数据日期倒转
Collections.reverse(result);
System.out.println("Mysql中需要补充的时间分区:");
for (String element : result) {
System.out.println(element);
}
if (tableName.contains("dwd_hive_tbox_period_")) {
dbName = "dwd";
tableName = tableName.replace("dwd_hive_tbox_period_", "dwd_tsp_tbox_period_");
}
int count = 1;
String sql = "insert into " + MYSQL_TABLE + " (`database`, `table_name`, `sample_date`, `state`, `update_time`) values('" + dbName + "', '" + tableName + "' ,%s ,0 ,now())";
for (String resultSampleDate : result) {
try {
System.out.println("正在写入第" + count++ + "条数据");
//获取数据源
ps = conn.prepareStatement(String.format(sql, resultSampleDate));
ps.executeUpdate();
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println(tableName + "数据写入完成,成功写入的数据条数为:" + result.size());
cosClient.shutdown();
}
}
文章来源地址https://www.toymoban.com/news/detail-534812.html
文章来源:https://www.toymoban.com/news/detail-534812.html
到了这里,关于【Hadoop-Cos】存储对象Cos通过Java-SDK获取目录结构的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!