4、介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍

这篇具有很好参考价值的文章主要介绍了4、介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引



本文详细介绍了流批一体的开发过程、source、transformations、sink详细的api以及flink与kafka的详细功能介绍。详细的介绍了18种transformations算子的功能、示例代码。
本文仅仅是介绍概念性内容,详细示例参考该系列文章Flink(五)source、transformations、sink的详细示例
本文部分图片、文字来源于互联网。
本分为2个部分,即流批一体介绍和flink与kafka介绍。

一、流批一体API

在自然环境中,数据的产生原本就是流式的。无论是来自 Web 服务器的事件数据,证券交易所的交易数据,还是来自工厂车间机器上的传感器数据,其数据都是流式的。但是当你分析数据时,可以围绕 有界流(bounded)或 无界流(unbounded)两种模型来组织处理数据,当然,选择不同的模型,程序的执行和处理方式也都会不同。
4、介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍,# Flink专栏,flink,kafka,流批一体 flink,flink operators,流式计算,批量计算,flink kafka

  • 批处理是有界数据流处理的范例。在这种模式下,你可以选择在计算结果输出之前输入整个数据集,这也就意味着你可以对整个数据集的数据进行排序、统计或汇总计算后再输出结果。
  • 流处理正相反,其涉及无界数据流。至少理论上来说,它的数据输入永远不会结束,因此程序必须持续不断地对到达的数据进行处理。

在 Flink 中,应用程序由用户自定义算子转换而来的流式 dataflows 所组成。这些流式 dataflows 形成了有向图,以一个或多个源(source)开始,并以一个或多个汇(sink)结束。
4、介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍,# Flink专栏,flink,kafka,流批一体 flink,flink operators,流式计算,批量计算,flink kafka
通常,程序代码中的 transformation 和 dataflow 中的算子(operator)之间是一一对应的。但有时也会出现一个 transformation 包含多个算子的情况,如上图所示。
Flink 应用程序可以消费来自消息队列或分布式日志这类流式数据源(例如 Apache Kafka 或 Kinesis)的实时数据,也可以从各种的数据源中消费有界的历史数据。同样,Flink 应用程序生成的结果流也可以发送到各种数据汇中。
4、介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍,# Flink专栏,flink,kafka,流批一体 flink,flink operators,流式计算,批量计算,flink kafka

2、DataStream API

DataStream API 支持流批执行模式
Flink 的核心 API 最初是针对特定的场景设计的,尽管 Table API / SQL 针对流处理和批处理已经实现了统一的 API,但当用户使用较底层的 API 时,仍然需要在批处理(DataSet API)和流处理(DataStream API)这两种不同的 API 之间进行选择。鉴于批处理是流处理的一种特例,将这两种 API 合并成统一的 API,有一些非常明显的好处,比如:

  • 可复用性:作业可以在流和批这两种执行模式之间自由地切换,而无需重写任何代码。因此,用户可以复用同一个作业,来处理实时数据和历史数据。
  • 维护简单:统一的 API 意味着流和批可以共用同一组 connector,维护同一套代码,并能够轻松地实现流批混合执行,例如 backfilling 之类的场景。

流批统一的 DataStream API 支持高效的批处理(FLIP-134),DataSet API 将被弃用(FLIP-131),其功能将被包含在 DataStream API 和 Table API / SQL 中。

3、Flink的编程模型

4、介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍,# Flink专栏,flink,kafka,流批一体 flink,flink operators,流式计算,批量计算,flink kafka
4、介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍,# Flink专栏,flink,kafka,流批一体 flink,flink operators,流式计算,批量计算,flink kafka

4、编程步骤

官网链接说明:Apache Flink 1.12 Documentation: Flink DataStream API 编程指南
Flink programs look like regular programs that transform DataStreams/dataset. Each program consists of the same basic parts:

  • Obtain an execution environment, 准备环境env
  • Load/create the initial data,加载数据源
  • Specify transformations on this data,转换操作
  • Specify where to put the results of your computations,sink结果
  • Trigger the program execution,触发执行

1)、准备环境env

getExecutionEnvironment(),推荐使用
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)
示例如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2)、加载数据源

