SparkStreaming学习——读取socket的数据和kafka生产者的消息

这篇具有很好参考价值的文章主要介绍了SparkStreaming学习——读取socket的数据和kafka生产者的消息。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、Spark Streaming概述

二、添加依赖

三、配置log4j

1.依赖下载好后打开IDEA最左侧的外部库

2.找到spark-core

3.找到apache.spark目录

4.找到log4j-defaults.properties文件

5.将该文件放在资源目录下,并修改文件名

6.修改log4j.properties第19行的内容

四、Spark Streaming读取Socket数据流

1.代码编写

2.开启nc -lk

3.启动Scala程序

五、Spark Streaming读取kafka消息

1.代码编写

2.开启生产者sparkkafkastu并生产消息

3. 运行scala代码


一、Spark Streaming概述

        Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的RDD如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。

SparkStreaming学习——读取socket的数据和kafka生产者的消息

         Spark Streaming与Flink的区别:Spark Streaming是基于秒级别,而Flink是基于毫秒级别,是真正的实时流,Spark Streaming属于伪实时。因此,在选择实时流计算框架时,如果对实时速度要求不高的话,选择Spark Streaming基本足够。

        Spark Streaming的编程抽象是离散化流,也就是DStream。它是一个 RDD 序列,每个RDD代表数据流中一个时间片内的数据。

SparkStreaming学习——读取socket的数据和kafka生产者的消息

        应用于 DStream 上的转换操作都会转换为底层RDD上的操作。如对行 DStream中的每个RDD应用flatMap操作以生成单词 DStream 的RDD。 SparkStreaming学习——读取socket的数据和kafka生产者的消息

二、添加依赖

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <spark.version>3.1.2</spark.version>
    <mysql.version>8.0.29</mysql.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <!--  https://mvnrepository.com/artifact/org.apache.spark/spark-core  -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
  </dependencies>

三、配置log4j

1.依赖下载好后打开IDEA最左侧的外部库

SparkStreaming学习——读取socket的数据和kafka生产者的消息

2.找到spark-core

SparkStreaming学习——读取socket的数据和kafka生产者的消息

3.找到apache.spark目录

SparkStreaming学习——读取socket的数据和kafka生产者的消息

4.找到log4j-defaults.properties文件

SparkStreaming学习——读取socket的数据和kafka生产者的消息

5.将该文件放在资源目录下,并修改文件名

SparkStreaming学习——读取socket的数据和kafka生产者的消息

6.修改log4j.properties第19行的内容

log4j.rootCategory=ERROR, console

四、Spark Streaming读取Socket数据流

1.代码编写

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamDemo1 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkstream1")
    // 定义流,采集周期3秒
    val streamingContext = new StreamingContext(conf, Seconds(3))
    // TODO 配置数据源为指定机器和端口
    val socketLineStream: ReceiverInputDStream[String] = streamingContext.socketTextStream("lxm147", 8888)
    // TODO 业务处理
    val wordStream: DStream[String] = socketLineStream.flatMap(_.split("\\s+"))
    val mapStream: DStream[(String, Int)] = wordStream.map((_, 1))
    val wordCountStream: DStream[(String, Int)] = mapStream.reduceByKey(_ + _)

    // TODO 输出结果
    wordCountStream.print()
    // TODO 启动采集器
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

2.开启nc -lk

SparkStreaming学习——读取socket的数据和kafka生产者的消息

3.启动Scala程序

SparkStreaming学习——读取socket的数据和kafka生产者的消息

五、Spark Streaming读取kafka消息

1.代码编写

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamingKafkaSource {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("sparkKafkaStream").setMaster("local[*]")
    val streamingContext = new StreamingContext(conf, Seconds(5))

    val kafkaParams = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "lxm147:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG -> "sparkstreamgroup1")
    )
    
    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      streamingContext,
      LocationStrategies.PreferConsistent,
      // 如果没有topic需要创建
      // kafka-topics.sh --create --zookeeper lxm147:2181 --topic sparkkafkastu --partitions 1 --replication-factor 1
      ConsumerStrategies.Subscribe(Set("sparkkafkastu"), kafkaParams)
    )

    // KeyValue(key,value)
    val wordCountStream: DStream[(String, Int)] = kafkaStream.flatMap(_.value().toString.split("\\s+"))
      .map((_, 1))
      .reduceByKey(_ + _)

    wordCountStream.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

2.开启生产者sparkkafkastu并生产消息

