云原生Flinkonk8s讲解与实战操作
一、概述
Flink核心是一个流式的数据流执行引擎,并且能够基于同一个Flink运行时,提供支持流处理和批处理两种类型应用。其针对数据流的分布式计算提供了数据分布,数据通信及容错机制等功能。
Flink官网:https://flink.apache.org/
不同版本的文档:https://nightlies.apache.org/flink/
k8s on flink 官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/
也可以参考我之前的文章:大数据Hadoop之——实时计算流计算引擎Flink(Flink环境部署)
GitHub地址:https://github.com/apache/flink/tree/release-1.14.6/ 二、Flink 运行模式
官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/deployment/overview/
FLink on yarn 有三种运行模式: yarn-session模式(Seesion Mode) yarn-cluster模式(Per-Job Mode) Application模式(Application Mode)
【温馨提示】Per-Job 模式(已弃用),Per-job 模式仅由 YARN 支持,并已在 Flink 1.15 中弃用。它将被丢弃在FLINK-26000中。 三、Flink on k8s实战操作
1)flink下载
下载地址:https://flink.apache.org/downloads.html wget https://dlcdn.apache.org/flink/flink-1.14.6/flink-1.14.6-bin-scala_2.12.tgz2)构建基础镜像docker pull apache/flink:1.14.6-scala_2.12 docker tag apache/flink:1.14.6-scala_2.12 myharbor.com/bigdata/flink:1.14.6-scala_2.12 docker push myharbor.com/bigdata/flink:1.14.6-scala_2.123)session模式
Flink Session 集群作为长时间运行的 Kubernetes Deployment 执行。你可以在一个Session 集群上运行多个 Flink 作业。每个作业都需要在集群部署完成后提交到集群。
Kubernetes 中的Flink Session 集群部署至少包含三个组件: 运行 JobManager 的部署TaskManagers 池的部署暴露 JobManager 的REST 和 UI 端口的服务1、Native Kubernetes 模式
参数配置:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#kubernetes-namespace 【1】构建镜像DockerfileFROM myharbor.com/bigdata/flink:1.14.6-scala_2.12 RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone RUN export LANG=zh_CN.UTF-8
开始构建镜像 docker build -t myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 . --no-cache # 上传镜像 docker push myharbor.com/bigdata/flink-session:1.14.6-scala_2.12【2】创建命名空间和serviceaccount# 创建namespace kubectl create ns flink # 创建serviceaccount kubectl create serviceaccount flink-service-account -n flink # 用户授权 kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account【3】创建flink集群./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster -Dkubernetes.container.image=myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 -Dkubernetes.namespace=flink -Dkubernetes.jobmanager.service-account=flink-service-account -Dkubernetes.rest-service.exposed.type=NodePort
【4】提交任务./bin/flink run --target kubernetes-session -Dkubernetes.cluster-id=my-first-flink-cluster -Dkubernetes.namespace=flink -Dkubernetes.jobmanager.service-account=flink-service-account ./examples/streaming/TopSpeedWindowing.jar # 参数配置 ./examples/streaming/WordCount.jar -Dkubernetes.taskmanager.cpu=2000m -Dexternal-resource.limits.kubernetes.cpu=4000m -Dexternal-resource.limits.kubernetes.memory=10Gi -Dexternal-resource.requests.kubernetes.cpu=2000m -Dexternal-resource.requests.kubernetes.memory=8Gi -Dkubernetes.taskmanager.cpu=2000m
【温馨提示】注意jdk版本,目前jdk8是正常的。
【5】查看kubectl get pods -n flink kubectl logs -f my-first-flink-cluster-taskmanager-1-1
【6】删除flink集群kubectl delete deployment/my-first-flink-cluster -n flink kubectl delete ns flink --force2、Standalone模式【1】构建镜像
默认用户是flink用户,这里我换成admin,根据企业需要更换用户,脚本可以通过上面运行的pod拿到。
启动脚本 docker-entrypoint.sh #!/usr/bin/env bash ############################################################################### # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ############################################################################### COMMAND_STANDALONE="standalone-job" COMMAND_HISTORY_SERVER="history-server" # If unspecified, the hostname of the container is taken as the JobManager address JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)} CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml" drop_privs_cmd() { if [ $(id -u) != 0 ]; then # Don"t need to drop privs if EUID != 0 return elif [ -x /sbin/su-exec ]; then # Alpine echo su-exec admin else # Others echo gosu admin fi } copy_plugins_if_required() { if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then return 0 fi echo "Enabling required built-in plugins" for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ";" " "); do echo "Linking ${target_plugin} to plugin directory" plugin_name=${target_plugin%.jar} mkdir -p "${FLINK_HOME}/plugins/${plugin_name}" if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then echo "Plugin ${target_plugin} does not exist. Exiting." exit 1 else ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}" echo "Successfully enabled ${target_plugin}" fi done } set_config_option() { local option=$1 local value=$2 # escape periods for usage in regular expressions local escaped_option=$(echo ${option} | sed -e "s/./\./g") # either override an existing entry, or append a new one if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}" else echo "${option}: ${value}" >> "${CONF_FILE}" fi } prepare_configuration() { set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS} set_config_option blob.server.port 6124 set_config_option query.server.port 6125 if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS} fi if [ -n "${FLINK_PROPERTIES}" ]; then echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}" fi envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}" } maybe_enable_jemalloc() { if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so" JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so" if [ -f "$JEMALLOC_PATH" ]; then export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH elif [ -f "$JEMALLOC_FALLBACK" ]; then export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK else if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then MSG_PATH=$JEMALLOC_PATH else MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK" fi echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn"t be found. glibc will be used instead." fi fi } maybe_enable_jemalloc copy_plugins_if_required prepare_configuration args=("$@") if [ "$1" = "help" ]; then printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER}) " printf " Or $(basename "$0") help " printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the "DISABLE_JEMALLOC" environment variable to "true". " exit 0 elif [ "$1" = "jobmanager" ]; then args=("${args[@]:1}") echo "Starting Job Manager" exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}" elif [ "$1" = ${COMMAND_STANDALONE} ]; then args=("${args[@]:1}") echo "Starting Job Manager" exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}" elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then args=("${args[@]:1}") echo "Starting History Server" exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}" elif [ "$1" = "taskmanager" ]; then args=("${args[@]:1}") echo "Starting Task Manager" exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}" fi args=("${args[@]}") # Running command in pass-through mode exec $(drop_privs_cmd) "${args[@]}"
编排Dockerfile FROM myharbor.com/bigdata/centos:7.9.2009 USER root # 安装常用工具 RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof # 设置时区,默认是UTC时区 RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone RUN mkdir -p /opt/apache ADD jdk-8u212-linux-x64.tar.gz /opt/apache/ ADD flink-1.14.6-bin-scala_2.12.tgz /opt/apache/ ENV FLINK_HOME /opt/apache/flink-1.14.6 ENV JAVA_HOME /opt/apache/jdk1.8.0_212 ENV PATH $JAVA_HOME/bin:$PATH # 创建用户应用jar目录 RUN mkdir $FLINK_HOME/usrlib/ #RUN mkdir home COPY docker-entrypoint.sh /opt/apache/ RUN chmod +x /opt/apache/docker-entrypoint.sh RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin RUN chown -R admin:admin /opt/apache #设置的工作目录 WORKDIR $FLINK_HOME # 对外暴露端口 EXPOSE 6123 8081 # 执行脚本,构建镜像时不执行,运行实例才会执行 ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"] CMD ["help"]
开始构建镜像 docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache # 上传镜像 docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 # 删除镜像 docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 crictl rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12【2】创建命名空间和serviceaccount# 创建namespace kubectl create ns flink # 创建serviceaccount kubectl create serviceaccount flink-service-account -n flink # 用户授权 kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account【3】编排yaml文件flink-configuration-configmap.yaml apiVersion: v1 kind: ConfigMap metadata: name: flink-config labels: app: flink data: flink-conf.yaml: |+ jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 2 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 queryable-state.proxy.ports: 6125 jobmanager.memory.process.size: 3200m taskmanager.memory.process.size: 2728m taskmanager.memory.flink.size: 2280m parallelism.default: 2 log4j-console.properties: |+ # This affects logging for both user code and Flink rootLogger.level = INFO rootLogger.appenderRef.console.ref = ConsoleAppender rootLogger.appenderRef.rolling.ref = RollingFileAppender # Uncomment this if you want to _only_ change Flink"s logging #logger.flink.name = org.apache.flink #logger.flink.level = INFO # The following lines keep the log level of common libraries/connectors on # log level INFO. The root logger does not override this. You have to manually # change the log levels here. logger.akka.name = akka logger.akka.level = INFO logger.kafka.name= org.apache.kafka logger.kafka.level = INFO logger.hadoop.name = org.apache.hadoop logger.hadoop.level = INFO logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = INFO # Log all infos to the console appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Log all infos in the given rolling file appender.rolling.name = RollingFileAppender appender.rolling.type = RollingFile appender.rolling.append = false appender.rolling.fileName = ${sys:log.file} appender.rolling.filePattern = ${sys:log.file}.%i appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 10 # Suppress the irrelevant (wrong) warnings from the Netty channel handler logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFFjobmanager-service.yaml 可选服务,仅非 HA 模式需要。apiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: type: ClusterIP ports: - name: rpc port: 6123 - name: blob-server port: 6124 - name: webui port: 8081 selector: app: flink component: jobmanagerjobmanager-rest-service.yaml 可选服务,将 jobmanager rest 端口公开为公共 Kubernetes 节点的端口。apiVersion: v1 kind: Service metadata: name: flink-jobmanager-rest spec: type: NodePort ports: - name: rest port: 8081 targetPort: 8081 nodePort: 30081 selector: app: flink component: jobmanagertaskmanager-query-state-service.yaml 可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。apiVersion: v1 kind: Service metadata: name: flink-taskmanager-query-state spec: type: NodePort ports: - name: query-state port: 6125 targetPort: 6125 nodePort: 30025 selector: app: flink component: taskmanager
以上几个配置文件是公共的 jobmanager-session-deployment-non-ha.yaml apiVersion: apps/v1 kind: Deployment metadata: name: flink-jobmanager spec: replicas: 1 selector: matchLabels: app: flink component: jobmanager template: metadata: labels: app: flink component: jobmanager spec: containers: - name: jobmanager image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 args: ["jobmanager"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob-server - containerPort: 8081 name: webui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/apache/flink-1.14.6/conf/ securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.propertiestaskmanager-session-deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager spec: replicas: 2 selector: matchLabels: app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 args: ["taskmanager"] ports: - containerPort: 6122 name: rpc - containerPort: 6125 name: query-state livenessProbe: tcpSocket: port: 6122 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/apache/flink-1.14.6/conf/ securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties【4】创建flink集群kubectl create ns flink # Configuration and service definition kubectl create -f flink-configuration-configmap.yaml -n flink # service kubectl create -f jobmanager-service.yaml -n flink kubectl create -f jobmanager-rest-service.yaml -n flink kubectl create -f taskmanager-query-state-service.yaml -n flink # Create the deployments for the cluster kubectl create -f jobmanager-session-deployment-non-ha.yaml -n flink kubectl create -f taskmanager-session-deployment.yaml -n flink
镜像逆向解析dockerfile alias whaler="docker run -t --rm -v /var/run/docker.sock:/var/run/docker.sock:ro pegleg/whaler" whaler flink:1.14.6-scala_2.12
查看 kubectl get pods,svc -n flink -owide
web:http://192.168.182.110:30081/#/overview
【5】提交任务./bin/flink run -m local-168-182-110:30081 ./examples/streaming/WordCount.jar
kubectl logs flink-taskmanager-54649bf96c-zjtkh -n flink
【6】删除flink集群kubectl delete -f jobmanager-service.yaml -n flink kubectl delete -f flink-configuration-configmap.yaml -n flink kubectl delete -f taskmanager-session-deployment.yaml -n flink kubectl delete -f jobmanager-session-deployment.yaml -n flink kubectl delete ns flink --force【7】访问flink web
端口就是 jobmanager-rest-service.yaml 文件中的NodePort
http://192.168.182.110:30081/#/overview
4)application模式(推荐)
Kubernetes 中一个基本的Flink Application 集群部署包含三个组件: 运行 JobManager 的应用程序TaskManagers 池的部署暴露 JobManager 的REST 和 UI 端口的服务1、Native Kubernetes 模式(常用)【1】构建镜像DockerfileFROM myharbor.com/bigdata/flink:1.14.6-scala_2.12 RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone RUN export LANG=zh_CN.UTF-8 RUN mkdir -p $FLINK_HOME/usrlib COPY TopSpeedWindowing.jar $FLINK_HOME/usrlib/
开始构建镜像 docker build -t myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 . --no-cache # 上传镜像 docker push myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 # 删除镜像 docker rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 crictl rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12【2】创建命名空间和serviceacount# 创建namespace kubectl create ns flink # 创建serviceaccount kubectl create serviceaccount flink-service-account -n flink # 用户授权 kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account【3】创建flink集群并提交任务./bin/flink run-application --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster -Dkubernetes.container.image=myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 -Dkubernetes.jobmanager.replicas=1 -Dkubernetes.namespace=flink -Dkubernetes.jobmanager.service-account=flink-service-account -Dexternal-resource.limits.kubernetes.cpu=2000m -Dexternal-resource.limits.kubernetes.memory=2Gi -Dexternal-resource.requests.kubernetes.cpu=1000m -Dexternal-resource.requests.kubernetes.memory=1Gi -Dkubernetes.rest-service.exposed.type=NodePort local:///opt/flink/usrlib/TopSpeedWindowing.jar
【注意】 local 是应用模式中唯一支持的方案。local代表本地环境,这里即pod或者容器环境,并非宿主机。
查看 kubectl get pods pods,svc -n flink
kubectl logs -f my-first-application-cluster-taskmanager-1-1 -n flink
【4】删除flink集群kubectl delete deployment/my-first-application-cluster -n flink kubectl delete ns flink --force2、Standalone模式【1】构建镜像 Dockerfile
启动脚本 docker-entrypoint.sh #!/usr/bin/env bash ############################################################################### # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ############################################################################### COMMAND_STANDALONE="standalone-job" COMMAND_HISTORY_SERVER="history-server" # If unspecified, the hostname of the container is taken as the JobManager address JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)} CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml" drop_privs_cmd() { if [ $(id -u) != 0 ]; then # Don"t need to drop privs if EUID != 0 return elif [ -x /sbin/su-exec ]; then # Alpine echo su-exec admin else # Others echo gosu admin fi } copy_plugins_if_required() { if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then return 0 fi echo "Enabling required built-in plugins" for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ";" " "); do echo "Linking ${target_plugin} to plugin directory" plugin_name=${target_plugin%.jar} mkdir -p "${FLINK_HOME}/plugins/${plugin_name}" if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then echo "Plugin ${target_plugin} does not exist. Exiting." exit 1 else ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}" echo "Successfully enabled ${target_plugin}" fi done } set_config_option() { local option=$1 local value=$2 # escape periods for usage in regular expressions local escaped_option=$(echo ${option} | sed -e "s/./\./g") # either override an existing entry, or append a new one if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}" else echo "${option}: ${value}" >> "${CONF_FILE}" fi } prepare_configuration() { set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS} set_config_option blob.server.port 6124 set_config_option query.server.port 6125 if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS} fi if [ -n "${FLINK_PROPERTIES}" ]; then echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}" fi envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}" } maybe_enable_jemalloc() { if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so" JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so" if [ -f "$JEMALLOC_PATH" ]; then export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH elif [ -f "$JEMALLOC_FALLBACK" ]; then export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK else if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then MSG_PATH=$JEMALLOC_PATH else MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK" fi echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn"t be found. glibc will be used instead." fi fi } maybe_enable_jemalloc copy_plugins_if_required prepare_configuration args=("$@") if [ "$1" = "help" ]; then printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER}) " printf " Or $(basename "$0") help " printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the "DISABLE_JEMALLOC" environment variable to "true". " exit 0 elif [ "$1" = "jobmanager" ]; then args=("${args[@]:1}") echo "Starting Job Manager" exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}" elif [ "$1" = ${COMMAND_STANDALONE} ]; then args=("${args[@]:1}") echo "Starting Job Manager" exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}" elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then args=("${args[@]:1}") echo "Starting History Server" exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}" elif [ "$1" = "taskmanager" ]; then args=("${args[@]:1}") echo "Starting Task Manager" exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}" fi args=("${args[@]}") # Running command in pass-through mode exec $(drop_privs_cmd) "${args[@]}"
编排 Dockerfile FROM myharbor.com/bigdata/centos:7.9.2009 USER root # 安装常用工具 RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof # 设置时区,默认是UTC时区 RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone RUN mkdir -p /opt/apache ADD jdk-8u212-linux-x64.tar.gz /opt/apache/ ADD flink-1.14.6-bin-scala_2.12.tgz /opt/apache/ ENV FLINK_HOME /opt/apache/flink-1.14.6 ENV JAVA_HOME /opt/apache/jdk1.8.0_212 ENV PATH $JAVA_HOME/bin:$PATH # 创建用户应用jar目录 RUN mkdir $FLINK_HOME/usrlib/ #RUN mkdir home COPY docker-entrypoint.sh /opt/apache/ RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin RUN chown -R admin:admin /opt/apache RUN chmod +x ${FLINK_HOME}/docker-entrypoint.sh #设置的工作目录 WORKDIR $FLINK_HOME # 对外暴露端口 EXPOSE 6123 8081 # 执行脚本,构建镜像时不执行,运行实例才会执行 ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"] CMD ["help"]docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache # 上传镜像 docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 # 删除镜像 docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12【2】创建命名空间和 serviceacount# 创建namespace kubectl create ns flink # 创建serviceaccount kubectl create serviceaccount flink-service-account -n flink # 用户授权 kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account【3】编排yaml文件
flink-configuration-configmap.yaml apiVersion: v1 kind: ConfigMap metadata: name: flink-config labels: app: flink data: flink-conf.yaml: |+ jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 2 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 queryable-state.proxy.ports: 6125 jobmanager.memory.process.size: 3200m taskmanager.memory.process.size: 2728m taskmanager.memory.flink.size: 2280m parallelism.default: 2 log4j-console.properties: |+ # This affects logging for both user code and Flink rootLogger.level = INFO rootLogger.appenderRef.console.ref = ConsoleAppender rootLogger.appenderRef.rolling.ref = RollingFileAppender # Uncomment this if you want to _only_ change Flink"s logging #logger.flink.name = org.apache.flink #logger.flink.level = INFO # The following lines keep the log level of common libraries/connectors on # log level INFO. The root logger does not override this. You have to manually # change the log levels here. logger.akka.name = akka logger.akka.level = INFO logger.kafka.name= org.apache.kafka logger.kafka.level = INFO logger.hadoop.name = org.apache.hadoop logger.hadoop.level = INFO logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = INFO # Log all infos to the console appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Log all infos in the given rolling file appender.rolling.name = RollingFileAppender appender.rolling.type = RollingFile appender.rolling.append = false appender.rolling.fileName = ${sys:log.file} appender.rolling.filePattern = ${sys:log.file}.%i appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 10 # Suppress the irrelevant (wrong) warnings from the Netty channel handler logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF
jobmanager-service.yaml 可选服务,仅非 HA 模式需要。apiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: type: ClusterIP ports: - name: rpc port: 6123 - name: blob-server port: 6124 - name: webui port: 8081 selector: app: flink component: jobmanager
jobmanager-rest-service.yaml 可选服务,将 jobmanager rest 端口公开为公共 Kubernetes 节点的端口。apiVersion: v1 kind: Service metadata: name: flink-jobmanager-rest spec: type: NodePort ports: - name: rest port: 8081 targetPort: 8081 nodePort: 30081 selector: app: flink component: jobmanager
taskmanager-query-state-service.yaml 可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。apiVersion: v1 kind: Service metadata: name: flink-taskmanager-query-state spec: type: NodePort ports: - name: query-state port: 6125 targetPort: 6125 nodePort: 30025 selector: app: flink component: taskmanager
jobmanager-application-non-ha.yaml ,非高可用apiVersion: batch/v1 kind: Job metadata: name: flink-jobmanager spec: template: metadata: labels: app: flink component: jobmanager spec: restartPolicy: OnFailure containers: - name: jobmanager image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 env: args: ["standalone-job", "--job-classname", "org.apache.flink.examples.java.wordcount.WordCount","--output","/tmp/result"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob-server - containerPort: 8081 name: webui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/apache/flink-1.14.6/conf - name: job-artifacts-volume mountPath: /opt/apache/flink-1.14.6/usrlib securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties - name: job-artifacts-volume hostPath: path: /mnt/nfsdata/flink/application/job-artifacts
【温馨提示】注意这里的挂载 /mnt/bigdata/flink/usrlib ,最好这里使用共享目录。
taskmanager-job-deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager spec: replicas: 2 selector: matchLabels: app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 env: args: ["taskmanager"] ports: - containerPort: 6122 name: rpc - containerPort: 6125 name: query-state livenessProbe: tcpSocket: port: 6122 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/apache/flink-1.14.6/conf - name: job-artifacts-volume mountPath: /opt/apache/flink-1.14.6/usrlib securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties - name: job-artifacts-volume hostPath: path: /mnt/nfsdata/flink/application/job-artifacts【4】创建flink集群并提交任务kubectl create ns flink # Configuration and service definition kubectl create -f flink-configuration-configmap.yaml -n flink # service kubectl create -f jobmanager-service.yaml -n flink kubectl create -f jobmanager-rest-service.yaml -n flink kubectl create -f taskmanager-query-state-service.yaml -n flink # Create the deployments for the cluster kubectl create -f jobmanager-application-non-ha.yaml -n flink kubectl create -f taskmanager-job-deployment.yaml -n flink
查看 kubectl get pods,svc -n flink
【5】删除flink集群kubectl delete -f flink-configuration-configmap.yaml -n flink kubectl delete -f jobmanager-service.yaml -n flink kubectl delete -f jobmanager-rest-service.yaml -n flink kubectl delete -f taskmanager-query-state-service.yaml -n flink kubectl delete -f jobmanager-application-non-ha.yaml -n flink kubectl delete -f taskmanager-job-deployment.yaml -n flink kubectl delete ns flink --force【6】查看kubectl get pods,svc -n flink kubectl exec -it flink-taskmanager-54cb7fc57c-g484q -n flink -- bash
Flink on k8s 讲解与实战操作,这里只是拿官方示例进行演示,后续也有相关企业案例,有任何疑问的小伙伴欢迎给我留言,后续会持续分享【云原生+大数据】相关的教程,请小伙伴耐心等待~
Java根据Freemarker模板生成Word文件1。准备模板模板数据模型1将准备好的Word模板文件另存为。xml文件(PS建议使用WPS来创建Word文件,不建议用Office)2将。xml文件重命名为。ftl文件3用文本编辑
容百科技九代高镍三元的龙头供应商自上而下分析新能源汽车是一个国家政策支持,前景非常广阔的行业在新能源汽车行业中,最重要的部件是锂电池,锂电池中,最重要且成本占比最高的细分领域是正极材料正极材料中,未来的方向是磷酸
四川最新惊现滴滴青桔出行套路消费者的智商税,5元起步四川绵阳惊现滴滴青桔出行套路消费者,智商税5元起步,单车使用者不容易发现,目前已被套路过的市民自认倒霉,你和你的城市准备好解套了吗?当事人9月22日下午1340扫了一辆共享单车到距
别免费给澳大利亚打工了前段时间我写了小短文,说应该淘汰新能源汽车,反对声一片。我也大体算了一下新能源汽车的帐。所谓的新能源汽车单纯地从电车来看,技术即不新也不节能,并且汽车失火后危险极大。可以看专业赛车
12G256G大容量不贵了,这四款低于3000元,可以让你连用四五年对于安卓手机来讲,想要用着流畅且耐用,除了最核心的性能之外,手机内存大小也很重要,甚至优先级高过处理器,因此如果你奔着三年甚至四年使用,笔者向你中肯推荐12GB256GB大内存机型
外星文明降临?新疆多地现巨大不明发光体,夜空中缓慢移动当我们抬头仰望无尽的星空,可以看到繁星点点,每一个都是和太阳类似的恒星,部分恒星的周围或许存在着不止一颗行星,如果一切时机恰好,在那个行星上诞生了生命,此时此刻当他们同样抬头望星空
厉害了,我的国我的芯国产集成电路领域14nm芯片先进工艺规模实现量产,90nm光刻机5nm刻蚀机12英寸大硅片国产CPU5G芯片等实现突破。中芯国际历经二十多年终于实现14nm芯片工艺规模量产。虽然与
指间的卫星通信近日,华为和苹果相继在发布新机时提到,新机具备卫星通信功能,为应急场景的通信需求打开一扇窗。卫星通信并非新技术,例如日常生活中的广播电视互联网等此前都有一定应用,只是更多用在公共事
又打了起来了?国美推新型线下体验店,正面硬刚三大巨头9月16日,国美首个新模式线下门店真快乐线下体验中心在北京西坝河正式开业。一场AllIn新零售的殊死搏斗,正在徐徐拉开帷幕。重返一线后,黄光裕曾显得雄心万丈,势要将国美拉出泥潭,并
美联储激进加息,汇率和房价,让我们陷入两难北京时间9月22日凌晨,美联储再度激进加息,宣布上调联邦基金利率75个基点,目标区间由2。252。50上调至3。003。25,这也是年内美联储连续第三次加息75个基点。美联储表示,
两区建设两年来,数字人民币试点已扩大到北京全域昨日,记者从两区建设金融领域专场发布会获悉,两区建设两年来,数字人民币试点已扩大到北京全域,累计落地40余万个场景。资料图新华社记者陈钟昊摄民生消费交通出行酒店餐饮数字政务如今,市
这段话平复了我的焦虑这段话平复了我的焦虑今年你挣了10万,会发现有很多挣100万的人,你挣了100万,会发现有很多挣1000万的人。年收入3万的农民,在山里田园间也能过得很幸福。年入千万的老板可能也会
世事一场梦,唯有秋知了南国的秋热情奔放。而北国的秋高冷静谧。正如郁达夫所说的,北国的秋,却特别地来得清,来得静,来得悲凉。他从南到北,不远千里,也只不过想饱尝一尝这秋,这故都的秋味。天水的秋恰到好处最热
苦难之后,就是重生你得有足够大的福报,才会在青壮年经历一场大坎,或婚姻或事业,或遭一场濒死的大难。很多人都是在经历了一次大变故之后,才开始笃信宗教的某些观点,或开始默默敬畏些什么了。也许,只有经历过
暗示ampamp39我要离开ampamp39的隐晦文案头条创作挑战赛1只是偶尔被需要,从来没有很重要。Itisonlyoccasionallyneeded,butneververyimportant。2不等了,就当月亮失约了。Dont
悠扬的童年里的童年早晨,我喜欢一边听歌一边洗漱,今天无意中播放了罗大佑的经典老歌童年。悠扬的旋律优美的歌词展现出了一个鲜活的少年的形象,少年的所思所念,少年的忧愁与乐趣,少年的幻想与期盼尽在歌词中,
曼谷再夺亚太最佳休闲城市称号来源北京市政府外办市级最新动态日前,著名商业旅行类杂志商务旅行者(BusinessTraveller)发布亚太商务旅行者读者评选大奖获奖名单,曼谷再次蝉联亚太最佳休闲城市(Best
在南滨路遇见彩虹吃定彩虹连日阴天终于放晴啦不少市民都出门晒太阳找耍事今天给你们整一个好看又好玩的!南滨路彩虹集市南滨路建设发展中心举办的彩虹集市2。0已正式归位!一辆辆汽车一字排开氛围感十足的装饰灯玩味有
幼儿园老师全职妈妈带大的娃和职场妈妈带大的娃,差距很明显大家好,我是高级家庭教育指导师西红柿妈妈!一生妈妈,一生责任带娃三年,其中的辛酸,只有全职妈妈懂。现在孩子3岁,基本都入园了。在一个全职妈妈群当中,同事苗苗突然发出了感慨姐妹们,全
偷摸涨价好借难还!共享充电宝为何成了充电刺客共享充电宝偷摸涨价,充电太慢好借难还槽点不断。近日,共享充电宝用1小时却扣费99元,再次引发网友讨论。一共享充电宝,价格偷摸上涨2015年,随着共享经济概念的崛起,共享充电宝诞生并
警惕爆雷,币圈重磅诈弹派相信大家都玩过击鼓传花的游戏。鼓点声随着众人的欢呼不断攀高,热烈的气氛逐渐迎来高潮,鼓点声一浪高过一浪,到底花落谁家?只有在鼓槌收起,鼓声停下的那一刻才会揭晓答案。往往在这之前,鼓
科技股寒流难阻美债收益率攀升美股回吐10月涨幅币海财经授权转载周一(10月31日),美股三大股指全线收跌。鹰派加息担忧叠加此前令人失望的科技股三季报,均施压欧美市场风险情绪。Meta跌至七年新低。由于黑海粮食协议的不确定性,粮