救救家长:疫情封控下packetbeat+kafka+ES套件监控青少年上网行为

这篇具有很好参考价值的文章主要介绍了救救家长:疫情封控下packetbeat+kafka+ES套件监控青少年上网行为。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

缘起

疫情,不少孩子封控在家,需要上网课,但是老是抑制不住地去打游戏或看视频。
朋友圈里面,某位技术大牛这么描述疫情封控期间,他与孩子的居家“战争”:

孩子上网课已经一个多月了,孩子因为爱玩游戏爱看B站,与我斗智斗勇好几回,目前战斗情况如下:
上课时间玩手机游戏 ~ 没收手机
在电脑上装手机模拟器 继续玩手机游戏 ~ 卸载模拟器
在电脑上看B站 ~ 设置host文件屏蔽B站域名
在电脑上看芒果TV ~ 继续设置屏蔽域名
继续安装手机模拟器、找到host文件删除屏蔽,看B站玩游戏 ~ 被打,被卸载各种软件,被警告再发现就换Linux操作系统
解封后,先买个企业级路由器管控起来… 或者再装个摄像头再加上AI人体姿态识别?😭

因本文是技术文章,在这里咱暂先不讨论教育之道,先提供一个技术方案,解决家长的燃眉之急,不用企业级路由器,也不用AI人体姿态识别。
我的方案是:packetbeat+kafka+ES套件,以大数据可视化方式监控孩子。
这个方案的特点:

  • 一是抓住了矛盾的主要方面,疫情封控期间,孩子因网课有正当理由使用电脑等设备,监控重点是上网行为,尤其在上课时间段内不能打游戏看视频
  • 二是最大程度利用了现有设备,不需要额外添置监控设备

架构设计

救救家长:疫情封控下packetbeat+kafka+ES套件监控青少年上网行为
架构设计上有个难点,我的方案中考虑了异地监控的情况,比如孩子在杭州上课,我在新加坡工作,考虑跨国网络的不稳定性,我在常规的packetbeat和ES集群之间,加了一个准备放在阿里云上的kafka消息队列,这样就不用穿透内网,也不怕网络不稳丢消息了,远在新加坡Shopee云的监控主机可以用异步方式读取kafka。

效果展示

监控实图1:
救救家长:疫情封控下packetbeat+kafka+ES套件监控青少年上网行为
监控实图2:
救救家长:疫情封控下packetbeat+kafka+ES套件监控青少年上网行为
孩子访问了啥网站,啥时候访问,网络地址、流量情况一应俱全:) 虽然和教育的目的相背,但是国内就好这一口不是😄 IT前辈大牛目前还在用物理手段,我已经升级到 云+大数据了,我要把这个项目开源,造福中国的父母,嘿嘿

安装运行

注意版本问题!如下未特别说明的,都采用7.8.1版本,版本不匹配会有坑。

packetbeat

总体过程是:官网下载,unzip解压到本地目录,配置yml文件,后启动运行

10360* sudo ./packetbeat -e -c packetbeat.yml
10362  cd packetbeat-7.8.1-darwin-x86_64
10363* sudo ./packetbeat setup --dashboards
10364* sudo ./packetbeat -e -c packetbeat.yml

配置packetbeat.yml 输出到Kafka

# ---------------------------- Kafka -------------------------------------------
output.kafka:
  hosts: ["localhost:9092"]
  topic: packetbeat
  required_acks: 1
Kafka

采用docker-compose方式安装

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - 9092:9092
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.18.37
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  kafka_manager:
    image: sheepkiller/kafka-manager
    container_name: kafka_manager
    ports:
      - 9000:9000
    environment:
      ZK_HOSTS: "zookeeper:2181"
      APPLICATION_SECRET: "random-secret"
    command: -Dpidfile.path=/dev/null

消费端代码如下供参考,也可以用logstash直接拉取

package main

import (
    "fmt"
    "github.com/Shopify/sarama"
)

