大数据 Ranger2.1.0 适配 Kafka3.4.0

这篇具有很好参考价值的文章主要介绍了大数据 Ranger2.1.0 适配 Kafka3.4.0。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

根据官方说明Kafka3.0以上版本将会被替换权限认证方式,包括 类和方法 的变换,所以需要对ranger中继承 kafka 的实现中,修改相应的逻辑

官方说明

Kafka3.0以上版本将会被替换权限认证方式,包括 类和方法 的变换,

Github PR https://github.com/apache/kafka/pull/10450

大数据 Ranger2.1.0 适配 Kafka3.4.0

POM

apache-ranger\pom.xml,该文件中主要涉及 kafka 的版本及 scala 的版本

<?xml version="1.0" encoding="UTF-8"?>
<project>
...
    <properties>
        ...
        <kafka.version>3.4.0</kafka.version>
        ...
        <scala.version>2.12.10</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
        ...
    </properties>

</project>

apache-ranger\plugin-schema-registry\pom.xml,该文件中主要涉及 kafka 的版本及 scala 的版本

<?xml version="1.0" encoding="UTF-8"?>
<project>
...
    <properties>
        ...
        <kafka.version>3.4.0</kafka.version>
        <kafkaArtifact>kafka_2.12</kafkaArtifact>
        ...
    </properties>
</project>

代码

apache-ranger\plugin-kafka\src\main\java\org\apache\ranger\authorization\kafka\authorizer\RangerKafkaAuthorizer.java

这个类中是主要涉及的位置,包括类、方法等,所以相等于对原有逻辑适配于 kafka 的重构,主要修改包括认证方法 authorize,
判断操作及资源类型等。

说明:
Kaka中 Authorizer 的共有一种子接口(ClusterMetadataAuthorizer)和二种子类(AclAuthorizer 和 StandardAuthorizer),这两个子类是具体实现了方法的功能,由 官网提示 从原来的kafka低版本(3.0以下)使用 SimpleAclAuthorizer 替换成现在的kafka高版本(3.0以上)使用 AclAuthorizer ,所以继承改类,从而实现 Ranger2.1.0 适配 Kafka3.4.0

...

package org.apache.ranger.authorization.kafka.authorizer;

import java.util.*;
import java.util.concurrent.CompletionStage;

import com.google.gson.Gson;
import kafka.security.authorizer.AclAuthorizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.authorizer.*;
import org.apache.kafka.common.acl.*;
import org.apache.kafka.common.resource.*;

import org.apache.commons.lang.StringUtils;
import org.apache.ranger.audit.provider.MiscUtil;
import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
import org.apache.ranger.plugin.policyengine.RangerAccessResult;
import org.apache.ranger.plugin.service.RangerBasePlugin;

import org.apache.ranger.plugin.util.RangerPerfTracer;

public class RangerKafkaAuthorizer extends AclAuthorizer {
	private static final Log logger = LogFactory
			.getLog(RangerKafkaAuthorizer.class);
	private static final Log PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");

	public static final String KEY_TOPIC = "topic";
	public static final String KEY_CLUSTER = "cluster";
	public static final String KEY_CONSUMER_GROUP = "consumergroup";
	public static final String KEY_TRANSACTIONALID = "transactionalid";
	public static final String KEY_DELEGATIONTOKEN = "delegationtoken";

	public static final String ACCESS_TYPE_READ = "consume";
	public static final String ACCESS_TYPE_WRITE = "publish";
	public static final String ACCESS_TYPE_CREATE = "create";
	public static final String ACCESS_TYPE_DELETE = "delete";
	public static final String ACCESS_TYPE_CONFIGURE = "configure";
	public static final String ACCESS_TYPE_DESCRIBE = "describe";
	public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
	public static final String ACCESS_TYPE_ALTER_CONFIGS    = "alter_configs";
	public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
	public static final String ACCESS_TYPE_CLUSTER_ACTION   = "cluster_action";

	private static volatile RangerBasePlugin rangerPlugin = null;
	RangerKafkaAuditHandler auditHandler = null;

	public RangerKafkaAuthorizer() {
	}

