Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

这篇具有很好参考价值的文章主要介绍了Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、案例说明

现有一电商网站数据文件,名为buyer_favorite1,记录了用户对商品的收藏数据,数据以“\t”键分割,数据内容及数据格式如下:
Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作,kafka,spark,java

二、前置准备工作

项目环境说明
Linux Ubuntu 16.04

jdk-7u75-linux-x64

scala-2.10.4

kafka_2.10-0.8.2.2

spark-1.6.0-bin-hadoop2.6

开启hadoop集群,zookeeper服务,开启kafka服务。再另开启一个窗口,在/apps/kafka/bin目录下创建一个topic。

/apps/zookeeper/bin/zkServer.sh start 
cd /apps/kafka  
bin/kafka-server-start.sh config/server.properties &  
cd /apps/kafka  
bin/kafka-topics.sh \  
--create \  
--zookeeper localhost:2181 \  
--replication-factor 1 \  
--topic kafkasendspark \  
--partitions 1

三、编写程序代码创建kafka的producer

1、新创一个文件folder命名为lib,并将jar包添加进来。(可以从我的博客主页资源里面下载)
2、进入以下界面,移除Scala Library。

Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作,kafka,spark,java

3、操作完成后,再点击Add Library选项

Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作,kafka,spark,java

4、进入以下界面

Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作,kafka,spark,java

5、点击完成即可
6、最后创建如下项目结构的文件

Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作,kafka,spark,java

四、编写代码,运行程序

编写生产者代码

package my.kafka;  
import java.io.BufferedReader;  
import java.io.File;  
import java.io.FileNotFoundException;  
import java.io.FileReader;  
import java.io.IOException;  
import java.util.Properties;  
import kafka.javaapi.producer.Producer;  
import kafka.producer.KeyedMessage;  
import kafka.producer.ProducerConfig;  
public class KafkaSend {  
    private final Producer<String, String> producer;  
  
    public final static String TOPIC = "kafkasendspark";  
  
    public KafkaSend(){  
        Properties props = new Properties();  
        // 此处配置的是kafka的端口  
        props.put("metadata.broker.list", "localhost:9092");  
        // 配置value的序列化类  
        props.put("serializer.class", "kafka.serializer.StringEncoder");  
        // 配置key的序列化类  
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");  
        props.put("request.required.acks", "-1");  
        producer = new Producer<String, String>(new ProducerConfig(props));  
    }  
  
    void produce() {  
        int lineNo = 1;  
        File file = new File("/data/case6/buyer_favorite1");  
        BufferedReader reader = null;  
        try {  
            reader = new BufferedReader(new FileReader(file));  
            String tempString = null;  
  
            while ( (tempString = reader.readLine()) != null ) {  
                String key = String.valueOf(lineNo);  
                String data = tempString;  
                producer.send(new KeyedMessage<String, String>(TOPIC, key, data));  
                System.out.println(data);  
                lineNo++;  
  
                Thread.sleep(100);  
  
            }  
        } catch (FileNotFoundException e) {  
            System.err.println(e.getMessage());  
        } catch (IOException e) {  
            System.err.println(e.getMessage());  
        } catch (InterruptedException e) {  
            System.err.println(e.getMessage());  
        }  
    }  
    public static void main(String[] args) {  
        System.out.println("start");  
        new KafkaSend().produce();  
        System.out.println("finish");  
    }  
}  

编写消费者代码

package my.scala  
import org.apache.spark.SparkConf  
import org.apache.spark.streaming.StreamingContext  
import org.apache.spark.streaming.Seconds  
import scala.collection.immutable.Map  
import org.apache.spark.streaming.kafka.KafkaUtils  
import kafka.serializer.StringDecoder  
import kafka.serializer.StringDecoder  
object SparkReceive {  
  def main(args: Array[String]) {  
  
    val sparkConf = new SparkConf().setAppName("countuser").setMaster("local")  
    val ssc = new StreamingContext(sparkConf, Seconds(2))  
    ssc.checkpoint("checkpoint")  
    val topics = Set("kafkasendspark")  
    val brokers = "localhost:9092"  
    val zkQuorum = "localhost:2181"  
  
    val kafkaParams = Map[String, String](  
        "metadata.broker.list" -> brokers,  
        "serializer.class" -> "kafka.serializer.StringEncoder"  
    )  
  
  
    val lines = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topics)  
    val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {  
      //通过Spark内部的reduceByKey按key规约,然后这里传入某key当前批次的Seq/List,再计算当前批次的总和  
      val currentCount = currValues.sum  
      // 已累加的值  
      val previousCount = prevValueState.getOrElse(0)  
      // 返回累加后的结果,是一个Option[Int]类型  
      Some(currentCount + previousCount)  
    }  
    val result=lines.map(line => (line._2.split("\t")) ).map( row => (row(0),1) ).updateStateByKey[Int](addFunc).print()  
  