const TOPIC = "packetbeat"
func main() {
    consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
    if err != nil {
        fmt.Printf("fail to start consumer, err:%v\n", err)
        return
    }
    partitionList, err := consumer.Partitions(TOPIC) // 根据topic取到所有的分区
    if err != nil {
        fmt.Printf("fail to get list of partition:err%v\n", err)
        return
    }
    fmt.Println(partitionList)
    for partition := range partitionList { // 遍历所有的分区
        // 针对每个分区创建一个对应的分区消费者
        pc, err := consumer.ConsumePartition(TOPIC, int32(partition), sarama.OffsetOldest)
    fmt.Printf("---%+v\n", pc)
        if err != nil {
            fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
            return
        }
        defer pc.AsyncClose()
        // 异步从每个分区消费信息
    i := 0
        for msg := range pc.Messages() {
                fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
        i++
        if i > 5 {
            return
        }
        }
    }
}
logstash

安装过程参考官网说明,配置

input {
    kafka {
            bootstrap_servers => "localhost:59471"
            topics => ["packetbeat"]
    }
}

output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "packetbeat-7.8.1-2022.05.01-000001"
        document_type => "_doc"
    }
    stdout { codec => rubydebug }
}
ES集群+kibana

也是docker-compose安装文章来源地址https://www.toymoban.com/news/detail-409534.html

services:

  elasticsearch01:
    build:
      context: elasticsearch/
      args:
        ELK_VERSION: ${ELK_VERSION:-7.8.1}
    volumes:
      - type: bind
        source: ./elasticsearch/elasticsearch.yml
        target: /usr/share/elasticsearch/config/elasticsearch.yml
        read_only: true
      - ./data_elasticsearch01:/usr/share/elasticsearch/data
    ports:
      - "9200:9200"
    environment:
      - node.name=elasticsearch01
      - discovery.seed_hosts=elasticsearch02
      - cluster.initial_master_nodes=elasticsearch01,elasticsearch02
      - bootstrap.memory_lock=true
      #- "ES_JAVA_OPTS=-Xms${ES_HEAP_SIZE:-2g} -Xmx${ES_HEAP_SIZE:-2g}"
    ulimits:
      memlock:
        soft: -1
        hard: -1

  elasticsearch02:
    build:
      context: elasticsearch/
      args:
        ELK_VERSION: ${ELK_VERSION:-7.8.1}
    volumes:
      - type: bind
        source: ./elasticsearch/elasticsearch.yml
        target: /usr/share/elasticsearch/config/elasticsearch.yml
        read_only: true
      - ./data_elasticsearch02:/usr/share/elasticsearch/data
    environment:
      - node.name=elasticsearch02
      - discovery.seed_hosts=elasticsearch01
      - cluster.initial_master_nodes=elasticsearch01,elasticsearch02
      - bootstrap.memory_lock=true
      #- "ES_JAVA_OPTS=-Xms${ES_HEAP_SIZE:-2g} -Xmx${ES_HEAP_SIZE:-2g}"
    ulimits:
      memlock:
        soft: -1
        hard: -1

  kibana:
    build:
      context: kibana/
      args:
        ELK_VERSION: ${ELK_VERSION:-7.8.1}
    volumes:
      - type: bind
        source: ./kibana/kibana.yml
        target: /usr/share/kibana/config/kibana.yml
        read_only: true
    ports:
      - "5601:5601"
    environment:
      - elasticsearch.hosts=["http://elasticsearch01:9200"]

