Kafka+Fink 实战+工具类

这篇具有很好参考价值的文章主要介绍了Kafka+Fink 实战+工具类。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

  • LogServiceImpl
@Service
@Slf4j
public class LogServiceImpl implements LogService {

    private static final String TOPIC_NAME = "ods_link_visit_topic";

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 记录日志
     *
     * @param request
     * @param shortLinkCode
     * @param accountNo
     * @return
     */
    @Override
    public void recodeShortLinkLog(HttpServletRequest request, String shortLinkCode, Long accountNo) {
        // ip、 浏览器信息
        String ip = CommonUtil.getIpAddr(request);
        // 全部请求头
        Map<String, String> headerMap = CommonUtil.getAllRequestHeader(request);

        Map<String,String> availableMap = new HashMap<>();
        availableMap.put("user-agent",headerMap.get("user-agent"));
        availableMap.put("referer",headerMap.get("referer"));
        availableMap.put("accountNo",accountNo.toString());

        LogRecord logRecord = LogRecord.builder()
                //日志类型
                .event(LogTypeEnum.SHORT_LINK_TYPE.name())
                //日志内容
                .data(availableMap)
                //客户端ip
                .ip(ip)
                // 时间
                .ts(CommonUtil.getCurrentTimestamp())
                //业务唯一标识(短链码)
                .bizId(shortLinkCode).build();

        String jsonLog = JsonUtil.obj2Json(logRecord);

        //打印日志 in 控制台
        log.info(jsonLog);

        // 发送kafka
        kafkaTemplate.send(TOPIC_NAME,jsonLog);


    }
}

  • DwdShortLinkLogApp
@Slf4j
public class DwdShortLinkLogApp {
    //定义 topic
    public static final String SOURCE_TOPIC = "ods_link_visit_topic";

    //定义 消费组
    public static final String SINK_TOPIC = "dwd_link_visit_topic";

    //定义 消费组
    public static final String GROUP_ID = "dwd_short_link_group";


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

//        DataStream<String> ds = env.socketTextStream("192.168.75.146", 8888);

        FlinkKafkaConsumer<String> kafkaConsumer = KafkaUtil.getKafkaConsumer(SOURCE_TOPIC, GROUP_ID);

        DataStreamSource<String> ds = env.addSource(kafkaConsumer);

        ds.print();

