k8s 搭建基于session模式的flink集群

这篇具有很好参考价值的文章主要介绍了k8s 搭建基于session模式的flink集群。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.flink集群搭建

不废话直接上代码,都是基于官网的,在此记录一下 Kubernetes | Apache Flink

flink-configuration-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    parallelism.default: 2    
  log4j-console.properties: |+
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender

    # Uncomment this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO

    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    logger.pekko.name = org.apache.pekko
    logger.pekko.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO

    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10

    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF    

jobmanager-service.yaml Optional service, which is only necessary for non-HA mode.

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager

Session cluster resource definitions #

jobmanager-session-deployment-non-ha.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: apache/flink:latest
        args: ["jobmanager"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob-server
        - containerPort: 8081
          name: webui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties

taskmanager-session-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: apache/flink:latest
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties

 kubectl apply -f xxx.yaml 或者 kubectl apply -f ./flink  flink为文件夹,存放的是以上这几个.yaml文件

k8s 搭建基于session模式的flink集群,kubernetes,flink,容器

为flink的ui界面添加nodeport即可外部访问

k8s 搭建基于session模式的flink集群,kubernetes,flink,容器

2. demo代码测试

创建一个maven工程,pom.xml引入依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>test-platform</artifactId>
        <groupId>com.test</groupId>
        <version>2.0.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>flink-demo</artifactId>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.17.0</flink.version>
        <log4j.version>2.20.0</log4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <scope>compile</scope>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <scope>compile</scope>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <scope>compile</scope>
            <version>${log4j.version}</version>
        </dependency>
    </dependencies>

</project>

log4j2.xml:

<?xml version="1.0" encoding="UTF-8"?>
<configuration monitorInterval="5">
    <Properties>
        <property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" />
        <!-- LOG_LEVEL 配置你需要的日志输出级别       -->
        <property name="LOG_LEVEL" value="INFO" />
    </Properties>

    <appenders>
        <console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="${LOG_PATTERN}"/>
            <ThresholdFilter level="${LOG_LEVEL}" onMatch="ACCEPT" onMismatch="DENY"/>
        </console>
    </appenders>

    <loggers>
        <root level="${LOG_LEVEL}">
            <appender-ref ref="Console"/>
        </root>
    </loggers>

</configuration>

计数代码:

package com.test.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCountUnboundStreamDemo {

    public static void main(String[] args) throws Exception {
        // TODO 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
//                3, // 尝试重启的次数
//                Time.of(10, TimeUnit.SECONDS) // 间隔
//        ));
        // TODO 2.读取数据
        DataStreamSource<String> lineDS = env.socketTextStream("192.168.0.28", 7777);

        // TODO 3.处理数据: 切分、转换、分组、聚合
        // TODO 3.1 切分、转换
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = lineDS //<输入类型, 输出类型>
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        // 按照 空格 切分
                        String[] words = value.split(" ");
                        for (String word : words) {
                            // 转换成 二元组 (word,1)
                            Tuple2<String, Integer> wordsAndOne = Tuple2.of(word, 1);
                            // 通过 采集器 向下游发送数据
                            out.collect(wordsAndOne);
                        }
                    }
                });
        // TODO 3.2 分组
        KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOneDS.keyBy(
                new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                }
        );
        // TODO 3.3 聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumDS = wordAndOneKS.sum(1);

        // TODO 4.输出数据
        sumDS.print("接收到的数据=======").setParallelism(1);

        // TODO 5.执行:类似 sparkstreaming最后 ssc.start()
        env.execute(sumDS.getClass().getSimpleName());
    }

}

打成jar包导入flink dashboard:

k8s 搭建基于session模式的flink集群,kubernetes,flink,容器

在另一台机器上运行 nc -lk -p 7777,如果出现连接拒绝,查看是否放开端口号

k8s 搭建基于session模式的flink集群,kubernetes,flink,容器

k8s查看读取到的数据

k8s 搭建基于session模式的flink集群,kubernetes,flink,容器文章来源地址https://www.toymoban.com/news/detail-695798.html