env可以加载很多种数据源,比如文件、socket、fromelements等
示例如下:
DataSet<String> lines = env.fromElements("flink hadoop hive", "flink hadoop hive", "flink hadoop", "flink");
DataStream<String> text = env.readTextFile("file:///path/to/file");

3)、转换操作

flink的核心功能之一就是转换处理操作,有很多种实现
示例如下:
DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
    @Override
    public Integer map(String value) {
        return Integer.parseInt(value);
    }
});

4)、sink结果

sink可以有很多种数据源,比如关系型数据库、消息队列、hdfs、redis等
示例如下:
writeAsText(String path)
print()

5)、触发执行

Once you specified the complete program you need to trigger the program execution by calling execute() on the StreamExecutionEnvironment. Depending on the type of the ExecutionEnvironment the execution will be triggered on your local machine or submit your program for execution on a cluster.
The execute() method will wait for the job to finish and then return a JobExecutionResult, this contains execution times and accumulator results.
If you don’t want to wait for the job to finish, you can trigger asynchronous job execution by calling executeAysnc() on the StreamExecutionEnvironment. It will return a JobClient with which you can communicate with the job you just submitted. For instance, here is how to implement the semantics of execute() by using executeAsync().

示例如下:
final JobClient jobClient = env.executeAsync();
final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();

That last part about program execution is crucial to understanding when and how Flink operations are executed. All Flink programs are executed lazily: When the program’s main method is executed, the data loading and transformations do not happen directly. Rather, each operation is created and added to a dataflow graph. The operations are actually executed when the execution is explicitly triggered by an execute() call on the execution environment. Whether the program is executed locally or on a cluster depends on the type of execution environment
The lazy evaluation lets you construct sophisticated programs that Flink executes as one holistically planned unit.

5、Source-Transformations-Sink介绍

官网链接:Apache Flink 1.12 Documentation: Flink DataStream API 编程指南

1)、Source介绍

source是数据输入源,使用StreamExecutionEnvironment.addSource(sourceFunction)添加数据源,支持的数据源有基于文件的、基于socket的、基于集合的和基于自定义的。
Sources are where your program reads its input from. You can attach a source to your program by using StreamExecutionEnvironment.addSource(sourceFunction). Flink comes with a number of pre-implemented source functions, but you can always write your own custom sources by implementing the SourceFunction for non-parallel sources, or by implementing the ParallelSourceFunction interface or extending the RichParallelSourceFunction for parallel sources.
There are several predefined stream sources accessible from the StreamExecutionEnvironment:

1、File-based

一般用于测试或生产应用场景
path可以是本地文件(包含压缩文件)、hdfs等文件、文件夹

  • readTextFile(path) - Reads text files, i.e. files that respect the TextInputFormat specification, line-by-line and returns them as Strings.
  • readFile(fileInputFormat, path) - Reads (once) files as dictated by the specified file input format.
  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - This is the method called internally by the two previous ones. It reads files in the path based on the given fileInputFormat. Depending on the provided watchType, this source may periodically monitor (every interval ms) the path for new data (FileProcessingMode.PROCESS_CONTINUOUSLY), or process once the data currently in the path and exit (FileProcessingMode.PROCESS_ONCE). Using the pathFilter, the user can further exclude files from being processed.

IMPLEMENTATION:
Under the hood, Flink splits the file reading process into two sub-tasks, namely directory monitoring and data reading. Each of these sub-tasks is implemented by a separate entity. Monitoring is implemented by a single, non-parallel (parallelism = 1) task, while reading is performed by multiple tasks running in parallel. The parallelism of the latter is equal to the job parallelism. The role of the single monitoring task is to scan the directory (periodically or only once depending on the watchType), find the files to be processed, divide them in splits, and assign these splits to the downstream readers. The readers are the ones who will read the actual data. Each split is read by only one reader, while a reader can read multiple splits, one-by-one.

IMPORTANT NOTES:
If the watchType is set to FileProcessingMode.PROCESS_CONTINUOUSLY, when a file is modified, its contents are re-processed entirely. This can break the “exactly-once” semantics, as appending data at the end of a file will lead to all its contents being re-processed.
If the watchType is set to FileProcessingMode.PROCESS_ONCE, the source scans the path once and exits, without waiting for the readers to finish reading the file contents. Of course the readers will continue reading until all file contents are read. Closing the source leads to no more checkpoints after that point. This may lead to slower recovery after a node failure, as the job will resume reading from the last checkpoint.