SparkStreaming学习——读取socket的数据和kafka生产者的消息

3. 运行scala代码

 SparkStreaming学习——读取socket的数据和kafka生产者的消息文章来源地址https://www.toymoban.com/news/detail-426475.html

到了这里,关于SparkStreaming学习——读取socket的数据和kafka生产者的消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka - 获取 Topic 生产者发布数据命令

    从头开始获取 20 条数据(等价于时间升序) ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic your-topic --from-beginning --max-messages 20 获取最新 20 条数据(等价于时间降序)去掉 --from-beginning 即可(默认) ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic your-topic --max-me

    2024年02月14日
    浏览(39)
  • 大数据开发之Kafka(概述、快速入门、生产者)

    Kafka是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。 发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。 目前企业中比较常见的消息队列产品主要有Kafka、ActiveM

    2024年01月19日
    浏览(59)
  • Kafka3.0.0版本——生产者 数据去重

    1.1、至少一次 至少一次(At Least Once )的含义 生产者发送数据到kafka集群,kafka集群至少接收到一次数据。 至少一次的条件: ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2 1.2、最多一次 最多一次(At Most Once )的含义 生产者发送数据到kafka集群,

    2024年02月01日
    浏览(41)
  • Kafka3.0.0版本——生产者数据有序与乱序

    单分区内,数据有序。如下图partion0、partion1、partion2分区内,各自分区内的数据有序。 2.1、kafka1.x版本之前保证数据单分区有序的条件 kafka在1.x版本之前保证数据单分区有序,条件如下: 2.2、kafka1.x版本及以后保证数据单分区有序的条件 未开启幂等性 开启幂等性 2.3、kafka1

    2023年04月27日
    浏览(45)
  • java:Kafka生产者推送数据与消费者接收数据(参数配置以及案例)

    bootstrap.servers :Kafka集群中的Broker列表,格式为host1:port1,host2:port2,…。生产者会从这些Broker中选择一个可用的Broker作为消息发送的目标Broker。 acks :Broker对消息的确认模式。可选值为0、1、all。0表示生产者不会等待Broker的任何确认消息;1表示生产者会等待Broker的Leader副本确认

    2024年02月16日
    浏览(47)
  • kafka-保证数据不重复-生产者开启幂等性和事务的作用?

    适用于消息在写入到服务器日志后,由于网络故障,生产者没有及时收到服务端的 ACK 消息,生产者误以为消息没有持久化到服务端,导致生产者重复发送该消息,造成了消息的重复现象,而幂等性就是为了解决该问题。 通过3个值的唯一性去重: PID:生产者ID 分区号 seq:单

    2024年02月14日
    浏览(44)
  • Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解

    kafka尚硅谷视频: 10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili ​      1. producer初始化:加载默认配置,以及配置的参数,开启网络线程      2. 拦截器拦截      3. 序列化器进行消息key, value序列化      4. 进行分区      5. kafka broker集群 获取metaData      6. 消息缓存到

    2024年02月11日
    浏览(48)
  • [kafka消息生产被阻塞] - 如何解决Kafka生产者阻塞的问题

    [kafka消息生产被阻塞] - 如何解决Kafka生产者阻塞的问题 Kafka是一个高度可扩展的分布式流平台,用于构建实时数据管道和流处理应用程序。作为一个广泛使用的消息代理系统,Kafka在数据传输方面表现出色,但是在极端情况下,它可能会出现生产者阻塞的问题。这可能会导致

    2024年02月11日
    浏览(49)
  • 三、Kafka生产者1---Kafka生产者初始化-new KafkaProducer

    概述 本文主要是分享Kafka初始化生产者的 大体过程 初始化过程中会新建很多对象,本文暂先分享部分对象 1.分区器---Partitioner partitioner 2.重试时间---long retryBackoffMs 3.序列化器---SerializerK keySerializer,SerializerV valueSerializer 4.拦截器--- ListProducerInterceptorK, V interceptorList 5.累加器-

    2024年03月14日
    浏览(62)
  • Apache Kafka - 重识Kafka生产者

    Kafka 生产者是 Apache Kafka 中的一个重要组件,它负责将数据发送到 Kafka 集群中。在实时数据处理和流式处理应用程序中,Kafka 生产者扮演着非常重要的角色。 这里我们将介绍 Kafka 生产者的概念、工作原理以及如何使用 Kafka 生产者。 Kafka 生产者是一种用于将数据发送到 Kafk

    2024年02月05日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包