SparkStreaming【实例演示】

这篇具有很好参考价值的文章主要介绍了SparkStreaming【实例演示】。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

1、环境准备

  1. 启动Zookeeper和Kafka集群
  2. 导入依赖:
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.2.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.2.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.2.4</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.30</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.10</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.14.2</version>
        </dependency>

2、模拟生产数据

通过循环来不断生产随机数据、使用Kafka来发布订阅消息。

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import java.util.Properties
import scala.collection.mutable.ListBuffer
import scala.util.Random

// 生产模拟数据
object MockData {

  def main(args: Array[String]): Unit = {

    // 生成模拟数据
    // 格式: timestamp area city userid adid
    // 含义:   时间戳    省份  城市   用户  广告

    // 生产数据 => Kafka => SparkStreaming => 分析处理
    // 设置Zookeeper属性
    val props = new Properties()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")

    val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)

    while (true){

      mockData().foreach(
        (data: String) => {
          // 向 Kafka 中生成数据
          val record = new ProducerRecord[String,String]("testTopic",data)
          producer.send(record)
          println(record)
        }
      )

      Thread.sleep(2000)

    }


  }
  def mockData(): ListBuffer[String] = {
    val list = ListBuffer[String]()
    val areaList = ListBuffer[String]("华东","华南","华北","华南")
    val cityList = ListBuffer[String]("北京","西安","上海","广东")

    for (i <- 1 to 30){
      val area = areaList(new Random().nextInt(4))
      val city = cityList(new Random().nextInt(4))
      val userid = new Random().nextInt(6) + 1
      val adid = new Random().nextInt(6) + 1

      list.append(s"${System.currentTimeMillis()} ${area} ${city} ${userid} ${adid}")
    }
    list
  }

}

3、模拟消费数据

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

// 消费数据
object Kafka_req1 {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[*]").setAppName("kafka req1")
    val ssc = new StreamingContext(conf,Seconds(3))

    // 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化
    val kafkaPara: Map[String,Object] = Map[String,Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG ->"lyh",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    )

    // 读取Kafka数据创建DStream
    val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,  //优先位置
      ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数)
    )

    // 将每条消息的KV取出
    val valueDStream: DStream[String] = kafkaDStream.map(_.value())

    // 计算WordCount
    valueDStream.print()

   // 开启任务
    ssc.start()
    ssc.awaitTermination()

  }

}

4、需求1 广告黑名单

实现实时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉黑。(黑名单保存到 MySQL 中。)

先判断用户是否已经在黑名单中?过滤:判断用户点击是否超过阈值?拉入黑名单:更新用户的点击数量,并获取最新的点击数据再判断是否超过阈值?拉入黑名单:不做处理

需要两张表:黑名单、点击数量表。

create table black_list (userid char(1));
CREATE TABLE user_ad_count (
dt varchar(255),
userid CHAR (1),
adid CHAR (1),
count BIGINT,
PRIMARY KEY (dt, userid, adid)
);

JDBC工具类

import com.alibaba.druid.pool.DruidDataSourceFactory

import java.sql.Connection
import java.util.Properties
import javax.sql.DataSource

object JDBCUtil {
  var dataSource: DataSource = init()
  //初始化连接池
  def init(): DataSource = {
    val properties = new Properties()
    properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")
    properties.setProperty("url", "jdbc:mysql://hadoop102:3306/spark-streaming?useUnicode=true&characterEncoding=UTF-8&useSSL=false")
    properties.setProperty("username", "root")
    properties.setProperty("password", "000000")
    properties.setProperty("maxActive", "50")

    DruidDataSourceFactory.createDataSource(properties)
  }
  //获取连接对象
  def getConnection(): Connection ={
    dataSource.getConnection
  }
}

需求实现:
 

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.sql.Connection
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable.ListBuffer

// 消费数据
object Kafka_req1 {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[*]").setAppName("kafka req1")
    val ssc = new StreamingContext(conf,Seconds(3))

    // 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化
    val kafkaPara: Map[String,Object] = Map[String,Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG ->"lyh",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    )

