📚专栏
「Java数据集成」专栏
- 《Java发起HTTP请求并解析JSON返回数据》:下图简称为《请求和解析》
- 《基于MyBatis实现依次、批量、分页增删改查操作》:下图简称为《依批分增删改查》
- 《用Python根据JSON生成Java类代码和数据库建表SQL语句》:下图简称为《生成代码脚本》
- 《基于SpringBoot+MyBatis的数据增删改查模板》:下图简称为《增删改查模板》
- 《Java发起同异步HTTP请求和处理数据》:下图简称为《同异步请求和处理》
- 《基于SpringBoot+MyBatis的数据集成模板》:下图简称为《数据集成模板》
- 《JavaHTTP请求工具类HTTPUtils》:下图简称为《HTTP请求工具类》
- 《JavaJSON处理工具类JSONUtils》:下图简称为《JSON处理工具类》
- 《JavaXML处理工具类XMLUtils》:下图简称为《XML处理工具类》
- 《用Python生成随机JSON数据》:下图简称为《生成随机数据脚本》
同步与异步概念辨析
同步(synchronous)和异步(asynchronous)通常用于描述在执行操作时是否需要等待某个操作完成,以及如何处理返回结果
- 类比生活
- 同步是接力跑:前一个人跑完了再轮到下一个人跑
- 异步是齐跑:大家在起跑线同时起跑
- 编程
- 同步编程(synchronous programming):大多数编程都是同步编程。在同步编程中,任务是按顺序执行的,一个任务必须等待另一个任务完成后才能开始执行
- 异步编程(asynchronous programming):同时处理多个任务
- 请求
- 同步请求(synchronous request):每个请求都必须按顺序进行,并且程序必须等待每个请求完成后才能继续执行下一个请求
- 异步请求(asynchronous request):同时发起多个请求
此处再提两个相关但不同的概念,并发(concurrency)和并行(parallelism)
- 并发是指看起来貌似在同时执行多个任务(如时间片轮转),但在微观上他们都是按一定顺序被处理的,也可以算是某种意义上的“同时”、“异步”,看你怎么理解
- 并行则是指多个任务执行就在同一时刻发生,算是真正意义上的“同时”、“异步”
Java 中的 HTTP 请求
对于 Java 中发起 HTTP 请求而言,一样有同步异步之分。看本文剩余部分之前首先需要学习用 Java 发起 HTTP 请求,对应后文的 requestHTTPContent()
,意为请求后获取响应内容,并在此篇博客文章中有介绍,可以说是本文的基础
💬相关
本文前置基础博客文章《Java发起HTTP请求并解析JSON返回数据》
https://blog.csdn.net/weixin_42077074/article/details/128672130
笔者做了个简单的测试,测试方案如下,一个 JSON 对象对应数据表中的一条记录,对象含有两个键,取值分别为长度为 10 的和长度为 1000 的随机字符串,记录不同方案、不同请求量级下请求获取数据的耗时。
以下每项耗时数据至少测试三次并取平均值。 此外,测试数据是在较为理想的环境下测试获取的,而在实际使用场景中的数据应该会比测试数据略高。
请求数 | 依次同步请求耗时 | 批量异步请求耗时 | 分页异步请求耗时 |
---|---|---|---|
10 | 115ms | 25ms | 31ms |
1000 | 10.19s | 1073ms | 1106ms |
100000 | 17m53.28s | 2m55.20s | 2m55.80s |
为了更好描述同步与异步,以下给出一个示例场景,我想发起 10 次 HTTP 请求,每次请求都带有参数开始时间 startTime
和 endTime
,为 2023 年 1 月 1 日至 2023 年 1 月 10 日十天中每日零点和下一日的零点,如第一次请求的参数为 2023-01-01 00:00:00
和 2023-01-02 00:00:00
依次同步请求
同步发请求无非就是直接调用或在 for
里调用 requestHTTPContent()
// 设置请求头
Map<String, String> headers = new HashMap<String, String>(){{
// 设置接收内容类型
put("Accept","application/json");
// 设置发送内容类型
put("Content-Type","application/json;charset=UTF-8");
// 设置字符集
put("charset", "UTF-8");
// 设置访问者系统引擎版本、浏览器信息的字段信息,此处伪装成用户通过浏览器访问
put("User-Agent", "Mozilla/4.0 (compatible; MSIE 5.0; Windows NT; DigExt)");
}};
// 设置开始时间和结束时间
LocalDateTime startTime = LocalDateTime.of(2023, 1, 1, 0, 0, 0);
LocalDateTime endTime = LocalDateTime.of(2023, 1, 2, 0, 0, 0);
// 创建一个数组来保存结果
String[] results = new String[10];
for (int i = 0; i < 10; i++) {
// 格式化开始时间和结束时间
String formattedTime1 = startTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
String formattedTime2 = endTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
// 设置参数
Map<String, String> params = new HashMap<String, String>(){{
put("startTime", formattedTime1);
put("endTime", formattedTime2);
}};
// 发送请求并获取响应
results[i] = requestHTTPContent("http://www.example.com","GET", headers, params);
// 更新开始时间和结束时间
startTime = startTime.plusDays(1);
endTime = endTime.plusDays(1);
}
// 输出结果
for (int i = 0; i < 10; i++) {
System.out.println(results[i]);
}
批量异步请求
倘若请求量特别大,还按同步的方式进行,综合程序处理、网络延迟等因素,会非常地慢。
而对于异步而言,实现方式就比较多了,如 Java 11 中可以直接使用 java.net.http.HttpClient
类来创建异步 HTTP 客户端并使用 API,网上方法很多,此处不再赘述了
然而笔者处于兼容等各种考虑还是 Java8,Java8 并没有内置的异步 HTTP 客户端,要么调用第三方库来实现,如 Apache 的 HttpAsyncClient
库
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1.4</version>
</dependency>
要么借助 Java 8 的标准库提供的基于 CompletableFuture
的异步编程来实现,笔者也是使用这个方案
注意, CompletableFuture
是并发而不是并行意义上的异步
// 创建 1000 个异步任务
CompletableFuture<String>[] futures = new CompletableFuture[1000];
LocalDateTime startTime = LocalDateTime.of(2023, 1, 1, 0, 0, 0);
LocalDateTime endTime = LocalDateTime.of(2023, 1, 2, 0, 0, 0);
for (int i = 0; i < 1000; i++) {
// 格式化开始时间和结束时间
String formattedTime1 = startTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
String formattedTime2 = endTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
// 设置参数
Map<String, String> params = new HashMap<String, String>(){{
put("startTime", formattedTime1);
put("endTime", formattedTime2);
}};
// 设置异步请求
futures[i] = asyncHTTPRequest("http://www.example.com","GET", headers, params);
// 更新开始时间和结束时间
startTime = startTime.plusDays(1);
endTime = endTime.plusDays(1);
}
// 等待异步任务完成,超时时间为5秒
try {
CompletableFuture.allOf(futures).get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
System.out.println("请求超时");
e.printStackTrace();
return;
}
// 输出每个异步任务的结果
for (int i = 0; i < 1000; i++) {
try {
System.out.println(futures[i].get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
其中的异步请求函数 asyncHTTPRequest()
public static CompletableFuture<String> asyncHTTPRequest(String strURL, String method, Map<String, ?> headers, Map<String, ?> params) {
return CompletableFuture.supplyAsync(() -> {
try {
return requestHTTPContent(strURL, method, headers, params);
} catch (Exception e) {
e.printStackTrace();
return null;
}
});
}
批量异步请求 + 批量数据处理
为了更好说明,现在我们不单单是请求数据,还要请求完后处理数据,以下给出一个示例场景
- HTTP 请求返回的 JSON 数据含有键
id
、key1
、key2
,如
{
"code":"200",
"msg":"success",
"data":[
{"id":"000001","key1":"5WoFrZxFR5ZXi6tA","key2":"0afba4s6HATkE9N4"},
{"id":"000002","key1":"aKeHAyL10oGXYcB1","key2":"cG5SlzRavO2zMLkW"},
{"id":"000003","key1":"O7zdMpEilsatFHRo","key2":"rKsqN0nOfU06vQ8E"},
{"id":"000004","key1":"xD6s7KlaUQ9zY5pR","key2":"8oe1RTbDu8gH30Fn"},
{"id":"000005","key1":"lkpnmv47rybG3hw2","key2":"rht3MhVvDOuaB9cQ"}
]
}
- 建立对应的 Java 类
Data
,含有属性id
、attr1
、attr2
让我们来批量异步请求 + 批量数据处理
💬相关
依次、批量、分页进行数据增删改查请查看
博客文章《基于Spring Boot + MyBatis的数据增删改查模板》,下文的
dataMapper
的 Mapper 层函数均出自于此https://blog.csdn.net/weixin_42077074/article/details/128868655
博客文章《基于MyBatis实现依次、批量、分页增删改查操作》,下文的
dataMapper
的 Mapper 层函数的 MyBatis 实现部分出自于此https://blog.csdn.net/weixin_42077074/article/details/129405833
public void batchRequestAndHandle() throws Exception {
// 异步请求列表
List<CompletableFuture<String>> futures = new ArrayList<>();
// 设置开始时间和结束时间
LocalDateTime startTime = LocalDateTime.of(2023, 1, 1, 0, 0, 0);
LocalDateTime endTime = LocalDateTime.of(2023, 1, 2, 0, 0, 0);
for (int i = 0; i < 1000; i++) {
// 格式化开始时间和结束时间
String formattedTime1 = startTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
String formattedTime2 = endTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
// 设置参数
Map<String, String> params = new HashMap<String, String>(){{
put("startTime", formattedTime1);
put("endTime", formattedTime2);
}};
// 设置异步请求
futures.add(batchHandleAsyncResult("http://www.example.com","GET", headers, params));
// 更新开始时间和结束时间
startTime = startTime.plusDays(1);
endTime = endTime.plusDays(1);
}
// 批量处理异步请求结果
batchHandleAsyncResult(futures);
}
其中批量处理异步请求结果函数 batchHandleAsyncResult()
public void batchHandleAsyncResult(List<CompletableFuture<String>> futures){
// 等待异步任务完成,超时时间为 30 分钟
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1800, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
System.out.println("请求超时");
e.printStackTrace();
return;
}
// 批量处理记录列表
List<Data> dataList = new ArrayList<>();
// 处理每个异步任务的结果
for (CompletableFuture<String> future : futures) {
try {
JSONObject jsonObj = JSON.parseObject(future.get());// 将 JSON 字符串解析成 JSON 对象
if (jsonObj != null){
JSONArray jsonInfo = jsonObj.getJSONArray("data");//解析成 JSON 数组
if (jsonInfo != null) for (int i = 0; i < jsonInfo.size(); i++) {// 遍历 JSON 数组依次取出 JSON 对象
JSONObject jsonDetailInfo = jsonInfo.getJSONObject(i);
dataList.add(new Data(
jsonDetailInfo.getString("id"),
jsonDetailInfo.getString("key1"),
jsonDetailInfo.getString("key2")
));
}
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
// 批量插入或更新
dataMapper.batchInsertOrUpdateData(dataList);
futures.clear();
return ;
}
分页异步请求
可以看到按照上文的异步请求的逻辑,是先将所有请求异步一次性发起,再将结果存进数组,随后统一处理
但是在统一处理之前,放进数组的返回结果会占着内存空间,倘若请求量大,响应信息多,那很容易内存溢出(笔者已经经历过几次了,因为返回结果对应在数据库的行数是百万及以上量级的)
这就需要将大量的请求进行分页,每次仅将一部分的异步请求发起并立即处理结果,随后再进行下一部分的
对于分页处理数据的思路而言,有一些常见的量需要先了解一下
- 总量
-
totalRecords
:总记录数,即数据表中的行数 -
totalFields
:总字段数,即数据表中的列数 -
totalData
:总数据量,即数据表中的行数 × 列数
-
- 分页
-
pageSize
:页面大小,即每页记录数 -
pageDataSize
:页面数据量大小,即每页记录数 × 字段数 -
currentPage
:当前页码 -
totalPages
:总页数
-
一般是将众多记录拆成若干页,每页再挨个处理,一般很少见到拆字段的,所以也就很少见到“每页字段大小”(因为“每页字段大小”就是“总字段数”)
至于这个页大小多少合适,我觉得没有统一的答案,自己根据实际情况设定即可,要看客户机和服务器的硬件条件(CPU 处理速度,内存大小等)、网络延迟等诸多因素,笔者均经历过页太小速度极慢,页太大内存溢出等各种情况……
不过一般情况下,在进行处理完数据之前,我们可能只知道总字段数 totalFields
,不知道总记录数 totalRecords
,所以要计数
- 计数
-
currentCount
:当前记录数,当达到页面大小后就重置为 0 -
totalCount
:总记录数,即数据表中的行数
-
分页异步请求 + 分页数据处理
其实分页和批量的实现是非常像的,只是多了将大量的请求及其结果进行分页的过程
异步请求分页的页面大小较大取决于服务器的硬件条件,而插入或更新数据分页的页面大小较大取决于客户机的硬件条件
一重分页——一次请求对应一条记录
如果恰好一次请求对应一条记录,那么二者的分页是相同的,可以说仅需一重分页
每次仅将一部分的异步请求发起并处理结果,一旦当前记录数达到页面大小就立即异步请求,随后将当前页所有异步请求的结果直接批量处理
一重分页涉及下面的变量
- 总量
-
totalFields
:总字段数,即数据表中的列数
-
- 分页
-
pageSize
:页面大小,即每页记录数 -
pageDataSize
:页面数据量大小,即每页记录数 × 字段数
-
- 计数
-
currentCount
:当前记录数,当达到页面大小后就重置为 0 -
totalCount
:总记录数,即数据表中的行数
-
public void pageRequestAndHandleData() throws Exception {
int totalRequests = 1000; // 总请求数
int requestPageSize = 300; // 请求页面大小,即每页请求数
int currentRequestCount = 0; // 当前记录数,当达到数据页面大小后就重置为 0
// 异步请求列表
List<CompletableFuture<String>> futures = new ArrayList<>();
// 设置开始时间和结束时间
LocalDateTime startTime = LocalDateTime.of(2023, 1, 1, 0, 0, 0);
LocalDateTime endTime = LocalDateTime.of(2023, 1, 2, 0, 0, 0);
for (int i = 0; i < totalRequests; i++) {
// 格式化开始时间和结束时间
String formattedTime1 = startTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
String formattedTime2 = endTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
// 设置参数
Map<String, String> params = new HashMap<String, String>(){{
put("startTime", formattedTime1);
put("endTime", formattedTime2);
}};
// 设置异步请求
futures.add(asyncHTTPRequest("http://www.example.com","GET", headers, params));
currentRequestCount ++;
// 当记录数达到页面大小就进行处理,即分页处理
if(currentRequestCount == requestPageSize){
pageHandleAsyncResult(futures);
currentRequestCount = 0;
}
// 更新开始时间和结束时间
startTime = startTime.plusDays(1);
endTime = endTime.plusDays(1);
}
// 处理最后一页
if(!futures.isEmpty()){
pageHandleAsyncResult(futures);
}
}
其中批量处理异步请求结果函数 batchHandleAsyncResult()
和前文批量集成数据中的是一样的
二重分页——一次请求对应多条记录
然而, 大多数情况下,一次请求是对应多条记录的,返回内容中会有数组让你遍历,而且在你处理异步请求结果之前,你并不知道一个请求对应多少记录,也就未知总记录数
因此异步请求的分页和插入或更新数据的分页应该是分开的,可以说是需要二重分页
每次仅将一部分的异步请求发起并处理结果,一旦当前记录数达到页面大小就立即异步请求,随后将当前页所有异步请求的结果再分页处理
二重分页涉及下面的变量文章来源:https://www.toymoban.com/news/detail-420581.html
- 请求总量
-
totalRequests
:总请求数
-
- 请求分页
-
requestPageSize
:请求页面大小,即每页请求数
-
- 请求计数
-
currentRequestCount
:当前请求数,当达到请求页面大小后就重置为 0
-
- 数据总量
-
totalFields
:总字段数,即数据表中的列数
-
- 数据分页
-
recordPageSize
:记录页面大小,即每页记录数
-
- 数据计数
-
currentRecordCount
:当前记录数,当达到数据页面大小后就重置为 0 -
totalRecordCount
:总记录数,即数据表中的行数
-
public void pageRequestAndHandleData() throws Exception {
int totalRequests = 1000; // 总请求数
int requestPageSize = 300; // 请求页面大小,即每页请求数
int currentRequestCount = 0; // 当前记录数,当达到数据页面大小后就重置为 0
// 异步请求列表
List<CompletableFuture<String>> futures = new ArrayList<>();
// 设置开始时间和结束时间
LocalDateTime startTime = LocalDateTime.of(2023, 1, 1, 0, 0, 0);
LocalDateTime endTime = LocalDateTime.of(2023, 1, 2, 0, 0, 0);
for (int i = 0; i < totalRequests; i++) {
// 格式化开始时间和结束时间
String formattedTime1 = startTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
String formattedTime2 = endTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
// 设置参数
Map<String, String> params = new HashMap<String, String>(){{
put("startTime", formattedTime1);
put("endTime", formattedTime2);
}};
// 设置异步请求
futures.add(asyncHTTPRequest("http://www.example.com","GET", headers, params));
currentRequestCount ++;
// 当记录数达到页面大小就进行处理,即分页处理
if(currentRequestCount == requestPageSize){
pageHandleAsyncResult(futures);
currentRequestCount = 0;
}
// 更新开始时间和结束时间
startTime = startTime.plusDays(1);
endTime = endTime.plusDays(1);
}
// 处理最后一页
if(!futures.isEmpty()){
pageHandleAsyncResult(futures);
}
}
其中分页处理异步请求结果函数 pageHandleAsyncResult()
文章来源地址https://www.toymoban.com/news/detail-420581.html
public void pageHandleAsyncResult(List<CompletableFuture<String>> futures){
int pageDataSize = 12000; // 页面数据量大小,即每页记录数 × 字段数
int totalFields = Data.class.getDeclaredFields().length; // 总字段数,即数据表中的列数
int pageRecordSize = pageDataSize / totalFields; // 页面大小,即每页记录数
int currentRecordCount = 0; // 当前记录数,当达到页面大小后就重置为 0
int totalRecordCount = 0; // 累积记录数,一直累积,不进行重置,最终就是总记录数
// 等待异步任务完成,超时时间为 30 分钟
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1800, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
System.out.println("请求超时");
e.printStackTrace();
return;
}
// 批量处理记录列表
List<Data> dataList = new ArrayList<>();
// 处理每个异步任务的结果
for (CompletableFuture<String> future : futures) {
try {
JSONObject jsonObj = JSON.parseObject(future.get());// 将 JSON 字符串解析成 JSON 对象
if (jsonObj != null){
JSONArray jsonInfo = jsonObj.getJSONArray("data");//解析成 JSON 数组
if (jsonInfo != null) for (int i = 0; i < jsonInfo.size(); i++) {// 遍历 JSON 数组依次取出 JSON 对象
JSONObject jsonDetailInfo = jsonInfo.getJSONObject(i);
dataList.add(new Data(
jsonDetailInfo.getString("id"),
jsonDetailInfo.getString("key1"),
jsonDetailInfo.getString("key2")
));
currentRecordCount ++;
totalRecordCount ++;
if(currentRecordCount == pageRecordSize){
dataMapper.batchInsertOrUpdateData(dataList);
dataList.clear();
currentRecordCount = 0;
}
}
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
if(totalRecordCount > 0) dataMapper.batchInsertOrUpdateData(dataList);
futures.clear();
return ;
}
到了这里,关于Java发起同异步HTTP请求和处理数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!