大数据 Ranger2.1.0 适配 Kafka3.4.0

这篇具有很好参考价值的文章主要介绍了大数据 Ranger2.1.0 适配 Kafka3.4.0。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

根据官方说明Kafka3.0以上版本将会被替换权限认证方式,包括 类和方法 的变换,所以需要对ranger中继承 kafka 的实现中,修改相应的逻辑

官方说明

Kafka3.0以上版本将会被替换权限认证方式,包括 类和方法 的变换,

Github PR https://github.com/apache/kafka/pull/10450

大数据 Ranger2.1.0 适配 Kafka3.4.0

POM

apache-ranger\pom.xml,该文件中主要涉及 kafka 的版本及 scala 的版本

<?xml version="1.0" encoding="UTF-8"?>
<project>
...
    <properties>
        ...
        <kafka.version>3.4.0</kafka.version>
        ...
        <scala.version>2.12.10</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
        ...
    </properties>

</project>

apache-ranger\plugin-schema-registry\pom.xml,该文件中主要涉及 kafka 的版本及 scala 的版本

<?xml version="1.0" encoding="UTF-8"?>
<project>
...
    <properties>
        ...
        <kafka.version>3.4.0</kafka.version>
        <kafkaArtifact>kafka_2.12</kafkaArtifact>
        ...
    </properties>
</project>

代码

apache-ranger\plugin-kafka\src\main\java\org\apache\ranger\authorization\kafka\authorizer\RangerKafkaAuthorizer.java

这个类中是主要涉及的位置,包括类、方法等,所以相等于对原有逻辑适配于 kafka 的重构,主要修改包括认证方法 authorize,
判断操作及资源类型等。

说明:
Kaka中 Authorizer 的共有一种子接口(ClusterMetadataAuthorizer)和二种子类(AclAuthorizer 和 StandardAuthorizer),这两个子类是具体实现了方法的功能,由 官网提示 从原来的kafka低版本(3.0以下)使用 SimpleAclAuthorizer 替换成现在的kafka高版本(3.0以上)使用 AclAuthorizer ,所以继承改类,从而实现 Ranger2.1.0 适配 Kafka3.4.0

...

package org.apache.ranger.authorization.kafka.authorizer;

import java.util.*;
import java.util.concurrent.CompletionStage;

import com.google.gson.Gson;
import kafka.security.authorizer.AclAuthorizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.authorizer.*;
import org.apache.kafka.common.acl.*;
import org.apache.kafka.common.resource.*;

import org.apache.commons.lang.StringUtils;
import org.apache.ranger.audit.provider.MiscUtil;
import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
import org.apache.ranger.plugin.policyengine.RangerAccessResult;
import org.apache.ranger.plugin.service.RangerBasePlugin;

import org.apache.ranger.plugin.util.RangerPerfTracer;

public class RangerKafkaAuthorizer extends AclAuthorizer {
	private static final Log logger = LogFactory
			.getLog(RangerKafkaAuthorizer.class);
	private static final Log PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request");

	public static final String KEY_TOPIC = "topic";
	public static final String KEY_CLUSTER = "cluster";
	public static final String KEY_CONSUMER_GROUP = "consumergroup";
	public static final String KEY_TRANSACTIONALID = "transactionalid";
	public static final String KEY_DELEGATIONTOKEN = "delegationtoken";

	public static final String ACCESS_TYPE_READ = "consume";
	public static final String ACCESS_TYPE_WRITE = "publish";
	public static final String ACCESS_TYPE_CREATE = "create";
	public static final String ACCESS_TYPE_DELETE = "delete";
	public static final String ACCESS_TYPE_CONFIGURE = "configure";
	public static final String ACCESS_TYPE_DESCRIBE = "describe";
	public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
	public static final String ACCESS_TYPE_ALTER_CONFIGS    = "alter_configs";
	public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
	public static final String ACCESS_TYPE_CLUSTER_ACTION   = "cluster_action";

	private static volatile RangerBasePlugin rangerPlugin = null;
	RangerKafkaAuditHandler auditHandler = null;

	public RangerKafkaAuthorizer() {
	}