2、Socket-based

一般用于测试应用场景
socketTextStream - Reads from a socket. Elements can be separated by a delimiter.

3、Collection-based

一般用于测试、验证场景

  • fromCollection(Collection) - Creates a data stream from the Java Java.util.Collection. All elements in the collection must be of the same type.
  • fromCollection(Iterator, Class) - Creates a data stream from an iterator. The class specifies the data type of the elements returned by the iterator.
  • fromElements(T …) - Creates a data stream from the given sequence of objects. All objects must be of the same type.
    fromParallelCollection(SplittableIterator, Class) - Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.
  • generateSequence(from, to) - Generates the sequence of numbers in the given interval, in parallel.该方法被替换了成了fromSequence了
4、Custom

一般用于测试、生产应用场景

  • addSource - Attach a new source function. For example, to read from Apache Kafka you can use addSource(new FlinkKafkaConsumer<>(…)). See connectors for more details.

2)、Transformations介绍

用户通过算子Operators能将一个或多个 DataStream 转换成新的 DataStream,在应用程序中可以将多个数据转换算子Operators合并成一个复杂的数据流拓扑。
这部分内容将描述 Flink DataStream API 中基本的数据转换API,数据转换后各种数据分区方式,以及算子的链接策略。
4、介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍,# Flink专栏,flink,kafka,流批一体 flink,flink operators,流式计算,批量计算,flink kafka
整体来说,流式数据上的操作可以分为四类。

  • 第一类是对于单条记录的操作,比如筛除掉不符合要求的记录(Filter 操作),或者将每条记录都做一个转换(Map 操作)
  • 第二类是对多条记录的操作。比如说统计一个小时内的订单总成交量,就需要将一个小时内的所有订单记录的成交量加到一起。为了支持这种类型的操作,就得通过 Window 将需要的记录关联到一起进行处理
  • 第三类是对多个流进行操作并转换为单个流。例如,多个流可以通过 Union、Join 或 Connect 等操作合到一起。这些操作合并的逻辑不同,但是它们最终都会产生了一个新的统一的流,从而可以进行一些跨流的操作。
  • 第四类DataStream 还支持与合并对称的拆分操作,即把一个流按一定规则拆分为多个流(Split 操作),每个流是之前流的一个子集,这样就可以对不同的流作不同的处理。
1、map

DataStream → DataStream
将函数作用在集合中的每一个元素上,并返回作用后的结果
4、介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍,# Flink专栏,flink,kafka,流批一体 flink,flink operators,流式计算,批量计算,flink kafka
Takes one element and produces one element. A map function that doubles the values of the input stream:

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});
2、flatmap

DataStream → DataStream
将集合中的每个元素变成一个或多个元素,并返回扁平化之后的结果
4、介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍,# Flink专栏,flink,kafka,流批一体 flink,flink operators,流式计算,批量计算,flink kafka
Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});
3、Filter

DataStream → DataStream
按照指定的条件对集合中的元素进行过滤,过滤出返回true/符合条件的元素
4、介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍,# Flink专栏,flink,kafka,流批一体 flink,flink operators,流式计算,批量计算,flink kafka

Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:

dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});

4、KeyBy
DataStream → KeyedStream
按照指定的key来对流中的数据进行分组
4、介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍,# Flink专栏,flink,kafka,流批一体 flink,flink operators,流式计算,批量计算,flink kafka

Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, keyBy() is implemented with hash partitioning. There are different ways to specify keys.
This transformation returns a KeyedStream, which is, among other things, required to use keyed state.

dataStream.keyBy(value -> value.getSomeKey()) // Key by field "someKey"
dataStream.keyBy(value -> value.f0) // Key by the first element of a Tuple

A type cannot be a key if:

  • it is a POJO type but does not override the hashCode() method and relies on the Object.hashCode() implementation.
  • it is an array of any type.
5、Reduce

