Flink DataStream之使用filter实现分流

这篇具有很好参考价值的文章主要介绍了Flink DataStream之使用filter实现分流。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

  • 新建类
package test01;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SplitStreamByFilter {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
        //设置并行度
        executionEnvironment.setParallelism(1);
        //监听数据端口
        DataStreamSource<String> dataSource = executionEnvironment.socketTextStream("localhost", 9999);
        //使用filter对数据进行分流,这里测试如果是以H开头的分为一个流,以M开头的分为一个流。缺点是每条数据都会执行一下各个filter
        SingleOutputStreamOperator<String> hWord = dataSource.filter(value -> value.startsWith("H"));
        SingleOutputStreamOperator<String> mWord = dataSource.filter(value -> value.startsWith("M"));
        //打印分流,这里print可以使用ctrl+p看到print有个参数,这样就可以在打印时在开头位置加上一些信息。
        hWord.print("以H开头:");
        mWord.print("以M开头:");

        executionEnvironment.execute();


    }
}
  • 启动netcat和程序

Flink DataStream之使用filter实现分流,大数据之Flink,flink,大数据

Flink DataStream之使用filter实现分流,大数据之Flink,flink,大数据 可以看到输入的"World"由于不满足两个filter中的任何一个,所以数据被舍弃。"Monday"和"Hello"分别打印在两个不同的流中。

 文章来源地址https://www.toymoban.com/news/detail-547993.html

到了这里,关于Flink DataStream之使用filter实现分流的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Apache Flink】Flink DataStream API的基本使用

    Flink DataStream API的基本使用 Flink DataStream API主要用于处理无界和有界数据流 。 无界数据流 是一个持续生成数据的数据源,它没有明确的结束点,例如实时的交易数据或传感器数据。这种类型的数据流需要使用Apache Flink的实时处理功能来连续地处理和分析。 有界数据流 是一个

    2024年02月06日
    浏览(37)
  • Flink CDC-Oracle CDC配置及DataStream API实现代码...可实现监控采集一个数据库的多个表

    使用sysdba角色登录到Oracle数据库 确保Oracle归档日志(Archive Log)已启用 若未启用归档日志, 需运行以下命令启用归档日志 设置归档日志存储大小及位置 设置数据库恢复文件存储区域的大小(如归档重做日志文件、控制文件备份等) 设置恢复文件的实际物理存储路径;scope=spfile参数

    2024年02月05日
    浏览(48)
  • Flink DataStream之从Kafka读数据

    搭建Kafka 参考:centos7下kafka2.12-2.1.0的安装及使用_kafka2.12-2.1.0 steam_QYHuiiQ的博客-CSDN博客  启动zookeeper 启动kafka 查看进程  创建topic 查看topic列表 导入pom依赖 新建类 启动程序 在终端向kafka生产数据,同时观察程序控制台flink的读取情况  如图说明flink从kafka成功读取数据。

    2024年02月13日
    浏览(50)
  • Flink|《Flink 官方文档 - DataStream API - 状态与容错 - 使用状态》学习笔记

    学习文档:Flink 官方文档 - DataStream API - 状态与容错 - 使用状态 相关文档: 有状态流处理背后的概念:Flink|《Flink 官方文档 - 概念透析 - 有状态流处理》学习笔记 Redis 过期 key 的删除机制:Redis|过期 key 的删除机制 学习笔记如下: 如果要使用键控状态,则必须要为 DataS

    2024年02月03日
    浏览(42)
  • Flink DataStream之输出数据到File中

    新建类 启动程序并启动nc -lp 输入数据: 正在写入的文件会有inprogress的标识(在指定的目录下生成文件时会按照日期的年月日时进行分目录,因为我在执行时的时间是2023/7/12 22点,所以它就会自动生成一个2023-07-12--22目录,分桶策略也可以自己在代码中配置。):  当满足滚

    2024年02月15日
    浏览(39)
  • 【Flink】DataStream API使用之源算子(Source)

    创建环境之后,就可以构建数据的业务处理逻辑了,Flink可以从各种来源获取数据,然后构建DataStream进项转换。一般将数据的输入来源称为数据源(data source),而读取数据的算子就叫做源算子(source operator)。所以,Source就是整个程序的输入端。 Flink中添加source的方式,是

    2024年02月10日
    浏览(37)
  • Flink 读写MySQL数据(DataStream和Table API)

    Flink提供了基于JDBC的方式,可以将读取到的数据写入到MySQL中;本文通过两种方式将数据下入到MySQL数据库,其他的基于JDBC的数据库类似,另外,Table API方式的Catalog指定为Hive Catalog方式,持久化DDL操作。 另外,JDBC 连接器允许使用 JDBC 驱动程序从任何关系数据库读取数据并将

    2023年04月09日
    浏览(44)
  • flink1.16使用消费/生产kafka之DataStream

    flink高级版本后,消费kafka数据一种是Datastream 一种之tableApi。 上官网 Kafka | Apache Flink 引入依赖 flink和kafka的连接器,里面内置了kafka-client 使用方法 很简单一目了然。 topic和partition  反序列化 其实就是实现接口 DeserializationSchema 的deserialize()方法 把byte转为你想要的类型。 起

    2024年02月16日
    浏览(41)
  • 【Flink实战】Flink中的分流

    Flink中的分流 在Flink中将数据流切分为多个子数据流,子数据流称为”旁路输出数据流“。 拆分流数据的方式 Split,已经废弃,不推荐使用 Fliter SideOut,推荐使用 Fliter分流的Java实现 SideOut分流的Java实现 SideOutPut 是 Flink 框架推荐的分流方法,在使用 SideOutPut 时,需要按照以下

    2024年02月10日
    浏览(37)
  • Flink DataStream API CDC同步MySQL数据到StarRocks

    Flink:1.16.1 pom文件如下 Java代码 SourceAndSinkInfo 类,用于定义source和sink的IP、端口、账号、密码信息 DataCenterShine实体类,字段与数据库一一对应。 StarRocksPrimary 实体类 FieldInfo注解类,用于标记字段序号、是否为主键、是否为空,后续生成TableSchema需要使用到。 TableName 注解类,

    2024年02月03日
    浏览(75)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包