	/*
	 * (non-Javadoc)
	 *
	 * @see kafka.security.auth.Authorizer#configure(Map<String, Object>)
	 */
	@Override
	public void configure(Map<String, ?> configs) {
		RangerBasePlugin me = rangerPlugin;
		if (me == null) {
			synchronized(RangerKafkaAuthorizer.class) {
				me = rangerPlugin;
				if (me == null) {
					try {
						// Possible to override JAAS configuration which is used by Ranger, otherwise
						// SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
						// if it's not defined, then it reverts to 'KafkaServer' configuration.
						final Object jaasContext = configs.get("ranger.jaas.context");
						final String listenerName = (jaasContext instanceof String
										&& StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
										: SecurityProtocol.SASL_PLAINTEXT.name();
						final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
						JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
						MiscUtil.setUGIFromJAASConfig(context.name());
						logger.info("LoginUser=" + MiscUtil.getUGILoginUser());
					} catch (Throwable t) {
						logger.error("Error getting principal.", t);
					}
					me = rangerPlugin = new RangerBasePlugin("kafka", "kafka");
				}
			}
		}
		logger.info("Calling plugin.init()");
		rangerPlugin.init();
		auditHandler = new RangerKafkaAuditHandler();
		rangerPlugin.setResultProcessor(auditHandler);
	}

	@Override
	public void close() {
		logger.info("close() called on authorizer.");
		try {
			if (rangerPlugin != null) {
				rangerPlugin.cleanup();
			}
		} catch (Throwable t) {
			logger.error("Error closing RangerPlugin.", t);
		}
	}

	@Override
	public List<AuthorizationResult> authorize(AuthorizableRequestContext authorizableRequestContext, List<Action> list) {

		List<AuthorizationResult> authResults = new ArrayList<>();
		AuthorizableRequestContext session = authorizableRequestContext;

		Gson gson = new Gson();
		logger.info("List<Action> list -> " + gson.toJson(list));
		for (Action actionData : list) {
			ResourcePattern resource = actionData.resourcePattern();
			AclOperation operation = actionData.operation();
			if (rangerPlugin == null) {
				MiscUtil.logErrorMessageByInterval(logger,
						"Authorizer is still not initialized");
				return new ArrayList<>();
			}

			RangerPerfTracer perf = null;

			if (RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
				perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(resource=" + resource + ")");
			}
			String userName = null;
			if (session.principal() != null) {
				userName = session.principal().getName();
			}
			java.util.Set<String> userGroups = MiscUtil
					.getGroupsForRequestUser(userName);
			String ip = session.clientAddress().getHostAddress();

			// skip leading slash
			if (StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') {
				ip = ip.substring(1);
			}

			Date eventTime = new Date();
			String accessType = mapToRangerAccessType(operation);
			boolean validationFailed = false;
			String validationStr = "";

			if (accessType == null) {
				if (MiscUtil.logErrorMessageByInterval(logger,
						"Unsupported access type. operation=" + operation)) {
					logger.error("Unsupported access type. session=" + session
							+ ", operation=" + operation + ", resource=" + resource);
				}
				validationFailed = true;
				validationStr += "Unsupported access type. operation=" + operation;
			}
			String action = accessType;

			RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
			rangerRequest.setUser(userName);
			rangerRequest.setUserGroups(userGroups);
			rangerRequest.setClientIPAddress(ip);
			rangerRequest.setAccessTime(eventTime);

			RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
			rangerRequest.setResource(rangerResource);
			rangerRequest.setAccessType(accessType);
			rangerRequest.setAction(action);
			rangerRequest.setRequestData(resource.name());

			if (resource.resourceType().equals(ResourceType.TOPIC)) {
				rangerResource.setValue(KEY_TOPIC, resource.name());
			} else if (resource.resourceType().equals(ResourceType.CLUSTER)) {
				rangerResource.setValue(KEY_CLUSTER, resource.name());
			} else if (resource.resourceType().equals(ResourceType.GROUP)) {
				rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
			} else if (resource.resourceType().equals(ResourceType.TRANSACTIONAL_ID)) {
				rangerResource.setValue(KEY_TRANSACTIONALID, resource.name());
			} else if (resource.resourceType().equals(ResourceType.DELEGATION_TOKEN)) {
				rangerResource.setValue(KEY_DELEGATIONTOKEN, resource.name());
			} else {
				logger.error("Unsupported resourceType=" + resource.resourceType());
				validationFailed = true;
			}

			AuthorizationResult authResult = AuthorizationResult.DENIED;
			boolean returnValue = false;
			if (validationFailed) {
				MiscUtil.logErrorMessageByInterval(logger, validationStr
						+ ", request=" + rangerRequest);
			} else {

				try {
					RangerAccessResult result = rangerPlugin
							.isAccessAllowed(rangerRequest);
					if (result == null) {
						logger.error("Ranger Plugin returned null. Returning false");
					} else {
						returnValue = result.getIsAllowed();
						authResult = returnValue ? AuthorizationResult.ALLOWED : authResult;
					}
				} catch (Throwable t) {
					logger.error("Error while calling isAccessAllowed(). request="
							+ rangerRequest, t);
				} finally {
					auditHandler.flushAudit();
				}
			}
			RangerPerfTracer.log(perf);

			if (logger.isDebugEnabled()) {
				logger.debug("rangerRequest=" + rangerRequest + ", return="
							+ returnValue);
			}

			authResults.add(authResult);
		}

		logger.info("end authorize");
		return authResults;
	}

	@Override
	public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBinding> list) {
		logger.error("createAcls(AuthorizableRequestContext, List<AclBinding>) is not supported by Ranger for Kafka");
		return new ArrayList<>();
	}