到了这里,关于k8s 搭建基于session模式的flink集群的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 二进制搭建 Kubernetes与k8s集群搭建(一)

    目录 二进制搭建 Kubernetes v1.20     操作系统初始化配置 部署 docker引擎 部署 etcd 集群 准备签发证书环境 在 master01 节点上操作      生成Etcd证书 在 node01 节点上操作 在 node02 节点上操作 部署 Master 组件 在 master01 节点上操作 部署 Worker Node 组件 在所有 node 节点上操作 在 mas

    2024年02月06日
    浏览(69)
  • Linux搭建 Kubernetes(K8S)集群详情教程

    当搭建 Kubernetes 集群时,涉及的详细步骤可能较多,以下是详细的 Kubernetes 单节点集群搭建步骤: 步骤 1: 准备工作 确保满足以下基本要求: 一台运行 Ubuntu 18.04 或更高版本的机器。 2 GB 或更多内存。 2 个 CPU 核心或更多。 安装了 Docker。 步骤 2: 安装 Docker 步骤 3: 安装 kubea

    2024年01月17日
    浏览(61)
  • Kubernetes(K8s)使用 kubeadm 方式搭建多 master 高可用 K8s 集群

    本篇主要针对上篇文章的单 master 节点的 K8s 集群上搭建多 master 节点集群 和 LB 负载均衡服务器。 Kubernetes(K8S)集群搭建基础入门教程 虚拟机 IP 地址: IP 操作系统 主机名称 192.168.2.121 centos7.9 k8s-master01 192.168.2.124 centos7.9 k8s-master02 192.168.2.125 centos7.9 k8s-node01 192.168.2.126 centos

    2023年04月26日
    浏览(52)
  • 【kubernetes】k8s高可用集群搭建(三主三从)

    目录 【kubernetes】k8s高可用集群搭建(三主三从) 一、服务器设置 二、环境配置 1、关闭防火墙 2、关闭selinux 3、关闭swap 4、修改主机名(根据主机角色不同,做相应修改) 5、主机名映射 6、将桥接的IPv4流量传递到iptables的链 7、时间同步 8、master之间进行免密登录设置 三、

    2024年02月09日
    浏览(42)
  • 使用containerd从0搭建k8s(kubernetes)集群

    准备两台服务器节点,如果需要安装虚拟机,可以参考《wmware和centos安装过程》 机器名 IP 角色 CPU 内存 centos01 192.168.109.130 master 4核 2G centos02 192.168.109.131 node 4核 2G 设置主机名,所有节点都执行 关闭防火墙,所有节点都执行 关闭swap内存,所有节点都执行 配置网桥,所有节点

    2024年02月08日
    浏览(66)
  • 搭建单机版K8S运行Flink集群

    环境要求 操作系统: CentOS 7.x 64位 Kubernetes版本:v1.16.2 Docker版本:19.03.13-ce Flink版本:1.14.3 使用中国YUM及镜像源  1.安装Kubernetes: 1.1 创建文件:/etc/yum.repos.d/kubernetes.repo,内容如下: 1.2  执行安装命令:  1.3 启动kubelet服务并设置开机自启: 2.安装Docker: 2.1 创建文件:

    2023年04月26日
    浏览(47)
  • 【云原生-K8s-1】kubeadm搭建k8s集群(一主两从)完整教程及kubernetes简介

    🍁 博主简介   🏅云计算领域优质创作者   🏅华为云开发者社区专家博主   🏅阿里云开发者社区专家博主 💊 交流社区: 运维交流社区 欢迎大家的加入!   Kubernetes(简称:k8s) 是Google在2014年6月开源的一个容器集群管理系统,使用Go语言开发,用于管理云平台中多

    2024年02月07日
    浏览(66)
  • K8s(kubernetes)集群搭建及dashboard安装、基础应用部署

    本质是一组服务器集群,在集群每个节点上运行特定的程序,来对节点中的容器进行管理。实现资源管理的自动化。 自我修复 弹性伸缩 服务发现 负载均衡 版本回退 存储编排 控制节点(master)-控制平面 APIserver :资源操作的唯一入口 scheduler :集群资源调度,将Pod调度到node节

    2024年02月08日
    浏览(53)
  • Kubernetes(k8s)集群搭建,完整无坑,不需要科学上网~

    k8s集群,每一台机器需要2核CPU+2G的内存。 我们此次搭建的集群环境,各个版本如下: Docker 18.09.0 kubeadm-1.14.0-0 kubelet-1.14.0-0 kubectl-1.14.0-0 k8s.gcr.io/kube-apiserver:v1.14.0 k8s.gcr.io/kube-controller-manager:v1.14.0 k8s.gcr.io/kube-scheduler:v1.14.0 k8s.gcr.io/kube-proxy:v1.14.0 k8s.gcr.io/pause:3.1 k8s.gcr.io/etcd:3.

    2024年02月12日
    浏览(36)
  • kubernetes(k8s)安装、集群搭建、可视化界面、完全卸载

    官网:https://kubernetes.io/zh-cn/docs/concepts/overview/ Kubernetes 是一个可移植、可扩展的开源平台, 用于管理容器化的工作负载和服务 ,可促进声明式配置和自动化。 Kubernetes 作用: 服务发现和负载均衡 Kubernetes 可以使用 DNS 名称或自己的 IP 地址来暴露容器。 如果进入容器的流量很

    2024年02月02日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包