	/*
	 * (non-Javadoc)
	 *
	 * @see kafka.security.auth.Authorizer#configure(Map<String, Object>)
	 */
	@Override
	public void configure(Map<String, ?> configs) {
		RangerBasePlugin me = rangerPlugin;
		if (me == null) {
			synchronized(RangerKafkaAuthorizer.class) {
				me = rangerPlugin;
				if (me == null) {
					try {
						// Possible to override JAAS configuration which is used by Ranger, otherwise
						// SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer',
						// if it's not defined, then it reverts to 'KafkaServer' configuration.
						final Object jaasContext = configs.get("ranger.jaas.context");
						final String listenerName = (jaasContext instanceof String
										&& StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
										: SecurityProtocol.SASL_PLAINTEXT.name();
						final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
						JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs);
						MiscUtil.setUGIFromJAASConfig(context.name());
						logger.info("LoginUser=" + MiscUtil.getUGILoginUser());
					} catch (Throwable t) {
						logger.error("Error getting principal.", t);
					}
					me = rangerPlugin = new RangerBasePlugin("kafka", "kafka");
				}
			}
		}
		logger.info("Calling plugin.init()");
		rangerPlugin.init();
		auditHandler = new RangerKafkaAuditHandler();
		rangerPlugin.setResultProcessor(auditHandler);
	}

	@Override
	public void close() {
		logger.info("close() called on authorizer.");
		try {
			if (rangerPlugin != null) {
				rangerPlugin.cleanup();
			}
		} catch (Throwable t) {
			logger.error("Error closing RangerPlugin.", t);
		}
	}

	@Override
	public List<AuthorizationResult> authorize(AuthorizableRequestContext authorizableRequestContext, List<Action> list) {

		List<AuthorizationResult> authResults = new ArrayList<>();
		AuthorizableRequestContext session = authorizableRequestContext;

		Gson gson = new Gson();
		logger.info("List<Action> list -> " + gson.toJson(list));
		for (Action actionData : list) {
			ResourcePattern resource = actionData.resourcePattern();
			AclOperation operation = actionData.operation();
			if (rangerPlugin == null) {
				MiscUtil.logErrorMessageByInterval(logger,
						"Authorizer is still not initialized");
				return new ArrayList<>();
			}

			RangerPerfTracer perf = null;

			if (RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
				perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(resource=" + resource + ")");
			}
			String userName = null;
			if (session.principal() != null) {
				userName = session.principal().getName();
			}
			java.util.Set<String> userGroups = MiscUtil
					.getGroupsForRequestUser(userName);
			String ip = session.clientAddress().getHostAddress();

			// skip leading slash
			if (StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') {
				ip = ip.substring(1);
			}

			Date eventTime = new Date();
			String accessType = mapToRangerAccessType(operation);
			boolean validationFailed = false;
			String validationStr = "";

			if (accessType == null) {
				if (MiscUtil.logErrorMessageByInterval(logger,
						"Unsupported access type. operation=" + operation)) {
					logger.error("Unsupported access type. session=" + session
							+ ", operation=" + operation + ", resource=" + resource);
				}
				validationFailed = true;
				validationStr += "Unsupported access type. operation=" + operation;
			}
			String action = accessType;

			RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
			rangerRequest.setUser(userName);
			rangerRequest.setUserGroups(userGroups);
			rangerRequest.setClientIPAddress(ip);
			rangerRequest.setAccessTime(eventTime);

			RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
			rangerRequest.setResource(rangerResource);
			rangerRequest.setAccessType(accessType);
			rangerRequest.setAction(action);
			rangerRequest.setRequestData(resource.name());

			if (resource.resourceType().equals(ResourceType.TOPIC)) {
				rangerResource.setValue(KEY_TOPIC, resource.name());
			} else if (resource.resourceType().equals(ResourceType.CLUSTER)) {
				rangerResource.setValue(KEY_CLUSTER, resource.name());
			} else if (resource.resourceType().equals(ResourceType.GROUP)) {
				rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
			} else if (resource.resourceType().equals(ResourceType.TRANSACTIONAL_ID)) {
				rangerResource.setValue(KEY_TRANSACTIONALID, resource.name());
			} else if (resource.resourceType().equals(ResourceType.DELEGATION_TOKEN)) {
				rangerResource.setValue(KEY_DELEGATIONTOKEN, resource.name());
			} else {
				logger.error("Unsupported resourceType=" + resource.resourceType());
				validationFailed = true;
			}

			AuthorizationResult authResult = AuthorizationResult.DENIED;
			boolean returnValue = false;
			if (validationFailed) {
				MiscUtil.logErrorMessageByInterval(logger, validationStr
						+ ", request=" + rangerRequest);
			} else {

				try {
					RangerAccessResult result = rangerPlugin
							.isAccessAllowed(rangerRequest);
					if (result == null) {
						logger.error("Ranger Plugin returned null. Returning false");
					} else {
						returnValue = result.getIsAllowed();
						authResult = returnValue ? AuthorizationResult.ALLOWED : authResult;
					}
				} catch (Throwable t) {
					logger.error("Error while calling isAccessAllowed(). request="
							+ rangerRequest, t);
				} finally {
					auditHandler.flushAudit();
				}
			}
			RangerPerfTracer.log(perf);

			if (logger.isDebugEnabled()) {
				logger.debug("rangerRequest=" + rangerRequest + ", return="
							+ returnValue);
			}

			authResults.add(authResult);
		}

		logger.info("end authorize");
		return authResults;
	}

