Dolphinscheduler2.0.5 内置参数实践

DolphinScheduler 中所涉及的参数值的定义可能来自三种类型:

  • 全局参数:在工作流保存页面定义时定义的变量
  • 上游任务传递的参数:上游任务传递过来的参数
  • 本地参数:节点的自有变量,用户在“自定义参数”定义的变量,并且用户可以在工作流定义时定义该部分变量的值

因为参数的值存在多个来源,当参数名相同时,就需要会存在参数优先级的问题。DolphinScheduler 参数的优先级从高到低为:全局参数 > 上游任务传递的参数 > 本地参数

在上游任务传递的参数的情况下,由于上游可能存在多个任务向下游传递参数。当上游传递的参数名称相同时:

  • 下游节点会优先使用值为非空的参数
  • 如果存在多个值为非空的参数,则按照上游任务的完成时间排序,选择完成时间最早的上游任务对应的参数

基础内置参数

file

衍生内置参数

支持代码中自定义变量名,声明方式:${变量名}。可以是引用 "系统参数"

我们定义这种基准变量为 [...]格式的,[yyyyMMddHHmmss] 是可以任意分解组合的,比如:[yyyyMMdd],[HHmmss], $[yyyy-MM-dd] 等

也可以通过以下两种方式:

1.使用add_months()函数,该函数用于加减月份, 第一个入口参数为[yyyyMMdd],表示返回时间的格式 第二个入口参数为月份偏移量,表示加减多少个月

  • 后 N 年:$[add_months(yyyyMMdd,12*N)]
  • 前 N 年:$[add_months(yyyyMMdd,-12*N)]
  • 后 N 月:$[add_months(yyyyMMdd,N)]
  • 前 N 月:$[add_months(yyyyMMdd,-N)]

2.直接加减数字 在自定义格式后直接“+/-”数字

  • 后 N 周:$[yyyyMMdd+7*N]
  • 前 N 周:$[yyyyMMdd-7*N]
  • 后 N 天:$[yyyyMMdd+N]
  • 前 N 天:$[yyyyMMdd-N]
  • 后 N 小时:$[HHmmss+N/24]
  • 前 N 小时:$[HHmmss-N/24]
  • 后 N 分钟:$[HHmmss+N/24/60]
  • 前 N 分钟:$[HHmmss-N/24/60]

file

实战应用

实际业务应用,需要抽取T-1天的数据,作为变量,可以这样设置:

设置全局变量:

变量名 变量值 备注
global_bizdate ${system.biz.date} 系统内置变量,定时时间前一天
etldate $[yyyyMMdd-1] 前1天,时间写法

源码分析

org/apache/dolphinscheduler/spi/task/paramparser/BusinessTimeUtils.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.dolphinscheduler.spi.task.paramparser;

import static org.apache.dolphinscheduler.spi.task.TaskConstants.PARAMETER_BUSINESS_DATE;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.PARAMETER_CURRENT_DATE;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.PARAMETER_DATETIME;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.PARAMETER_FORMAT_DATE;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.PARAMETER_FORMAT_TIME;
import static org.apache.dolphinscheduler.spi.utils.DateUtils.addDays;
import static org.apache.dolphinscheduler.spi.utils.DateUtils.format;

import org.apache.dolphinscheduler.spi.enums.CommandType;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 * business time utils
 */
public class BusinessTimeUtils {
    private BusinessTimeUtils() {
        throw new IllegalStateException("BusinessTimeUtils class");
    }

    /**
     * get business time in parameters by different command types
     *
     * @param commandType command type
     * @param runTime run time or schedule time
     * @return business time
     */
    public static Map<String, String> getBusinessTime(CommandType commandType, Date runTime) {
        Date businessDate = runTime;
        switch (commandType) {
            case COMPLEMENT_DATA:
                break;
            case START_PROCESS:
            case START_CURRENT_TASK_PROCESS:
            case RECOVER_TOLERANCE_FAULT_PROCESS:
            case RECOVER_SUSPENDED_PROCESS:
            case START_FAILURE_TASK_PROCESS:
            case REPEAT_RUNNING:
            case SCHEDULER:
            default:
                businessDate = addDays(new Date(), -1);
                if (runTime != null) {
                    /**
                     * If there is a scheduled time, take the scheduling time. Recovery from failed nodes, suspension of recovery, re-run for scheduling
                     */
                    businessDate = addDays(runTime, -1);
                }
                break;
        }
        Date businessCurrentDate = addDays(businessDate, 1);
        Map<String, String> result = new HashMap<>();
        result.put(PARAMETER_CURRENT_DATE, format(businessCurrentDate, PARAMETER_FORMAT_DATE));
        result.put(PARAMETER_BUSINESS_DATE, format(businessDate, PARAMETER_FORMAT_DATE));
        result.put(PARAMETER_DATETIME, format(businessCurrentDate, PARAMETER_FORMAT_TIME));
        return result;
    }
}

相关文章:
DS 内置参数

为者常成,行者常至