Seatunnel整合xxl-job实现批处理任务定时执行
创作时间:
作者:
@小白创作中心
Seatunnel整合xxl-job实现批处理任务定时执行
引用
CSDN
1.
https://blog.csdn.net/liang520521/article/details/144562776
Seatunnel是一个开源的大数据处理框架,支持批处理和流处理两种模式,广泛应用于数据集成、数据转换和数据加载等场景。xxl-job是一个分布式任务调度平台,具有简单易用、功能强大等特点。本文将介绍如何使用xxl-job调度器结合Seatunnel实现批处理任务的定时执行,解决Seatunnel Web没有定时任务的问题。
环境要求
在执行器的环境中需要存在Seatunnel环境,并且配置了Seatunnel环境变量。
任务创建
批处理任务脚本
将处理任务添加到Shell脚本中,具体脚本如下。保存后运行即可。通过修改config_content
值实现任务的修改。
#!/bin/bash
SEATUNNEL_CMD="$SEATUNNEL_HOME/bin/seatunnel.sh"
# 定义任务停止时执行的清理操作
exit_func() {
# 在这里放入你希望在任务停止时执行的操作,比如释放资源、记录日志等
$SEATUNNEL_CMD -can "$JOB_ID"
exit;
}
# 捕获 SIGINT (Ctrl+C) 和 SIGTERM (手动终止) 信号
trap exit_func SIGINT SIGTERM SIGHUP SIGQUIT SIGKILL
# 将配置内容写入变量
config_content=$(cat <<EOL
env {
# You can set SeaTunnel environment configuration here
parallelism = 2
job.mode = "BATCH"
checkpoint.interval = 10000
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
parallelism = 2
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
# If you would like to get more information about how to configure SeaTunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/source
}
sink {
Console {
}
}
EOL
)
echo "开始执行任务"
## 同步任务,日志中会打印运行日志
echo "$config_content" | $SEATUNNEL_CMD --config /dev/stdin
## 异步任务,日志不会记录运行日志
# 将配置内容写入标准输入并传递给 SeaTunnel
# SUBJOB_OUTPUT=$(echo "$config_content" | $SEATUNNEL_CMD --config /dev/stdin --async 2>&1)
# #JOB_ID=$(echo "$SUBJOB_OUTPUT" | grep "job name" | awk -F'job id: ' '{print $2}' | awk -F',' '{print $1}')
# #echo "任务Id: $JOB_ID"
# # 监控任务状态
# while true; do
# # 查询任务状态
# STATUS_OUTPUT=$($SEATUNNEL_CMD -j "$JOB_ID" 2>&1)
# TASK_STATE=$(echo "$STATUS_OUTPUT" | grep "$JOB_ID" | awk -F'"jobStatus":"' '{print $2}' | awk -F '","' '{print $1}')
# if [[ "$TASK_STATE" == "FINISHED" ]]; then
# echo "任务完成, 状态: $TASK_STATE"
# exit 0
# fi
# # 检查任务是否已完成
# if [[ "$TASK_STATE" != "RUNNING" ]]; then
# echo "任务已结束,状态:$TASK_STATE"
# exit 1
# else
# echo "任务运行中 ... 状态: $TASK_STATE"
# # 等待 5 秒后再次查询
# sleep 5
# fi
# done
流处理任务脚本
针对流操作还有部分问题:
- 任务无法自启动
- 两次任务无法实现增量同步,每次同步对于Seatunnel来说,都是新任务。
#!/bin/bash
SEATUNNEL_CMD="$SEATUNNEL_HOME/bin/seatunnel.sh"
SEATUNNEL_HOST=localhost
SEATUNNEL_PORT=5801
# 定义任务停止时执行的清理操作
exit_func() {
# 在这里放入你希望在任务停止时执行的操作,比如释放资源、记录日志等
$SEATUNNEL_CMD -can "$JOB_ID"
exit;
}
# 捕获 SIGINT (Ctrl+C) 和 SIGTERM (手动终止) 信号
trap exit_func SIGINT SIGTERM SIGHUP SIGQUIT SIGKILL
# 将配置内容写入变量
config_content=$(cat <<EOL
env {
parallelism = 2
job.mode = "STREAMING"
checkpoint.interval = 2000
}
source {
FakeSource {
parallelism = 2
plugin_output = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}
sink {
Console {
}
}
EOL
)
echo "开始执行任务"
echo "-------- 配置信息 --------------"
echo "$config_content"
echo "-------- end --------------"
# 将配置内容写入标准输入并传递给 SeaTunnel
SUBJOB_OUTPUT=$(echo "$config_content" | $SEATUNNEL_CMD --config /dev/stdin --async 2>&1)
JOB_ID=$(echo "$SUBJOB_OUTPUT" | grep "job name" | awk -F'job id: ' '{print $2}' | awk -F',' '{print $1}')
echo "任务Id: $JOB_ID"
# 监控任务状态
while true; do
STATUS_OUTPUT=$(curl -s http://$SEATUNNEL_HOST:$SEATUNNEL_PORT/hazelcast/rest/maps/job-info/$JOB_ID)
echo $(date "+%Y-%m-%d %H:%M:%S.%3N") "写入数量 : "$(echo "$STATUS_OUTPUT" | awk -F'"SinkWriteCount":"' '{print $2}' | awk -F '","' '{print $1}')", 读取数量 :"$(echo "$STATUS_OUTPUT" | awk -F'"SourceReceivedCount":"' '{print $2}' | awk -F '","' '{print $1}')
TASK_STATE=$(echo "$STATUS_OUTPUT" | awk -F'"jobStatus":"' '{print $2}' | awk -F '","' '{print $1}')
if [[ "$TASK_STATE" == "FINISHED" ]]; then
echo "任务完成, 状态: $TASK_STATE"
exit 0
fi
if [[ "$TASK_STATE" != "RUNNING" ]]; then
echo "任务已结束,状态:$TASK_STATE"
exit 1
else
echo "任务运行中 ... 状态: $TASK_STATE"
sleep 300
fi
done
热门推荐
眼科专家推荐:角膜地形图的应用
同一件衣服,为啥别人没事,你穿就起静电?
如何有效去除静电:全面指南
高铁让江西旅游“如虎添翼”:冬日最美乡村风光尽在掌握
科学素质提升|守“胃”健康,戒掉伤胃坏习惯!
京沪高铁开通后,沿线城市的财政收入真的增加了吗?
荆门高铁通车:家乡经济新引擎
疫苗与免疫学:从基本原理到最新进展
从 x86 到 ARM64:CPU 架构的进化与未来
白平衡:摄影中不可忽视的重要因素
倾斜容器中的水:从实验现象到流体力学原理
从零开始学Premiere Pro:打造专业级视频作品的完整指南
医疗保险理赔知识详解
保险理赔材料由谁提供?
张吉怀高铁:一条贯穿绿水青山的生态之路
荆门高铁通车,家乡经济迎来新机遇
行书笔法标准教程(珍藏版)
衣柜深度多少合适?标准答案来了!
定制衣柜深度大揭秘:高效收纳不求人!
小户型衣柜设计攻略:从布局到收纳,让小空间也能拥有大容量
单核细胞绝对值:艾滋病监测的新视角
单核细胞绝对值:艾滋病患者健康管理的重要指标
新《射雕英雄传》黄蓉被梅超风艳压?网友热议
庄达菲版黄蓉引热议:当经典遭遇创新
翁美玲版黄蓉:从3000人中脱颖而出的荧幕经典
云南亲子游攻略:五天四晚行程规划,轻松玩转春城
冬天游览云南,必备物品和推荐行程
深圳地铁沿线15个绝美景点推荐,总有一处让你流连忘返
深圳地铁沿线打卡最美自然景观!
本地孃孃推荐的4个昆明菜市场,好吃又好逛!