	@Override
	public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBinding> list) {
		logger.error("createAcls(AuthorizableRequestContext, List<AclBinding>) is not supported by Ranger for Kafka");
		return new ArrayList<>();
	}

	@Override
	public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBindingFilter> list) {
		logger.error("deleteAcls(AuthorizableRequestContext, List<AclBindingFilter>) is not supported by Ranger for Kafka");
		return new ArrayList<>();
	}

	@Override
	public Iterable<AclBinding> acls(AclBindingFilter filter) {
		logger.error("getAcls(AclBindingFilter) is not supported by Ranger for Kafka");
		return new ArrayList<>();
	}

	/**
	 * @param operation
	 * @return
	 */
	private String mapToRangerAccessType(AclOperation operation) {
		if (operation.equals(AclOperation.READ)) {
			return ACCESS_TYPE_READ;
		} else if (operation.equals(AclOperation.WRITE)) {
			return ACCESS_TYPE_WRITE;
		} else if (operation.equals(AclOperation.ALTER)) {
			return ACCESS_TYPE_CONFIGURE;
		} else if (operation.equals(AclOperation.DESCRIBE)) {
			return ACCESS_TYPE_DESCRIBE;
		} else if (operation.equals(AclOperation.CLUSTER_ACTION)) {
			return ACCESS_TYPE_CLUSTER_ACTION;
		} else if (operation.equals(AclOperation.CREATE)) {
			return ACCESS_TYPE_CREATE;
		} else if (operation.equals(AclOperation.DELETE)) {
			return ACCESS_TYPE_DELETE;
		} else if (operation.equals(AclOperation.DESCRIBE_CONFIGS)) {
			return ACCESS_TYPE_DESCRIBE_CONFIGS;
		} else if (operation.equals(AclOperation.ALTER_CONFIGS)) {
			return ACCESS_TYPE_ALTER_CONFIGS;
		} else if (operation.equals(AclOperation.IDEMPOTENT_WRITE)) {
			return ACCESS_TYPE_IDEMPOTENT_WRITE;
		}
		return null;
	}

	@Override
	public AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) {
		return super.authorizeByResourceType(requestContext, op, resourceType);
	}
}

apache-ranger\ranger-kafka-plugin-shim\src\main\java\org\apache\ranger\authorization\kafka\authorizer\RangerKafkaAuthorizer.java

这个类没有什么很特殊的改动,主要是修改一些方法的参数为正确的继承参数,

...

package org.apache.ranger.authorization.kafka.authorizer;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;

import org.apache.kafka.common.Endpoint;
import org.apache.ranger.plugin.classloader.RangerPluginClassLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.kafka.server.authorizer.*;
import org.apache.kafka.common.acl.*;
import org.apache.kafka.common.resource.*;


//public class RangerKafkaAuthorizer extends Authorizer {
public class RangerKafkaAuthorizer implements Authorizer {
	private static final Logger LOG  = LoggerFactory.getLogger(RangerKafkaAuthorizer.class);

	private static final String   RANGER_PLUGIN_TYPE                      = "kafka";
	private static final String   RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME  = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer";

	private Authorizer  rangerKakfaAuthorizerImpl 						  = null;
	private static		RangerPluginClassLoader rangerPluginClassLoader   = null;
	
	public RangerKafkaAuthorizer() {
		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
		}

