JAVA连接Kafka
1、Maven驱动(注意一定要对应自己的Kafka版本)
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.5.0</version>
</dependency>
2、生产者生产数据
2.1 普通方式创建Producer
package com.tyhh.test;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
/**
* @author:
* @version: v1.0
* @description:
* @date:
**/
public class KafkaProducerTest {
public static void main(String[] args) {
Properties props = new Properties();
String topic = "my-topic";
//连接地址
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put("acks", "all");//所有副本写入该消息才算成功
props.put("retries", 0);//retries=MAX 无限尝试
props.put("batch.size", 16384);//默认批量处理消息字节数
props.put("linger.ms", 1);//延时1ms发送
props.put("buffer.memory", 33554432);//缓冲区大小
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//序列化
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "topic_key_" + i, "topic_value_" + i);
Future<RecordMetadata> metadataFuture = producer.send(record);
RecordMetadata recordMetadata = null;
try {
recordMetadata = metadataFuture.get();
System.out.println("发送成功!");
System.out.println("topic:" + recordMetadata.topic());
System.out.println("partition:" + recordMetadata.partition());
System.out.println("offset:" + recordMetadata.offset());
} catch (Exception e) {
System.out.println("发送失败!");
e.printStackTrace();
}
}
producer.flush();
producer.close();
}
}
2.2 ssl加密和认证创建Producer(Plain)
package com.tyhh.test;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
/**
* @author:
* @version: v1.0
* @description:
* @date:
**/
public class KafkaProducerTest {
public static void main(String[] args) {
Properties props = new Properties();
String user = "admin";
String password = "admin";
String topic = "my-topic";
//连接地址
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put("acks", "all");//所有副本写入该消息才算成功
props.put("retries", 0);//retries=MAX 无限尝试
props.put("batch.size", 16384);//默认批量处理消息字节数
props.put("linger.ms", 1);//延时1ms发送
props.put("buffer.memory", 33554432);//缓冲区大小
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//序列化
//ssl加密和认证
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule "
+ "required username=\"" + user + "\" password=\"" + password + "\";");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "topic_key_" + i, "topic_value_" + i);
Future<RecordMetadata> metadataFuture = producer.send(record);
RecordMetadata recordMetadata = null;
try {
recordMetadata = metadataFuture.get();
System.out.println("发送成功!");
System.out.println("topic:" + recordMetadata.topic());
System.out.println("partition:" + recordMetadata.partition());
System.out.println("offset:" + recordMetadata.offset());
} catch (Exception e) {
System.out.println("发送失败!");
e.printStackTrace();
}
}
producer.flush();
producer.close();
}
}
2.3 ssl加密和认证创建Producer(Plain使用配置文件方式)
kafka_client_jaas_plain配置文件信息:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";
};
具体代码实现:
package com.tyhh.test;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
/**
* @author:
* @version: v1.0
* @description:
* @date:
**/
public class KafkaProducerTest {
public static void main(String[] args) {
//使用配置文件方式进行ssl认证
System.setProperty("java.security.auth.login.config","./kafka_ssl_conf/kafka_client_jaas_plain.conf");
Properties props = new Properties();
String topic = "my-topic";
//连接地址
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put("acks", "all");//所有副本写入该消息才算成功
props.put("retries", 0);//retries=MAX 无限尝试
props.put("batch.size", 16384);//默认批量处理消息字节数
props.put("linger.ms", 1);//延时1ms发送
props.put("buffer.memory", 33554432);//缓冲区大小
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//序列化
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "topic_key_" + i, "topic_value_" + i);
Future<RecordMetadata> metadataFuture = producer.send(record);
RecordMetadata recordMetadata = null;
try {
recordMetadata = metadataFuture.get();
System.out.println("发送成功!");
System.out.println("topic:" + recordMetadata.topic());
System.out.println("partition:" + recordMetadata.partition());
System.out.println("offset:" + recordMetadata.offset());
} catch (Exception e) {
System.out.println("发送失败!");
e.printStackTrace();
}
}
producer.flush();
producer.close();
}
}
2.4 ssl加密和认证创建Producer(Scram)
package com.tyhh.test;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
/**
* @author:
* @version: v1.0
* @description:
* @date:
**/
public class KafkaProducerTest {
public static void main(String[] args) {
Properties props = new Properties();
String user = "admin";
String password = "admin";
String topic = "my-topic";
//连接地址
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put("acks", "all");//所有副本写入该消息才算成功
props.put("retries", 0);//retries=MAX 无限尝试
props.put("batch.size", 16384);//默认批量处理消息字节数
props.put("linger.ms", 1);//延时1ms发送
props.put("buffer.memory", 33554432);//缓冲区大小
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//序列化
//ssl加密和认证
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule "
+ "required username=\"" + user + "\" password=\"" + password + "\";");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "topic_key_" + i, "topic_value_" + i);
Future<RecordMetadata> metadataFuture = producer.send(record);
RecordMetadata recordMetadata = null;
try {
recordMetadata = metadataFuture.get();
System.out.println("发送成功!");
System.out.println("topic:" + recordMetadata.topic());
System.out.println("partition:" + recordMetadata.partition());
System.out.println("offset:" + recordMetadata.offset());
} catch (Exception e) {
System.out.println("发送失败!");
e.printStackTrace();
}
}
producer.flush();
producer.close();
}
}
2.5 ssl加密和认证创建Producer(Scram使用配置文件方式)
kafka_client_jaas_scram配置文件信息:
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};
具体代码实现:文章来源地址https://www.toymoban.com/news/detail-526020.html
package com.tyhh.test;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
/**
* @author:
* @version: v1.0
* @description:
* @date:
**/
public class KafkaProducerTest {
public static void main(String[] args) {
//使用配置文件方式进行ssl认证
System.setProperty("java.security.auth.login.config","./kafka_ssl_conf/kafka_client_jaas_scram.conf");
Properties props = new Properties();
String topic = "my-topic";
//连接地址
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put("acks", "all");//所有副本写入该消息才算成功
props.put("retries", 0);//retries=MAX 无限尝试
props.put("batch.size", 16384);//默认批量处理消息字节数
props.put("linger.ms", 1);//延时1ms发送
props.put("buffer.memory", 33554432);//缓冲区大小
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//序列化
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "topic_key_" + i, "topic_value_" + i);
Future<RecordMetadata> metadataFuture = producer.send(record);
RecordMetadata recordMetadata = null;
try {
recordMetadata = metadataFuture.get();
System.out.println("发送成功!");
System.out.println("topic:" + recordMetadata.topic());
System.out.println("partition:" + recordMetadata.partition());
System.out.println("offset:" + recordMetadata.offset());
} catch (Exception e) {
System.out.println("发送失败!");
e.printStackTrace();
}
}
producer.flush();
producer.close();
}
}
3、消费者消费数据
3.1 普通方式创建消费者
package com.tyhh.test;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
/**
* @author:
* @version: v1.0
* @description:
* @date:
**/
public class KafkaConsumerTest {
public static void main(String[] args) {
String topic = "my-topic";
String groupId = "my-group";
String autoCommit = "true";
String offsetReset = "earliest";
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put("group.id", groupId);
//是否自动提交偏移量
props.put("enable.auto.commit", autoCommit);
props.put("auto.offset.reset", offsetReset);
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
//序列化方式
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000L);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n", record.partition(),
record.offset(), record.key(), record.value());
}
}
}
}
3.2 ssl加密和认证创建Consumer(Plain)
package com.tyhh.test;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
/**
* @author:
* @version: v1.0
* @description:
* @date:
**/
public class KafkaConsumerTest {
public static void main(String[] args) {
String user = "admin";
String password = "admin";
String topic = "my-topic";
String groupId = "my-group";
String autoCommit = "true";
String offsetReset = "earliest";
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put("group.id", groupId);
//是否自动提交偏移量
props.put("enable.auto.commit", autoCommit);
props.put("auto.offset.reset", offsetReset);
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
//序列化方式
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//ssl加密和认证
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule "
+ "required username=\"" + user + "\" password=\"" + password + "\";");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000L);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n", record.partition(),
record.offset(), record.key(), record.value());
}
}
}
}
3.3 ssl加密和认证创建Producer(Plain使用配置文件方式)
kafka_client_jaas_plain配置文件信息:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";
};
具体代码实现:
package com.tyhh.test;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
/**
* @author:
* @version: v1.0
* @description:
* @date:
**/
public class KafkaConsumerTest {
public static void main(String[] args) {
//使用配置文件方式进行ssl认证
System.setProperty("java.security.auth.login.config","./kafka_ssl_conf/kafka_client_jaas_plain.conf");
String topic = "my-topic";
String groupId = "my-group";
String autoCommit = "true";
String offsetReset = "earliest";
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put("group.id", groupId);
//是否自动提交偏移量
props.put("enable.auto.commit", autoCommit);
props.put("auto.offset.reset", offsetReset);
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
//序列化方式
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000L);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n", record.partition(),
record.offset(), record.key(), record.value());
}
}
}
}
3.4 ssl加密和认证创建Producer(Scram)
package com.tyhh.test;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
/**
* @author:
* @version: v1.0
* @description:
* @date:
**/
public class KafkaConsumerTest {
public static void main(String[] args) {
String user = "admin";
String password = "admin";
String topic = "my-topic";
String groupId = "my-group";
String autoCommit = "true";
String offsetReset = "earliest";
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put("group.id", groupId);
//是否自动提交偏移量
props.put("enable.auto.commit", autoCommit);
props.put("auto.offset.reset", offsetReset);
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
//序列化方式
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//ssl加密和认证
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required username='"+user+"' password='"+password+"';");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000L);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n", record.partition(),
record.offset(), record.key(), record.value());
}
}
}
}
3.5 ssl加密和认证创建Producer(Scram使用配置文件方式)
kafka_client_jaas_scram配置文件信息:文章来源:https://www.toymoban.com/news/detail-526020.html
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};
具体代码实现:
package com.tyhh.test;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
/**
* @author:
* @version: v1.0
* @description:
* @date:
**/
public class KafkaConsumerTest {
public static void main(String[] args) {
//使用配置文件方式进行ssl认证
System.setProperty("java.security.auth.login.config","./kafka_ssl_conf/kafka_client_jaas_scram.conf");
String topic = "my-topic";
String groupId = "my-group";
String autoCommit = "true";
String offsetReset = "earliest";
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put("group.id", groupId);
//是否自动提交偏移量
props.put("enable.auto.commit", autoCommit);
props.put("auto.offset.reset", offsetReset);
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
//序列化方式
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000L);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n", record.partition(),
record.offset(), record.key(), record.value());
}
}
}
}
到了这里,关于JAVA连接Kafka及SSL认证的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!