问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

Seatunnel整合xxl-job实现批处理任务定时执行

创作时间:
作者:
@小白创作中心

Seatunnel整合xxl-job实现批处理任务定时执行

引用
CSDN
1.
https://blog.csdn.net/liang520521/article/details/144562776

Seatunnel是一个开源的大数据处理框架,支持批处理和流处理两种模式,广泛应用于数据集成、数据转换和数据加载等场景。xxl-job是一个分布式任务调度平台,具有简单易用、功能强大等特点。本文将介绍如何使用xxl-job调度器结合Seatunnel实现批处理任务的定时执行,解决Seatunnel Web没有定时任务的问题。

环境要求

在执行器的环境中需要存在Seatunnel环境,并且配置了Seatunnel环境变量。

任务创建

批处理任务脚本

将处理任务添加到Shell脚本中,具体脚本如下。保存后运行即可。通过修改config_content值实现任务的修改。

#!/bin/bash
SEATUNNEL_CMD="$SEATUNNEL_HOME/bin/seatunnel.sh"
# 定义任务停止时执行的清理操作
exit_func() {
    # 在这里放入你希望在任务停止时执行的操作,比如释放资源、记录日志等
    $SEATUNNEL_CMD -can "$JOB_ID"
    exit;
}
# 捕获 SIGINT (Ctrl+C) 和 SIGTERM (手动终止) 信号
trap exit_func SIGINT SIGTERM SIGHUP SIGQUIT SIGKILL
# 将配置内容写入变量
config_content=$(cat <<EOL
env {
  # You can set SeaTunnel environment configuration here
  parallelism = 2
  job.mode = "BATCH"
  checkpoint.interval = 10000
}
source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  FakeSource {
    parallelism = 2
    result_table_name = "fake"
    row.num = 16
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
  }
  # If you would like to get more information about how to configure SeaTunnel and see full list of source plugins,
  # please go to https://seatunnel.apache.org/docs/connector-v2/source
}
sink {
  Console {
  }
}
EOL
)
echo "开始执行任务"
## 同步任务,日志中会打印运行日志
echo "$config_content" | $SEATUNNEL_CMD --config /dev/stdin
## 异步任务,日志不会记录运行日志
# 将配置内容写入标准输入并传递给 SeaTunnel
# SUBJOB_OUTPUT=$(echo "$config_content" | $SEATUNNEL_CMD --config /dev/stdin --async 2>&1)
# #JOB_ID=$(echo "$SUBJOB_OUTPUT" | grep "job name" | awk -F'job id: ' '{print $2}' | awk -F',' '{print $1}')
# #echo "任务Id: $JOB_ID"
# # 监控任务状态
# while true; do
#     # 查询任务状态
#     STATUS_OUTPUT=$($SEATUNNEL_CMD -j "$JOB_ID" 2>&1)
#     TASK_STATE=$(echo "$STATUS_OUTPUT" | grep "$JOB_ID" | awk -F'"jobStatus":"' '{print $2}' | awk -F '","' '{print $1}')
#     if [[ "$TASK_STATE" == "FINISHED" ]]; then
#         echo "任务完成, 状态: $TASK_STATE"
#         exit 0
#     fi
#     # 检查任务是否已完成
#     if [[ "$TASK_STATE" != "RUNNING" ]]; then
#         echo "任务已结束,状态:$TASK_STATE"
#         exit 1
#     else
#         echo "任务运行中 ... 状态: $TASK_STATE"
#         # 等待 5 秒后再次查询
#         sleep 5
#     fi
# done

流处理任务脚本

针对流操作还有部分问题:

  1. 任务无法自启动
  2. 两次任务无法实现增量同步,每次同步对于Seatunnel来说,都是新任务。
#!/bin/bash
SEATUNNEL_CMD="$SEATUNNEL_HOME/bin/seatunnel.sh"
SEATUNNEL_HOST=localhost
SEATUNNEL_PORT=5801
# 定义任务停止时执行的清理操作
exit_func() {
    # 在这里放入你希望在任务停止时执行的操作,比如释放资源、记录日志等
    $SEATUNNEL_CMD -can "$JOB_ID"
    exit;
}
# 捕获 SIGINT (Ctrl+C) 和 SIGTERM (手动终止) 信号
trap exit_func SIGINT SIGTERM SIGHUP SIGQUIT SIGKILL
# 将配置内容写入变量
config_content=$(cat <<EOL
env {
  parallelism = 2
  job.mode = "STREAMING"
  checkpoint.interval = 2000
}
source {
  FakeSource {
    parallelism = 2
    plugin_output = "fake"
    row.num = 16
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
  }
}
sink {
  Console {
  }
}
EOL
)
echo "开始执行任务"
echo "--------    配置信息    --------------"
echo "$config_content"
echo "--------    end    --------------"
# 将配置内容写入标准输入并传递给 SeaTunnel
SUBJOB_OUTPUT=$(echo "$config_content" | $SEATUNNEL_CMD --config /dev/stdin --async 2>&1)
JOB_ID=$(echo "$SUBJOB_OUTPUT" | grep "job name" | awk -F'job id: ' '{print $2}' | awk -F',' '{print $1}')
echo "任务Id: $JOB_ID"
# 监控任务状态
while true; do
    STATUS_OUTPUT=$(curl -s http://$SEATUNNEL_HOST:$SEATUNNEL_PORT/hazelcast/rest/maps/job-info/$JOB_ID)
    echo $(date "+%Y-%m-%d %H:%M:%S.%3N") "写入数量 : "$(echo "$STATUS_OUTPUT" | awk -F'"SinkWriteCount":"' '{print $2}' | awk -F '","' '{print $1}')", 读取数量 :"$(echo "$STATUS_OUTPUT" | awk -F'"SourceReceivedCount":"' '{print $2}' | awk -F '","' '{print $1}')
    
    TASK_STATE=$(echo "$STATUS_OUTPUT" | awk -F'"jobStatus":"' '{print $2}' | awk -F '","' '{print $1}')
    if [[ "$TASK_STATE" == "FINISHED" ]]; then
        echo "任务完成, 状态: $TASK_STATE"
        exit 0
    fi
    
    if [[ "$TASK_STATE" != "RUNNING" ]]; then
        echo "任务已结束,状态:$TASK_STATE"
        exit 1
    else
        echo "任务运行中 ... 状态: $TASK_STATE"
        sleep 300
    fi
done
© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号