    // 读取Kafka数据创建DStream
    val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,  //优先位置
      ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数)
    )

    val clickData: DStream[AdClickData] = kafkaDStream.map(
      kafkaData => {
        val data = kafkaData.value()
        val datas = data.split(" ")
        AdClickData(datas(0), datas(1), datas(2), datas(3),datas(4))
      }
    )


    val ds: DStream[((String,String,String),Int)] = clickData.transform( //周期性地拿到 RDD 数据
      rdd => {
        // todo 周期性获取黑名单数据,就要周期性读取MySQL中的数据
        val black_list = ListBuffer[String]()
        val con: Connection = JDBCUtil.getConnection()
        val stmt = con.prepareStatement("select * from black_list")

        val rs = stmt.executeQuery()
        while (rs.next()) {
          black_list.append(rs.getString(1))
        }

        rs.close()
        stmt.close()
        con.close()

        // todo 判断用户是否在黑名单当中,在就过滤掉
        val filterRDD = rdd.filter(
          data => {
            !black_list.contains(data.user)
          }
        )

        // todo 如果不在,那么统计点击数量
        filterRDD.map(
          data => {

            val sdf = new SimpleDateFormat("yyyy-MM-dd")
            val day = sdf.format(new Date(data.ts.toLong))
            val user = data.user
            val ad = data.ad

            ((day, user, ad), 1) // 返回键值对
          }
        ).reduceByKey(_ + _)

      }
    )

    ds.foreachRDD(
      rdd => {
        rdd.foreach {
          case ((day, user, ad), count) => {
            println(s"$day $user $ad $count")
            if (count>=30){
              // todo 如果统计数量超过点击阈值(30),拉入黑名单
              val con = JDBCUtil.getConnection()
              val stmt = con.prepareStatement(
                """
                  |insert into black_list values(?)
                  |on duplicate key
                  |update userid=?
                  |""".stripMargin
              )
              stmt.setString(1,user)
              stmt.setString(2,user)
              stmt.executeUpdate()
              stmt.close()
              con.close()
            }else{
              // todo 如果没有超过阈值,更新到当天点击数量
              val con = JDBCUtil.getConnection()
              val stmt = con.prepareStatement(
                """
                  |select *
                  |from user_ad_count
                  |where dt=? and userid=? and adid=?
                  |""".stripMargin)
              stmt.setString(1,day)
              stmt.setString(2,user)
              stmt.setString(3,ad)
              val rs = stmt.executeQuery()

              if (rs.next()){ //如果存在数据
                val stmt1 = con.prepareStatement(
                  """
                    |update user_ad_count
                    |set count=count+?
                    |where dt=? and userid=? and adid=?
                    |""".stripMargin)
                stmt1.setInt(1,count)
                stmt1.setString(2,day)
                stmt1.setString(3,user)
                stmt1.setString(4,ad)
                stmt1.executeUpdate()
                stmt1.close()
                // todo 如果更新后的点击数量超过阈值,拉入黑名单
                val stmt2 = con.prepareStatement(
                  """
                    |select *
                    |from user_ad_count
                    |where dt=? and userid=? and adid=?
                    |""".stripMargin)
                stmt2.setString(1,day)
                stmt2.setString(2,user)
                stmt2.setString(3,ad)
                val rs1 = stmt2.executeQuery()
                if (rs1.next()){
                  val stmt3 = con.prepareStatement(
                    """
                      |insert into black_list(userid) values(?)
                      |on duplicate key
                      |update userid=?
                      |""".stripMargin)
                  stmt3.setString(1,user)
                  stmt3.setString(2,user)
                  stmt3.executeUpdate()
                  stmt3.close()
                }
                rs1.close()
                stmt2.close()
              }else{
                // todo 如果不存在数据,那么新增
                val stmt1 = con.prepareStatement(
                  """
                    |insert into user_ad_count(dt,userid,adid,count) values(?,?,?,?)
                    |""".stripMargin)
                stmt1.setString(1,day)
                stmt1.setString(2,user)
                stmt1.setString(3,ad)
                stmt1.setInt(4,count)
                stmt1.executeUpdate()
                stmt1.close()
              }
              rs.close()
              stmt.close()
              con.close()
            }
          }
        }
      }
    )


    // 开启任务
    ssc.start()
    ssc.awaitTermination()

  }

  // 广告点击数据
  case class AdClickData(ts: String,area: String,city: String,user: String,ad: String)

}

5、需求2 广告实时点击数据

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

import java.text.SimpleDateFormat
import java.util.Date

object Kafka_req2 {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("kafka req2")
    val ssc = new StreamingContext(conf,Seconds(3))