KeyedStream → DataStream
对集合中的元素进行聚合
4、介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍,# Flink专栏,flink,kafka,流批一体 flink,flink operators,流式计算,批量计算,flink kafka

A “rolling” reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.
A reduce function that creates a stream of partial sums:

keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});
6、Aggregations

KeyedStream → DataStream
Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
7、Window

KeyedStream → WindowedStream
Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows.

dataStream.keyBy(value -> value.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
8、WindowAll

DataStream → AllWindowedStream
Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows.
WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator.

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
9、Window Apply

WindowedStream → DataStream
AllWindowedStream → DataStream
Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.
Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead.

windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
    public void apply (Tuple tuple,
            Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});
10、Window Reduce

WindowedStream → DataStream
Applies a functional reduce function to the window and returns the reduced value.

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
    }
});
11、Aggregations on windows

WindowedStream → DataStream
Aggregates the contents of a window. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");
12、Union

union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First In First Out)的模式合并,且不去重

DataStream* → DataStream
Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream.

dataStream.union(otherStream1, otherStream2, ...);
13、Window Join

DataStream,DataStream → DataStream
Join two data streams on a given key and a common window.

dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...});
14、Interval Join

KeyedStream,KeyedStream → DataStream
Join two elements e1 and e2 of two keyed streams with a common key over a given time interval, so that e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound

// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
    .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
    .upperBoundExclusive(true) // optional
    .lowerBoundExclusive(true) // optional
    .process(new IntervalJoinFunction() {...});
15、Window CoGroup

DataStream,DataStream → DataStream
Cogroups two data streams on a given key and a common window.

dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...});
16、Connect

connect提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:
connect只能连接两个数据流,union可以连接多个数据流。
connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。
两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。

DataStream,DataStream → ConnectedStreams
“Connects” two data streams retaining their types. Connect allowing for shared state between the two streams.

DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

17、CoMap, CoFlatMap

ConnectedStreams → DataStream
Similar to map and flatMap on a connected data stream

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
});
18、Iterate

DataStream → IterativeStream → DataStream
Creates a “feedback” loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream.

IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value <= 0;
    }
});

3)、Sink介绍

1、Flink支持的sink

Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams:

  • writeAsText() / TextOutputFormat - Writes elements line-wise as Strings. The Strings are obtained by calling the toString() method of each element.
  • writeAsCsv(…) / CsvOutputFormat - Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the toString() method of the objects.
  • print() / printToErr() - Prints the toString() value of each element on the standard out / standard error stream. Optionally, a prefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print. If the parallelism is greater than 1, the output will also be prepended with the identifier of the task which produced the output.
    writeUsingOutputFormat() / FileOutputFormat - Method and base class for custom file outputs. Supports custom object-to-bytes conversion.
  • writeToSocket - Writes elements to a socket according to a SerializationSchema
  • addSink - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions.

Note that the write*() methods on DataStream are mainly intended for debugging purposes. They are not participating in Flink’s checkpointing, this means these functions usually have at-least-once semantics. The data flushing to the target system depends on the implementation of the OutputFormat. This means that not all elements send to the OutputFormat are immediately showing up in the target system. Also, in failure cases, those records might be lost.
For reliable, exactly-once delivery of a stream into a file system, use the StreamingFileSink. Also, custom implementations through the .addSink(…) method can participate in Flink’s checkpointing for exactly-once semantics.

  • ds.print 直接输出到控制台
  • ds.printToErr() 直接输出到控制台,用红色
  • ds.writeAsText(“本地/HDFS的path”,WriteMode.OVERWRITE).setParallelism(1)
    在输出到path的时候,可以在前面设置并行度,如果
    并行度>1,则path为目录
    并行度=1,则path为文件名
2、自定义Sink

自己实现RichSinkFunction等sink接口或抽象类即可。

6、DataStream Connectors

DataStream Flink支持很多种连接,接下来会在该系列文章中给出使用示例,如下图
4、介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍,# Flink专栏,flink,kafka,流批一体 flink,flink operators,流式计算,批量计算,flink kafka

1)、JDBC

This connector provides a sink that writes data to a JDBC database.
To use it, add the following dependency to your project (along with your JDBC-driver):

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc_2.11</artifactId>
  <version>1.12.7</version>
