kafka是什么?
是一种高吞吐量的、分布式、发布、订阅、消息系统
1.导入maven坐标
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
2.编写提供者
public class KafkaProducer {
public static void main(String[] args) {
Properties prop = new Properties();
prop.put("bootstrap.servers", "localhost:9092");
prop.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
prop.put("acks", "all");
prop.put("retries", 0);
prop.put("batch.size", 16384);
prop.put("linger.ms", 1);
prop.put("buffer.memory", 33554432);
String topic = "hello"; // 主题
org.apache.kafka.clients.producer.KafkaProducer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(prop);
producer.send(new ProducerRecord<String, String>(topic, Integer.toString(2), "hello kafka"));
producer.close();
}
}
3.编写消费者
public class KafkaConsumer {
public static void main(String[] args) throws InterruptedException {
Properties prop = new Properties();
prop.put("bootstrap.servers", "192.168.8.166:9092");
prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.put("group.id", "con-1");
prop.put("auto.offset.reset", "latest");
//自动提交偏移量
prop.put("auto.commit.intervals.ms", "true");
//自动提交时间
prop.put("auto.commit.interval.ms", "1000");
org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(prop);
ArrayList<String> topics = new ArrayList<>();
//可以订阅多个消息
topics.add("hello");
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(20));
for (ConsumerRecord<String, String> consumerRecord : poll) {
System.out.println(consumerRecord);
System.out.println(consumerRecord.key());
System.out.println(consumerRecord.value());
System.out.println(consumerRecord.topic());
}
}
}
}
4.下载kafka
点此去官网下载——>Apache Kafka
解压后进入config目录
修改zookeeper.properties
dataDir=D:/kafka_2.13-3.5.1/tmp/zookeeper
修改日志存放的路径server.properties
log.dirs=D:/kafka_2.13-3.5.1/tmp/kafka-logs
启动zookeeper服务文章来源:https://www.toymoban.com/news/detail-682907.html
zookeeper-server-start.bat ../../config/zookeeper.properties
启动kafka服务文章来源地址https://www.toymoban.com/news/detail-682907.html
kafka-server-start.bat ../../config/server.properties
5.依次启动消费者和生产者,查看发布的消息
到了这里,关于kafka消息系统实战的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!