Seatunnel整合xxl-job实现批处理任务定时执行
创作时间:
作者:
@小白创作中心
Seatunnel整合xxl-job实现批处理任务定时执行
引用
CSDN
1.
https://blog.csdn.net/liang520521/article/details/144562776
Seatunnel是一个开源的大数据处理框架,支持批处理和流处理两种模式,广泛应用于数据集成、数据转换和数据加载等场景。xxl-job是一个分布式任务调度平台,具有简单易用、功能强大等特点。本文将介绍如何使用xxl-job调度器结合Seatunnel实现批处理任务的定时执行,解决Seatunnel Web没有定时任务的问题。
环境要求
在执行器的环境中需要存在Seatunnel环境,并且配置了Seatunnel环境变量。
任务创建
批处理任务脚本
将处理任务添加到Shell脚本中,具体脚本如下。保存后运行即可。通过修改config_content值实现任务的修改。
#!/bin/bash
SEATUNNEL_CMD="$SEATUNNEL_HOME/bin/seatunnel.sh"
# 定义任务停止时执行的清理操作
exit_func() {
# 在这里放入你希望在任务停止时执行的操作,比如释放资源、记录日志等
$SEATUNNEL_CMD -can "$JOB_ID"
exit;
}
# 捕获 SIGINT (Ctrl+C) 和 SIGTERM (手动终止) 信号
trap exit_func SIGINT SIGTERM SIGHUP SIGQUIT SIGKILL
# 将配置内容写入变量
config_content=$(cat <<EOL
env {
# You can set SeaTunnel environment configuration here
parallelism = 2
job.mode = "BATCH"
checkpoint.interval = 10000
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
parallelism = 2
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
# If you would like to get more information about how to configure SeaTunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/source
}
sink {
Console {
}
}
EOL
)
echo "开始执行任务"
## 同步任务,日志中会打印运行日志
echo "$config_content" | $SEATUNNEL_CMD --config /dev/stdin
## 异步任务,日志不会记录运行日志
# 将配置内容写入标准输入并传递给 SeaTunnel
# SUBJOB_OUTPUT=$(echo "$config_content" | $SEATUNNEL_CMD --config /dev/stdin --async 2>&1)
# #JOB_ID=$(echo "$SUBJOB_OUTPUT" | grep "job name" | awk -F'job id: ' '{print $2}' | awk -F',' '{print $1}')
# #echo "任务Id: $JOB_ID"
# # 监控任务状态
# while true; do
# # 查询任务状态
# STATUS_OUTPUT=$($SEATUNNEL_CMD -j "$JOB_ID" 2>&1)
# TASK_STATE=$(echo "$STATUS_OUTPUT" | grep "$JOB_ID" | awk -F'"jobStatus":"' '{print $2}' | awk -F '","' '{print $1}')
# if [[ "$TASK_STATE" == "FINISHED" ]]; then
# echo "任务完成, 状态: $TASK_STATE"
# exit 0
# fi
# # 检查任务是否已完成
# if [[ "$TASK_STATE" != "RUNNING" ]]; then
# echo "任务已结束,状态:$TASK_STATE"
# exit 1
# else
# echo "任务运行中 ... 状态: $TASK_STATE"
# # 等待 5 秒后再次查询
# sleep 5
# fi
# done
流处理任务脚本
针对流操作还有部分问题:
- 任务无法自启动
- 两次任务无法实现增量同步,每次同步对于Seatunnel来说,都是新任务。
#!/bin/bash
SEATUNNEL_CMD="$SEATUNNEL_HOME/bin/seatunnel.sh"
SEATUNNEL_HOST=localhost
SEATUNNEL_PORT=5801
# 定义任务停止时执行的清理操作
exit_func() {
# 在这里放入你希望在任务停止时执行的操作,比如释放资源、记录日志等
$SEATUNNEL_CMD -can "$JOB_ID"
exit;
}
# 捕获 SIGINT (Ctrl+C) 和 SIGTERM (手动终止) 信号
trap exit_func SIGINT SIGTERM SIGHUP SIGQUIT SIGKILL
# 将配置内容写入变量
config_content=$(cat <<EOL
env {
parallelism = 2
job.mode = "STREAMING"
checkpoint.interval = 2000
}
source {
FakeSource {
parallelism = 2
plugin_output = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}
sink {
Console {
}
}
EOL
)
echo "开始执行任务"
echo "-------- 配置信息 --------------"
echo "$config_content"
echo "-------- end --------------"
# 将配置内容写入标准输入并传递给 SeaTunnel
SUBJOB_OUTPUT=$(echo "$config_content" | $SEATUNNEL_CMD --config /dev/stdin --async 2>&1)
JOB_ID=$(echo "$SUBJOB_OUTPUT" | grep "job name" | awk -F'job id: ' '{print $2}' | awk -F',' '{print $1}')
echo "任务Id: $JOB_ID"
# 监控任务状态
while true; do
STATUS_OUTPUT=$(curl -s http://$SEATUNNEL_HOST:$SEATUNNEL_PORT/hazelcast/rest/maps/job-info/$JOB_ID)
echo $(date "+%Y-%m-%d %H:%M:%S.%3N") "写入数量 : "$(echo "$STATUS_OUTPUT" | awk -F'"SinkWriteCount":"' '{print $2}' | awk -F '","' '{print $1}')", 读取数量 :"$(echo "$STATUS_OUTPUT" | awk -F'"SourceReceivedCount":"' '{print $2}' | awk -F '","' '{print $1}')
TASK_STATE=$(echo "$STATUS_OUTPUT" | awk -F'"jobStatus":"' '{print $2}' | awk -F '","' '{print $1}')
if [[ "$TASK_STATE" == "FINISHED" ]]; then
echo "任务完成, 状态: $TASK_STATE"
exit 0
fi
if [[ "$TASK_STATE" != "RUNNING" ]]; then
echo "任务已结束,状态:$TASK_STATE"
exit 1
else
echo "任务运行中 ... 状态: $TASK_STATE"
sleep 300
fi
done
热门推荐
基金买卖全攻略:从普通基金到ETF,这些技巧助你把握投资时机
《摩尔庄园》霸气归来:持续霸榜、营收破亿,童年情怀IP游戏的火热与隐忧
全面解析:工伤赔偿各级标准及更高赔偿额度一览
文学作品的创作动机分析
定金与保证金及预付款有什么区别
锂电池VS铅酸电池:同容量下,为何锂电池能跑得更远?
膝关节肿胀疼痛是因为“积水”了?
5部法国情调满满的电影,大概只有法国才能拍得出这些片
预售合同资金断裂如何维权?一文详解法律责任与合同查询
秦朝的政治制度及其历史影响
炒鞋现象引发关注:是投资还是传销?
风光互补发电系统案例解析
哪些材质的包装纸箱可以回收再利用?
如何自行进行汽车打蜡?这种保养对车辆外观有何重要性?
谁是青岛第三海底隧道的“受益者”?
铁路部门提示新规:每位旅客行李限重20KG,超出需缴费
江苏徐州:一条“美丽农路”串起乡村发展好风光
红豆薏米不煮熟可以直接喝吗?专家为你解答
股东权益怎么计算:从公式到法律保障的全面解析
为什么德国和北欧高校成为2024年留学的主流目的地?
电影《金刚川》,影片中的叙事创新,让人眼前一亮
如何把钙补到位?药师教你科学使用钙补充剂
牙龈反复出血怎么办?医生提醒:这6个治疗方法,守护你牙龈健康
零基础也能拿证!ASFC无人机驾驶员证报考全攻略,手把手教你飞上天!
新鲜肉类冰箱保存全攻略:从准备到注意事项一文详解
中国10大示范步行街,南北大不同,都是城市的精华,你去过几个?
煤矿安全生产综合信息:保障矿业安全,促进可持续发展
宝宝敏感肌护理全攻略:从清洁到保湿,告别红脸蛋
湿气重的人,建议这样调理
小体积,大能量:GIS在现代电力系统的关键角色