到了这里,关于救救家长:疫情封控下packetbeat+kafka+ES套件监控青少年上网行为的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Logstash输入Kafka输出Es配置

    Logstash是一个开源的数据收集引擎,具有实时管道功能。它可以从各种数据源中动态地统一和标准化数据,并将其发送到你选择的目的地。 Logstash 的早期目标主要是用于收集日志,但现在的功能已经远远超出这个范围。任何事件类型都可以通过Logstash进行分析,通过输入、过

    2024年02月03日
    浏览(39)
  • filebeat采集日志数据到kafka(一)(filebeat->kafka->logstash->es)

    一、filebeat安装 filebeat-kafka版本适配 1、安装包下载 https://www.elastic.co/cn/downloads/past-releases#filebeat 解压 2、新建yml配置文件,如test.yml 3、Filebeat启动和停止 启动:./filebeat -c test.yml 停止:kill -9 PID

    2024年02月16日
    浏览(87)
  • 海量kafka数据入es速度优化处理

    主要是涉及到kafka 消费端到es 的数据处理 kafka端 1、批量消费(效果相当明显) 2、kafka 设置topic多分区,增加kafka的消费并行度(效果相当明显) es 端 1、采用批量插入,批量插入效率较单条插入效率高很多(效果相当明显,一次批量插入数据大小限制在5M内) 2、调整es 中索

    2024年02月12日
    浏览(61)
  • logstash同步数据从kafka到es集群

    背景:需求是这样的,原始文件是txt文件(每天300个文件),最终想要的结果是每天将txt中的数据加载到es中,开始的想法是通过logstash加载数据到es中,但是对logstash不太熟悉,不知道怎么讲程序弄成读取一个txt文件到es中以后,就将这个txt原始文件备份并且删除掉,然后就想

    2024年02月15日
    浏览(41)
  • Java+springboot+vue智慧校园源码,数据云平台Web端+小程序教师端+小程序家长端

    Java+springboot+vue +element-ui+小程序+电子班牌:Java Android+演示+自主版权。 智慧校园电子班牌人脸识别系统全套源码, 包含:数据云平台Web端+小程序教师端+小程序家长端+电子班牌 学生端。 电子班牌系统又称之为智慧班牌,是当前校园数字化信息化建设、文化建设的主流,是校

    2024年02月02日
    浏览(41)
  • ElasticStack日志分析平台-ES 集群、Kibana与Kafka

    Elasticsearch 是一个开源的分布式搜索和分析引擎,Logstash 和 Beats 收集的数据可以存储在 Elasticsearch 中进行搜索和分析。 Elasticsearch为所有类型的数据提供近乎实时的搜索和分析:一旦数据被索引,它就可以立即被搜索和分析,这种实时性使得用户能够即时获取最新数据的搜索

    2024年02月04日
    浏览(40)
  • 通过kafka connector实现mysql数据自动同步es

    整体思路: 1、使用 io.debezium.connector.mysql.MySqlConnector 自动同步数据到kafka消息队列 2、通过listener监听消息队列,代码控制数据插入es ps:其实有更简单的方式:在此基础上使用ElasticsearchSinkConnector、ksql,完成数据的转换与自动同步es,全程无需代码控制,后续本地跑通流程后

    2024年02月08日
    浏览(44)
  • EFLK日志平台(filebeat-->kafka-->logstash-->es-->kiabana)

    ELK平台是一套完整的日志集中处理解决方案,将 ElasticSearch、Logstash 和 Kiabana 三个开源工具配合使用, 完成更强大的用户对日志的查询、排序、统计需求。 1. es安装(单节点部署) 前提 安装 2.es web 查看 浏览器访问 http://esIP:9200 或者本地访问curl ‘http://localhost:9200/?pretty’ 安

    2024年02月10日
    浏览(36)
  • 用seatunnel替代logstash,把数据从kafka抽取到ES

    seatunnel(2.1.3)调用spark-sql(2.4)、flink-sql(1.14)对结构化数据进行处理;能够通过配置,在一个任务里调度多个source和sink 在seatunnel源码里升级elasticsearch-spark组件,添加spark-catalyst的依赖后,重新打包 指定spark任务启动参数 定义临时动态表,kafka consumer官方配置参数都添加在consume

    2024年02月13日
    浏览(49)
  • 使用Flink MySQL cdc分别sink到ES、Kafka、Hudi

    [flink-1.13.1-bin-scala_2.11.tgz](https://archive.apache.org/dist/flink/flink-1.13.1/flink-1.13.1-bin-scala_2.11.tgz) [hadoop-2.7.3.tar.gz](https://archive.apache.org/dist/hadoop/common/hadoop-2.7.3/hadoop-2.7.3.tar.gz) [flink-cdc-connectors](https://github.com/ververica/flink-cdc-connectors)(git clone源码编译) [hudi](https://github.com/apache/hudi)(git

    2024年02月03日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包