</dependency>

Note that the streaming connectors are currently NOT part of the binary distribution. See how to link with them for cluster execution here.
Created JDBC sink provides at-least-once guarantee. Effectively exactly-once can be achieved using upsert statements or idempotent updates.
Example usage:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
        .fromElements(...)
        .addSink(JdbcSink.sink(
                "insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
                (ps, t) -> {
                    ps.setInt(1, t.id);
                    ps.setString(2, t.title);
                    ps.setString(3, t.author);
                    ps.setDouble(4, t.price);
                    ps.setInt(5, t.qty);
                },
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(getDbMetadata().getUrl())
                        .withDriverName(getDbMetadata().getDriverClass())
                        .build()));
env.execute();

2)、kafka

官网介绍:Kafka | Apache Flink

Flink 里已经提供了一些绑定的 Connector,例如 kafka source 和 sink,Es sink 等。读写 kafka、es、rabbitMQ 时可以直接使用相应 connector 的 api 即可,虽然该部分是 Flink 项目源代码里的一部分,但是真正意义上不算作 Flink 引擎相关逻辑,并且该部分没有打包在二进制的发布包里面。所以在提交 Job 时候需要注意, job 代码 jar 包中一定要将相应的 connetor 相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常。

以下参数都建议设置
订阅的主题
反序列化规则
消费者属性-集群地址
消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)
消费者属性-offset重置规则,如earliest/latest…
动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)
如果没有设置Checkpoint,那么可以设置自动提交offset,后续了解了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中

Apache Flink ships with a universal Kafka connector which attempts to track the latest version of the Kafka client. The version of the client it uses may change between Flink releases. Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later. For details on Kafka compatibility, please refer to the official Kafka documentation.

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.17.1</version>
</dependency>

Flink’s streaming connectors are not part of the binary distribution.

二、Flink与kafka

该部分是关于Flink中使用kafka更详细的介绍,内容摘抄至官网。下文中没有介绍关于kafka的性能指标、安全等信息,更多的内容参考官网。
特别注意:
FlinkKafkaConsumer is deprecated and will be removed with Flink 1.17, please use KafkaSource instead.
FlinkKafkaProducer is deprecated and will be removed with Flink 1.15, please use KafkaSink instead.

1、Kafka Source

1)、用法Usage

Kafka source provides a builder class for constructing instance of KafkaSource. The code snippet below shows how to build a KafkaSource to consume messages from the earliest offset of topic “input-topic”, with consumer group “my-group” and deserialize only the value of message as string.

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

The following properties are required for building a KafkaSource:

  • Bootstrap servers, configured by setBootstrapServers(String)
  • Topics / partitions to subscribe, see the following Topic-partition subscription for more details.
  • Deserializer to parse Kafka messages, see the following Deserializer for more details.

2)、主题订阅Topic-partition Subscription

Kafka source provide 3 ways of topic-partition subscription:

  • Topic list, subscribing messages from all partitions in a list of topics. For example:
KafkaSource.builder().setTopics("topic-a", "topic-b");
  • Topic pattern, subscribing messages from all topics whose name matches the provided regular expression. For example:
KafkaSource.builder().setTopicPattern("topic.*");
  • Partition set, subscribing partitions in the provided partition set. For example:
final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
        new TopicPartition("topic-a", 0),    // Partition 0 of topic "topic-a"
        new TopicPartition("topic-b", 5)));  // Partition 5 of topic "topic-b"
KafkaSource.builder().setPartitions(partitionSet);

3)、反序列化Deserializer

A deserializer is required for parsing Kafka messages. Deserializer (Deserialization schema) can be configured by setDeserializer(KafkaRecordDeserializationSchema), where KafkaRecordDeserializationSchema defines how to deserialize a Kafka ConsumerRecord.

If only the value of Kafka ConsumerRecord is needed, you can use setValueOnlyDeserializer(DeserializationSchema) in the builder, where DeserializationSchema defines how to deserialize binaries of Kafka message value.
You can also use a Kafka Deserializer for deserializing Kafka message value. For example using StringDeserializer for deserializing Kafka message value as string:

import org.apache.kafka.common.serialization.StringDeserializer;
KafkaSource.<String>builder()
        .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));

