需要源码请点赞关注收藏后评论区留言私信~~~
Flume、Kafka区别和侧重点
1)Kafka 是一个非常通用的系统,你可以有许多生产者和消费者共享多个主题Topics。相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase等发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。如果数据被多个系统消费的话,使用kafka;如果数据有多个生产者场景,或者有写入Hbase、HDFS操作,使用Flume。
2)Flume可以使用拦截器实时处理数据。而Kafka需要外部的流处理系统才能做到。
3)Kafka和Flume都是可靠的系统,通过适当的配置能保证零数据丢失。然而,Flume不支持副本事件。如果Flume代理的一个节点奔溃了,即使使用了可靠的文件管道方式,你也将丢失这些事件直到你恢复这些磁盘。如果你需要一个高可靠行的管道,那么使用Kafka是个更好的选择。
Spark Streaming与Flume、Kafka整合与开发
此开发示例的功能是商品实时交易数据统计分析,通过Flume实时收集交易订单,将数据分发Kafka,Kafka将数据传输到Spark Streaming,Spark Streaming统计商品的销售量。实现主要有以下几个步骤:
1)通过LOG日志模拟产生实时交易数据
2)Flume收集模拟产生实时交易数据
3)Flume将数据发送给Kafka消息队列
4)Spark Streaming接收Kafka消息队列的消息,每5秒进行数据统计
具体实现如下:
1)新建MAVEN项目,名称为RealtimeAnalysis,新建过程请见第9章。在POM.XML文件中加入依赖包
2)在工程的resource目录下新建log4j.properties文件,其中注意的是log4j.appender.flume.Hostname的配置,要配置成你安装flume的服务器
3)在工程的test目录下新建java类LoggerGenerator,此类用于不断模拟产生订单交易数据,在此类中每6秒调用一次PaymentInfo交易的实体类的random方法是模拟产生订单交易数据方法,数据以JSON格式返回。其中PaymentInfo是交易的实体类,用三个成员变量,分别是订单编号、商品编号、商品价格,LoggerGenerator为模拟日志生成类
4)在安装Flume服务器的conf目录下新建文件log4j_flume.properties,其中注意的是sinks.kafka_sink.brokerList配置的是连接Kafka集群的地址和端口号
5)启动flume,命令如下:
./kafka-server-start.sh /hadoop/kafka_2.11-2.4.1/config/server.properties &
6)新建topic,名称为 logtoflume,命令如下:
kafka-topics.sh
--zookeeper 172.16.106.69:2181,172.16.106.70:2181,172.16.106.71:2181
--topic logtoflume --replication-factor 1 --partitions 1 --create
7)新建scala类KafkaConsumerMsg,接收kafka下的topic队列,名称为logtoflume的数据,并做统计
8)启动LoggerGenerator不断模拟产生订单交易数据,运行效果如下:
9)启动KafkaConsumerMsg接收kafka下的topic队列的数据,并做统计,运行效果如下:
部分代码如下文章来源:https://www.toymoban.com/news/detail-435299.html
import com.alibaba.fastjson.JSONObject;
import java.util.Random;
import java.util.UUID;
public class PaymentInfo {
private static final long serialVersionUID = 1L;
private String orderId;//订单编号
private String productId;//商品编号
private long productPrice;//商品价格
public PaymentInfo() {
}
public static long getSerialVersionUID() {
return serialVersionUID;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getProductId() {
return productId;
}
public void setProductId(String productId) {
this.productId = productId;
}
public long getProductPrice() {
return productPrice;
}
public void setProductPrice(long productPrice) {
this.productPrice = productPrice;
}
@Override
public String toString() {
return "PaymentInfo{" +
"orderId='" + orderId + '\'' +
", productId='" + productId + '\'' +
", productPrice=" + productPrice +
'}';
}
//模拟订单数据
public String random() {
Random r = new Random();
this.orderId = UUID.randomUUID().toString().replaceAll( "-", "" );
this.productPrice = r.nextInt( 1000 );
this.productId = r.nextInt( 10 ) + "";
JSONObject obj = new JSONObject();
String jsonString = obj.toJSONString( this );
return jsonString;
}
}
创作不易 觉得有帮助请点赞关注收藏~~~文章来源地址https://www.toymoban.com/news/detail-435299.html
到了这里,关于【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!