物料准备
- k8s Rancher, 阿里云的 nas 存储
- 一台物理机(需要挂载PVC: dags plugins 和 logs)
- mysql 数据库和redis
- 包含airflow 以及对应依赖库的基础镜像
这里使用 airflow 的 CeleryExecutor 部署在 k8s 上,并不是使用 KubernetesExecutor.
基础镜像构建
Dockerfile 文件
这里使用的是 airflow 官方的V2.6.0 的 python3.10 的镜像
FROM apache/airflow:slim-latest-python3.10
USER root
EXPOSE 8080 5555 8793
COPY config/airflow.cfg /opt/airflow/airflow.cfg
RUN set -ex \
&& buildDeps=' \
freetds-dev \
libkrb5-dev \
libsasl2-dev \
libssl-dev \
libffi-dev \
libpq-dev \
git \
python3-dev \
gcc \
sasl2-bin \
libsasl2-2 \
libsasl2-dev \
libsasl2-modules \
' \
&& apt-get update -yqq \
&& apt-get upgrade -yqq \
&& apt-get install -yqq --no-install-recommends \
$buildDeps \
freetds-bin \
build-essential \
default-libmysqlclient-dev \
apt-utils \
curl \
rsync \
netcat \
locales \
procps \
telnet
USER airflow
RUN pip install celery
RUN pip install flower
RUN pip install pymysql
RUN pip install mysqlclient
RUN pip install redis
RUN pip install livy==0.6.0
RUN pip install apache-airflow-providers-mysql
RUN pip install apache-airflow-providers-apache-hive
RUN airflow db init
# 保证基础镜像安全,执行完数据库初始化后删除相关配置文件
RUN rm -rf /opt/airflow/airflow.cfg
构建基础镜像并推送至镜像仓库:
- 在构建airflow基础镜像的时候同时会初始化对应的元数据库
相关部署代码 git 地址:https://github.com/itnoobzzy/EasyAirflow.git
拉取完代码后进入 EasyAirflow项目 创建 logs 目录并且sudo -R chmod 777 logs
:
因为在构建基础镜像的时候需要初始化元数据库,所以需要修改配置文件,这里主要需要修改四个地方:-
mv config/default_airflow.cfg config/airflow.cfg
, 并且修改 airflow.cfg 文件 - 将 executor 修改为 CeleryExecutor
- 修改 sql_alchemy_conn 使用已有的 mysql 数据库, 这里需要注意连接驱动使用 mysql+pymysql
- 修改 broker_url 和 result_backend, broker 需要使用 redis 通信, result_backend 使用 mysql 存储,这里 result_backend 需要注意使用 db+mysql 连接驱动
-
- 执行构建命令,并推送至镜像仓库
docker build -t airflow:2.6.0 .
docker tag airflow:2.6.0 itnoobzzy/airflow:v2.6.0-python3.10
docker push itnoobzzy/airflow:v2.6.0-python3.10
部署步骤
-
创建 namespace: airflow-v2
-
创建 PVC
volumes.yaml
文件地址:https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/volumes.yaml
导入 rancher 后将创建三个 PVC 分别存储 dags, logs, plugins如下:
挂载PVC至物理机器上,方便管理 dags, logs 和 plugins, 查看 PVC 详情并执行挂载命令,下边是挂载 airflow-dags-pv 至机器上的例子:
挂载完后df -h
验证下 dags, logs, plugins 是否都挂载正常: -
创建 ConfigMap
configmap.yaml
文件 Git 地址: https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/configmap.yaml
将 yaml 文件导入 rancher 如下: -
创建 Secret(可选)
secret.yaml
文件地址:https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/secret.yaml
将 yaml 文件导入 rancher 如下, 需要注意 secret.yaml 文件中的数据库信息需要 base64 加密:
可以将数据库信息使用 k8s 的 secret 存储, 然后再 Deployment yaml 文件中使用环境变量获取 secret 中的配置信息。 -
创建 scheduler Deployment
scheduler-dp.yaml
文件地址:https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/scheduler-dp.yamlkind: Deployment apiVersion: apps/v1 metadata: name: airflow-scheduler namespace: airflow-v2 spec: replicas: 1 selector: matchLabels: tier: airflow component: scheduler release: v2.6.0 template: metadata: labels: tier: airflow component: scheduler release: v2.6.0 annotations: cluster-autoscaler.kubernetes.io/safe-to-evict: "true" spec: restartPolicy: Always terminationGracePeriodSeconds: 10 containers: - name: scheduler image: itnoobzzy/airflow:v2.6.0-python3.10 imagePullPolicy: IfNotPresent args: ["airflow", "scheduler"] env: - name: AIRFLOW__CORE__FERNET_KEY value: cwmLHK76Sp9XclhLzHwCNXNiAr04OSMKQ--6WXRjmss= - name: AIRFLOW__CORE__EXECUTOR value: CeleryExecutor - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN valueFrom: secretKeyRef: name: airflow-secrets key: sql_alchemy_conn - name: AIRFLOW__CELERY__BROKER_URL valueFrom: secretKeyRef: name: airflow-secrets key: broker_url - name: AIRFLOW__CELERY__RESULT_BACKEND valueFrom: secretKeyRef: name: airflow-secrets key: result_backend volumeMounts: - name: logs-pv mountPath: "/opt/airflow/logs" - name: dags-pv mountPath: "/opt/airflow/dags" - name: plugins-pv mountPath: "/opt/airflow/plugins" - name: config mountPath: "/opt/airflow/airflow.cfg" subPath: airflow.cfg volumes: - name: config configMap: name: airflow-configmap - name: logs-pv persistentVolumeClaim: claimName: airflow-logs-pvc - name: dags-pv persistentVolumeClaim: claimName: airflow-dags-pvc - name: plugins-pv persistentVolumeClaim: claimName: airflow-plugins-pvc
-
创建 webserver Deployment 和 Service
webserver.yaml
文件地址:https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/webserver.yaml
kind: Deployment
apiVersion: apps/v1
metadata:
name: airflow-webserver
namespace: airflow-v2
spec:
replicas: 1
selector:
matchLabels:
tier: airflow
component: webserver
release: v2.6.0
template:
metadata:
labels:
tier: airflow
component: webserver
release: v2.6.0
annotations:
cluster-autoscaler.kubernetes.io/safe-to-evict: "true"
spec:
restartPolicy: Always
terminationGracePeriodSeconds: 10
containers:
- name: webserver
image: itnoobzzy/airflow:v2.6.0-python3.10
imagePullPolicy: IfNotPresent
args: ["airflow", "webserver"]
env:
- name: AIRFLOW__CORE__FERNET_KEY
value: cwmLHK76Sp9XclhLzHwCNXNiAr04OSMKQ--6WXRjmss=
- name: AIRFLOW__CORE__EXECUTOR
value: CeleryExecutor
- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
- name: AIRFLOW__CELERY__BROKER_URL
valueFrom:
secretKeyRef:
name: airflow-secrets
key: broker_url
- name: AIRFLOW__CELERY__RESULT_BACKEND
valueFrom:
secretKeyRef:
name: airflow-secrets
key: result_backend
volumeMounts:
- name: logs-pv
mountPath: "/opt/airflow/logs"
- name: dags-pv
mountPath: "/opt/airflow/dags"
- name: plugins-pv
mountPath: "/opt/airflow/plugins"
- name: config
mountPath: "/opt/airflow/airflow.cfg"
subPath: airflow.cfg
volumes:
- name: config
configMap:
name: airflow-configmap
- name: logs-pv
persistentVolumeClaim:
claimName: airflow-logs-pvc
- name: dags-pv
persistentVolumeClaim:
claimName: airflow-dags-pvc
- name: plugins-pv
persistentVolumeClaim:
claimName: airflow-plugins-pvc
---
apiVersion: v1
kind: Service
metadata:
name: airflow-webserver-svc
spec:
type: ClusterIP
ports:
- name: airflow-webserver
port: 8080
targetPort: 8080
protocol: TCP
selector:
tier: airflow
component: webserver
release: v2.6.0
-
创建 flower Deployment 和 Service
flower.yaml
文件地址:https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/flower.yamlkind: Deployment apiVersion: apps/v1 metadata: name: airflow-flower namespace: airflow-v2 spec: replicas: 1 selector: matchLabels: tier: airflow component: flower release: v2.6.0 template: metadata: labels: tier: airflow component: flower release: v2.6.0 annotations: cluster-autoscaler.kubernetes.io/safe-to-evict: "true" spec: restartPolicy: Always terminationGracePeriodSeconds: 10 containers: - name: flower image: itnoobzzy/airflow:v2.6.0-python3.10 imagePullPolicy: IfNotPresent args: ["airflow", "celery", "flower"] env: - name: AIRFLOW__CORE__FERNET_KEY value: cwmLHK76Sp9XclhLzHwCNXNiAr04OSMKQ--6WXRjmss= - name: AIRFLOW__CORE__EXECUTOR value: CeleryExecutor - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN valueFrom: secretKeyRef: name: airflow-secrets key: sql_alchemy_conn - name: AIRFLOW__CELERY__BROKER_URL valueFrom: secretKeyRef: name: airflow-secrets key: broker_url - name: AIRFLOW__CELERY__RESULT_BACKEND valueFrom: secretKeyRef: name: airflow-secrets key: result_backend volumeMounts: - name: logs-pv mountPath: "/opt/airflow/logs" - name: dags-pv mountPath: "/opt/airflow/dags" - name: plugins-pv mountPath: "/opt/airflow/plugins" - name: config mountPath: "/opt/airflow/airflow.cfg" subPath: airflow.cfg volumes: - name: config configMap: name: airflow-configmap - name: logs-pv persistentVolumeClaim: claimName: airflow-logs-pvc - name: dags-pv persistentVolumeClaim: claimName: airflow-dags-pvc - name: plugins-pv persistentVolumeClaim: claimName: airflow-plugins-pvc --- apiVersion: v1 kind: Service metadata: name: airflow-flower-svc spec: type: ClusterIP ports: - name: airflow-flower port: 5555 targetPort: 5555 protocol: TCP selector: tier: airflow component: flower release: v2.6.0
-
创建 worker Deployment
worker.yaml
文件地址:https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/worker.yamlkind: Deployment apiVersion: apps/v1 metadata: name: airflow-worker namespace: airflow-v2 spec: replicas: 1 selector: matchLabels: tier: airflow component: worker release: v2.6.0 template: metadata: labels: tier: airflow component: worker release: v2.6.0 annotations: cluster-autoscaler.kubernetes.io/safe-to-evict: "true" spec: restartPolicy: Always terminationGracePeriodSeconds: 10 containers: - name: worker image: itnoobzzy/airflow:v2.6.0-python3.10 imagePullPolicy: IfNotPresent args: ["airflow", "celery", "worker"] env: - name: AIRFLOW__CORE__FERNET_KEY value: cwmLHK76Sp9XclhLzHwCNXNiAr04OSMKQ--6WXRjmss= - name: AIRFLOW__CORE__EXECUTOR value: CeleryExecutor - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN valueFrom: secretKeyRef: name: airflow-secrets key: sql_alchemy_conn - name: AIRFLOW__CELERY__BROKER_URL valueFrom: secretKeyRef: name: airflow-secrets key: broker_url - name: AIRFLOW__CELERY__RESULT_BACKEND valueFrom: secretKeyRef: name: airflow-secrets key: result_backend volumeMounts: - name: logs-pv mountPath: "/opt/airflow/logs" - name: dags-pv mountPath: "/opt/airflow/dags" - name: plugins-pv mountPath: "/opt/airflow/plugins" - name: config mountPath: "/opt/airflow/airflow.cfg" subPath: airflow.cfg volumes: - name: config configMap: name: airflow-configmap - name: logs-pv persistentVolumeClaim: claimName: airflow-logs-pvc - name: dags-pv persistentVolumeClaim: claimName: airflow-dags-pvc - name: plugins-pv persistentVolumeClaim: claimName: airflow-plugins-pvc
-
创建 webserver 和 flower 的 Ingress
Ingress.yaml
文件地址:https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/ingress.yaml--- apiVersion: extensions/v1beta1 kind: Ingress metadata: name: airflow-ingress spec: rules: - host: airflow-webserver.akulaku.com http: paths: - path: / backend: serviceName: airflow-webserver-svc servicePort: 8080 - host: airflow-flower.akulaku.com http: paths: - path: / backend: serviceName: airflow-flower-svc servicePort: 5555
验证
部署完后在浏览器中输入 http://airflow-webserver.akulaku.com/ 访问 webserver 界面(需要注意 /etc/hosts 文件配置了域名解析),webserver 初始化管理员用户名和密码都为 admin:
在浏览器中输入 http://airflow-flower.akulaku.com/ 访问 flower 界面:
这里flower worker name 是 worker Pod 的name:
触发 DAG 运行并且在挂载的机器上查看对应的日志:
机器上的 /data/app/k8s/EasyAirflow/logs
这个目录就是前边将 k8s 的 PVC 的内容挂载在机器上对应的目录:
(base) [admin@data-landsat-test03 logs]$ view /data/app/k8s/EasyAirflow/logs/dag_id\=tutorial/run_id\=manual__2023-05-15T09\:27\:44.253944+00\:00/task_id\=sleep/attempt\=1.log
dag_id=tutorial/ dag_processor_manager/ scheduler/
[2023-05-15T09:27:47.187+0000] {taskinstance.py:1125} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: tutorial.sleep manual__2023-05-15T09:27:44.253944+00:00 [queued]>
[2023-05-15T09:27:47.195+0000] {taskinstance.py:1125} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: tutorial.sleep manual__2023-05-15T09:27:44.253944+00:00 [queued]>
[2023-05-15T09:27:47.195+0000] {taskinstance.py:1331} INFO - Starting attempt 1 of 4
[2023-05-15T09:27:47.206+0000] {taskinstance.py:1350} INFO - Executing <Task(BashOperator): sleep> on 2023-05-15 09:27:44.253944+00:00
[2023-05-15T09:27:47.209+0000] {standard_task_runner.py:57} INFO - Started process 71 to run task
[2023-05-15T09:27:47.213+0000] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'tutorial', 'sleep', 'manual__2023-05-15T09:27:44.253944+00:00', '--job-id', '75', '--raw', '--subdir', 'DAGS_FOLDER/tutorial.py', '--cfg-path', '/tmp/tmpy65q1a3h']
[2023-05-15T09:27:47.213+0000] {standard_task_runner.py:85} INFO - Job 75: Subtask sleep
[2023-05-15T09:27:47.260+0000] {task_command.py:410} INFO - Running <TaskInstance: tutorial.sleep manual__2023-05-15T09:27:44.253944+00:00 [running]> on host airflow-worker-6f9ffb7fb8-t6j9p
[2023-05-15T09:27:47.330+0000] {taskinstance.py:1568} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='airflow@example.com' AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='tutorial' AIRFLOW_CTX_TASK_ID='sleep' AIRFLOW_CTX_EXECUTION_DATE='2023-05-15T09:27:44.253944+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-05-15T09:27:44.253944+00:00'
[2023-05-15T09:27:47.331+0000] {subprocess.py:63} INFO - Tmp dir root location:
/tmp
[2023-05-15T09:27:47.331+0000] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'sleep 5']
[2023-05-15T09:27:47.355+0000] {subprocess.py:86} INFO - Output:
[2023-05-15T09:27:52.360+0000] {subprocess.py:97} INFO - Command exited with return code 0
[2023-05-15T09:27:52.383+0000] {taskinstance.py:1368} INFO - Marking task as SUCCESS. dag_id=tutorial, task_id=sleep, execution_date=20230515T092744, start_date=20230515T092747, end_date=20230515T092752
[2023-05-15T09:27:52.426+0000] {local_task_job_runner.py:232} INFO - Task exited with return code 0
[2023-05-15T09:27:52.440+0000] {taskinstance.py:2674} INFO - 0 downstream tasks scheduled from follow-on schedule check
总结
这里的将 airflow 部署在 k8s 上,并没有使用 airflow 的 k8s executor ,不能够做到任务执行完后自动停止掉 Pod,缩减成本。
但是 airflow 一般运行的都是批处理任务,集中在一个时间段内运行,目前我们公司使用的场景就是在夜间使用 airflow 跑大量离线处理任务,因此在白天的时候可以将 airflow 的一些 worker 给停掉,晚上再根据实际情况增加对应的 worker Pod。
但是在启停 worker 的 Pod 的时候也有一些注意事项:文章来源:https://www.toymoban.com/news/detail-491342.html
- 启停能否做成自动化的,在白天某个时间点开始停止 worker Pod, 在夜间某个时间点开始启动 Pod。
- 需要优雅停止,停止前需要等待 worker 中的任务运行完毕(或者说最多等待多久时间杀死任务进程),并且不会再有新任务进入将要停止的 Pod 中。
后边针对上边所说的问题进行研究,一旦发现好的解决方法和步骤,将与大家一起分享~文章来源地址https://www.toymoban.com/news/detail-491342.html
到了这里,关于airflow v2.6.0 k8s 部署(Rancher)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!