java 连接google cloud pubsub做消息发布和消费

这篇具有很好参考价值的文章主要介绍了java 连接google cloud pubsub做消息发布和消费。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

pom依赖

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
            <version>1.2.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>30.1-jre</version>
        </dependency>

yml配置

pubsub-secret-key.json 为谷歌云服务账号密钥,密钥生成看谷歌云文档

spring:
  cloud:
    gcp:
      pubsub:
        enabled: true
      project-id: project-ID
      credentials:
        location: classpath:pubsub-secret-key.json

java代码文章来源地址https://www.toymoban.com/news/detail-636231.html

import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.PostConstruct;
import org.springframework.stereotype.Service;
import com.allsaints.reco.service.PubSubService;
import com.google.api.core.ApiFuture;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import org.threeten.bp.Duration;

@Service
@Slf4j
public class PubSubServiceImpl implements PubSubService {

	String projectId = "project-ID";
	String subscriptionId = "订阅id";

	@PostConstruct
	public void startSubscriber() throws IOException {
		ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId);

		MessageReceiver receiver = (message, consumer) -> {
			// 处理接收到的消息
			log.info("Received message------------" + message.getData().toStringUtf8());

			// 确认消息已被处理
			consumer.ack();
		};
		GoogleCredentials credentials = GoogleCredentials
				.fromStream(new FileInputStream("src/main/resources/pubsub-secret-key.json"));
		Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).setCredentialsProvider(FixedCredentialsProvider.create(credentials)).build();
		subscriber.startAsync().awaitRunning();
		
		subscriber.stopAsync();
	}

	@Override
	public void publishMessage(String message) {
		Publisher publisher = null;
		try {
			GoogleCredentials credentials = GoogleCredentials.fromStream(new FileInputStream("src/main/resources/pubsub-secret-key.json"));
			ProjectTopicName topicName = ProjectTopicName.of(projectId, "topid-ID");
			FixedCredentialsProvider credentialsProvider = FixedCredentialsProvider.create(credentials);
			 Duration initialRetryDelay = Duration.ofMillis(100); // default: 100 ms
		      double retryDelayMultiplier = 2.0; // back off for repeated failures, default: 1.3
		      Duration maxRetryDelay = Duration.ofSeconds(60); // default : 60 seconds
		      Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 5 seconds
		      double rpcTimeoutMultiplier = 1.0; // default: 1.0
		      Duration maxRpcTimeout = Duration.ofSeconds(600); // default: 600 seconds
		      Duration totalTimeout = Duration.ofSeconds(600); // default: 600 seconds

		      RetrySettings retrySettings =
		          RetrySettings.newBuilder()
		              .setInitialRetryDelay(initialRetryDelay)
		              .setRetryDelayMultiplier(retryDelayMultiplier)
		              .setMaxRetryDelay(maxRetryDelay)
		              .setInitialRpcTimeout(initialRpcTimeout)
		              .setRpcTimeoutMultiplier(rpcTimeoutMultiplier)
		              .setMaxRpcTimeout(maxRpcTimeout)
		              .setTotalTimeout(totalTimeout)
		              .build();
			publisher = Publisher.newBuilder(topicName).setCredentialsProvider(credentialsProvider).setRetrySettings(retrySettings).build();

			ByteString data = ByteString.copyFromUtf8(message);
			PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
			ApiFuture<String> res = publisher.publish(pubsubMessage);
			log.info("pubsub 发布结果 = {}",res);
			log.info("pubsub 发布结果 = {}",res.get());
//			publisher.shutdown();
//			publisher.awaitTermination(30, TimeUnit.SECONDS);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (publisher != null) {
				// When finished with the publisher, shutdown to free up resources.
				publisher.shutdown();
				try {
					publisher.awaitTermination(1, TimeUnit.MINUTES);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		 }
		
	}

	@Override
	public void pullMessages() {
		// 创建订阅名称
		ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId);
		PullRequest pullRequest = PullRequest.newBuilder().setSubscription(subscriptionName.toString()).setMaxMessages(1).build();
		try {
			GoogleCredentials credentials = GoogleCredentials.fromStream(new FileInputStream("src/main/resources/pubsub-secret-key.json"));
			SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder().setCredentialsProvider(() -> credentials).build();
			GrpcSubscriberStub subscriberStub = GrpcSubscriberStub.create(subscriberStubSettings);
			// 发送拉取请求并处理接收到的消息
			List<ReceivedMessage> list = subscriberStub.pullCallable().call(pullRequest).getReceivedMessagesList();
			list.forEach(message -> {
				// 处理消息逻辑
				log.info("Received message: " + message.getMessage().getData().toStringUtf8());
			});
			subscriberStub.shutdown();
		} catch (IOException e) {
			log.error("拉取pubsub消息异常 = {}",e);
		}
//		String projectId = "asofone-composer2-project";
//		String subscriptionId = "java-test-sub";
//		Integer numOfMessages = 10;
//		try {
//			subscribeSyncExample(projectId, subscriptionId, numOfMessages);
//		}catch (Exception e) {
//			log.error("{}",e);
//		}
		
	}
	

	/***************************************************/
