Reactor 第十篇 定制一个生产的WebClient

这篇具有很好参考价值的文章主要介绍了Reactor 第十篇 定制一个生产的WebClient。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1 为什么要用 WebClient

刚开始尝试使用 Spring WebFlux 的时候,很多人都会使用 Mono.fromFuture() 将异步请求转成 Mono 对象,或者 Mono.fromSupplier() 将请求转成 MOno 对象,这两种方式在响应式编程中都是不建议的,都会阻塞当前线程。

1.1 Mono.fromFuture() VS WebClient

Mono.fromFuture()方法和使用 WebClient 调用第三方接口之间存在以下区别:

  • 异步 vs. 非阻塞

Mono.fromFuture()方法适用于接收一个 java.util.concurrent.Future 对象,并将其转换为响应式的 Mono。这是一个阻塞操作,因为它会等待 Future 对象完成。而使用 WebClient 调用第三方接口是异步和非阻塞的,它不会直接阻塞应用程序的执行,而是使用事件驱动的方式处理响应。

可扩展性和灵活性:使用 WebClient 可以更灵活地进行配置和处理,例如设置超时时间、请求头、重试机制等。WebClient 还可以与许多其他 Spring WebFlux 组件集成,如 WebSockets、Server-Sent Events 等。而 Mono.fromFuture() 是适用于单个 Future 对象转化为 Mono 的情况,可扩展性较差。

  • 错误处理

WebClient 提供了更丰富的错误处理机制,可以通过 onStatus、onError 等方法来处理不同的 HTTP 状态码或异常。同时,WebClient 还提供了更灵活的重试和回退策略。Mono.fromFuture() 方法只能将 Future 对象的结果包装在 Mono 中,不提供特定的错误处理机制。

  • 阻塞操作

Mono.fromFuture() 会阻塞。当调用 Mono.fromFuture() 方法将 Future 转换为 Mono 时,它会等待 Future 对象的结果返回。在这个等待的过程中,Mono.fromFuture()方法会阻塞当前的线程。这意味着,如果 Future 的结果在运行过程中没有返回,则当前线程会一直阻塞,直到 Future 对象返回结果或者超时。因此,在使用 Mono.fromFuture() 时需要注意潜在的阻塞风险。另外,需要确保F uture 的任务在后台线程中执行,以免阻塞应用程序的主线程。

1.2 Mono.fromFuture VS Mono.fromSupplier

Mono.fromSupplier() 和 Mono.fromFuture() 都是用于将异步执行的操作转换为响应式的 Mono 对象,但它们的区别在于:

Mono.fromSupplier() 适用于一个提供者/生产者,可以用来表示某个操作的结果,该操作是一些纯计算并且没有阻塞的方法。也就是说,Mono.fromSupplier() 将其参数 (Supplier) 所提供的操作异步执行,并将其结果打包成一个 Mono 对象。

Mono.fromFuture() 适用于一个 java.util.concurrent.Future 对象,将其封装成 Mono 对象。这意味着调用 Mono.fromFuture() 方法将阻塞当前线程,直到异步操作完成返回一个 Future 对象。

因此,Mono.fromSupplier() 与 Mono.fromFuture() 的主要区别在于:

Mono.fromSupplier() 是一个非阻塞的操作,不会阻塞当前线程。这个方法用于执行计算型的任务,返回一个封装了计算结果的 Mono 对象。
Mono.fromFuture() 是阻塞操作,会阻塞当前线程,直到异步操作完毕并返回看,它适用于处理 java.util.concurrent.Future 对象。

需要注意的是,如果 Supplier 提供的操作是阻塞的,则 Mono.fromSupplier() 方法本身也会阻塞线程。但通常情况下,Supplier 提供的操作是纯计算型的,不会阻塞线程。

因此,可以使用 Mono.fromSupplier() 方法将一个纯计算型的操作转换为 Mono 对象,而将一个异步返回结果的操作转换为 Mono 对象时,可以使用 Mono.fromFuture() 方法。

2 定制化自己的 WebClient

2.1 初始化 WebClient

WebClient 支持建造者模式,使用 WebClient 建造者模式支持开发自己的个性化 WebClient,比如支持设置接口调用统一耗时、自定义底层 Http 客户端、调用链路、打印接口返回日志、监控接口耗时等等。

WebClient builder 支持以下方法