    ssc.start();  
    ssc.awaitTermination()  
  }  
}  

五、运行程序

在Eclipse的SparkReceive类中右键并点击==>Run As==>Scala Application选项。

然后在KafkaSend类中:右键点击==>Run As==>Jave Application选项。

即可在控制窗口Console中查看输出结果为:
Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作,kafka,spark,java文章来源地址https://www.toymoban.com/news/detail-538132.html

到了这里,关于Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 推荐系统架构设计实践:Spark Streaming+Kafka构建实时推荐系统架构

    作者:禅与计算机程序设计艺术 推荐系统(Recommendation System)一直都是互联网领域一个非常火热的话题。其主要目标是在用户多样化的信息环境中,通过分析用户的偏好、消费习惯等数据,提供个性化的信息推送、商品推荐、购物指导等服务。如何设计一个推荐系统的架构及

    2024年02月08日
    浏览(53)
  • 大数据——Spark Streaming

    Spark Streaming是一个可扩展、高吞吐、具有容错性的流式计算框架。 之前我们接触的spark-core和spark-sql都是离线批处理任务,每天定时处理数据,对于数据的实时性要求不高,一般都是T+1的。但在企业任务中存在很多的实时性的任务需求,列如双十一的京东阿里都会要求做一个

    2024年02月07日
    浏览(45)
  • Spark Streaming实时数据处理

    作者:禅与计算机程序设计艺术 Apache Spark™Streaming是一个构建在Apache Spark™之上的快速、微批次、容错的流式数据处理系统,它可以对实时数据进行高吞吐量、低延迟地处理。Spark Streaming既可用于流计算场景也可用于离线批处理场景,而且可以将结构化或无结构化数据源(如

    2024年02月06日
    浏览(54)
  • 大数据编程实验四:Spark Streaming

    一、目的与要求 1、通过实验掌握Spark Streaming的基本编程方法; 2、熟悉利用Spark Streaming处理来自不同数据源的数据。 3、熟悉DStream的各种转换操作。 4、熟悉把DStream的数据输出保存到文本文件或MySQL数据库中。 二、实验内容 1.参照教材示例,利用Spark Streaming对三种类型的基

    2024年02月03日
    浏览(53)
  • Spark Streaming实时流式数据处理

    作者:禅与计算机程序设计艺术 Apache Spark Streaming 是 Apache Spark 提供的一个用于高吞吐量、容错的流式数据处理引擎。它可以实时的接收数据并在系统内部以微批次的方式进行处理,并将结果输出到文件、数据库或实时消息系统中。Spark Streaming 支持 Java、Scala 和 Python 编程语言

    2024年02月08日
    浏览(50)
  • Unity Render Streaming实现自定义数据传输

    Uinity Render Streaming中已有的脚本实现了Video和Audio的收发和InputSystem相关事件的收发,那么如何简单的实现自定义的数据收发呢?(本文实现的是Unity端到Unity端的数据收发) 关于Unity Render Streaming包,其自身提供了与信令服务器建立连接的基础件脚本RenderStreaming.cs,还提供了一

    2024年02月07日
    浏览(52)
  • 实时大数据流处理技术:Spark Streaming与Flink的深度对比

    引言 在当前的大数据时代,企业和组织越来越多地依赖于实时数据流处理技术来洞察和响应业务事件。实时数据流处理不仅能够加快数据分析的速度,还能提高决策的效率和准确性。Apache Spark Streaming和Apache Flink是目前两个主要的实时数据流处理框架,它们各自拥有独特的特

    2024年03月10日
    浏览(62)
  • 在Spring Boot中使用Spark Streaming进行实时数据处理和流式计算

    引言: 在当今大数据时代,实时数据处理和流式计算变得越来越重要。Apache Spark作为一个强大的大数据处理框架,提供了Spark Streaming模块,使得实时数据处理变得更加简单和高效。本文将深入浅出地介绍如何在Spring Boot中使用Spark Streaming进行实时数据处理和流式计算,并提供

    2024年03月27日
    浏览(48)
  • 数据平台的实时处理:Streaming和Apache Kafka

    随着数据的增长和数据处理的复杂性,实时数据处理变得越来越重要。实时数据处理是指在数据产生时或者数据产生后的很短时间内对数据进行处理的技术。这种技术在各个领域都有广泛的应用,如实时推荐、实时监控、实时分析、实时语言翻译等。 在实时数据处理中,St

    2024年04月14日
    浏览(42)
  • 大数据流处理与实时分析:Spark Streaming和Flink Stream SQL的对比与选择

    作者:禅与计算机程序设计艺术

    2024年02月07日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包