Springboot Kafka整合(开发实例、连接、配置TOPICS、发送消息)—官方原版

这篇具有很好参考价值的文章主要介绍了Springboot Kafka整合(开发实例、连接、配置TOPICS、发送消息)—官方原版。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、概念

Spring for Apache Kafka项目将Spring的核心概念应用于基于Kafka的消息传递解决方案的开发。我们提供了一个“模板”作为发送消息的高级抽象。

二、开发环境准备

1、Kafka客户端版本

本快速教程适用于以下版本:

  • Apache Kafka 客户端 3.3.x

  • Spring Framework 6.0.x

  • 最低 Java 版本:17

Springboot Kafka整合(开发实例、连接、配置TOPICS、发送消息)—官方原版,kafka,kafka

 2、引入依赖

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>3.0.5</version>
</dependency>

 3、配置application.yml

spring:
  kafka:
    # kafka连接地址
    bootstrap-servers: 192.168.1.1:9092
    producer:
      # 发生错误后,消息重发的次数。
      retries: 0
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: 1
    consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
      auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 5
      #listner负责ack,每调用一次,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false
# 自定义的kafka配置项
kafka:
  topic:
    name: TEST_TOPIC
  group:
    id: TEST_GROUP

三、Spring Boot Consumer App

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1")
                .partitions(10)
                .replicas(1)
                .build();
    }

    @KafkaListener(id = "myId", topics = "topic1")
    public void listen(String in) {
        System.out.println(in);
    }

}

 四、Spring Boot Producer App

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1")
                .partitions(10)
                .replicas(1)
                .build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("topic1", "test");
        };
    }

}

五、使用 Java 配置(无 Spring boot)

以下是一个不使用Spring Boot的应用程序示例;它既有消费者也有生产者。

public class Sender {

	public static void main(String[] args) {
		AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
		context.getBean(Sender.class).send("test", 42);
	}

	private final KafkaTemplate<Integer, String> template;

	public Sender(KafkaTemplate<Integer, String> template) {
		this.template = template;
	}

	public void send(String toSend, int key) {
		this.template.send("topic1", key, toSend);
	}

}

public class Listener {

    @KafkaListener(id = "listen1", topics = "topic1")
    public void listen1(String in) {
        System.out.println(in);
    }

}

@Configuration
@EnableKafka
public class Config {

    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProps());
    }

    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // ...
        return props;
    }

    @Bean
    public Sender sender(KafkaTemplate<Integer, String> template) {
        return new Sender(template);
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }

    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(senderProps());
    }

    private Map<String, Object> senderProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //...
        return props;
    }

    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
        return new KafkaTemplate<Integer, String>(producerFactory);
    }

}

正如您所看到的,当不使用Spring Boot时,您必须定义几个基础设施bean。

六、Springg整合Apache Kafka

1、连接Kafka

从2.5版本开始,每一个都扩展了KafkaResourceFactory。这允许在运行时更改引导服务器,方法是在其配置中添加一个Supplier<String>:setBootstrapServersSupplier(()→ …​). 这将对所有新连接调用,以获取服务器列表。消费者和生产者通常寿命较长。要关闭现有的Producers,请在DefaultKafkaProducerFactory上调用reset()。要关闭现有的Consumers,请在KafkaListenerEndpointRegistry上调用stop()(然后调用start()),和/或在任何其他侦听器容器bean上调用stop()和start()。
为了方便起见,该框架还提供了一个ABSwitchCluster,它支持两组引导服务器;其中一个在任何时候都是活动的。通过调用setBootstrapServersSupplier(),配置ABSwitchCluster并将其添加到生产者和消费者工厂以及KafkaAdmin。当你想切换时,在生产者工厂上调用primary()或secondary()并调用reset()来建立新的连接;对于消费者,停止()和启动()所有侦听器容器。当使用@KafkaListener时,停止()和启动()KafkaListenerEndpointRegistry bean。

Factory Listeners

从2.5版本开始,DefaultKafkaProducerFactory和DefaultKafka ConsumerFactory可以配置一个监听器,以便在创建或关闭生产者或消费者时接收通知。

Producer Factory Listener        

interface Listener<K, V> {

    default void producerAdded(String id, Producer<K, V> producer) {
    }

    default void producerRemoved(String id, Producer<K, V> producer) {
    }

}

Consumer Factory Listener

interface Listener<K, V> {

    default void consumerAdded(String id, Consumer<K, V> consumer) {
    }

    default void consumerRemoved(String id, Consumer<K, V> consumer) {
    }

}

在每种情况下,id都是通过将客户端id属性(创建后从metrics()获得)附加到工厂beanName属性来创建的,用..分隔。。
例如,当创建新客户端时,可以使用这些监听器来创建和绑定Micrometer KafkaClientMetrics实例(并在客户端关闭时关闭它)。