interface Builder {

		/**
		 * 配置请求基础的url,如:baseUrl = "https://abc.go.com/v1";和 uriBuilderFactory 冲突,如果有 uriBuilderFactory ,则忽略 baseUrl
		 */
		Builder baseUrl(String baseUrl);

		/**
		 * URI 请求的默认变量。也和 uriBuilderFactory 冲突,如果有 uriBuilderFactory ,则忽略 defaultUriVariables
		 */
		Builder defaultUriVariables(Map<String, ?> defaultUriVariables);

		/**
		 * 提供一个预配置的UriBuilderFactory实例
		 */
		Builder uriBuilderFactory(UriBuilderFactory uriBuilderFactory);

		/**
		 * 默认 header
		 */
		Builder defaultHeader(String header, String... values);

		/**
		 * 默认cookie
		 */
		Builder defaultCookie(String cookie, String... values);

		/**
		 * 提供一个 consumer 来定制每个请求
		 */
		Builder defaultRequest(Consumer<RequestHeadersSpec<?>> defaultRequest);

		/**
		 * 添加一个filter,可以添加多个
		 */
		Builder filter(ExchangeFilterFunction filter);

	
		/**
		 * 配置要使用的 ClientHttpConnector。这对于插入或自定义底层HTTP 客户端库(例如SSL)的选项非常有用。
		 */
		Builder clientConnector(ClientHttpConnector connector);

		/**
		 * Configure the codecs for the {@code WebClient} in the
		 * {@link #exchangeStrategies(ExchangeStrategies) underlying}
		 * {@code ExchangeStrategies}.
		 * @param configurer the configurer to apply
		 * @since 5.1.13
		 */
		Builder codecs(Consumer<ClientCodecConfigurer> configurer);



		/**
		 * 提供一个预先配置了ClientHttpConnector和ExchangeStrategies的ExchangeFunction。
这是对 clientConnector 的一种替代,并且有效地覆盖了它们。
		 */
		Builder exchangeFunction(ExchangeFunction exchangeFunction);

		/**
		 * Builder the {@link WebClient} instance.
		 */
		WebClient build();
        
  // 其他方法
	}

2.2 日志打印及监控

  • 打印参数、url、返回
  • 参数和返回需要转成json
  • 需要打印正常返回日志和异常
  • 正常监控、异常监控、总监控以及响应时间
.doOnSuccess(response-> {
    log.info("get.success, url={}, response={}, param={}", url, response);
})
.doOnError(error-> {
    log.info("get.error, url={}", url, error);
    // 监控
})
.doFinally(res-> {
  //监控
})

2.3 返回处理

retrieve() // 声明如何提取响应。例如,提取一个ResponseEntity的状态,头部和身体:

.bodyToMono(clazz) 将返回body内容转成clazz对象,clazz 对象可以自己指定类型。如果碰到有问题的无法转化的,也可以先转成String,然后自己实现一个工具类,将String转成 class 对象。

