【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)

这篇具有很好参考价值的文章主要介绍了【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

需要源码请点赞关注收藏后评论区留言私信~~~

Flume、Kafka区别和侧重点

1)Kafka 是一个非常通用的系统,你可以有许多生产者和消费者共享多个主题Topics。相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase等发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。如果数据被多个系统消费的话,使用kafka;如果数据有多个生产者场景,或者有写入Hbase、HDFS操作,使用Flume。

2)Flume可以使用拦截器实时处理数据。而Kafka需要外部的流处理系统才能做到。

3)Kafka和Flume都是可靠的系统,通过适当的配置能保证零数据丢失。然而,Flume不支持副本事件。如果Flume代理的一个节点奔溃了,即使使用了可靠的文件管道方式,你也将丢失这些事件直到你恢复这些磁盘。如果你需要一个高可靠行的管道,那么使用Kafka是个更好的选择。

Spark Streaming与Flume、Kafka整合与开发

此开发示例的功能是商品实时交易数据统计分析,通过Flume实时收集交易订单,将数据分发Kafka,Kafka将数据传输到Spark Streaming,Spark Streaming统计商品的销售量。实现主要有以下几个步骤:

1)通过LOG日志模拟产生实时交易数据

2)Flume收集模拟产生实时交易数据

3)Flume将数据发送给Kafka消息队列

4)Spark Streaming接收Kafka消息队列的消息,每5秒进行数据统计

具体实现如下:

1)新建MAVEN项目,名称为RealtimeAnalysis,新建过程请见第9章。在POM.XML文件中加入依赖包

2)在工程的resource目录下新建log4j.properties文件,其中注意的是log4j.appender.flume.Hostname的配置,要配置成你安装flume的服务器

3)在工程的test目录下新建java类LoggerGenerator,此类用于不断模拟产生订单交易数据,在此类中每6秒调用一次PaymentInfo交易的实体类的random方法是模拟产生订单交易数据方法,数据以JSON格式返回。其中PaymentInfo是交易的实体类,用三个成员变量,分别是订单编号、商品编号、商品价格,LoggerGenerator为模拟日志生成类 

4)在安装Flume服务器的conf目录下新建文件log4j_flume.properties,其中注意的是sinks.kafka_sink.brokerList配置的是连接Kafka集群的地址和端口号

5)启动flume,命令如下:

./kafka-server-start.sh /hadoop/kafka_2.11-2.4.1/config/server.properties &

6)新建topic,名称为 logtoflume,命令如下:

kafka-topics.sh
  --zookeeper 172.16.106.69:2181,172.16.106.70:2181,172.16.106.71:2181
 --topic logtoflume --replication-factor 1 --partitions 1  --create

7)新建scala类KafkaConsumerMsg,接收kafka下的topic队列,名称为logtoflume的数据,并做统计

8)启动LoggerGenerator不断模拟产生订单交易数据,运行效果如下:

【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)

 9)启动KafkaConsumerMsg接收kafka下的topic队列的数据,并做统计,运行效果如下:

【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)

部分代码如下

import com.alibaba.fastjson.JSONObject;

import java.util.Random;
import java.util.UUID;
public class PaymentInfo {
    private static final long serialVersionUID = 1L;
    private String orderId;//订单编号
    private String productId;//商品编号
    private long productPrice;//商品价格
    public PaymentInfo() {
    }
    public static long getSerialVersionUID() {
        return serialVersionUID;
    }
    public String getOrderId() {
        return orderId;
    }
    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }
    public String getProductId() {
        return productId;
    }
    public void setProductId(String productId) {
        this.productId = productId;
    }
    public long getProductPrice() {
        return productPrice;
    }
    public void setProductPrice(long productPrice) {
        this.productPrice = productPrice;
    }
    @Override
    public String toString() {
        return "PaymentInfo{" +
                "orderId='" + orderId + '\'' +
                ", productId='" + productId + '\'' +
                ", productPrice=" + productPrice +
                '}';
    }
    //模拟订单数据
    public String random() {
        Random r = new Random();
        this.orderId = UUID.randomUUID().toString().replaceAll( "-", "" );
        this.productPrice = r.nextInt( 1000 );
        this.productId = r.nextInt( 10 ) + "";
        JSONObject obj = new JSONObject();
        String jsonString = obj.toJSONString( this );
        return jsonString;
    }
}

创作不易 觉得有帮助请点赞关注收藏~~~文章来源地址https://www.toymoban.com/news/detail-435299.html