		this.init();

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
		}
	}
	
	private void init(){
		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerKafkaAuthorizer.init()");
		}

		try {
			
			rangerPluginClassLoader = RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE, this.getClass());
			
			@SuppressWarnings("unchecked")
			Class<Authorizer> cls = (Class<Authorizer>) Class.forName(RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME, true, rangerPluginClassLoader);

			activatePluginClassLoader();

			rangerKakfaAuthorizerImpl = cls.newInstance();
		} catch (Exception e) {
			// check what need to be done
			LOG.error("Error Enabling RangerKafkaPlugin", e);
		} finally {
			deactivatePluginClassLoader();
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.init()");
		}
	}

	@Override
	public void configure(Map<String, ?> configs) {
		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerKafkaAuthorizer.configure(Map<String, ?>)");
		}

		try {
			activatePluginClassLoader();

			rangerKakfaAuthorizerImpl.configure(configs);
		} finally {
			deactivatePluginClassLoader();
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.configure(Map<String, ?>)");
		}
	}

	@Override
	public void close() {
		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerKafkaAuthorizer.close()");
		}

		try {
			activatePluginClassLoader();
			
			rangerKakfaAuthorizerImpl.close();
		} catch (Throwable t) {
			LOG.error("Error closing RangerPlugin.", t);
		} finally {
			deactivatePluginClassLoader();
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.close()");
		}
		
	}

	@Override
	public List<AuthorizationResult> authorize(AuthorizableRequestContext authorizableRequestContext, List<Action> list) {
		if(LOG.isDebugEnabled()) {
			LOG.debug(String.format("==> RangerKafkaAuthorizer.authorize(AuthorizableRequestContext=%s, List<Action>=%s)", authorizableRequestContext, list));
		}

		List<AuthorizationResult> ret = new ArrayList<>();

		try {
			activatePluginClassLoader();

			ret = rangerKakfaAuthorizerImpl.authorize(authorizableRequestContext, list);
		} finally {
			deactivatePluginClassLoader();
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.authorize: " + ret);
		}
		
		return ret;
	}

	@Override
	public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBinding> list) {
		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
		}

		List<? extends CompletionStage<AclCreateResult>> createAcls = new ArrayList<>();
		try {
			activatePluginClassLoader();

			createAcls = rangerKakfaAuthorizerImpl.createAcls(authorizableRequestContext, list);
		} finally {
			deactivatePluginClassLoader();
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)");
		}
		return createAcls;
	}

	@Override
	public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext authorizableRequestContext, List<AclBindingFilter> list) {
		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
		}
		List<? extends CompletionStage<AclDeleteResult>> ret = new ArrayList<>();
		try {
			activatePluginClassLoader();

			ret = rangerKakfaAuthorizerImpl.deleteAcls(authorizableRequestContext, list);
		} finally {
			deactivatePluginClassLoader();
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
		}
		
		return ret;
	}

	@Override
	public Iterable<AclBinding> acls(AclBindingFilter aclBindingFilter) {
		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerKafkaAuthorizer.getAcls(Resource)");
		}

		Iterable<AclBinding> ret = new ArrayList<>();

		try {
			activatePluginClassLoader();

			ret = rangerKakfaAuthorizerImpl.acls(aclBindingFilter);
		} finally {
			deactivatePluginClassLoader();
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.getAcls(Resource)");
		}

		return ret;
	}

	@Override
	public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo authorizerServerInfo) {
		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerKafkaAuthorizer.start(AuthorizerServerInfo authorizerServerInfo)");
		}

		Map<Endpoint, ? extends CompletionStage<Void>> map;
		try {
			activatePluginClassLoader();
			map = rangerKakfaAuthorizerImpl.start(authorizerServerInfo);
		} finally {
			deactivatePluginClassLoader();
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerKafkaAuthorizer.configure(Map<String, ?>)");
		}

		return map;
	}

	@Override
	public AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) {
		return Authorizer.super.authorizeByResourceType(requestContext, op, resourceType);
	}
	
	private void activatePluginClassLoader() {
		if(rangerPluginClassLoader != null) {
			rangerPluginClassLoader.activate();
		}
	}

	private void deactivatePluginClassLoader() {
		if(rangerPluginClassLoader != null) {
			rangerPluginClassLoader.deactivate();
		}
	}
		
}

说明

这里涉及了,两个 RangerKafkaAuthorizer 类

  1. ranger-kafka-plugin-shim 下的该类具体是暴露方法,被 kafka 的插件用于权限校验并返回校验结果

  2. plugin-kafka 下的类,是具体实现具体认证的逻辑

其他

相关命令

博主是在 Kerberos 环境下操作的,所以在执行命令时需要携带相关参数,这里时指定了配置文件,将以下命令添加到对应文件中即可

security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
sasl.mechanism=GSSAPI

查看 topic 命令

bin/kafka-topics.sh --list --bootstrap-server node248:6667 --command-config conf/client.properties

生产者命令

bin/kafka-console-producer.sh --topic test1 --broker-list node248:6667 --producer.config conf/producer.properties

消费者命令文章来源地址https://www.toymoban.com/news/detail-475170.html

bin/kafka-console-consumer.sh --topic test1 --broker-list node248:6667 --consumer.config conf/consumer.properties