2.3.1 get
public <T> Mono<T> get(String url, Class<T> clazz, T defaultClass) {
long start = System.currentTimeMillis();
return webClient.get()
        .uri(url)
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .bodyToMono(clazz)
        .doOnSuccess(response-> {
            log.info("get.success, url={}, response={}, param={}", url, response);
        })
        .doOnError(error-> {
            log.info("get.param.error, url={}", url, error);
        })
        .onErrorReturn(defaultClass)
        .doFinally(res-> {
        })
        .publishOn(customScheduler);
}
2.3.2 get param 请求
public <T> Mono<T> getParam(String url, MultiValueMap<String, String> param, Class<T> clazz, T defaultClass) {
long start = System.currentTimeMillis();
URI uri = UriComponentsBuilder.fromUriString(url)
        .queryParams(param)
        .build()
        .toUri();

return webClient.get()
        .uri(uri)
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .bodyToMono(clazz)
        .doOnSuccess(response-> {
            log.info("get.param.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(param));
        })
        .doOnError(error-> {
            log.error("get.param.error, url={}, param={}", url, JsonUtil.toJson(param), error);
        })
        .onErrorReturn(defaultClass)
        .doFinally(res-> {
        // 监控 or 打印日志 or 耗时
        })
        .publishOn(customScheduler);
}
2.3.3 post json 请求
public <T> Mono<T> postJson(String url, final HttpParameter4Json parameter, Class<T> clazz, T defaultClass) {
final long start = System.currentTimeMillis();
return webClient.post()
        .uri(url)
        .contentType(MediaType.APPLICATION_JSON)
        .cookies(cookies -> cookies.setAll(parameter.getCookies()))
        .body(Mono.just(parameter.getJsonBody()), ParameterizedTypeReference.forType(parameter.getBodyType()))
        .headers(headers -> headers.setAll(parameter.getHeaders()))
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .bodyToMono(clazz)
        .doOnSuccess(response-> {
            log.info("post.json.success, url={}, response={}, param={}", url, response, parameter.getJsonBody());
        })
        .doOnError(error-> {
            log.error("get.param.error, url={}, param={}", url, parameter.getJsonBody(), error);
        })
        .onErrorReturn(defaultClass)
        .doFinally(res-> {
        })
        .publishOn(customScheduler);

}
2.3.4 post form Data 请求
public <T> Mono<T> postFormData(String url, HttpParameter parameter, Class<T> clazz, T defaultClass) {
    final long start = System.currentTimeMillis();
    return webClient.post()
            .uri(url)
            .contentType(MediaType.APPLICATION_FORM_URLENCODED)
            .cookies(cookies -> cookies.setAll(parameter.getCookies()))
            .body(BodyInserters.fromFormData(parameter.getMultiValueMapParam()))
            .headers(headers -> headers.setAll(parameter.getMapHeaders()))
            .accept(MediaType.APPLICATION_JSON)
            .retrieve()
            .bodyToMono(clazz)
            .doOnSuccess(response-> {
                log.info("post.fromData.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(parameter));
            })
            .doOnError(error-> {
                log.info("get.param.error, url={}, param={}", url, JsonUtil.toJson(parameter), error);
            })
            .onErrorReturn(defaultClass)
            .doFinally(res-> {
            })
            .publishOn(customScheduler);
}

2.4 异常处理

2.4.1 异常返回兜底

onErrorReturn 发现异常返回兜底数据

2.4.2 异常处理

状态码转成异常抛出

.onStatus(HttpStatus::isError, response -> Mono.error(new RuntimeException("Request failed with status code: " + response.statusCode())))

监控异常文章来源地址https://www.toymoban.com/news/detail-663868.html

.doOnError(error -> {
    // log and monitor
})

3 完整的 WebClient


package com.geniu.reactor.webclient;

import com.geniu.utils.JsonUtil;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.SslProvider;
import reactor.netty.tcp.TcpClient;

import java.net.URI;
import java.time.Duration;
import java.util.function.Function;

/**
 * @Author: prepared
 * @Date: 2023/8/15 11:05
 */
@Slf4j
public class CustomerWebClient {

	public static final CustomerWebClient instance = new CustomerWebClient();

	/**
	 * 限制并发数 100
	 */
	Scheduler customScheduler = Schedulers.newParallel("CustomScheduler", 100);


	private final WebClient webClient;

	private CustomerWebClient() {

		final SslContextBuilder sslBuilder = SslContextBuilder.forClient()
				.trustManager(InsecureTrustManagerFactory.INSTANCE);

		final SslProvider ssl = SslProvider.builder().sslContext(sslBuilder)
				.defaultConfiguration(SslProvider.DefaultConfigurationType.TCP).build();

		final int cpuCores = Runtime.getRuntime().availableProcessors();
		final int selectorCount = Math.max(cpuCores / 2, 4);
		final int workerCount = Math.max(cpuCores * 2, 8);
		final LoopResources pool = LoopResources.create("HCofSWC", selectorCount, workerCount, true);

		final Function<? super TcpClient, ? extends TcpClient> tcpMapper = tcp -> tcp
				.option(ChannelOption.TCP_NODELAY, true)
				.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
				.option(ChannelOption.SO_TIMEOUT, 10000)
				.secure(ssl)
				.runOn(pool);

		ConnectionProvider.Builder httpClientOfSWC = ConnectionProvider
				.builder("HttpClientOfSWC")
				.maxConnections(100_000)
				.pendingAcquireTimeout(Duration.ofSeconds(6));
		final ConnectionProvider connectionProvider = httpClientOfSWC.build();

		final HttpClient hc = HttpClient.create(connectionProvider)
				.tcpConfiguration(tcpMapper);

		final Function<HttpClient, HttpClient> hcMapper = rhc -> rhc
				.compress(true);

		final WebClient.Builder wcb = WebClient.builder()
				.clientConnector(new ReactorClientHttpConnector(hcMapper.apply(hc)));
//				.filter(new TraceRequestFilter()); 可以通过Filter 增加trace追踪

		this.webClient = wcb.build();
	}

	public <T> Mono<T> get(String url, Class<T> clazz, T defaultClass) {
		long start = System.currentTimeMillis();
		return webClient.get()
				.uri(url)
				.accept(MediaType.APPLICATION_JSON)
				.retrieve()
				.onStatus(HttpStatus::isError, response -> Mono.error(new RuntimeException("Request failed with status code: " + response.statusCode())))
				.bodyToMono(clazz)
				.doOnSuccess(response-> {
					log.info("get.success, url={}, response={}, param={}", url, response);
				})
				.doOnError(error-> {
					log.info("get.param.error, url={}", url, error);
				})
				.onErrorReturn(defaultClass)
				.doFinally(res-> {
				})
				.publishOn(customScheduler);
	}

	public <T> Mono<T> getParam(String url, MultiValueMap<String, String> param, Class<T> clazz, T defaultClass) {
		long start = System.currentTimeMillis();
		URI uri = UriComponentsBuilder.fromUriString(url)
				.queryParams(param)
				.build()
				.toUri();

		return webClient.get()
				.uri(uri)
				.accept(MediaType.APPLICATION_JSON)
				.retrieve()
				.bodyToMono(clazz)
				.doOnSuccess(response-> {
					log.info("get.param.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(param));
				})
				.doOnError(error-> {
					log.error("get.param.error, url={}, param={}", url, JsonUtil.toJson(param), error);
				})
				.onErrorReturn(defaultClass)
				.doFinally(res-> {
				})
				.publishOn(customScheduler);
	}



	public <T> Mono<T> postJson(String url, final HttpParameter4Json parameter, Class<T> clazz, T defaultClass) {
		final long start = System.currentTimeMillis();
		return webClient.post()
				.uri(url)
				.contentType(MediaType.APPLICATION_JSON)
				.cookies(cookies -> cookies.setAll(parameter.getCookies()))
				.body(Mono.just(parameter.getJsonBody()), ParameterizedTypeReference.forType(parameter.getBodyType()))
				.headers(headers -> headers.setAll(parameter.getHeaders()))
				.accept(MediaType.APPLICATION_JSON)
				.retrieve()
				.bodyToMono(clazz)
				.doOnSuccess(response-> {
					log.info("post.json.success, url={}, response={}, param={}", url, response, parameter.getJsonBody());
				})
				.doOnError(error-> {
					log.error("get.param.error, url={}, param={}", url, parameter.getJsonBody(), error);
				})
				.onErrorReturn(defaultClass)
				.doFinally(res-> {
				})
				.publishOn(customScheduler);

	}


	public <T> Mono<T> postFormData(String url, HttpParameter parameter, Class<T> clazz, T defaultClass) {
		final long start = System.currentTimeMillis();
		return webClient.post()
				.uri(url)
				.contentType(MediaType.APPLICATION_FORM_URLENCODED)
				.cookies(cookies -> cookies.setAll(parameter.getCookies()))
				.body(BodyInserters.fromFormData(parameter.getMultiValueMapParam()))
				.headers(headers -> headers.setAll(parameter.getMapHeaders()))
				.accept(MediaType.APPLICATION_JSON)
				.retrieve()
				.bodyToMono(clazz)
				.doOnSuccess(response-> {
					log.info("post.fromData.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(parameter));
				})
				.doOnError(error-> {
					log.info("get.param.error, url={}, param={}", url, JsonUtil.toJson(parameter), error);
				})
				.onErrorReturn(defaultClass)
				.doFinally(res-> {
				})
				.publishOn(customScheduler);
	}

}

到了这里,关于Reactor 第十篇 定制一个生产的WebClient的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Kubernetes】第十篇 - 灰度发布的介绍与实现

    前几篇,已经介绍了环境搭建、Deployment 部署对象、Service 服务、Ingress 路由转发; 本篇,介绍灰度发布的实现; 灰度发布,也叫金丝雀发布;是一种应用的发布方式; 金丝雀发布的命名:金丝雀对瓦斯气体非常敏感,矿工在下井前会先向井里放一只金丝雀,如果金丝雀不叫

    2024年02月16日
    浏览(38)
  • 第十篇:如何通过开源框架搭建自己的区块链网络?

    作者:禅与计算机程序设计艺术 区块链是由分布式系统技术所构建、管理、维护、保障的数据共享、共识机制、透明可验证等特性的新型计算机分布式基础设施技术。从某种程度上说,区块链可以看做一种去中心化的数据库,记录着所有参与者间的历史交易记录。每一条数据

    2024年02月08日
    浏览(36)
  • 【夜深人静学数据结构与算法 | 第十篇】动态规划

    目录 前言: 动态规划: 常见应用: 解题步骤:  动态规划的简化步骤: 案例: 509. 斐波那契数 - 力扣(LeetCode) 70. 爬楼梯 - 力扣(LeetCode) 62. 不同路径 - 力扣(LeetCode) 总结:         本文我们将为大家讲解一下动态规划的理论知识,并且会讲解几道力扣的经典例题。

    2024年02月11日
    浏览(53)
  • 【Python入门系列】第十篇:Python图像处理和计算机视觉

    图像处理和计算机视觉是计算机科学中非常重要的领域之一。Python作为一种功能强大且易于学习的编程语言,提供了许多用于图像处理和计算机视觉的库和工具。本文将介绍一些常用的Python库,并提供一些示例代码。 Python中有几个流行的图像处理库,其中最常用的是OpenCV和

    2024年02月12日
    浏览(39)
  • 【Git技巧】第十篇 解决每次git clone都需要输入账号密码

    目录 1、遇到问题 2、问题解决 每次 git clone 时都需要输入账号、密码。 配置全局开机认证信息保存: 可以永久保存。除非用命令解除。 然后在输入账号密码之后,后面每次克隆不需要输入。 谨记:密码是gitHub或gitlab上的访问令牌。

    2024年02月11日
    浏览(44)
  • 【Spring进阶系列丨第十篇】基于注解的面向切面编程(AOP)详解

    ​ 注意,该类的两个细节: a、@Component注解向容器中注册一个Bean。 b、@Aspect注解表示这个是一个切面类。 c、@Before注解表示的是这个是前置增强/前置通知。 ​ 注意:对于业务Bean,我们也需要通过@Service注解来向容器中注册。 ​ 问题:我们看到对于切面类中定义的通知,有

    2024年04月23日
    浏览(50)
  • 第十篇博文:An Overview of HumanCentered AI and Crowdsourcing

    作者:禅与计算机程序设计艺术 随着人类活动规模的扩大、信息化程度的提高、社会经济生产力水平的提升,以及人类对资源的需求日益增长,我们已经可以进行多种多样的应用,包括从金融到医疗、教育、娱乐、科技等,都离不开计算机算法的帮助。同时,在这个过程中,

    2024年02月07日
    浏览(36)
  • 第十篇【传奇开心果系列】Ant Design Mobile of React 开发移动应用:涉及到的相关基础知识介绍和示例

    第一篇【传奇开心果系列】Ant Design Mobile of React 开发移动应用:从helloworld开始 第二篇【传奇开心果系列】Ant Design Mobile of React 开发移动应用:天气应用 第三篇【传奇开心果系列】Ant Design Mobile of React 开发移动应用:健身追踪 第四篇【传奇开心果系列】Ant Design Mobile of React 开发移

    2024年01月20日
    浏览(52)
  • GPT生产实践之定制化翻译

    GPT除了能用来聊天以外,其实功能非常强大,但是我们如何把它运用到生产实践中去,为公司带来价值呢?下面一个使用案例–使用gpt做专业领域定制化翻译 思路: 定制化:有些公司词条的翻译我们想要定制翻译成公司的slogn简称,比如运去哪翻译成YQN,这在其他翻译引擎里

    2024年02月11日
    浏览(40)
  • 三防平板定制服务:亿道信息与个性化生产的紧密结合

    在当今数字化时代,个性化定制已经成为了市场的一大趋势,而三防平板定制服务作为其中的一部分,展现了数字化技术与个性化需求之间的紧密结合。这种服务是通过亿道信息所提供的技术支持,为用户提供了满足特定需求的定制化三防平板,从而使得产品更符合用户的个

    2024年04月09日
    浏览(55)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包