Seatunnel整合xxl-job实现批处理任务定时执行
创作时间:
作者:
@小白创作中心
Seatunnel整合xxl-job实现批处理任务定时执行
引用
CSDN
1.
https://blog.csdn.net/liang520521/article/details/144562776
Seatunnel是一个开源的大数据处理框架,支持批处理和流处理两种模式,广泛应用于数据集成、数据转换和数据加载等场景。xxl-job是一个分布式任务调度平台,具有简单易用、功能强大等特点。本文将介绍如何使用xxl-job调度器结合Seatunnel实现批处理任务的定时执行,解决Seatunnel Web没有定时任务的问题。
环境要求
在执行器的环境中需要存在Seatunnel环境,并且配置了Seatunnel环境变量。
任务创建
批处理任务脚本
将处理任务添加到Shell脚本中,具体脚本如下。保存后运行即可。通过修改config_content
值实现任务的修改。
#!/bin/bash
SEATUNNEL_CMD="$SEATUNNEL_HOME/bin/seatunnel.sh"
# 定义任务停止时执行的清理操作
exit_func() {
# 在这里放入你希望在任务停止时执行的操作,比如释放资源、记录日志等
$SEATUNNEL_CMD -can "$JOB_ID"
exit;
}
# 捕获 SIGINT (Ctrl+C) 和 SIGTERM (手动终止) 信号
trap exit_func SIGINT SIGTERM SIGHUP SIGQUIT SIGKILL
# 将配置内容写入变量
config_content=$(cat <<EOL
env {
# You can set SeaTunnel environment configuration here
parallelism = 2
job.mode = "BATCH"
checkpoint.interval = 10000
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
parallelism = 2
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
# If you would like to get more information about how to configure SeaTunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/source
}
sink {
Console {
}
}
EOL
)
echo "开始执行任务"
## 同步任务,日志中会打印运行日志
echo "$config_content" | $SEATUNNEL_CMD --config /dev/stdin
## 异步任务,日志不会记录运行日志
# 将配置内容写入标准输入并传递给 SeaTunnel
# SUBJOB_OUTPUT=$(echo "$config_content" | $SEATUNNEL_CMD --config /dev/stdin --async 2>&1)
# #JOB_ID=$(echo "$SUBJOB_OUTPUT" | grep "job name" | awk -F'job id: ' '{print $2}' | awk -F',' '{print $1}')
# #echo "任务Id: $JOB_ID"
# # 监控任务状态
# while true; do
# # 查询任务状态
# STATUS_OUTPUT=$($SEATUNNEL_CMD -j "$JOB_ID" 2>&1)
# TASK_STATE=$(echo "$STATUS_OUTPUT" | grep "$JOB_ID" | awk -F'"jobStatus":"' '{print $2}' | awk -F '","' '{print $1}')
# if [[ "$TASK_STATE" == "FINISHED" ]]; then
# echo "任务完成, 状态: $TASK_STATE"
# exit 0
# fi
# # 检查任务是否已完成
# if [[ "$TASK_STATE" != "RUNNING" ]]; then
# echo "任务已结束,状态:$TASK_STATE"
# exit 1
# else
# echo "任务运行中 ... 状态: $TASK_STATE"
# # 等待 5 秒后再次查询
# sleep 5
# fi
# done
流处理任务脚本
针对流操作还有部分问题:
- 任务无法自启动
- 两次任务无法实现增量同步,每次同步对于Seatunnel来说,都是新任务。
#!/bin/bash
SEATUNNEL_CMD="$SEATUNNEL_HOME/bin/seatunnel.sh"
SEATUNNEL_HOST=localhost
SEATUNNEL_PORT=5801
# 定义任务停止时执行的清理操作
exit_func() {
# 在这里放入你希望在任务停止时执行的操作,比如释放资源、记录日志等
$SEATUNNEL_CMD -can "$JOB_ID"
exit;
}
# 捕获 SIGINT (Ctrl+C) 和 SIGTERM (手动终止) 信号
trap exit_func SIGINT SIGTERM SIGHUP SIGQUIT SIGKILL
# 将配置内容写入变量
config_content=$(cat <<EOL
env {
parallelism = 2
job.mode = "STREAMING"
checkpoint.interval = 2000
}
source {
FakeSource {
parallelism = 2
plugin_output = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}
sink {
Console {
}
}
EOL
)
echo "开始执行任务"
echo "-------- 配置信息 --------------"
echo "$config_content"
echo "-------- end --------------"
# 将配置内容写入标准输入并传递给 SeaTunnel
SUBJOB_OUTPUT=$(echo "$config_content" | $SEATUNNEL_CMD --config /dev/stdin --async 2>&1)
JOB_ID=$(echo "$SUBJOB_OUTPUT" | grep "job name" | awk -F'job id: ' '{print $2}' | awk -F',' '{print $1}')
echo "任务Id: $JOB_ID"
# 监控任务状态
while true; do
STATUS_OUTPUT=$(curl -s http://$SEATUNNEL_HOST:$SEATUNNEL_PORT/hazelcast/rest/maps/job-info/$JOB_ID)
echo $(date "+%Y-%m-%d %H:%M:%S.%3N") "写入数量 : "$(echo "$STATUS_OUTPUT" | awk -F'"SinkWriteCount":"' '{print $2}' | awk -F '","' '{print $1}')", 读取数量 :"$(echo "$STATUS_OUTPUT" | awk -F'"SourceReceivedCount":"' '{print $2}' | awk -F '","' '{print $1}')
TASK_STATE=$(echo "$STATUS_OUTPUT" | awk -F'"jobStatus":"' '{print $2}' | awk -F '","' '{print $1}')
if [[ "$TASK_STATE" == "FINISHED" ]]; then
echo "任务完成, 状态: $TASK_STATE"
exit 0
fi
if [[ "$TASK_STATE" != "RUNNING" ]]; then
echo "任务已结束,状态:$TASK_STATE"
exit 1
else
echo "任务运行中 ... 状态: $TASK_STATE"
sleep 300
fi
done
热门推荐
《魁拔》系列:道阻且长 行则将至
重症百日咳临床药物救治措施
心阳虚和心阴虚的区别及现状
人工淡水栖息地在支撑本土鱼类生态方面的潜力
如何提升用户激活,引爆产品增长?
孕妇饮用五指毛桃汤:营养价值与安全指南
减肥成功的第一步:吃对早餐
高尿酸血症和痛风的科普:尿酸盐结晶形成的影响因素
链游系统源码开发的市场机会与挑战
脂肪瘤並非肥胖者才會有,瘦子也會!中醫曝多吃3種蔬菜來預防
爱尔兰苏打面包的制作方法
北京大学数学科学学院郭帅副教授在BCOV猜想方面取得重要进展
企业单方调整工作地点引发劳动争议?HR必知的5个合规操作要点
诺和盈美团首发预约;无锡虹桥医院注销;药店售价省际联动
南宁自驾游攻略,如何规划一场完美的自驾之旅?
八字测正缘出现的年份 八字合多容易遇到正缘
《混沌怎么包》:在复杂生活中寻找方向与内心平静的智慧探索
二手房买卖合同履行指南:注意事项、撤销程序与成立条件
如何撰写一份令人印象深刻的软件测试试用期总结报告?
ABS材料物理特性详解:从基础性能到改性应用
草酸对瓷砖的损害及保养方法
公证+资产自行处置:多元解纷新路径
盐包热敷,粗盐与细盐的选择
先进封装之铟片回流焊介绍
国风带动新热潮 一根蚕丝的故事未完待续
电视背景墙设计全攻略:风格、材料与储物功能一文掌握
和艾滋病人吃饭会传染艾滋病?真相可能让你意外!
自由潜水带给我异于常人的勇气和真正的自由
四位名医治疗心悸的临床医案
股票中的MTR指标:概念、计算及应用详解