到了这里,关于大数据 Ranger2.1.0 适配 Kafka3.4.0的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka3.1部署和Topic主题数据生产与消费

    本章节主要讲述Kafka3.1X版本在Windows11主机下部署以及JAVA对Kafka应用: 1.安装JDK配置环境变量 2.Zookeeper(zookeeper-3.7.1) zk 部署后的目录位置:D:setupapache-zookeeper-3.7.1 3.安装Kafka3.1X 3.1 下载包(kafka_2.12-3.1.2.tgz) Kafka 3.2、 解压并进入Kafka目录: 根目录:D:setupkafka3.1.2 3、 编辑

    2024年02月09日
    浏览(40)
  • Kafka3.0.0版本——生产者数据有序与乱序

    单分区内,数据有序。如下图partion0、partion1、partion2分区内,各自分区内的数据有序。 2.1、kafka1.x版本之前保证数据单分区有序的条件 kafka在1.x版本之前保证数据单分区有序,条件如下: 2.2、kafka1.x版本及以后保证数据单分区有序的条件 未开启幂等性 开启幂等性 2.3、kafka1

    2023年04月27日
    浏览(46)
  • Kafka3.0.0版本——消费者(独立消费者消费某一个主题数据案例__订阅主题)

    1.1、案例需求 创建一个独立消费者,消费firstTopic主题中数据,所下图所示: 注意:在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组id 会被自动填写随机的消费者组 id。 1.2、案例代码 代码 1.3、测试 在 Kafka 集群控制台,创建firstTopic主题 在 IDEA中

    2024年02月09日
    浏览(40)
  • Kafka3.0.0版本——消费者(独立消费者消费某一个主题中某个分区数据案例__订阅分区)

    1.1、案例需求 创建一个独立消费者,消费firstTopic主题 0 号分区的数据,所下图所示: 1.2、案例代码 生产者往firstTopic主题 0 号分区发送数据代码 消费者消费firstTopic主题 0 分区数据代码 1.3、测试 在 IDEA 中执行消费者程序,如下图: 在 IDEA 中执行生产者程序 ,在控制台观察

    2024年02月09日
    浏览(45)
  • Flink1.17.1消费kafka3.5中的数据出现问题Failed to get metadata for topics [flink].

    问题呈现 Failed to get metadata for topics [flink]. at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47) at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52) at org.apache.flink.connecto

    2024年02月11日
    浏览(58)
  • kafka3.x详解

    1、目前企业中比较常见的消息队列产品主 要有 Kafka、ActiveMQ 、RabbitMQ 、RocketMQ 等。 在 大数据场景 主要采用 Kafka 作为消息队列。在 JavaEE 开发中主要采用 ActiveMQ、RabbitMQ、RocketMQ。 2、Kafka与其他消息队列MQ(如ActiveMQ、RabbitMQ等)相比,有以下几个区别: 磁盘存储: Kafka将所

    2024年02月05日
    浏览(30)
  • kafka3.6.0部署

    https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.9.1/apache-zookeeper-3.9.1.tar.gz root@ubuntu20:/apps/zookeeper/bin# cat /etc/profile.d/zk.sh #!/bin/bash export PATH=/apps/zookeeper/bin/:$PATH mkdir /data root@ubuntu20:/apps/zookeeper# echo 3 data/myid 拷贝到其他节点并修改myid 配置环境变量 root@ubuntu20:/apps/zookeeper/bin#

    2024年01月16日
    浏览(44)
  • 尚硅谷kafka3.0.0

    目录 💃概述 ⛹定义 ​编辑⛹消息队列 🤸‍♂️消息队列应用场景 ​编辑🤸‍♂️两种模式:点对点、发布订阅 ​编辑⛹基本概念 💃Kafka安装 ⛹ zookeeper安装 ⛹集群规划 ​编辑⛹流程 ⛹原神启动 🤸‍♂️批量脚本 ⛹topics常规操作 ⛹生产者命令行操作  ⛹消费者命令

    2024年02月06日
    浏览(30)
  • Windows下安装Kafka3

    本文讲述Windows(win10)下安装Kafka3的方法。基本流程跟《CentOS下安装Kafka3》一样,也是一样需要先安装Java环境,再部署部署Kafka。 首先在官网 Apache Kafka 下载Kafka二进制压缩包。无论是在CentOS还是在Windows下都是下载该压缩包,里面已经包含了Kafka Linux和windows平台下的可执行文件

    2024年04月27日
    浏览(27)
  • kafka3.6.0集群部署

    环境准备 机器环境 系统 主机名 IP地址 centos7.9 kafka01 192.168.200.51 centos7.9 kafka02 192.168.200.52 centos7.9 kafka03 192.168.200.53 所需软件 hosts设置 java环境设置 zookeeper安装部署 创建软件安装目录 解压安装 修改配置 分发软件 kafka02与kafka03软链接 kafka02与kafka03修改myid 防火墙放行端口 设置

    2024年02月04日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包