【Flink实战】玩转Flink里面核心的Source Operator实战

这篇具有很好参考价值的文章主要介绍了【Flink实战】玩转Flink里面核心的Source Operator实战。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

🚀 作者 :“大数据小禅”

🚀 文章简介 :【Flink实战】玩转Flink里面核心的Source Operator实战

🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬


Flink 的API层级介绍Source Operator速览

  • Flink的API层级 为流式/批式处理应用程序的开发提供了不同级别的抽象

    • 第一层是最底层的抽象为有状态实时流处理,抽象实现是 Process Function,用于底层处理

    • 第二层抽象是 Core APIs,许多应用程序不需要使用到上述最底层抽象的 API,而是使用 Core APIs 进行开发

      • 例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等,此层 API 中处理的数据类型在每种编程语言中都有其对应的类。
    • 第三层抽象是 Table API。 是以表Table为中心的声明式编程API,Table API 使用起来很简洁但是表达能力差

      • 类似数据库中关系模型中的操作,比如 select、project、join、group-by 和 aggregate 等
      • 允许用户在编写应用程序时将 Table API 与 DataStream/DataSet API 混合使用
    • 第四层最顶层抽象是 SQL,这层程序表达式上都类似于 Table API,但是其程序实现都是 SQL 查询表达式

      • SQL 抽象与 Table API 抽象之间的关联是非常紧密的
    • 注意:Table和SQL层变动多,还在持续发展中,大致知道即可,核心是第一和第二层
      【Flink实战】玩转Flink里面核心的Source Operator实战,Flink,flink,大数据

  • Flink编程模型

【Flink实战】玩转Flink里面核心的Source Operator实战,Flink,flink,大数据

  • Source来源

    • 元素集合

      • env.fromElements
      • env.fromColletion
      • env.fromSequence(start,end);
    • 文件/文件系统

      • env.readTextFile(本地文件);
      • env.readTextFile(HDFS文件);
    • 基于Socket

      • env.socketTextStream(“ip”, 8888)
    • 自定义Source,实现接口自定义数据源,rich相关的api更丰富

      • 并行度为1

        • SourceFunction
        • RichSourceFunction
      • 并行度大于1

        • ParallelSourceFunction
        • RichParallelSourceFunction
  • Connectors与第三方系统进行对接(用于source或者sink都可以)

    • Flink本身提供Connector例如kafka、RabbitMQ、ES等
    • 注意:Flink程序打包一定要将相应的connetor相关类打包进去,不然就会失败
  • Apache Bahir连接器

    • 里面也有kafka、RabbitMQ、ES的连接器更多
  • 总结 和外部系统进行读取写入的

    • 第一种 Flink 里面预定义的 source 和 sink。
    • 第二种 Flink 内部也提供部分 Boundled connectors。
    • 第三种是第三方 Apache Bahir 项目中的连接器。
    • 第四种是通过异步 IO 方式
      • 异步I/O是Flink提供的非常底层的与外部系统交互

Flink 预定义的Source 数据源 案例实战

  • Source来源
    • 元素集合
      • env.fromElements
      • env.fromColletion
      • env.fromSequence(start,end);
 public static void main(String [] args) throws Exception {

        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //相同类型元素的数据流 source
        DataStream<String> stringDS1 = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka,小滴课堂");
        stringDS1.print("stringDS1");

        DataStream<String> stringDS2 = env.fromCollection(Arrays.asList("微服务项目大课,java","alibabacloud,rabbitmq","hadoop,hbase"));
        stringDS2.print("stringDS2");

        DataStreamSource<Long> longDS3 = env.fromSequence(0,10);
        longDS3.print("longDS3");

        //DataStream需要调用execute,可以取个名称
        env.execute("xdclass job");
    }

  • 文件/文件系统
    • env.readTextFile(本地文件);
    • env.readTextFile(HDFS文件);
public static void main(String [] args) throws Exception {

        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> textDS = env.readTextFile("/Users/xdclass/Desktop/xdclass_access.log");
        //DataStream<String> textDS = env.readTextFile("hdfs://xdclass_node:8010/file/log/words.txt");
        textDS.print();
        env.execute("xdclass job");
}
  • 基于Socket
    • env.socketTextStream(“ip”, 8888)
   public static void main(String [] args) throws Exception {

        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> stringDataStream = env.socketTextStream("127.0.0.1",8888);
        stringDataStream.print();
        env.execute(" job");
}

Flink自定义的Source 数据源案例-订单来源实战

  • 自定义Source,实现接口自定义数据源

    • 并行度为1

      • SourceFunction
      • RichSourceFunction
    • 并行度大于1

      • ParallelSourceFunction
      • RichParallelSourceFunction
    • Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等

  • 创建接口

