目录
创建执行环境
1. getExecutionEnvironment
2. createLocalEnvironment
3. createRemoteEnvironment
执行模式(Execution Mode)
1. BATCH 模式的配置方法
2. 什么时候选择 BATCH 模式
触发程序执行
数据源操作
读取kafka数据源操作
自定义Source
文章来源:https://www.toymoban.com/news/detail-409158.html
创建执行环境
1. getExecutionEnvironment
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
2. createLocalEnvironment
StreamExecutionEnvironment localEnv =StreamExecutionEnvironment.createLocalEnvironment();
3. createRemoteEnvironment
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("host", // JobManager 主机名1234, // JobManager 进程端口号"path/to/jarFile.jar" // 提交给 JobManager 的 JAR 包);
执行模式(Execution Mode)
// 批处理环境ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();// 流处理环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
1. BATCH 模式的配置方法
bin/flink run -Dexecution.runtime-mode=BATCH ...
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.BATCH);
2. 什么时候选择 BATCH 模式
触发程序执行
env.execute();
数据源操作
读取kafka数据源操作
文章来源地址https://www.toymoban.com/news/detail-409158.html
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class SourceKafkaTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop102:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
DataStreamSource<String> stream = env.addSource(new
FlinkKafkaConsumer<String>(
"clicks",
new SimpleStringSchema(),
properties
));
stream.print("Kafka");
env.execute();
}
}
自定义Source
package com.atmk.stream.app;
import com.atmk.stream.entity.Event;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Calendar;
import java.util.Random;
/**
* @author:lss
* @date:2022/11/3 17:18
* @description:some
*/
public class ClickSource implements SourceFunction<Event> {
//声明一个变量,作为控制数据生成的标识位
private Boolean running = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
//在指定数据集中随机选取数据
Random random = new Random();
String[] users = {"Mary","Bob","Alice","Cary"};
String[] urls = {"./home","./cart","./fav","./prod?id=1"};
while (running){
ctx.collect(new Event(
users[random.nextInt(users.length)],
urls[random.nextInt(urls.length)],
Calendar.getInstance().getTimeInMillis()
));
//隔一秒生成一个点击事件,方面观测
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
package com.atmk.stream.app;
import com.atmk.stream.entity.Event;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author:lss
* @date:2022/11/3 17:26
* @description:some
*/
public class SourceCustom {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//使用自定义的source function,调用addSource方法
DataStreamSource<Event> stream = env.addSource(new ClickSource());
stream.print("SourceCustom");
env.execute();
}
}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
public class SourceThrowException {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new ClickSource()).setParallelism(2).print();
env.execute();
}
}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import java.util.Random;
public class ParallelSourceExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new CustomSource()).setParallelism(2).print();
env.execute();
}
public static class CustomSource implements ParallelSourceFunction<Integer>
{
private boolean running = true;
private Random random = new Random();
@Override
public void run(SourceContext<Integer> sourceContext) throws Exception {
while (running) {
sourceContext.collect(random.nextInt());
}
}
@Override
public void cancel() {
running = false;
}
}
}
2> -6861690472> 4295153972> -2235162882> 11379073122> -3801657302> 2082090389
到了这里,关于flink执行环境和读取kafka以及自定义数据源操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!