4)、偏移量Starting Offset

Kafka source is able to consume messages starting from different offsets by specifying OffsetsInitializer. Built-in initializers include:

KafkaSource.builder()
    // Start from committed offset of the consuming group, without reset strategy
    .setStartingOffsets(OffsetsInitializer.committedOffsets())
    // Start from committed offset, also use EARLIEST as reset strategy if committed offset doesn't exist
    .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
    // Start from the first record whose timestamp is greater than or equals a timestamp (milliseconds)
    .setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))
    // Start from earliest offset
    .setStartingOffsets(OffsetsInitializer.earliest())
    // Start from latest offset
    .setStartingOffsets(OffsetsInitializer.latest());

You can also implement a custom offsets initializer if built-in initializers above cannot fulfill your requirement. (Not supported in PyFlink)
If offsets initializer is not specified, OffsetsInitializer.earliest() will be used by default.

5)、有界性Boundedness

Kafka source is designed to support both streaming and batch running mode. By default, the KafkaSource is set to run in streaming manner, thus never stops until Flink job fails or is cancelled. You can use setBounded(OffsetsInitializer) to specify stopping offsets and set the source running in batch mode. When all partitions have reached their stopping offsets, the source will exit.
You can also set KafkaSource running in streaming mode, but still stop at the stopping offset by using setUnbounded(OffsetsInitializer). The source will exit when all partitions reach their specified stopping offset.

6)、配置Additional Properties

In addition to properties described above, you can set arbitrary properties for KafkaSource and KafkaConsumer by using setProperties(Properties) and setProperty(String, String). KafkaSource has following options for configuration:

  • client.id.prefix defines the prefix to use for Kafka consumer’s client ID
  • partition.discovery.interval.ms defines the interval im milliseconds for Kafka source to discover new partitions. See Dynamic Partition Discovery below for more details.
  • register.consumer.metrics specifies whether to register metrics of KafkaConsumer in Flink metric group
  • commit.offsets.on.checkpoint specifies whether to commit consuming offsets to Kafka brokers on checkpoint

For configurations of KafkaConsumer, you can refer to Apache Kafka documentation for more details.
Please note that the following keys will be overridden by the builder even if it is configured:

  • key.deserializer is always set to ByteArrayDeserializer
  • value.deserializer is always set to ByteArrayDeserializer
  • auto.offset.reset.strategy is overridden by OffsetsInitializer#getAutoOffsetResetStrategy() for the starting offsets
  • partition.discovery.interval.ms is overridden to -1 when setBounded(OffsetsInitializer) has been invoked

7)、动态分区Dynamic Partition Discovery

In order to handle scenarios like topic scaling-out or topic creation without restarting the Flink job, Kafka source can be configured to periodically discover new partitions under provided topic-partition subscribing pattern. To enable partition discovery, set a non-negative value for property partition.discovery.interval.ms:

//Partition discovery is disabled by default. You need to explicitly set the partition discovery interval to enable this feature.
KafkaSource.builder()
    .setProperty("partition.discovery.interval.ms", "10000"); // discover new partitions per 10 seconds

8)、事件时间与水印Event Time and Watermarks

By default, the record will use the timestamp embedded in Kafka ConsumerRecord as the event time. You can define your own WatermarkStrategy for extract event time from the record itself, and emit watermark downstream:

env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy");

This documentation describes details about how to define a WatermarkStrategy. (Not supported in PyFlink)

9)、惰性Idleness

The Kafka Source does not go automatically in an idle state if the parallelism is higher than the number of partitions. You will either need to lower the parallelism or add an idle timeout to the watermark strategy. If no records flow in a partition of a stream for that amount of time, then that partition is considered “idle” and will not hold back the progress of watermarks in downstream operators.
This documentation describes details about how to define a WatermarkStrategy#withIdleness.

10)、消费者偏移量提交Consumer Offset Committing

Kafka source commits the current consuming offset when checkpoints are completed, for ensuring the consistency between Flink’s checkpoint state and committed offsets on Kafka brokers.
If checkpointing is not enabled, Kafka source relies on Kafka consumer’s internal automatic periodic offset committing logic, configured by enable.auto.commit and auto.commit.interval.ms in the properties of Kafka consumer.
Note that Kafka source does NOT rely on committed offsets for fault tolerance. Committing offset is only for exposing the progress of consumer and consuming group for monitoring.