@Data
@AllArgsConstructor
@NoArgsConstructor
public class VideoOrder {
    private String tradeNo;
    private String title;
    private int money;
    private int userId;
    private Date createTime;

}


public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {

	private volatile Boolean flag = true;

    private  Random random = new Random();

    private static List<String> list = new ArrayList<>();
    static {
        list.add("spring boot2.x课程");
        list.add("微服务SpringCloud课程");
        list.add("RabbitMQ消息队列");
        list.add("Kafka课程");
        list.add("Flink流式技术课程");
        list.add("工业级微服务项目大课训练营");
        list.add("Linux课程");
    }

    @Override
    public void run(SourceContext<VideoOrder> ctx) throws Exception {
        while (flag){
            Thread.sleep(1000);
            String id = UUID.randomUUID().toString();
            int userId = random.nextInt(10);
            int money = random.nextInt(100);
            int videoNum = random.nextInt(list.size());
            String title = list.get(videoNum);
            ctx.collect(new VideoOrder(id,title,money,userId,new Date()));
        }
    }

    /**
     * 取消任务
     */
    @Override
    public void cancel() {
        flag = false;
    }
}
  • 案例
public static void main(String [] args) throws Exception {

        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<VideoOrder> videoOrderDataStream = env.addSource(new VideoOrderSource());
        videoOrderDataStream.print();

        //DataStream需要调用execute,可以取个名称
        env.execute("custom source job");
    }

不断产生很多订单

【Flink实战】玩转Flink里面核心的Source Operator实战,Flink,flink,大数据文章来源地址https://www.toymoban.com/news/detail-709626.html

到了这里,关于【Flink实战】玩转Flink里面核心的Source Operator实战的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Flink实战系列】Hash collision on user-specified ID “Kafka Source”

    在使用 fromSource 构建 Kafka Source 的时候,遇到下面的报错,下面就走进源码,分析一下原因。

    2024年02月09日
    浏览(65)
  • 大数据-玩转数据-Flink营销对账

    在电商网站中,订单的支付作为直接与营销收入挂钩的一环,在业务流程中非常重要。对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网站一般会设置一个支付失效时间,超过一段时间不支付的订单就会被取消。另外,对于订单的支付,我们还应保证用

    2024年02月11日
    浏览(38)
  • 大数据-玩转数据-Flink RedisSink

    具体版本根据实际情况确定 参见大数据-玩转数据-Redis 安装与使用 可以根据要写入的redis的不同数据类型进行调整

    2024年02月13日
    浏览(38)
  • 大数据-玩转数据-Flink 容错机制

    在分布式架构中,当某个节点出现故障,其他节点基本不受影响。在 Flink 中,有一套完整的容错机制,最重要就是检查点(checkpoint)。 在流处理中,我们可以用存档读档的思路,把之前的计算结果做个保存,这样重启之后就可以继续处理新数据、而不需要重新计算了。所以

    2024年02月07日
    浏览(48)
  • 大数据-玩转数据-Flink窗口函数

    前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素. window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种. ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对

    2024年02月11日
    浏览(42)
  • 大数据-玩转数据-Flink定时器

    基于处理时间或者事件时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行. Context和OnTimerContext所持有的TimerService对象拥有以下方法: currentProcessingTime(): Long 返回当前处理时间 currentWatermark(): Long 返回当前watermark的时间戳 registerProcessingTimeTimer(timestamp: Long): Unit 会注

    2024年02月10日
    浏览(37)
  • 大数据-玩转数据-Flink状态编程(上)

    有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。 SparkStreaming在状态管理这块做的不好, 很多时候需要借助于外部存储(例如Redis)来手动管理状态, 增加了编程的难度。 Flink的状态管理是它的优

    2024年02月09日
    浏览(44)
  • 大数据-玩转数据-Flink恶意登录监控

    对于网站而言,用户登录并不是频繁的业务操作。如果一个用户短时间内频繁登录失败,就有可能是出现了程序的恶意攻击,比如密码暴力破解。 因此我们考虑,应该对用户的登录失败动作进行统计,具体来说,如果同一用户(可以是不同IP)在2秒之内连续两次登录失败,就

    2024年02月07日
    浏览(41)
  • Flink on Kubernetes (flink-operator) 部署Flink

    https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.1/docs/try-flink-kubernetes-operator/quick-start/ 我的部署脚本和官网不一样,有些地方官网不够详细 注意,按照默认配置至少有两台worker https://helm.sh/zh/docs/intro/install/ 安装完成后,资源如下 此时k8s集群就可以支持我们按照fli

    2024年04月14日
    浏览(41)
  • 【Flink进阶】-- 本地部署 Flink kubernetes operator

    目录 1.说明 1.1 版本 1.2 kubernetes 环境 1.3 参考 2.安装步骤 2.1 安装本地 kubernetes 环境

    2024年02月10日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包