kafak消费数据,webSocket实时推送数据到前端

这篇具有很好参考价值的文章主要介绍了kafak消费数据,webSocket实时推送数据到前端。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

kafak消费数据,webSocket实时推送数据到前端,微服务,大数据,kafka,websocket文章来源地址https://www.toymoban.com/news/detail-673353.html

1.导入webSocket依赖

 <!--websocket依赖包-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

2.编写webSocket类

package com.skyable.device.config.webSocket;


import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

/**
 * @author Administrator
 */
@ServerEndpoint("/vehicle/{domainId}")
@Component
@Slf4j
public class WebSocketServer {
    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
     */
    private static final Set<Session> SESSIONS = new HashSet<>();


    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        log.info("webSocket link close");
    }

    /**
     * @param error
     */
    @OnError
    public void onError(Throwable error) {
        error.printStackTrace();
    }

    /**
     * 接收数据
     *
     * @param data
     */
    public static void sendDataToClients(String data) {
        for (Session session : SESSIONS) {
            try {
                session.getBasicRemote().sendText(data);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    @OnOpen
    public void onOpen(Session session) {
        /**
         * 与某个客户端的连接会话,需要通过它来给客户端发送数据
         */
        /**
         * 接收domainId
         */
        SESSIONS.add(session);
        sendDataToClients();
    }


    public void sendDataToClients() {
        for (Session session : SESSIONS) {
            try {
                session.getBasicRemote().sendText("webSocket link succeed");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
package com.skyable.device.config.webSocket;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author Administrator
 */
@EnableWebSocket
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

3.kafak消费数据后调用webSocket方法

  /**
     * 获取kafka数据
     *
     * @param
     */
    @Override
    public void saveBatch(String jsonValue) {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            //位置
            JsonNode jsonNode = objectMapper.readTree(jsonValue);
            if (jsonNode.has(VehicleConstant.LOCATION)) {
                RealTimePosition realTimePosition = new RealTimePosition();
                JsonNode locationNode = jsonNode.get("location");
                String vehicleId = locationNode.get("vehicleId").asText();
                double longitude = Double.parseDouble(locationNode.get("longitude").asText());
                double latitude = Double.parseDouble(locationNode.get("latitude").asText());
                long timeStamp = locationNode.get("timestamp").asLong();
                realTimePosition.setTimeStamp(timeStamp);
                realTimePosition.setLatitude(String.valueOf(latitude));
                realTimePosition.setLongitude(String.valueOf(longitude));
                realTimePosition.setVehicleId(vehicleId);
                VehicleLocationVo locationVo = deviceMapMapper.selectLonLat(vehicleId);
                if (!Objects.isNull(locationVo)) {
                    //计算距离
                    RedisUtil.addLocation(vehicleId, Double.parseDouble(locationVo.getLongitude()), Double.parseDouble(locationVo.getLatitude()), "l1");
                    RedisUtil.addLocation(vehicleId, longitude, latitude, "l2");
                    Double result = RedisUtil.calculateDistance(vehicleId, "l1", "l2");
                    Double meters = RedisUtil.convertMilesToKilometers(result);
                    DecimalFormat decimalFormat = new DecimalFormat("#.###");
                    String distance = decimalFormat.format(meters);
                    realTimePosition.setDistance(Double.parseDouble(distance));
                } else {
                    realTimePosition.setDistance(0);
                }
                //获取省份
                Map<String, Object> position = addressUtil.getPosition(longitude, latitude, null, null, null);
                Map data = (Map) position.get("data");
                String provinceName = data.get("shortname").toString().replaceAll("\"", "");
                realTimePosition.setArea(provinceName);
                deviceMapMapper.insertRealTimePosition(realTimePosition);
                RedisUtil.addZSetValue(VehicleConstant.VEHICLE_LOCATION, String.valueOf(vehicleId), timeStamp);
            }
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        try {
            //报警
            JsonNode jsonNode = objectMapper.readTree(jsonValue);
            if (jsonNode.has(VehicleConstant.ALERT)) {
                JsonNode alertNode = jsonNode.get("alert");
                String vehicleId = alertNode.get("vehicleId").asText();
                Integer alertType = alertNode.get("alertType").asInt();
                long timeStamp = alertNode.get("timestamp").asLong();
                Alerts alerts = new Alerts();
                alerts.setAlertType(alertType);
                alerts.setTimeStamp(timeStamp);
                alerts.setVehicleId(vehicleId);
                deviceMapMapper.insertAlerts(alerts);
                RedisUtil.addZSetValue(VehicleConstant.VEHICLE_ALERT, String.valueOf(vehicleId), timeStamp);
            }
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        //webSocket发送消息
        VehicleAllVo vehicles = vehicles();
        WebSocketServer.sendDataToClients(vehicles.toString());
    }

4.发送消息内容

VehicleAllVo vehicles = vehicles();
该方法就是发送的具体内容

5.kafak消费者

package com.skyable.device.listener.Vehicle;

import com.alibaba.fastjson.JSON;
import com.skyable.common.config.CloudApplicationContext;
import com.skyable.common.constants.kafka.KafkaTopicConstants;
import com.skyable.device.config.webSocket.WebSocketServer;
import com.skyable.device.entity.vehicle.Vehicle;
import com.skyable.device.service.DeviceMapService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.stream.Collectors;

/**
 * Description:
 *
 * @author yangJun
 * @date: 2023-08-18-14:12
 */
@Service
@Component
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class VehicleDataKafkaListener {
    private final DeviceMapService deviceMapService;

    @KafkaListener(topics = KafkaTopicConstants.TOPIC_VEHICLE_RECORD, groupId = "rx_1_thing", containerFactory = "batchFactory")
    public void dealDeviceDataToScript(List<ConsumerRecord<String, String>> recordList) {
        recordList.parallelStream()
                .map(ConsumerRecord::value)
                .forEach(this::saveVehicleData);
    }

    private void saveVehicleData(String jsonValue) {
        log.info("kafka data:" + jsonValue);
        deviceMapService.saveBatch(jsonValue);
    }
}
package com.skyable.device.listener.Vehicle;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * @ClassName KafkaConsumerConfig
 * @Description Kafka消费者配置
 * @Author gaoy
 * @Date 2021/2/25 15:02
 */
@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.consumer.concurrency}")
    private int concurrency;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private int maxPollRecords;


    /**
     * 批量消费工厂bean
     * @return
     */
    @Bean
    KafkaListenerContainerFactory batchFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new
                ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        // 开启批量监听
        factory.setBatchListener(true);
        factory.setConcurrency(concurrency);
        // 设置手动提交ackMode
        // factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    @Bean
    public Map consumerConfigs() {
        Map<String,Object> props = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        //设置每次接收Message的数量
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //开启幂等性。
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
        return props;
    }

}

到了这里,关于kafak消费数据,webSocket实时推送数据到前端的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • webSocket实现数据的实时推送(附:前后端代码)

            之前开发的一个管理系统项目中,首页是数据大屏展示,一开始我是用JS的 setInterval() 方法,设置一个时间,每过时间发起一次 ajax 请求。虽然也能凑活着用,但总感觉不是最优的方法,而且还比较占用资源,所以学习 WebSocke ,以下是本人的一些学习心得及前后端的

    2024年02月02日
    浏览(51)
  • WebSocket:实现实时互动、数据推送的利器,你了解多少

    WebSocket技术是一种基于TCP协议的全双工通信协议,它允许浏览器和服务器之间进行实时、双向的通信。相比传统的HTTP请求-响应模式,WebSocket提供了持久连接,可以实时地推送数据,减少了通信的延迟。 WebSocket的工作原理是通过建立一条持久连接来实现实时通信。首先,浏览

    2024年01月18日
    浏览(46)
  • vue和node使用websocket实现数据推送,实时聊天

    需求:node做后端根据websocket,连接数据库,数据库的字段改变后,前端不用刷新页面也能更新到数据,前端也可以发送消息给后端,后端接受后把前端消息做处理再推送给前端展示 使用node ./app.js运行项目 在需要使用websocket连接的页面引入 默认如下: id为243 在数据库改为

    2024年02月15日
    浏览(51)
  • Vue使用WebSocket实现实时获取后端推送的数据。

    Vue可以使用WebSocket实现实时获取后端推送的数据。 1.在Vue项目中安装WebSocket库 可以使用npm或yarn安装WebSocket库: 2.创建WebSocket连接 在Vue组件中创建WebSocket连接,连接到后端WebSocket服务器,代码如下: 上面的代码中,使用WebSocket连接到后端WebSocket服务器,通过监听onmessage事件,

    2024年02月08日
    浏览(50)
  • python 中使用Kafka模块进行鉴权数据推送和消费

    最近刚好要用到kafka进行数据传输,又要鉴权,就研究了一下kafka的鉴权推送和消费,现在将代码放出来,有兴趣的可以看一下,鉴权的加密方式各有不同,所以需要注意哦! 一、生产者 生产者采用的是异步推送的形式,另外加入了计数模块,担心因为脚本推送后未回调但是

    2024年02月13日
    浏览(46)
  • java:Kafka生产者推送数据与消费者接收数据(参数配置以及案例)

    bootstrap.servers :Kafka集群中的Broker列表,格式为host1:port1,host2:port2,…。生产者会从这些Broker中选择一个可用的Broker作为消息发送的目标Broker。 acks :Broker对消息的确认模式。可选值为0、1、all。0表示生产者不会等待Broker的任何确认消息;1表示生产者会等待Broker的Leader副本确认

    2024年02月16日
    浏览(47)
  • 掌握实时数据流:使用Apache Flink消费Kafka数据

            导读:使用Flink实时消费Kafka数据的案例是探索实时数据处理领域的绝佳方式。不仅非常实用,而且对于理解现代数据架构和流处理技术具有重要意义。         Apache Flink  是一个在 有界 数据流和 无界 数据流上进行有状态计算分布式处理引擎和框架。Flink 设计旨

    2024年02月03日
    浏览(81)
  • 流批一体计算引擎-4-[Flink]消费kafka实时数据

    Python3.6.9 Flink 1.15.2消费Kafaka Topic PyFlink基础应用之kafka 通过PyFlink作业处理Kafka数据 PyFlink需要特定的Python版本,Python 3.6, 3.7, 3.8 or 3.9。 1.3.1 python3和pip3的配置 一、系统中安装了多个版本的python3 。 二、环境变量path作用顺序 三、安装Pyflink 1.3.2 配置Flink Kafka连接 (1)在https://mvnr

    2024年02月06日
    浏览(43)
  • 多个数据webSocket推送太快导致前端渲染卡顿问题优化

    作者代码写的不怎么样,谅解!主要思路就是把websocket接收到的数据用一个数组暂存起来,达到一定数量一起修改统一渲染,可根据项目数据推送数据的速度适当调解数组大小,然后再加了一个可能一段时间内都到不到数组达标渲染数量,就使用定时器直接做渲染,防止数据

    2024年02月12日
    浏览(49)
  • 实战Flink Java api消费kafka实时数据落盘HDFS

    在Java api中,使用flink本地模式,消费kafka主题,并直接将数据存入hdfs中。 flink版本1.13 kafka版本0.8 hadoop版本3.1.4 为了完成 Flink 从 Kafka 消费数据并实时写入 HDFS 的需求,通常需要启动以下组件: 确保 Zookeeper 在运行,因为 Flink 的 Kafka Consumer 需要依赖 Zookeeper。 确保 Kafka Serve

    2024年01月24日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包