2、配置Topics

如果您在应用程序上下文中定义了一个KafkaAdminbean,它可以自动向代理添加主题。为此,您可以为应用程序上下文中的每个主题添加一个NewTopic@Bean。2.3版引入了一个新的类TopicBuilder,使创建此类bean更加方便。以下示例显示了如何执行此操作:

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic1() {
    return TopicBuilder.name("thing1")
            .partitions(10)
            .replicas(3)
            .compact()
            .build();
}

@Bean
public NewTopic topic2() {
    return TopicBuilder.name("thing2")
            .partitions(10)
            .replicas(3)
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}

@Bean
public NewTopic topic3() {
    return TopicBuilder.name("thing3")
            .assignReplicas(0, Arrays.asList(0, 1))
            .assignReplicas(1, Arrays.asList(1, 2))
            .assignReplicas(2, Arrays.asList(2, 0))
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}

从2.6版开始,您可以省略.partitions()和/或replicas(),代理默认值将应用于这些财产。代理版本必须至少为2.4.0才能支持此功能-请参阅KIP-464。

@Bean
public NewTopic topic4() {
    return TopicBuilder.name("defaultBoth")
            .build();
}

@Bean
public NewTopic topic5() {
    return TopicBuilder.name("defaultPart")
            .replicas(1)
            .build();
}

@Bean
public NewTopic topic6() {
    return TopicBuilder.name("defaultRepl")
            .partitions(3)
            .build();
}

从2.7版本开始,您可以在一个KafkaAdmin.NewTopics bean定义中声明多个NewTopic:

@Bean
public KafkaAdmin.NewTopics topics456() {
    return new NewTopics(
            TopicBuilder.name("defaultBoth")
                .build(),
            TopicBuilder.name("defaultPart")
                .replicas(1)
                .build(),
            TopicBuilder.name("defaultRepl")
                .partitions(3)
                .build());
}

默认情况下,如果代理不可用,则会记录一条消息,但上下文会继续加载。您可以通过编程方式调用admin的initialize()方法,稍后再试。如果您希望将此情况视为致命,请将管理员的fatalIfBrokerNotAvailable属性设置为true。然后上下文无法初始化。

从2.7版本开始,KafkaAdmin提供了在运行时创建和检查主题的方法

  • 创建或修改主题
  • 描述主题

对于更高级的功能,您可以直接使用AdminClient。以下示例显示了如何执行此操作:

@Autowired
private KafkaAdmin admin;

...

    AdminClient client = AdminClient.create(admin.getConfigurationProperties());
    ...
    client.close();

3、发送消息 

3.1 用KafkaTemplate

KafkaTemplate包装了一个生产者,并提供了向Kafka主题发送数据的方便方法。以下列表显示了KafkaTemplate中的相关方法:

CompletableFuture<SendResult<K, V>> sendDefault(V data);

CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, V data);

CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

CompletableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

// Flush the producer.

void flush();

interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}

sendDefault API要求为模板提供一个默认主题。
API将时间戳作为参数,并将此时间戳存储在记录中。如何存储用户提供的时间戳取决于在Kafka主题上配置的时间戳类型。如果将主题配置为使用CREATE_TIME,则会记录用户指定的时间戳(如果未指定,则会生成时间戳)。如果将主题配置为使用LOG_APPEND_TIME,则会忽略用户指定的时间戳,并且代理会添加本地代理时间。
方法的metrics和partitionsFor委托给基础Producer上的相同方法。execute方法提供对基础Producer的直接访问。
要使用该模板,您可以配置一个生产者工厂,并在模板的构造函数中提供它。以下示例显示了如何执行此操作:

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}

从2.5版开始,您现在可以覆盖工厂的ProducerConfig财产,以使用同一工厂的不同生产商配置创建模板。

@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
    return new KafkaTemplate<>(pf);
}

@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
    return new KafkaTemplate<>(pf,
            Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}

请注意,ProducerFactory<?,?>类型的bean(例如Spring Boot自动配置的)可以用不同的缩小的泛型类型引用。
您还可以使用标准的<bean/>定义来配置模板。
然后,要使用该模板,您可以调用它的一个方法。
当您将方法与消息一起使用时<?>参数、主题、分区和密钥信息在消息头中提供,该消息头包括以下项目:

  • KafkaHeaders.TOPIC(卡夫卡标题主题)
  • KafkaHeaders.PARTITION(Kafka标头分区)
  • KafkaHeaders.KEY
  • KafkaHeaders.TIMESTAMP(卡夫卡标题时间戳)