        SingleOutputStreamOperator<JSONObject> jsonDs = ds.flatMap(new FlatMapFunction<String, JSONObject>() {

            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                JSONObject jsonObject = JSON.parseObject(value);
                // 生成web端设备唯一标识
                String udid = getDeviceId(jsonObject);
                jsonObject.put("udid",udid);

                String referer = getReferer(jsonObject);
                jsonObject.put("referer",referer);

                out.collect(jsonObject);

            }
        });

        // 分组
        KeyedStream<JSONObject, String> keyedStream = jsonDs.keyBy(new KeySelector<JSONObject, String>() {

            @Override
            public String getKey(JSONObject value) throws Exception {
                return value.getString("udid");

            }
        });


        // 识别新老访客    richMap open函数,对状态以及日期格式进行初始化

        SingleOutputStreamOperator<String> jsonDSWithVisitorState = keyedStream.map(new VisitorMapFunction());

        jsonDSWithVisitorState.print("ods新老访客");

        // 存储到dwd
        FlinkKafkaProducer<String> kafkaProducer = KafkaUtil.getKafkaProducer(SINK_TOPIC);

        jsonDSWithVisitorState.addSink(kafkaProducer);


        env.execute();
    }

    /**
     * 获取referer
     * @param jsonObject
     * @return
     */
    public static String getReferer(JSONObject jsonObject){
        JSONObject dataJsonObj = jsonObject.getJSONObject("data");
        if(dataJsonObj.containsKey("referer")){

            String referer = dataJsonObj.getString("referer");
            if(StringUtils.isNotBlank(referer)){
                try {
                    URL url = new URL(referer);
                    return url.getHost();
                } catch (MalformedURLException e) {
                    log.error("提取referer失败:{}",e.toString());
                }
            }
        }

        return "";

    }

    /**
     * 生成设备唯一标识
     *
     * @param jsonObject
     * @return
     */
    public static String getDeviceId(JSONObject jsonObject){
        Map<String,String> map= new TreeMap<>();

        try{
            map.put("ip",jsonObject.getString("ip"));
            map.put("event",jsonObject.getString("event"));
            map.put("bizId",jsonObject.getString("bizId"));
            map.put("userAgent",jsonObject.getJSONObject("data").getString("userAgent"));

            return DeviceUtil.geneWebUniqueDeviceId(map);

        }catch (Exception e){
            log.error("生产唯一deviceId异常:{}", jsonObject);
            return null;
        }


    }


}

  • KafkaUtil

    @Slf4j
    public class KafkaUtil {
    
        /**
         * kafka 的 broker 地址
         */
        private static String KAFKA_SERVER = null;
    
        static {
            Properties properties = new Properties();
    
            InputStream in = KafkaUtil.class.getClassLoader().getResourceAsStream("application.properties");
    
            try {
                properties.load(in);
            } catch (IOException e) {
                e.printStackTrace();
                log.error("加载Kafka配置文件失败:{}",e.getMessage());
            }
    
            //获取配置文件中的value
            KAFKA_SERVER = properties.getProperty("kafka.servers");
    
        }
    
        /**
         * 获取flink的kafka消费者
         * @param topic
         * @param groupId
         * @return
         */
        public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic,String groupId){
            Properties properties = new Properties();
            properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
            properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
    
            return new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),properties);
        }
    
        /**
         * 获取flink的kafka生产者
         * @param topic
         * @return
         */
        public static FlinkKafkaProducer<String> getKafkaProducer(String topic){
            return new FlinkKafkaProducer<String>(KAFKA_SERVER,topic,new SimpleStringSchema());
        }
    }
    
    
  • TimeUtil文章来源地址https://www.toymoban.com/news/detail-658286.html

    public class TimeUtil {
    
        /**
         * 默认日期格式
         */
        private static final String DEFAULT_PATTERN = "yyyy-MM-dd";
    
        /**
         * 默认日期格式
         */
        private static final DateTimeFormatter DEFAULT_DATE_TIME_FORMATTER  = DateTimeFormatter.ofPattern(DEFAULT_PATTERN);
    
        private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault();
    
    
        /**
         * LocalDateTime 转 字符串,指定日期格式
         * @param localDateTime
         * @param pattern
         * @return
         */
        public static String format(LocalDateTime localDateTime, String pattern){
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
            String timeStr = formatter.format(localDateTime.atZone(DEFAULT_ZONE_ID));
            return timeStr;
        }
    
    
        /**
         * Date 转 字符串, 指定日期格式
         * @param time
         * @param pattern
         * @return
         */
        public static String format(Date time, String pattern){
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
            String timeStr = formatter.format(time.toInstant().atZone(DEFAULT_ZONE_ID));
            return timeStr;
        }
    
        /**
         *  Date 转 字符串,默认日期格式
         * @param time
         * @return
         */
        public static String format(Date time){
    
            String timeStr = DEFAULT_DATE_TIME_FORMATTER.format(time.toInstant().atZone(DEFAULT_ZONE_ID));
            return timeStr;
        }
    
        /**
         * timestamp 转 字符串,默认日期格式
         *
         * @param timestamp
         * @return
         */
        public static String format(long timestamp) {
            String timeStr = DEFAULT_DATE_TIME_FORMATTER.format(new Date(timestamp).toInstant().atZone(DEFAULT_ZONE_ID));
            return timeStr;
        }
    
    
        /**
         * 字符串 转 Date
         *
         * @param time
         * @return
         */
        public static Date strToDate(String time) {
            LocalDateTime localDateTime = LocalDateTime.parse(time, DEFAULT_DATE_TIME_FORMATTER);
            return Date.from(localDateTime.atZone(DEFAULT_ZONE_ID).toInstant());
    
        }
    
    }
    