到了这里,关于【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于Kafka和Spark实现实时计算系统

    Apache Kafka是一个分布式的流处理平台。它最初是由LinkedIn开发并开源的,现在已经成为Apache软件基金会旗下的顶级项目之一。Kafka主要用于实时流数据的高吞吐量传输、存储和处理,例如日志收集、流式的ETL以及实时的Web日志等。 Apache Spark是一个用于大规模数据处理的通用引

    2024年02月10日
    浏览(41)
  • Spark Streaming + Kafka构建实时数据流

    1. 使用Apache Kafka构建实时数据流 参考文档链接:https://cloud.tencent.com/developer/article/1814030 2. 数据见UserBehavior.csv 数据解释:本次实战用到的数据集是CSV文件,里面是一百零四万条淘宝用户行为数据,该数据来源是阿里云天池公开数据集 根据这一csv文档运用Kafka模拟实时数据流,

    2024年02月12日
    浏览(43)
  • 基于Flume+spark+Flask的分布式实时日志分析与入侵检测系统

    完整项目地址:https://download.csdn.net/download/lijunhcn/88463174 简介 LogVision是一个整合了web日志聚合、分发、实时分析、入侵检测、数据存储与可视化的日志分析解决方案。聚合采用Apache Flume,分发采用Apache Kafka,实时处理采用Spark Streaming,入侵检测采用Spark MLlib,数据存储使用H

    2024年01月16日
    浏览(44)
  • Flume实现Kafka数据持久化存储到HDFS

    写在前面:博主是一只经过实战开发历练后投身培训事业的“小山猪”,昵称取自动画片《狮子王》中的“彭彭”,总是以乐观、积极的心态对待周边的事物。本人的技术路线从Java全栈工程师一路奔向大数据开发、数据挖掘领域,如今终有小成,愿将昔日所获与大家交流一二

    2024年02月06日
    浏览(41)
  • 基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化

    目标 : 实现FineBI访问MySQL结果数据集的配置 实施 安装FineBI 参考《FineBI Windows版本安装手册.docx》安装FineBI 配置连接 数据准备 小结 实现FineBI访问MySQL结果数据集的配置 目标 : 实现FineBI实时报表构建 路径 step1:实时报表构建 step2:实时报表配置 step3:实时刷新测试 实施 实

    2024年02月04日
    浏览(39)
  • 实时大数据流处理技术:Spark Streaming与Flink的深度对比

    引言 在当前的大数据时代,企业和组织越来越多地依赖于实时数据流处理技术来洞察和响应业务事件。实时数据流处理不仅能够加快数据分析的速度,还能提高决策的效率和准确性。Apache Spark Streaming和Apache Flink是目前两个主要的实时数据流处理框架,它们各自拥有独特的特

    2024年03月10日
    浏览(59)
  • 拼多多根据ID取商品详情原数据 API 实现实时数据获取的完整指南

    在电商行业中,商品详情页是用户了解商品信息、进行购买决策的重要页面。为了提高用户体验和促进销售,电商平台通常会提供商品详情的API接口,以便第三方应用能够实时获取商品数据。本文将介绍如何使用拼多多获得的根据ID取商品详情原数据的API实现实时数据获取,

    2024年01月21日
    浏览(67)
  • Spark+Kafka构建实时分析Dashboard

    Spark+Kafka构建实时分析Dashboard【林子雨】 官方实验步骤:https://dblab.xmu.edu.cn/post/spark-kafka-dashboard/ 前几天刚做完这个实验,学了不少知识,也遇到了不少问题,在这里记录一下自己的实验过程,与小伙伴们一起探讨下。 案例概述(详情见官网) (1)安装Spark 详细步骤见官网

    2024年02月13日
    浏览(44)
  • Sqoop与Flume的集成:实时数据采集

    将Sqoop与Flume集成是实现实时数据采集和传输的重要步骤之一。Sqoop用于将数据从关系型数据库导入到Hadoop生态系统中,而Flume用于数据流的实时采集、传输和处理。本文将深入探讨如何使用Sqoop与Flume集成,提供详细的步骤、示例代码和最佳实践,以确保能够成功实现实时数据

    2024年01月23日
    浏览(49)
  • 使用Flume-KafkaSource实时采集Avro格式数据

    Flume是一个可靠、可扩展且具有高可用性的分布式系统,用于在大规模数据集群中进行高效的日志聚合、收集和传输。Kafka是一个分布式流处理平台,用于处理高容量的实时数据流。本文将介绍如何使用Flume的KafkaSource来实时采集Avro格式的数据,并提供相应的源代码。 首先,确

    2024年02月07日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包