消息有效载荷是数据。
或者,您可以使用ProducerListener配置KafkaTemplate,以获得带有发送结果(成功或失败)的异步回调,而不是等待Future完成。以下列表显示了ProducerListener接口的定义:

public interface ProducerListener<K, V> {

    void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);

    void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
            Exception exception);

}

默认情况下,该模板配置有LoggingProducerListener,它会记录错误,并且在发送成功时不执行任何操作。
为了方便起见,在您只想实现其中一个方法的情况下,提供了默认的方法实现。
请注意,send方法返回一个CompletableFuture<SendResult>。您可以向侦听器注册回调,以异步接收发送的结果。以下示例显示了如何执行此操作:

CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
    ...
});

SendResult有两个财产,即ProducerRecord和RecordMetadata。有关这些对象的信息,请参阅KafkaAPI文档。
Throwable可以强制转换为KafkaProducerException;其failedProducerRecord属性包含失败的记录。
如果您希望阻止发送线程等待结果,您可以调用future的get()方法;建议使用带有超时的方法。如果您设置了linger.ms,您可能希望在等待之前调用flush(),或者为了方便起见,模板有一个带有autoFlush参数的构造函数,该参数会导致模板在每次发送时flush(()。只有在设置了linger.ms生产者属性并希望立即发送部分批次时,才需要刷新。

3.2 例子

本节显示了向 Kafka 发送消息的示例:

例 5.非阻塞(异步)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    CompletableFuture<SendResult<Integer, String>> future = template.send(record);
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            handleSuccess(data);
        }
        else {
            handleFailure(data, record, ex);
        }
    });
}

阻止(同步)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

请注意,ExecutionException的原因是具有failedProducerRecord属性的KafkaProducerException。

3.3 使用RoutingKafkaTemplate

从2.5版本开始,您可以使用RoutingKafkaTemplate在运行时根据目标主题名称选择生产者。

该模板需要java.util.regex.Pattern到ProducerFactory<Object,Object>实例的映射。应该对该映射进行排序(例如LinkedHashMap),因为它是按顺序遍历的;您应该在开始时添加更具体的模式。
以下简单的Spring Boot应用程序提供了一个示例,说明如何使用相同的模板发送到不同的主题,每个主题使用不同的值序列化程序。

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
            ProducerFactory<Object, Object> pf) {

        // Clone the PF with a different Serializer, register with Spring for shutdown
        Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
        context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);

        Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
        map.put(Pattern.compile("two"), bytesPF);
        map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
        return new RoutingKafkaTemplate(map);
    }

    @Bean
    public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
        return args -> {
            routingTemplate.send("one", "thing1");
            routingTemplate.send("two", "thing2".getBytes());
        };
    }

}

3.4 使用 DefaultKafkaProducerFactory

如使用KafkaTemplate中所示,ProducerFactory用于创建生产者。
默认情况下,当不使用Transactions时,DefaultKafkaProducerFactory会创建一个供所有客户端使用的单例生产者,如Kafka生产者javadocs中所建议的那样。但是,如果您在模板上调用flush(),这可能会导致使用同一生成器的其他线程延迟。从2.3版本开始,DefaultKafkaProducerFactory有了一个新的属性producerPerThread。当设置为true时,工厂将为每个线程创建(并缓存)一个单独的生产者,以避免此问题。

另请参阅KafkaTemplate事务性和非事务性发布。
创建DefaultKafkaProducerFactory时,可以通过调用只接受财产映射的构造函数从配置中获取键和/或值Serializer类(请参阅使用KafkaTemplate中的示例),也可以将Serializers实例传递给DefaultKaf kaProducer Factory构造函数(在这种情况下,所有Producer共享相同的实例)。或者,您可以提供供应商<Serializer>(从版本2.3开始),用于为每个生产商获取单独的Serializer实例:

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

从2.5.10版开始,现在可以在创建工厂后更新生产者财产。例如,如果在凭据更改后必须更新SSL密钥/信任存储位置,这可能会很有用。这些变化不会影响现有的生产者实例;调用reset()关闭所有现有生产者,以便使用新的财产创建新生产者。注意:不能将事务性生产者工厂更改为非事务性,反之亦然。
现在提供了两种新方法:

void updateConfigs(Map<String, Object> updates);

void removeConfig(String configKey);

从2.8版开始,如果您将序列化程序作为对象提供(在构造函数中或通过setter),工厂将调用configure()方法以使用配置财产配置它们。

3.5 Reply Type Message<?>

当@KafkaListener返回消息<?>时,对于2.5之前的版本,有必要填充回复主题和关联id头。在本例中,我们使用请求中的回复主题标头:

@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .build();
}

