Flink - souce算子

这篇具有很好参考价值的文章主要介绍了Flink - souce算子。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

水善利万物而不争,处众人之所恶,故几于道💦

目录

  1. 从Java的集合中读取数据
  2. 从本地文件中读取数据
  3. 从HDFS中读取数据
  4. 从Socket中读取数据
  5. 从Kafka中读取数据
  6. 自定义Source

官方文档 - Flink1.13

Flink - souce算子,Flink,flink,大数据,source,从Kafka中读取数据,自定义source,从HDFS中读取数据


1. 从Java的集合中读取数据

fromCollection(waterSensors)

public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    List<WaterSensor> waterSensors = Arrays.asList(
            new WaterSensor("ws_001", 1577844001L, 45),
            new WaterSensor("ws_002", 1577844015L, 43),
            new WaterSensor("ws_003", 1577844020L, 42));
    
    env
            .fromCollection(waterSensors)
            .print();

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

运行结果:
Flink - souce算子,Flink,flink,大数据,source,从Kafka中读取数据,自定义source,从HDFS中读取数据

2. 从本地文件中读取数据

readTextFile(“input/words.txt”),支持相对路径和绝对路径

public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    env.readTextFile("input/words.txt").print();

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }

}

运行结果:
Flink - souce算子,Flink,flink,大数据,source,从Kafka中读取数据,自定义source,从HDFS中读取数据

3. 从HDFS中读取数据