//	public static void main(String... args) throws Exception {
//		String projectId = "asofone-composer2-project";
//		String subscriptionId = "java-test-sub";
//		Integer numOfMessages = 10;
//
//		subscribeSyncExample(projectId, subscriptionId, numOfMessages);
//	}

	public static void subscribeSyncExample(String projectId, String subscriptionId, Integer numOfMessages)
			throws IOException {
		GoogleCredentials credentials = GoogleCredentials.fromStream(new FileInputStream("src/main/resources/pubsub-secret-key.json"));
		// 创建 SubscriberStubSettings.Builder 对象
		SubscriberStubSettings.Builder subscriberSettingsBuilder = SubscriberStubSettings.newBuilder();

		// 将 GoogleCredentials 对象设置到 SubscriberStubSettings.Builder 中
		subscriberSettingsBuilder.setCredentialsProvider(FixedCredentialsProvider.create(credentials));

		// 构建 SubscriberStubSettings 对象
//		SubscriberStubSettings subscriberSettings = subscriberSettingsBuilder.build();
		SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder()
				.setTransportChannelProvider(
					SubscriberStubSettings.defaultGrpcTransportProviderBuilder().setMaxInboundMessageSize(20 * 1024 * 1024).build()) // 20MB (maximum message size))
				.build();

		try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
			String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
			PullRequest pullRequest = PullRequest.newBuilder().setMaxMessages(numOfMessages).setSubscription(subscriptionName).build();
			// Use pullCallable().futureCall to asynchronously perform this operation.
			PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);

			// Stop the program if the pull response is empty to avoid acknowledging
			// an empty list of ack IDs.
			if (pullResponse.getReceivedMessagesList().isEmpty()) {
				log.info("消息列表 = {}",pullResponse.getReceivedMessagesList());
				return;
			}

			List<String> ackIds = new ArrayList<>();
			for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
				// Handle received message
				// ...
				ackIds.add(message.getAckId());
			}

			// Acknowledge received messages.
			AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder().setSubscription(subscriptionName)
					.addAllAckIds(ackIds).build();

			// Use acknowledgeCallable().futureCall to asynchronously perform this
			// operation.
			subscriber.acknowledgeCallable().call(acknowledgeRequest);
			System.out.println(pullResponse.getReceivedMessagesList());
		}
	}
}