到了这里,关于Kafka+Fink 实战+工具类的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka实战进阶:一篇详解与互联网实战PDF指南,带你深入Apache Kafka的世界

    Apache Kafka 是由Apache软件基金会开发的一款开源消息系统项目,主要使用Scala语言编写。该项目旨在为处理实时数据提供一个统一、高通量、低等待的平台。Kafka作为一种分布式的、分区的、多复本的日志提交服务,凭借其独特的设计提供了丰富的消息系统功能。 特点 高吞吐量

    2024年01月19日
    浏览(47)
  • Spring Boot+Kafka实战生产级Kafka消费组

    作者:禅与计算机程序设计艺术 Kafka是一个开源分布式消息系统,最初由LinkedIn开发,之后成为Apache项目的一部分。Kafka主要用于大数据实时流处理,具有低延迟、高吞吐量等特点。本文将会从基本概念、术语说明、原理及应用场景三个方面对Kafka进行详细介绍。 Kafka作为一个

    2024年02月10日
    浏览(35)
  • 【Kafka】Kafka监控工具Kafka-eagle简介

    Kafka-eagle是一种基于Web的开源管理工具,可以用来监控、管理多个Kafka集群。 下面是使用Docker部署Kafka-eagle的步骤: 下载并安装Docker和Docker Compose。 创建文件夹,例如kafka-eagle,并在其中创建docker-compose.yml文件,将以下配置写入: 在命令行中转到kafka-eagle文件夹中,运行以下命

    2024年02月14日
    浏览(37)
  • kafka map kafka可视化工具

    kafka-map是使用Java17和React开发的一款kafka可视化工具。 目前支持的功能有: 多集群管理 集群状态监控(分区数量、副本数量、存储大小、offset) 主题创建、删除、扩容(删除需配置delete.topic.enable = true) broker状态监控 消费者组查看、删除 重置offset 消息查询(支持String和j

    2024年03月28日
    浏览(72)
  • Kafka 可视化工具 Kafka Tool

    使用Kafka的小伙伴,有没有为无法直观地查看 Kafka 的 Topic 里的内容而发过愁呢? 下面推荐给大家一款带有可视化页面的Kafka工具: Kafka Tool (目前最新版本是 2.0.4 ) 下载地址 http://www.kafkatool.com/download.html 下载界面 不同版本的Kafka对应不同版本的工具,个人使用的是0.11,所

    2024年02月12日
    浏览(42)
  • 【面试实战】Kafka面试题

    是一个分布式流式处理平台,流平台一个关键的功能就是消息队列。 项目中使用Kafka消息队列,对评论、点赞、关注功能发布通知,封装为Event实体类。 消费者负责将消息队列中的Event取出,并将其封装为Message对象,并持久化到数据库中保存。 了解到除了Kafka还有其他的消息

    2024年02月07日
    浏览(35)
  • kafka消息系统实战

    kafka是什么?         是一种高吞吐量的、分布式、发布、订阅、消息系统  1.导入maven坐标 2.编写提供者 3.编写消费者  4.下载kafka 点此去官网下载——Apache Kafka 解压后进入config目录  修改zookeeper.properties dataDir=D:/kafka_2.13-3.5.1/tmp/zookeeper 修改日志存放的路径server.propertie

    2024年02月10日
    浏览(25)
  • KafKa 分区,副本实战

    5个broker (1主4从) 安装目路/config/server.properties, 额外复制4份为 server-2.properties,server-3.properties,server-4.properties,server-5.properties 主要配置不同 server.properties server-2.properties server-3.properties server-4.properties server-5.properties 运行这5个broker 创建一个主题test,8个分区,3个副本 bootstrap

    2024年02月11日
    浏览(39)
  • 【kafka】Kafka 可视化工具Kafka Eagle安装和使用

    Kafka产线环境需要管理的Topic和Consumser越来越多,使用命令行工具进行管理会非常繁杂。因此,大数据平台上需要一套Kafka的管理监控系统,Kafka-Eagle。 Kafka Eagle是一个用于监控和管理kafka的开源组件,可以同时监控多个kafka集群。 Kafka Eagle提供了完善的监控页面和kafka常用操作

    2023年04月15日
    浏览(53)
  • SpringBoot集成kafka全面实战

    本文是SpringBoot+Kafka的实战讲解,如果对kafka的架构原理还不了解的读者,建议先看一下《大白话kafka架构原理》、《秒懂kafka HA(高可用)》两篇文章。 一、生产者实践 普通生产者 带回调的生产者 自定义分区器 kafka事务提交 二、消费者实践 简单消费 指定topic、partition、of

    2024年02月15日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包