readTextFile(“hdfs://hadoop101:8020/flink/data/words.txt”)

要先在pom文件中添加hadoop-client依赖:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.1.3</version>
</dependency>
public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    env.readTextFile("hdfs://hadoop101:8020/flink/data/words.txt").print();
    
    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

运行结果:
Flink - souce算子,Flink,flink,大数据,source,从Kafka中读取数据,自定义source,从HDFS中读取数据

4. 从Socket中读取数据

socketTextStream(“hadoop101”,9999),这个输入源不支持多个并行度。

public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    //从端口中读数据,  windows中 nc -lp 9999     Linux nc -lk 9999
    env.socketTextStream("hadoop101",9999).print();

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

运行结果:
Flink - souce算子,Flink,flink,大数据,source,从Kafka中读取数据,自定义source,从HDFS中读取数据

5. 从Kafka中读取数据

addSource(new FlinkKafkaConsumer<>(“flink_source_kafka”,new SimpleStringSchema(),properties))

第一个参数是topic,

第二个参数是序列化器,序列化器就是在Kafka和flink之间转换数据 - 官方注释:The de-/serializer used to convert between Kafka’s byte messages and Flink’s objects.(反-序列化程序用于在Kafka的字节消息和Flink的对象之间进行转换。)

第三个参数是Kafka的配置。

public static void main(String[] args) {
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",1000);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);

    Properties properties = new Properties();
    // 设置集群地址
    properties.setProperty("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092");
    // 设置所属消费者组
    properties.setProperty("group.id", "flink_consumer_group");
    env.addSource(new FlinkKafkaConsumer<>("flink_source_kafka",new SimpleStringSchema(),properties)).print();

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

运行结果:
Flink - souce算子,Flink,flink,大数据,source,从Kafka中读取数据,自定义source,从HDFS中读取数据

6. 自定义Source

addSource(new XXXX())

  大多数情况下,前面的数据源已经能够满足需要,但是难免会存在特殊情况的场合,所以flink也提供了能自定义数据源的方式.

public class Flink06_myDefDataSource {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);

        env.addSource(new RandomWatersensor()).print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

  自定义数据源需要定义一个类,然后实现SourceFunction接口,然后实现其中的两个方法,runcancel,run方法包含具体读数据的逻辑,当调用cancel方法的时候应该可以让run方法中的读数据逻辑停止

public class RandomWatersensor implements SourceFunction<WaterSensor> {
    private Boolean running = true;

    @Override
    public void run(SourceContext<WaterSensor> sourceContext) throws Exception {
        Random random = new Random();
        while (running){
            sourceContext.collect(new WaterSensor(
                    "sensor" + random.nextInt(50),
                    Calendar.getInstance().getTimeInMillis(),
                    random.nextInt(100)
            ));
            Thread.sleep(1000);
        }
    }

    /**
     * 大多数的source在run方法内部都会有一个while循环,
     * 当调用这个方法的时候, 应该可以让run方法中的while循环结束
     */
    @Override
    public void cancel() {
        running = false;
    }

}

运行结果:
Flink - souce算子,Flink,flink,大数据,source,从Kafka中读取数据,自定义source,从HDFS中读取数据文章来源地址https://www.toymoban.com/news/detail-622616.html


demo2 - 自定义从socket中读取数据
public class Flink04_Source_Custom {
    public static void main(String[] args) throws Exception {


        // 1. 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env
          .addSource(new MySource("hadoop102", 9999))
          .print();

        env.execute();
    }

    public static class MySource implements SourceFunction<WaterSensor> {
        private String host;
        private int port;
        private volatile boolean isRunning = true;
        private Socket socket;

        public MySource(String host, int port) {
            this.host = host;
            this.port = port;
        }


        @Override
        public void run(SourceContext<WaterSensor> ctx) throws Exception {
            // 实现一个从socket读取数据的source
            socket = new Socket(host, port);
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
            String line = null;
            while (isRunning && (line = reader.readLine()) != null) {
                String[] split = line.split(",");
                ctx.collect(new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])));
            }
        }

        /**
         * 大多数的source在run方法内部都会有一个while循环,
         * 当调用这个方法的时候, 应该可以让run方法中的while循环结束
         */

        @Override
        public void cancel() {
            isRunning = false;
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
/*
sensor_1,1607527992000,20
sensor_1,1607527993000,40
sensor_1,1607527994000,50
 */

到了这里,关于Flink - souce算子的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Flink-1.17-教程】-【四】Flink DataStream API(1)源算子(Source)

    DataStream API 是 Flink 的核心层 API。一个 Flink 程序,其实就是对 DataStream 的各种转换。具体来说,代码基本上都由以下几部分构成: Flink 程序可以在各种上下文环境中运行:我们可以在本地 JVM 中执行程序,也可以提交到远程集群上运行。 不同的环境,代码的提交运行的过程会

    2024年01月22日
    浏览(59)
  • Flink读取kafka数据报错

    报错如下: 解决办法: 修改/usr/local/wyh/kafka/kafka_2.12-2.8.1/config下面的server.properties,默认该配置是被注释掉的额,所以需要放开注释并且配置Host:

    2024年02月13日
    浏览(49)
  • flink 从kafka读取数据报错

    报错: Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandl

    2024年02月02日
    浏览(45)
  • Flink Kafka-Source

    Apache Kafka 连接器 Flink 提供了 Apache Kafka 连接器使用精确一次(Exactly-once)的语义在 Kafka topic 中读取和写入数据。 依赖 1. 使用方法 Kafka Source 提供了构建类来创建 KafkaSource 的实例。以下代码片段展示了如何构建 KafkaSource 来消费 “input-topic” 最早位点的数据, 使用消费组 “

    2024年02月14日
    浏览(40)
  • 4.3、Flink任务怎样读取Kafka中的数据

    目录 1、添加pom依赖 2、API使用说明 3、这是一个完整的入门案例 4、Kafka消息应该如何解析 4.1、只获取Kafka消息的value部分 ​4.2、获取完整Kafka消息(key、value、Metadata) 4.3、自定义Kafka消息解析器 5、起始消费位点应该如何设置 ​5.1、earliest() 5.2、latest() 5.3、timestamp() 6、Kafka分区

    2024年02月13日
    浏览(42)
  • 自定义Flink SourceFunction定时读取数据库

    Source 是Flink获取数据输入的地方,可以用StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source,也可以通过实现继承 RichSourceFunction 类编写自定义的

    2024年02月02日
    浏览(36)
  • 【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)- kafka

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月03日
    浏览(55)
  • 十八、Flink自定义多并行Source

    1、概述 1)作用 自定义多并行的Source,即Source的并行度可以是1到多个。 2)实现 1.继承RichParallelSourceFunction,重写run()方法。 2、代码实现

    2024年02月08日
    浏览(36)
  • Flink---5、聚合算子、用户自定义函数、物理分区算子、分流、合流

                           星光下的赶路人star的个人主页                        欲买桂花同载酒,终不似,少年游 计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并—这就是

    2024年02月07日
    浏览(46)
  • 【Flink-Kafka-To-RocketMQ】使用 Flink 自定义 Sink 消费 Kafka 数据写入 RocketMQ

    这里的 maven 依赖比较冗余,推荐大家都加上,后面陆续优化。 注意: 1、此程序中所有的相关配置都是通过 Mysql 读取的(生产环境中没有直接写死的,都是通过配置文件动态配置),大家实际测试过程中可以将相关配置信息写死。 2、此程序中 Kafka 涉及到了 Kerberos 认证操作

    2024年02月03日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包