	@Override
	public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBindingFilter> list) {
		logger.error("deleteAcls(AuthorizableRequestContext, List<AclBindingFilter>) is not supported by Ranger for Kafka");
		return new ArrayList<>();
	}

	@Override
	public Iterable<AclBinding> acls(AclBindingFilter filter) {
		logger.error("getAcls(AclBindingFilter) is not supported by Ranger for Kafka");
		return new ArrayList<>();
	}

	/**
	 * @param operation
	 * @return
	 */
	private String mapToRangerAccessType(AclOperation operation) {
		if (operation.equals(AclOperation.READ)) {
			return ACCESS_TYPE_READ;
		} else if (operation.equals(AclOperation.WRITE)) {
			return ACCESS_TYPE_WRITE;
		} else if (operation.equals(AclOperation.ALTER)) {
			return ACCESS_TYPE_CONFIGURE;
		} else if (operation.equals(AclOperation.DESCRIBE)) {
			return ACCESS_TYPE_DESCRIBE;
		} else if (operation.equals(AclOperation.CLUSTER_ACTION)) {
			return ACCESS_TYPE_CLUSTER_ACTION;
		} else if (operation.equals(AclOperation.CREATE)) {
			return ACCESS_TYPE_CREATE;
		} else if (operation.equals(AclOperation.DELETE)) {
			return ACCESS_TYPE_DELETE;
		} else if (operation.equals(AclOperation.DESCRIBE_CONFIGS)) {
			return ACCESS_TYPE_DESCRIBE_CONFIGS;
		} else if (operation.equals(AclOperation.ALTER_CONFIGS)) {
			return ACCESS_TYPE_ALTER_CONFIGS;
		} else if (operation.equals(AclOperation.IDEMPOTENT_WRITE)) {
			return ACCESS_TYPE_IDEMPOTENT_WRITE;
		}
		return null;
	}

	@Override
	public AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) {
		return super.authorizeByResourceType(requestContext, op, resourceType);
	}
}

apache-ranger\ranger-kafka-plugin-shim\src\main\java\org\apache\ranger\authorization\kafka\authorizer\RangerKafkaAuthorizer.java

这个类没有什么很特殊的改动,主要是修改一些方法的参数为正确的继承参数,

...

package org.apache.ranger.authorization.kafka.authorizer;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;

import org.apache.kafka.common.Endpoint;
import org.apache.ranger.plugin.classloader.RangerPluginClassLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.kafka.server.authorizer.*;
import org.apache.kafka.common.acl.*;
import org.apache.kafka.common.resource.*;


