构建消息
构建消息,即创建 ProduceRecord 对象。
(1) kafka发送消息,最常见的构造方法是:
public ProducerRecord(String topic,V value)
topic 表示主题, value 表示值。
(2) kafka发送消息指定key,ProducerRecord 的 key ,既可以作为消息的唯一id,也可以用来决定消息该被写到主题的哪个分区。拥有相同key 的消息,将被写到同一个分区。
public ProducerRecord(String topic,K key,V value)
(3) kafka发送消息指定分区,如下:
public ProducerRecord(String topic,Integer partition,K key,V value)
发送消息的模式
创建生产者实例和构建消息之后,就可以开始发送消息了。
发送消息主要有三种模式:发后即忘、同步、异步。
发后即忘:
就是直接调用 生产者的 send方法发送。
发后即完,只管往 kafka中发送消息,而不关心消息是否正确到达。
这种发送方式的性能最高,可靠性也最差。
producer.send(record);
具体代码如下:
public class KafkaDemoProducer {
public static final String BROKER_LIST = "localhost:9092";
public static final String TOPIC = "myTopic1";
public static void main(String[] args) {
//属性配置
Properties properties = getProperties(BROKER_LIST);
//生产者初始化
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "hello kafka");
//发送消息
try {
producer.send(record);
System.out.println("========>producer.send(record).");
} catch (Exception e) {
System.out.println("send error." + e);
}
producer.close();
}
private static Properties getProperties(String brokerList) {
Properties properties = new Properties();
properties.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("bootstrap.servers", brokerList);
return properties;
}
}
同步发送:
try {
producer.send(record).get();
} catch (ExecutionException | InterruptedException e) {
log.error("send record get error", e);
}
同步发送的方式可靠性最高,要么消息发送成功,要么发生异常。如果发生异常,会catch并处理异常。
同步发送的性能会差一些,需要阻塞等待一条消息发送完,才能发送下一条。
异步发送:
异步发送,就是在 send 方法里指定一下 Callback 的回调函数。
消息发送成功后,会收到成功的回调。参数 metadata ,为发送成功的消息,相关的信息
如果发送失败,也会收到回调,包含失败的异常信息 exception。
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
log.error("send onCompletion error." , exception);
} else {
log.info(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
}
}
});
kafka入门文章
https://blog.csdn.net/sinat_32502451/category_12465196.html文章来源:https://www.toymoban.com/news/detail-798348.html
kafka入门文章
https://blog.csdn.net/sinat_32502451/category_12465196.html
参考资料:
《深入理解Kafka 核心设计与实践原理》文章来源地址https://www.toymoban.com/news/detail-798348.html
到了这里,关于kafka入门(五):kafka生产者发送消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!