Seatunnel整合xxl-job实现批处理任务定时执行
创作时间:
作者:
@小白创作中心
Seatunnel整合xxl-job实现批处理任务定时执行
引用
CSDN
1.
https://blog.csdn.net/liang520521/article/details/144562776
Seatunnel是一个开源的大数据处理框架,支持批处理和流处理两种模式,广泛应用于数据集成、数据转换和数据加载等场景。xxl-job是一个分布式任务调度平台,具有简单易用、功能强大等特点。本文将介绍如何使用xxl-job调度器结合Seatunnel实现批处理任务的定时执行,解决Seatunnel Web没有定时任务的问题。
环境要求
在执行器的环境中需要存在Seatunnel环境,并且配置了Seatunnel环境变量。
任务创建
批处理任务脚本
将处理任务添加到Shell脚本中,具体脚本如下。保存后运行即可。通过修改config_content值实现任务的修改。
#!/bin/bash
SEATUNNEL_CMD="$SEATUNNEL_HOME/bin/seatunnel.sh"
# 定义任务停止时执行的清理操作
exit_func() {
# 在这里放入你希望在任务停止时执行的操作,比如释放资源、记录日志等
$SEATUNNEL_CMD -can "$JOB_ID"
exit;
}
# 捕获 SIGINT (Ctrl+C) 和 SIGTERM (手动终止) 信号
trap exit_func SIGINT SIGTERM SIGHUP SIGQUIT SIGKILL
# 将配置内容写入变量
config_content=$(cat <<EOL
env {
# You can set SeaTunnel environment configuration here
parallelism = 2
job.mode = "BATCH"
checkpoint.interval = 10000
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
parallelism = 2
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
# If you would like to get more information about how to configure SeaTunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/source
}
sink {
Console {
}
}
EOL
)
echo "开始执行任务"
## 同步任务,日志中会打印运行日志
echo "$config_content" | $SEATUNNEL_CMD --config /dev/stdin
## 异步任务,日志不会记录运行日志
# 将配置内容写入标准输入并传递给 SeaTunnel
# SUBJOB_OUTPUT=$(echo "$config_content" | $SEATUNNEL_CMD --config /dev/stdin --async 2>&1)
# #JOB_ID=$(echo "$SUBJOB_OUTPUT" | grep "job name" | awk -F'job id: ' '{print $2}' | awk -F',' '{print $1}')
# #echo "任务Id: $JOB_ID"
# # 监控任务状态
# while true; do
# # 查询任务状态
# STATUS_OUTPUT=$($SEATUNNEL_CMD -j "$JOB_ID" 2>&1)
# TASK_STATE=$(echo "$STATUS_OUTPUT" | grep "$JOB_ID" | awk -F'"jobStatus":"' '{print $2}' | awk -F '","' '{print $1}')
# if [[ "$TASK_STATE" == "FINISHED" ]]; then
# echo "任务完成, 状态: $TASK_STATE"
# exit 0
# fi
# # 检查任务是否已完成
# if [[ "$TASK_STATE" != "RUNNING" ]]; then
# echo "任务已结束,状态:$TASK_STATE"
# exit 1
# else
# echo "任务运行中 ... 状态: $TASK_STATE"
# # 等待 5 秒后再次查询
# sleep 5
# fi
# done
流处理任务脚本
针对流操作还有部分问题:
- 任务无法自启动
- 两次任务无法实现增量同步,每次同步对于Seatunnel来说,都是新任务。
#!/bin/bash
SEATUNNEL_CMD="$SEATUNNEL_HOME/bin/seatunnel.sh"
SEATUNNEL_HOST=localhost
SEATUNNEL_PORT=5801
# 定义任务停止时执行的清理操作
exit_func() {
# 在这里放入你希望在任务停止时执行的操作,比如释放资源、记录日志等
$SEATUNNEL_CMD -can "$JOB_ID"
exit;
}
# 捕获 SIGINT (Ctrl+C) 和 SIGTERM (手动终止) 信号
trap exit_func SIGINT SIGTERM SIGHUP SIGQUIT SIGKILL
# 将配置内容写入变量
config_content=$(cat <<EOL
env {
parallelism = 2
job.mode = "STREAMING"
checkpoint.interval = 2000
}
source {
FakeSource {
parallelism = 2
plugin_output = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}
sink {
Console {
}
}
EOL
)
echo "开始执行任务"
echo "-------- 配置信息 --------------"
echo "$config_content"
echo "-------- end --------------"
# 将配置内容写入标准输入并传递给 SeaTunnel
SUBJOB_OUTPUT=$(echo "$config_content" | $SEATUNNEL_CMD --config /dev/stdin --async 2>&1)
JOB_ID=$(echo "$SUBJOB_OUTPUT" | grep "job name" | awk -F'job id: ' '{print $2}' | awk -F',' '{print $1}')
echo "任务Id: $JOB_ID"
# 监控任务状态
while true; do
STATUS_OUTPUT=$(curl -s http://$SEATUNNEL_HOST:$SEATUNNEL_PORT/hazelcast/rest/maps/job-info/$JOB_ID)
echo $(date "+%Y-%m-%d %H:%M:%S.%3N") "写入数量 : "$(echo "$STATUS_OUTPUT" | awk -F'"SinkWriteCount":"' '{print $2}' | awk -F '","' '{print $1}')", 读取数量 :"$(echo "$STATUS_OUTPUT" | awk -F'"SourceReceivedCount":"' '{print $2}' | awk -F '","' '{print $1}')
TASK_STATE=$(echo "$STATUS_OUTPUT" | awk -F'"jobStatus":"' '{print $2}' | awk -F '","' '{print $1}')
if [[ "$TASK_STATE" == "FINISHED" ]]; then
echo "任务完成, 状态: $TASK_STATE"
exit 0
fi
if [[ "$TASK_STATE" != "RUNNING" ]]; then
echo "任务已结束,状态:$TASK_STATE"
exit 1
else
echo "任务运行中 ... 状态: $TASK_STATE"
sleep 300
fi
done
热门推荐
小学课文《过桥》从歌颂老师变为激励学生,经过怎样的外科手术?
C#实现多线程的几种方式
筛网目数测量标准详解
C语言整数和小数的存储
金融投资理财的目标是什么?这些目标在实现过程中需要注意哪些问题?
欧国联德国VS波黑前瞻预测分析 德意志锁定八强席位
马岛缟狸:马达加斯加特有珍稀物种
【ACL配置秘籍】:迈普交换机访问控制列表的创建与管理全攻略
【计算机网络基础】ACL
魔芋的三种美味吃法,健康又营养
泡沫灭火器年检时间要求及注意事项
泡沫灭火器不能用于扑救什么火灾
可可制品完全解析:从可可液块到巧克力块
感冒的一般症状
宝宝口腔溃疡怎么办?8个实用方法帮你轻松应对
药物科普丨常见的海洋天然药物有哪些?其活性是什么?
科普 | 一周游泳几次,游泳频率知多少
考事业编在哪里看岗位 考事业编需要什么条件和学历
紧急提醒!慎防冒充“军人”网络交友诱导虚假投资理财诈骗
《修真界第一病秧子》:渡人渡己的修真传奇
墙体拉结筋的长度和规范是怎么样的
西梅汁真的能帮助改善便秘吗
苹果电脑word怎么语音输入
劳务合同工资标准:从基础工资到律师费承担全解析
慢性咽炎吃什么食物最好效果
库里在NBA中的影响力被名宿和前球员高度认可
动火作业“六个要”“十不准”,一定要牢记!
减肥时期火锅蘸料:如何选择既美味又健康的蘸料
如何正确看待黄金投资?这种投资的风险如何评估?
健康公平性与全人群身心健康研究中取得系列进展