//public class RangerKafkaAuthorizer extends Authorizer {
public class RangerKafkaAuthorizer implements Authorizer {
	private static final Logger LOG  = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);

	private static final String   RANGER_PLUGIN_TYPE                      = "kafka";
	private static final String   RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME  = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer";

	private Authorizer  rangerKakfaAuthorizerImpl 						  = null;
	private static		RangerPluginClassLoader rangerPluginClassLoader   = null;
	
	public RangerKafkaAuthorizer() {
		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
		}

		this.init();

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
		}
	}
	
	private void init(){
		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerKafkaAuthorizer.init()");
		}

		try {
			
			rangerPluginClassLoader = RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE, this.getClass());
			
			@SuppressWarnings("unchecked")
			Class<Authorizer> cls = (Class<Authorizer>) Class.forName(RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME, true, rangerPluginClassLoader);

			activatePluginClassLoader();

			rangerKakfaAuthorizerImpl = cls.newInstance();
		} catch (Exception e) {
			// check what need to be done
			LOG.error("Error Enabling RangerKafkaPlugin", e);
		} finally {
			deactivatePluginClassLoader();
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.init()");
		}
	}

	@Override
	public void configure(Map<String, ?> configs) {
		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerKafkaAuthorizer.configure(Map<String, ?>)");
		}

		try {
			activatePluginClassLoader();

			rangerKakfaAuthorizerImpl.configure(configs);
		} finally {
			deactivatePluginClassLoader();
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.configure(Map<String, ?>)");
		}
	}

	@Override
	public void close() {
		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerKafkaAuthorizer.close()");
		}

		try {
			activatePluginClassLoader();
			
			rangerKakfaAuthorizerImpl.close();
		} catch (Throwable t) {
			LOG.error("Error closing RangerPlugin.", t);
		} finally {
			deactivatePluginClassLoader();
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.close()");
		}
		
	}

	@Override
	public List<AuthorizationResult> authorize(AuthorizableRequestContext authorizableRequestContext, List<Action> list) {
		if(LOG.isDebugEnabled()) {
			LOG.debug(String.format("==> RangerKafkaAuthorizer.authorize(AuthorizableRequestContext=%s, List<Action>=%s)", authorizableRequestContext, list));
		}

		List<AuthorizationResult> ret = new ArrayList<>();

		try {
			activatePluginClassLoader();

			ret = rangerKakfaAuthorizerImpl.authorize(authorizableRequestContext, list);
		} finally {
			deactivatePluginClassLoader();
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.authorize: " + ret);
		}
		
		return ret;
	}

	@Override
	public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBinding> list) {
		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
		}

		List<? extends CompletionStage<AclCreateResult>> createAcls = new ArrayList<>();
		try {
			activatePluginClassLoader();

			createAcls = rangerKakfaAuthorizerImpl.createAcls(authorizableRequestContext, list);
		} finally {
			deactivatePluginClassLoader();
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
		}
		return createAcls;
	}

	@Override
	public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBindingFilter> list) {
		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
		}
		List<? extends CompletionStage<AclDeleteResult>> ret = new ArrayList<>();
		try {
			activatePluginClassLoader();

			ret = rangerKakfaAuthorizerImpl.deleteAcls(authorizableRequestContext, list);
		} finally {
			deactivatePluginClassLoader();
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
		}
		
		return ret;
	}

	@Override
	public Iterable<AclBinding> acls(AclBindingFilter aclBindingFilter) {
		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerKafkaAuthorizer.getAcls(Resource)");
		}

		Iterable<AclBinding> ret = new ArrayList<>();

		try {
			activatePluginClassLoader();

			ret = rangerKakfaAuthorizerImpl.acls(aclBindingFilter);
		} finally {
			deactivatePluginClassLoader();
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.getAcls(Resource)");
		}

		return ret;
	}

	@Override
	public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo authorizerServerInfo) {
		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerKafkaAuthorizer.start(AuthorizerServerInfo authorizerServerInfo)");
		}

		Map<Endpoint, ? extends CompletionStage<Void>> map;
		try {
			activatePluginClassLoader();
			map = rangerKakfaAuthorizerImpl.start(authorizerServerInfo);
		} finally {
			deactivatePluginClassLoader();
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.configure(Map<String, ?>)");
		}

		return map;
	}

	@Override
	public AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) {
		return Authorizer.super.authorizeByResourceType(requestContext, op, resourceType);
	}
	
	private void activatePluginClassLoader() {
		if(rangerPluginClassLoader != null) {
			rangerPluginClassLoader.activate();
		}
	}

	private void deactivatePluginClassLoader() {
		if(rangerPluginClassLoader != null) {
			rangerPluginClassLoader.deactivate();
		}
	}
		
}

说明

这里涉及了,两个 RangerKafkaAuthorizer 类

  1. ranger-kafka-plugin-shim 下的该类具体是暴露方法,被 kafka 的插件用于权限校验并返回校验结果

  2. plugin-kafka 下的类,是具体实现具体认证的逻辑

其他

相关命令

博主是在 Kerberos 环境下操作的,所以在执行命令时需要携带相关参数,这里时指定了配置文件,将以下命令添加到对应文件中即可

security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
sasl.mechanism=GSSAPI

查看 topic 命令

bin/kafka-topics.sh --list --bootstrap-server node248:6667 --command-config conf/client.properties

生产者命令

bin/kafka-console-producer.sh --topic test1 --broker-list node248:6667 --producer.config conf/producer.properties

消费者命令文章来源地址https://www.toymoban.com/news/detail-475170.html

bin/kafka-console-consumer.sh --topic test1 --broker-list node248:6667 --consumer.config conf/consumer.properties