2、Kafka Sink

KafkaSink allows writing a stream of records to one or more Kafka topics.

1)、用法Usage

Kafka sink provides a builder class to construct an instance of a KafkaSink. The code snippet below shows how to write String records to a Kafka topic with a delivery guarantee of at least once.

DataStream<String> stream = ...;
        
KafkaSink<String> sink = KafkaSink.<String>builder()
        .setBootstrapServers(brokers)
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopic("topic-name")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
        )
        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();
        
stream.sinkTo(sink);

The following properties are required to build a KafkaSink:

  • Bootstrap servers, setBootstrapServers(String)
  • Record serializer, setRecordSerializer(KafkaRecordSerializationSchema)
  • If you configure the delivery guarantee with DeliveryGuarantee.EXACTLY_ONCE you also have use setTransactionalIdPrefix(String)

2)、序列化Serializer

You always need to supply a KafkaRecordSerializationSchema to transform incoming elements from the data stream to Kafka producer records. Flink offers a schema builder to provide some common building blocks i.e. key/value serialization, topic selection, partitioning. You can also implement the interface on your own to exert more control.

KafkaRecordSerializationSchema.builder()
    .setTopicSelector((element) -> {<your-topic-selection-logic>})
    .setValueSerializationSchema(new SimpleStringSchema())
    .setKeySerializationSchema(new SimpleStringSchema())
    .setPartitioner(new FlinkFixedPartitioner())
    .build();

It is required to always set a value serialization method and a topic (selection method). Moreover, it is also possible to use Kafka serializers instead of Flink serializer by using setKafkaKeySerializer(Serializer) or setKafkaValueSerializer(Serializer).

3)、容错Fault Tolerance

Overall the KafkaSink supports three different DeliveryGuarantees. For DeliveryGuarantee.AT_LEAST_ONCE and DeliveryGuarantee.EXACTLY_ONCE Flink’s checkpointing must be enabled. By default the KafkaSink uses DeliveryGuarantee.NONE. Below you can find an explanation of the different guarantees.

  • DeliveryGuarantee.NONE does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated in case of a Flink failure.

  • DeliveryGuarantee.AT_LEAST_ONCE: The sink will wait for all outstanding records in the Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. No messages will be lost in case of any issue with the Kafka brokers but messages may be duplicated when Flink restarts because Flink reprocesses old input records.

  • DeliveryGuarantee.EXACTLY_ONCE: In this mode, the KafkaSink will write all messages in a Kafka transaction that will be committed to Kafka on a checkpoint. Thus, if the consumer reads only committed data (see Kafka consumer config isolation.level), no duplicates will be seen in case of a Flink restart. However, this delays record visibility effectively until a checkpoint is written, so adjust the checkpoint duration accordingly. Please ensure that you use unique transactionalIdPrefix across your applications running on the same Kafka cluster such that multiple running jobs do not interfere in their transactions! Additionally, it is highly recommended to tweak Kafka transaction timeout (see Kafka producer transaction.timeout.ms)» maximum checkpoint duration + maximum restart duration or data loss may happen when Kafka expires an uncommitted transaction.

3、Transformations

这个与上面的介绍的内容一致,不再赘述。

以上,本文详细介绍了流批一体的开发过程、source、transformations、sink详细的api以及flink与kafka的详细功能介绍。文章来源地址https://www.toymoban.com/news/detail-649355.html