    // 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化
    val kafkaPara: Map[String,Object] = Map[String,Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG ->"lyh",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    )

    // 读取Kafka数据创建DStream
    val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,  //优先位置
      ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数)
    )

    // 对DStream进行转换操作
    val clickData: DStream[AdClickData] = kafkaDStream.map(
      kafkaData => {
        val data = kafkaData.value()
        val datas = data.split(" ")
        AdClickData(datas(0), datas(1), datas(2), datas(3),datas(4))
      }
    )

    val ds: DStream[((String, String, String, String), Int)] = clickData.map(
      (data: AdClickData) => {
        val sdf = new SimpleDateFormat("yyyy-MM-dd")
        val day = sdf.format(new Date(data.ts.toLong))
        val area = data.area
        val city = data.city
        val ad = data.ad
        ((day, area, city, ad), 1)
      }
    ).reduceByKey(_+_)

    ds.foreachRDD(
      rdd=>{
        rdd.foreachPartition(
          iter => {
            val con = JDBCUtil.getConnection()
            val stmt = con.prepareStatement(
              """
                |insert into area_city_ad_count (dt,area,city,adid,count)
                |values (?,?,?,?,?)
                |on duplicate key
                |update count=count+?
                |""".stripMargin)
            iter.foreach {
              case ((day, area, city, ad), sum) => {
                println(s"$day $area $city $ad $sum")
                stmt.setString(1,day)
                stmt.setString(2,area)
                stmt.setString(3,city)
                stmt.setString(4,ad)
                stmt.setInt(5,sum)
                stmt.setInt(6,sum)
                stmt.executeUpdate()
              }
            }
            stmt.close()
            con.close()
          }
        )
      }
    )

    ssc.start()
    ssc.awaitTermination()

  }

  // 广告点击数据
  case class AdClickData(ts: String,area: String,city: String,user: String,ad: String)


}

需求3、一段时间内的广告点击数据

注意:窗口范围和滑动范围必须是收集器收集数据间隔的整数倍!!

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}


object Kafka_req3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("kafka req3")
    val ssc = new StreamingContext(conf,Seconds(5)) //每5s收集器收集一次数据形成一个RDD加入到DStream中

    // 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化
    val kafkaPara: Map[String,Object] = Map[String,Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG ->"lyh",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    )

    // 读取Kafka数据创建DStream
    val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,  //优先位置
      ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数)
    )

    val adClickData = kafkaDStream.map(
      (kafkaData: ConsumerRecord[String, String]) => {
        val data = kafkaData.value()
        val datas = data.split(" ")
        AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))
      }
    )

    val ds = adClickData.map(
      data => {
        val ts = data.ts.toLong
        /**
         * 为了结果展示的时候更加美观: ts=1698477282712ms
         * 我们希望统计的数据的是近一分钟的数据(每10s展示一次):
         * 15:10:00 ~ 15:11:00
         * 15:10:10 ~ 15:11:10
         * 15:10:20 ~ 15:11:20
         * ...
         * ts/1000 => 1698477282s (我们把秒换成0好看点) ts/10*10=1698477280s => 转成ms ts*1000 = 1698477282000ms
         * 所以就是 ts / 10000 * 10000
         */
        val newTs = ts / 10000 * 10000
        (newTs, 1)
      }
    ).reduceByKeyAndWindow((_: Int)+(_:Int), Seconds(60), Seconds(10))  //windowDurations和slideDuration都必须是收集器收集频率的整数倍

    ds.print()

    ssc.start()
    ssc.awaitTermination()

  }
  // 广告点击数据
  case class AdClickData(ts: String,area: String,city: String,user: String,ad: String)

}

运行结果:格式(毫秒,点击次数)

SparkStreaming【实例演示】,Spark,spark,大数据

产生的数据

SparkStreaming【实例演示】,Spark,spark,大数据文章来源地址https://www.toymoban.com/news/detail-715695.html