到了这里,关于大数据 Ranger2.1.0 适配 Kafka3.4.0的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka3.1部署和Topic主题数据生产与消费

    本章节主要讲述Kafka3.1X版本在Windows11主机下部署以及JAVA对Kafka应用: 1.安装JDK配置环境变量 2.Zookeeper(zookeeper-3.7.1) zk 部署后的目录位置:D:setupapache-zookeeper-3.7.1 3.安装Kafka3.1X 3.1 下载包(kafka_2.12-3.1.2.tgz) Kafka 3.2、 解压并进入Kafka目录: 根目录:D:setupkafka3.1.2 3、 编辑

    2024年02月09日
    浏览(27)
  • Kafka3.0.0版本——生产者数据有序与乱序

    单分区内,数据有序。如下图partion0、partion1、partion2分区内,各自分区内的数据有序。 2.1、kafka1.x版本之前保证数据单分区有序的条件 kafka在1.x版本之前保证数据单分区有序,条件如下: 2.2、kafka1.x版本及以后保证数据单分区有序的条件 未开启幂等性 开启幂等性 2.3、kafka1

    2023年04月27日
    浏览(33)
  • Kafka3.0.0版本——消费者(独立消费者消费某一个主题数据案例__订阅主题)

    1.1、案例需求 创建一个独立消费者,消费firstTopic主题中数据,所下图所示: 注意:在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组id 会被自动填写随机的消费者组 id。 1.2、案例代码 代码 1.3、测试 在 Kafka 集群控制台,创建firstTopic主题 在 IDEA中

    2024年02月09日
    浏览(33)
  • Kafka3.0.0版本——消费者(独立消费者消费某一个主题中某个分区数据案例__订阅分区)

    1.1、案例需求 创建一个独立消费者,消费firstTopic主题 0 号分区的数据,所下图所示: 1.2、案例代码 生产者往firstTopic主题 0 号分区发送数据代码 消费者消费firstTopic主题 0 分区数据代码 1.3、测试 在 IDEA 中执行消费者程序,如下图: 在 IDEA 中执行生产者程序 ,在控制台观察

    2024年02月09日
    浏览(36)
  • Flink1.17.1消费kafka3.5中的数据出现问题Failed to get metadata for topics [flink].

    问题呈现 Failed to get metadata for topics [flink]. at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47) at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52) at org.apache.flink.connecto

    2024年02月11日
    浏览(43)
  • kafka3.6.0部署

    https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.9.1/apache-zookeeper-3.9.1.tar.gz root@ubuntu20:/apps/zookeeper/bin# cat /etc/profile.d/zk.sh #!/bin/bash export PATH=/apps/zookeeper/bin/:$PATH mkdir /data root@ubuntu20:/apps/zookeeper# echo 3 data/myid 拷贝到其他节点并修改myid 配置环境变量 root@ubuntu20:/apps/zookeeper/bin#

    2024年01月16日
    浏览(32)
  • kafka3.x详解

    1、目前企业中比较常见的消息队列产品主 要有 Kafka、ActiveMQ 、RabbitMQ 、RocketMQ 等。 在 大数据场景 主要采用 Kafka 作为消息队列。在 JavaEE 开发中主要采用 ActiveMQ、RabbitMQ、RocketMQ。 2、Kafka与其他消息队列MQ(如ActiveMQ、RabbitMQ等)相比,有以下几个区别: 磁盘存储: Kafka将所

    2024年02月05日
    浏览(19)
  • kafka3.6.0集群部署

    环境准备 机器环境 系统 主机名 IP地址 centos7.9 kafka01 192.168.200.51 centos7.9 kafka02 192.168.200.52 centos7.9 kafka03 192.168.200.53 所需软件 hosts设置 java环境设置 zookeeper安装部署 创建软件安装目录 解压安装 修改配置 分发软件 kafka02与kafka03软链接 kafka02与kafka03修改myid 防火墙放行端口 设置

    2024年02月04日
    浏览(27)
  • 尚硅谷kafka3.0.0

    目录 💃概述 ⛹定义 ​编辑⛹消息队列 🤸‍♂️消息队列应用场景 ​编辑🤸‍♂️两种模式:点对点、发布订阅 ​编辑⛹基本概念 💃Kafka安装 ⛹ zookeeper安装 ⛹集群规划 ​编辑⛹流程 ⛹原神启动 🤸‍♂️批量脚本 ⛹topics常规操作 ⛹生产者命令行操作  ⛹消费者命令

    2024年02月06日
    浏览(23)
  • Windows下安装Kafka3

    本文讲述Windows(win10)下安装Kafka3的方法。基本流程跟《CentOS下安装Kafka3》一样,也是一样需要先安装Java环境,再部署部署Kafka。 首先在官网 Apache Kafka 下载Kafka二进制压缩包。无论是在CentOS还是在Windows下都是下载该压缩包,里面已经包含了Kafka Linux和windows平台下的可执行文件

    2024年04月27日
    浏览(17)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包