到了这里,关于4、介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink流批一体计算(7):Flink优化

    目录 配置内存 设置并行度 操作场景 具体设置 补充 配置进程参数 操作场景 具体配置 配置netty网络通信 操作场景 具体配置 配置内存 Flink 是依赖内存计算,计算过程中内存不够对 Flink 的执行效率影响很大。可以通过监控 GC ( Garbage Collection ),评估内存使用及剩余情况来判

    2024年02月12日
    浏览(46)
  • Flink流批一体计算(2):Flink关键特性

    目录 Flink关键特性 流式处理 丰富的状态管理 丰富的时间语义支持    Data pipeline 容错机制 Flink SQL CEP in SQL Flink 应用程序可以消费来自消息队列或分布式日志这类流式数据源(例如 Apache Kafka 或 Kinesis )的实时数据,也可以从各种的数据源中消费有界的历史数据。同样, Fli

    2024年02月10日
    浏览(44)
  • Flink流批一体计算(9):Flink Python

    目录 使用Python依赖 使用自定义的Python虚拟环境 方式一:在集群中的某个节点创建Python虚拟环境 方式二:在本地开发机创建Python虚拟环境 使用JAR包 使用数据文件 使用Python依赖 通过以下场景为您介绍如何使用Python依赖: 使用自定义的Python虚拟环境 使用第三方Python包 使用J

    2024年02月12日
    浏览(39)
  • Flink流批一体计算(3):FLink作业调度

    架构 所有的分布式计算引擎都需要有集群的资源管理器,例如:可以把MapReduce、Spark程序运行在YARN集群中、或者是Mesos中。Flink也是一个分布式计算引擎,要运行Flink程序,也需要一个资源管理器。而学习每一种分布式计算引擎,首先需要搞清楚的就是:我们开发的分布式应用

    2024年02月10日
    浏览(46)
  • Flink流批一体计算(4):Flink功能模块

    目录 Flink功能架构 Flink输入输出 Flink功能架构 Flink是分层架构的分布式计算引擎,每层的实现依赖下层提供的服务,同时提供抽象的接口和服务供上层使用。 Flink 架构可以分为4层,包括Deploy部署层、Core核心层、API层和Library层 部署层:主要涉及Flink的部署模式。Flink支持多种

    2024年02月10日
    浏览(50)
  • Flink流批一体计算(5):部署运行模式

    目录 集群运行模式 1.local模式 2.standalone模式 3.Flink on YARN模式 本地模式 Standalone 模式 Flink on Yarn 模式 集群运行模式 类似于 Spark , Flink 也有各种运行模式,其中主要支持三种: local 模式、 standalone 模式以及 Flink on YARN 模式。 每种模式都有特定的使用场景,接下来一起了解一

    2024年02月10日
    浏览(41)
  • flink重温笔记(四):Flink 流批一体 API 开发——物理分区(上)

    前言:今天是学习flink的第四天啦!学习了物理分区的知识点,这一次学习了前4个简单的物理分区,称之为简单分区篇! Tips:我相信自己会越来会好的,明天攻克困难分区篇,加油! 3. 物理分区 3.1 Global Partitioner 该分区器会将所有的数据都发送到下游的某个算子实例(subta

    2024年02月19日
    浏览(39)
  • flink重温笔记(五):Flink 流批一体 API 开发——物理分区(下)

    前言 :今天是学习 flink 的第五天啦! 主要学习了物理分区较难理解的部分,在这个部分的三个分区的学习中, rescale partition 和 forward partition 其原理可以归类 pointwise 模式,其他的 partition 其原理可以归类 all_to_all 模式,而比较有趣的是 custom partitioning,这个可以进行根据值

    2024年02月19日
    浏览(43)
  • 流批一体计算引擎-4-[Flink]消费kafka实时数据

    Python3.6.9 Flink 1.15.2消费Kafaka Topic PyFlink基础应用之kafka 通过PyFlink作业处理Kafka数据 PyFlink需要特定的Python版本,Python 3.6, 3.7, 3.8 or 3.9。 1.3.1 python3和pip3的配置 一、系统中安装了多个版本的python3 。 二、环境变量path作用顺序 三、安装Pyflink 1.3.2 配置Flink Kafka连接 (1)在https://mvnr

    2024年02月06日
    浏览(43)
  • Flink流批一体计算(10):PyFlink Tabel API

    简述 PyFlink 是 Apache Flink 的 Python API ,你可以使用它构建可扩展的批处理和流处理任务,例如实时数据处理管道、大规模探索性数据分析、机器学习( ML )管道和 ETL 处理。 如果你对 Python 和 Pandas 等库已经比较熟悉,那么 PyFlink 可以让你更轻松地利用 Flink 生态系统的全部功

    2024年02月11日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包