这也显示了如何在回复记录上设置密钥。
从2.5版本开始,框架将检测这些头是否丢失,并用主题填充它们——根据@SendTo值确定的主题或传入的KafkaHeaders.REPLY_topic头(如果存在)。它还将回显传入的KafkaHeaders.CORRELATION_ID和KafkaHeaders.REPLY_PARTITION(如果存在)。文章来源地址https://www.toymoban.com/news/detail-740944.html

@KafkaListener(id = "requestor", topics = "request")
@SendTo  // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.KEY, 42)
            .build();
}

3.6 使用 ReplyingKafkaTemplate

到了这里,关于Springboot Kafka整合(开发实例、连接、配置TOPICS、发送消息)—官方原版的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • HarmonyOS鸿蒙开发指南:UI开发 基于ArkTS的声明式开发范式 声明式UI开发实例 页面布局与连接

    目录 构建食物数据模型 构建食物列表List布局 构建食物分类Grid布局  页面跳转与数据传递

    2024年02月03日
    浏览(54)
  • 微信小程序 | 微信公众平台SpringBoot开发实例 │ 模板消息的应用开发

     在手机微信公众号中输入文本(如“你好”),公众号发送两条模板消息,如下图所示。 模板消息用来帮助公众号进行业务通知,是在模板内容中设定参数(参数必须以{ {开头,且以.DATA} }结尾)并在调用时为这些参数赋值并发送的消息。模板消息仅用于向用户发送重要的服务

    2024年02月03日
    浏览(51)
  • 如何使用平板连接服务器实现cpolar开发?【内网穿透实例】

    在入手iPad Pro后,为了防止“买前生产力,买后爱奇艺”,我们可以在Linux服务器上搭建code server,然后在iPad Pro上通过浏览器或者APP,来远程linux服务器,使用服务器的资源来编译代码,而iPad Pro前端只需要负责写代码编程就可以啦。 以及在实现局域网下的远程访问后,我们还

    2023年04月09日
    浏览(42)
  • 很合适新手入门使用的Python游戏开发包pygame实例教程-01[开发环境配置与第一个界面]

    我们假定你已经安装好了我们开发python程序的sublime text,如果不知道怎么安装的可以参照我前面的博文。这里只需要解决的是配置好Pygame的问题。本篇博文主要解决开发环境配置以及第一个游戏界面的显示问题。 文章原出处: https://blog.csdn.net/haigear/article/details/130173836 没有

    2024年01月25日
    浏览(92)
  • FPGA开发之Vivado安装及HLS环境配置,并实现流水灯实例

    HLS(High-Level Synthesis)高层综合,就是将 C/C++的功能用 RTL 来实现,将 FPGA 的组件在一个软件环境中来开发,这个模块的功能验证在软件环境中来实现,无缝的将硬件仿真环境集合在一起,使用软件为中心的工具、报告以及优化设计,很容易的在 FPGA 传统的设计工具中生成 IP。

    2024年02月05日
    浏览(49)
  • 智能合约入门开发实例

    web3开发:前端使用ethers.js调用Hello智能合约。 hello.sol 智能合约文件: dapp.html  前端文件:

    2024年02月02日
    浏览(47)
  • SpringBoot整合Kafka简单配置实现生产消费

    *本文基于SpringBoot整合Kafka,通过简单配置实现生产及消费,包括生产消费的配置说明、消费者偏移设置方式等。更多功能细节可参考 spring kafka 文档:https://docs.spring.io/spring-kafka/docs/current/reference/html 搭建Kafka环境,参考Kafka集群环境搭建及使用 Java环境:JDK1.8 Maven版本:apach

    2024年02月16日
    浏览(46)
  • Kafka动态认证SASL/SCRAM配置+整合springboot配置

    zookeeper启动命令: [root@master-yzjgxh2571705819-1651919082731-99-0727183 bin]# ./zkServer.sh start [root@master-yzjgxh2571705819-1651919082731-99-0727183 bin]# ./zkServer.sh stop kafka启动命令: /data/program/kafka2.12/bin/kafka-server-start.sh /data/program/kafka2.12/config/server.properties 1)创建broker建通信用户:admin(在使用sasl之

    2024年01月16日
    浏览(37)
  • 鸿蒙开发实例 | 分布式涂鸦

    CSDN话题挑战赛第2期 参赛话题:学习笔记  本篇文章介绍分布式设备间如何共享涂鸦画板的核心功能。 在涂鸦画板中有3个核心功能:     (1) 涂鸦者选择好希望连接的设备后,可以直接把涂鸦成果流转给对应的设备。     (2) 其他设备接收流转的涂鸦后,可以在涂鸦的基础上添

    2024年02月09日
    浏览(50)
  • 安卓开发实例:方向传感器

    调用手机的方向传感器,X轴,Y轴,Z轴的数值 activity_sensor.xml Sensor.java

    2024年02月06日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包