到了这里,关于SparkStreaming【实例演示】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 2023_Spark_实验五:Scala面向对象部分演示(一)(IDEA开发)

    1、面向对象的基本概念 把数据及对数据的操作方法放在一起,作为一个相互依存的整体——对象,面向 对象的三大特征:  封装  继承  多态 2、类的定义 简单类和无参方法 如果要开发main方法,需要将main方法定义在该类的伴生对象中,即:object对 象中,(后续做详细的讨

    2024年02月10日
    浏览(34)
  • 2023_Spark_实验六:Scala面向对象部分演示(二)(IDEA开发)

    7、Scala中的apply方法() 遇到如下形式的表达式时,apply方法就会被调用: Object(参数1,参数2,......,参数N) 通常,这样一个apply方法返回的是伴生类的对象;其作用是为了省略new Object的apply方法举例: 8、Scala中的继承 Scala和Java一样,使用extends扩展类。 案例一:

    2024年02月10日
    浏览(41)
  • Spark Streaming简介与代码实例

    Spark Streaming是准实时流处理框架,处理响应时间一般以分钟为单位,处理实时数据的延迟时间一般是秒级别的;其他容易混淆的例如Storm实时流处理框架,处理响应是毫秒级。 在我们项目实施选择流框架时需要看具体业务场景:使用MapReduce和Spark进行大数据处理,能够解决很

    2024年02月03日
    浏览(41)
  • 使用Spark ALS模型 + Faiss向量检索实现用户扩量实例

    1、通过ALS模型实现用户/商品Embedding的效果,获得其向量表示 准备训练数据, M = (U , I, R) 即 用户集U、商品集I、及评分数据R。 (1)商品集I的选择:可以根据业务目标确定商品候选集,比如TopK热度召回、或者流行度不高但在业务用户中区分度比较高的商品集等。个人建议量

    2024年02月13日
    浏览(27)
  • [Spark、hadoop]DStream的窗口操作、输出操作、实例——实现网站热词排序

    目录   DStream窗口操作 DStream输出操作 DStream实例——实现网站热词排序 DStream的概述 Dstream(Discretized Stream)是Spark Streaming数据的基本传输单位。它表示一个连续的数据流,这个数据流可以是原始的数据输入流,也可以是将原始的数据输入流通过转换生成已处理的数据输入流 特点

    2024年02月02日
    浏览(28)
  • 使用Pycharm运行spark实例时没有pyspark包(ModuleNotFoundError: No module named ‘py4j‘)

    在安装并配置pyspark,下载并打开Pycharm(专业版)后进行spark实例操作(笔者以统计文件中的行数为例)时,运行程序后提示ModuleNotFoundError: No module named \\\'py4j\\\': 1.下载py4j包后下载pyspark包 打开新终端,在终端中输入(若在pycharm中进行下载可能导致下载失败,这里指定使用清华

    2024年04月26日
    浏览(27)
  • 大数据技术之Spark(一)——Spark概述

    大数据技术之Spark(一)——Spark概述 Apache Spark是一个开源的、强大的分布式 查询和处理引擎 ,它提供MapReduce的灵活性和可扩展性,但速度明显要快上很多;拿数据存储在内存中的时候来说,它比Apache Hadoop 快100倍,访问磁盘时也要快上10倍。 Spark 是一种由 Scala 语言开发的快

    2024年02月14日
    浏览(26)
  • 大数据技术之Spark——Spark SQL

            Spark SQL是Spark用于结构化数据处理的Spark模块。         我们之前学习过hive,hive是一个基于hadoop的SQL引擎工具,目的是为了简化mapreduce的开发。由于mapreduce开发效率不高,且学习较为困难,为了提高mapreduce的开发效率,出现了hive,用SQL的方式来简化mapreduce:hiv

    2024年02月12日
    浏览(36)
  • 【大数据开发 Spark】第一篇:Spark 简介、Spark 的核心组成(5大模块)、Spark 的主要特征(4大特征)、Spark 对比 MapReduce

    初步了解一项技术,最好的方式就是去它的官网首页,一般首页都会有十分官方且准确的介绍,学习 Spark 也不例外, 官方介绍:Apache Spark ™是一种多语言引擎,用于在单节点机器或集群上执行数据工程、数据科学和机器学习。 我们可以得知,Spark 可以单节点运行,也可以搭

    2024年02月05日
    浏览(36)
  • 【spark大数据】spark大数据处理技术入门项目--购物信息分析

    购物信息分析基于spark 目录 本案例中三个文案例中需要处理的文件为 order_goods.txt、products.txt 以及 orders.txt 三个文件,三个文件的说明如下 一、本实训项目针对实验数据主要完成了哪些处理? 二、Hadoop+Spark集群环境的搭建步骤有哪些?(只介绍完全分布式集群环境的搭建)

    2023年04月08日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包