到了这里,关于java 连接google cloud pubsub做消息发布和消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ(二) - RabbitMQ与消息发布确认与返回、消费确认

    SpringBoot与RabbitMQ整合后,对RabbitClient的“确认”进行了封装、使用方式与RabbitMQ官网不一致; 生产者给交换机发送消息后、若是不管了,则会出现消息丢失; 解决方案1: 交换机接受到消息、给生产者一个答复ack, 若生产者没有收到ack, 可能出现消息丢失,因此重新发送消息;

    2024年02月14日
    浏览(36)
  • 用postman 推送消息到GCP的pubsub

    我们可以用terraform 去创建1个topic 和 2个subscriptions 获得 这个Topic 的publish Rest url 其实GCP products 的 rest url 有规律的 POST https://pubsub.googleapis.com/v1/projects/ {project_id} topics/ {topic_name} :publish 基于某个gcp 账号获得1个临时的token 用个人账号 登录 gcloud 或者 用service account 登录 gcloud 然后

    2024年02月09日
    浏览(34)
  • Sprint Cloud Stream整合RocketMq和websocket实现消息发布订阅

    1. 引入RocketMQ依赖 :首先,在 pom.xml 文件中添加RocketMQ的依赖: 2. 配置RocketMQ连接信息 :在 application.properties 或 application.yml 中配置RocketMQ的连接信息,包括Name Server地址等: 3.消息发布组件 4.消息发布控制器 项目结构: 接下来是websocket模块的搭建 1. 依赖添加 2.application.yml配

    2024年02月08日
    浏览(30)
  • 【RabbitMQ 实战】09 客户端连接集群生产和消费消息

    下面的链接是最快最简单的一种集群部署方法 3分钟部署一个RabbitMQ集群 上的的例子中,没有映射端口,所以没法从宿主机外部连接容器,下面的yml文件中,暴露了端口。 每个容器应用都映射了宿主机的端口,分别是5602,5612,5622 docker compse文件如下 通过docker-compose up命令,

    2024年02月07日
    浏览(31)
  • RabbitMQ学习笔记(消息发布确认,死信队列,集群,交换机,持久化,生产者、消费者)

    MQ(message queue):本质上是个队列,遵循FIFO原则,队列中存放的是message,是一种跨进程的通信机制,用于上下游传递消息。MQ提供“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后消息发送上游只需要依赖MQ,不需要依赖其它服务。 功能1:流量消峰 功能2:应用解耦 功

    2024年02月07日
    浏览(34)
  • EMQX Cloud BYOC 版本发布:在您的云上体验全托管的 MQTT 消息服务

    近日,全球领先的物联网数据基础设施软件供应商 EMQ 映云科技为其旗下全托管 MQTT 消息云服务产品 EMQX Cloud 推出了 BYOC(Bring Your Own Cloud)版本,该版本将允许用户在其现有的云基础架构环境中部署 MQTT 消息服务。用户不仅可以通过 EMQ 团队提供的专业运维管理服务 享受云计

    2023年04月15日
    浏览(23)
  • springboot整合rabbitmq的发布确认,消费者手动返回ack,设置备用队列,以及面试题:rabbitmq确保消息不丢失

    目录 1.生产者发消息到交换机时候的消息确认 2.交换机给队列发消息时候的消息确认 3.备用队列 3.消费者手动ack   rabbitmq的发布确认方式,可以有效的保证我们的数据不丢失。   消息正常发送的流程是:生产者发送消息到交换机,然后交换机通过路由键把消息发送给对应的队

    2024年02月09日
    浏览(60)
  • 使用Node.js连接和发布/订阅MQTT消息

    Node.js是一种基于事件驱动的异步I/O服务器端JavaScript运行环境,因为其非阻塞I/O和事件驱动模型,使得它非常适合处理大量并发请求的场景。MQTT是一种轻量级的消息传递协议,它是基于发布/订阅模式的,适用于传输小量数据,且具有低带宽、低电量消耗和可靠性高等特点。这

    2024年02月06日
    浏览(30)
  • Java实现Kafka消费者及消息异步回调方式

    Kafka 在创建消费者进行消费数据时,由于可以理解成为是一个kafka 的单独线程,所以在Kafka消费数据时想要在外部对消费到的数据进行业务处理时是获取不到的,所以就需要实现一个消息回调的接口来进行数据的保存及使用。 消息回调接口实现代码如下 Kafka消费者代码实现如

    2024年02月06日
    浏览(44)
  • JAVA实时获取kafka各个主题下分区消息的消费情况

    通过指定 主题 和 消费者组 调用方法,实时查看主题下分区消息的消费情况(消息总数量、消费消息数量、未消费的消息数量)。

    2024年02月13日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包