Skip to content

数据集成

数据集成说明

数据集成定义

数据集成支持自助式的数据清洗工作,包括字段过滤,字段重命名,字段添加/删除,字段修改类型,表关联,表聚合,表合并等。支持输出到不同目标数据库。

数据集成项目结构说明

字段类型描述
idINTEGER数据集成项目的 id
titleSTRING数据集成项目的标题
createdAtDATETIME数据集成项目创建的时间
createdByINTEGER数据集成项目创建用户的 id
updatedAtDATETIME数据集成项目最后修改的时间
updatedByINTEGER数据集成项目最后修改用户的 id
optionsOBJECT数据集成项目的配置
options.nodesOBJECT 数组数据集成项目的节点连线关系
options.nodes[].heOBJECT节点HE表达式,语法参考HQL参考衡石函数参考
options.nodes[].optionsOBJECT节点其他附加信息
options.nodes[].options.titleSTRING节点名称
options.nodes[].options.nodeTypeSTRING节点的类型,详见数据集成项目节点类型
options.nodes[].options.samplingMethodSTRING输入节点的取样方法,详见输入节点的取样方法
options.nodes[].options.samplingNumINTEGER输入节点的取样个数
options.nodes[].options.inputStrategySTRING输入节点的输入策略,见输入节点的输入策略
options.nodes[].options.incrementalFieldSTRING 数组输入节点的增量字段名列表,从左到右的区分度越来越细,比如年,月,日,时,分,秒,毫秒,也可以简单用自增主键
options.nodes[].options.sourceTitleSTRING输入节点的来源名字,对于数据集是数据集名字,对于连接是连接名字,对于文件是原始文件名
options.nodes[].options.validateSchemaBOOL输出节点的输出配置,是否验证schema
options.nodes[].options.tableActionSTRING输出节点的输出配置,表操作,见输出节点的表操作
options.nodes[].options.updateMethodSTRING输出节点的输出配置,更新方法,见输出节点的更新方法
options.nodes[].options.keyFieldsSTRING 数组输出节点的输出配置,键字段
options.nodes[].options.preSqlSTRING输出节点的输出配置,加载前sql
options.nodes[].options.postSqlSTRING输出节点的输出配置,加载后sql
options.nodes[].options.createTablePropertiesSTRING输出节点的输出配置,建表属性
options.defaultOutputConnectionIdINTEGER数据集成项目的默认输出目标连接id
options.defaultPathSTRING 数组数据集成项目的默认输出路径
statusSTRING数据集成项目最近一次执行的状态,见执行计划中的任务执行的状态说明
latestStartAtDATETIME数据集成项目最近一次任务执行开始执行的时间
costTimeINTEGER数据集成项目最近一次任务执行耗时,单位是秒
updatePartSTRING仅修改的时候需要,表示要修改的目标部分,见数据集成项目修改目标说明
entityGroupSTRING数据集成的执行计划类别,用于管理执行计划,固定为PIPELINE
entityKeySTRING数据集成的执行计划关键字,用于管理执行计划
execDetailOBJECT创建执行计划需要用到的任务描述信息,详见执行计划

数据集成项目修改目标说明

意义
TITLE_AND_PATH修改标题和输出默认配置,包括title和 options.defaultOutputConnectionId,options.defaultPath
NODES修改节点的信息,包括options.nodes

数据集成项目节点类型

意义
CONNECTION数据连接
FILE文件上传
DATASET数据集市中的数据集
SQLSQL节点
SINK输出节点
JOIN联合节点
UNION合并节点
AGGREGATE聚合节点
PIVOT行转列节点
UNPIVOT列转行节点
SELECT选择节点

输入节点的取样方法

意义
DEFAULTlimit n,顺序依赖具体数据源的实现
RANDOM随机 + limit n
ALL全量

输入节点的输入策略

意义
ALL全量
INCREMENTAL增量

输出节点的表操作

意义
NONE
RECREATE重建表
TRUNCATE清空数据

输出节点的更新方法

意义
INSERT插入
UPSERT更新

任务执行结果说明

字段类型描述
schemaOBJECT数组每一个元素表示一个字段的属性,与数据集的字段结构相同
dataOBJECT数组每一个元素是表示一行数据的数组
randomableBOOL数据源是否支持随机获取

各个类型节点样例

数据集输入节点
json
{
        "he": {
          "kind": "function",
          "op": "app_dataset",
          "args": [
            101126,                  // 应用id
            1                        // 数据集id
          ],
          "uid": "input-2"           // 节点唯一id
        },
        "options": {
          "title": "A_IVT_MOVIE",
          "nodeType": "DATASET",
          "samplingMethod": "DEFAULT",
          "samplingNum": 1000,
          "inputStrategy": "INCREMENTAL",
          "incrementalField": [
            "f1",
            "f2"
          ]
        }
      }
数据连接输入节点
json
{
        "he": {
          "kind": "function",
          "op": "db_source",
          "args": [
            25055,                 // 数据连接id
            [                      // schema路径,支持多层,数组元素每个表示一层
              "test"
            ],
            "客户"                 // 表名
          ],
          "uid": "input-3"        // 节点唯一id
        },
        "options": {
          "title": "客户",
          "nodeType": "CONNECTION",
          "samplingMethod": "DEFAULT",
          "samplingNum": 1000,
          "inputStrategy": "INCREMENTAL",
          "incrementalField": [
            "f1",
            "f2"
          ]
        }
      }
sql输入节点
json
{
  "he": {
    "kind": "function",
    "op": "raw_sql",
    "args": [
      25055,                         // 数据连接id
      "select * from city"           // 执行的sql
    ],
    "uid": "input-3"                 // 节点唯一id
  },
  "options": {
    "title": "sql输入节点",
    "nodeType": "SQL",
    "samplingMethod": "DEFAULT",
    "samplingNum": 1000,
    "inputStrategy": "INCREMENTAL",
    "incrementalField": [
      "f1",
      "f2"
    ]
  }
}
文件输入节点

节点信息生成需要先调用文件保存接口后生成,见使用新上传的文件表格为内容添加数据到内部连接并返回连接和表信息

json
{
        "he": {
          "kind": "function",
          "op": "db_source",
          "args": [
            1240,
            [
              "hengshi_internal_engine_tmp_schema"
            ],
            "f_u_7_1630930370000_18151_0"
          ],
          "uid": "input-1"
        },
        "options": {
          "title": "date.txt",
          "nodeType": "FILE",
          "samplingMethod": "DEFAULT",
          "samplingNum": 1000,
          "inputStrategy": "ALL",
          "sourceTitle": "date.txt"
        }
      }
行转列节点
json
{
  "he":{
    "kind":"function",
    "op":"pivot",
    "args":[
      {                                                // 上游引用的节点id
        "kind": "reference",
        "op": "uid-1"
      },
      [                                                // 分组列
        {"kind": "field", "op": "year"}
      ],
      {                                                 // 聚合列
        "kind": "function",
        "op": "sum",
	"args": [
	  {
	    "kind": "field",
	    "op": "val"
	  }
        ]
      },
      {"kind": "field", "op": "month"},                 // 转置列
      [                                                 // 转置列列值对应的列名,没有表示自动生成
	{"kind": "constant", "op": "一月", "uid": "Jan"},
	{"kind": "constant", "op": "二月", "uid": "Feb"},
	{"kind": "constant", "op": "三月", "uid": "Mar"},
	{"kind": "constant", "op": "四月", "uid": "Apr"},
	{"kind": "constant", "op": "五月", "uid": "May"},
	{"kind": "constant", "op": "六月", "uid": "Jun"}
      ]
    ],
    "uid":"uid-2"
  },
  "options":{
    "title":"pivot行转列节点",
    "nodeType":"PIVOT",
    "customizedFieldName":true
  }
}
列转行节点
json
{
  "he":{
    "kind":"function",
    "op":"unpivot",
    "args": [
      {"kind": "reference", "op": "uid-1"},                            // 上游节点
      {"kind": "field", "op": "month", "type": "number"},              // 存放列转行的key列,需要明确的 type
      {"kind": "field", "op": "val"},                                  // 存放列转行的 value 列
      [
        {"kind": "field", "op": "Jan", "value": 1},                    // 待转的列, value 表示转换后的key值
    	{"kind": "field", "op": "Feb", "value": 2},
    	{"kind": "field", "op": "Mar", "value": 3},
        {"kind": "field", "op": "Apr", "value": 4},
	{"kind": "field", "op": "May", "value": 5},
	{"kind": "field", "op": "Jun", "value": 6}
      ]
    ],
    "uid":"uid-3"
  },
  "options":{
    "title":"unpivot列转换节点",
    "nodeType":"UNPIVOT"
  }
}
聚合节点
json
{
  "he":{
    "kind": "function",
    "op": "summarize",
    "args": [
        {"kind": "reference", "op": "uid-1"},                        // 上游节点引用id
        {"kind": "field", "op": "id", "uid": "id2"},                 // 聚合的字段列表,可以是纯字段或者是聚合表达式
        {
          "kind": "function",
          "op": "trunc_year",
          "args": [{"kind": "field", "op": "pubdate"}],
          "uid": "pubdate2"
        },
        {
          "kind": "function",
          "op": "trunc_millisecond",
          "args": [{"kind": "field", "op": "release_time"}],
          "uid": "release_time2"
        },
        {
          "kind": "function",
          "op": "count",
          "args": [{"kind": "field", "op": "id"}]
          "uid": "votes"
        },
        {"kind": "formula", "op": "sum({rate_num})", "uid": "rate_num_sum"},
    ],
    "uid":"uid-3"
  },
  "options":{
    "title":"聚合节点",
    "nodeType":"AGGREGATE",
    "aggStartIndex":4                                                 // 分组列和聚合列的分界线,仅在和衡石前端交互有用
  }
}
联合节点
json
{
  "he": {
    "kind": "function",
    "op": "left_join",                                   // 支持left_join,right_join,inner_join,full_join
    "args": [
      {"kind": "reference", "op": "input-1"},           // 联合左表引用的节点id
      {"kind": "reference", "op": "input-2"},           // 联合右表引用的节点id
      {                                                 // 联合的条件
        "kind": "function",
        "op": "and",
        "args": [
          {
            "kind": "function",
            "op": "=",
            "args": [
              {
                "kind": "field",
                "dataset": "input-1",                 // 字段来自的节点id
                "op": "id"
              },
              {
                "kind": "field",
                "dataset": "input-2",                // 字段来自的节点id
                "op": "id"
              }
            ]
          }
        ]
      }
    ],
    "uid": "join-uid-1"
  },
  "options": {
    "title": "联合节点",
    "nodeType": "JOIN"
  }
}
合并节点
json
{
  "he": {
    "kind": "function",
    "op": "append",
    "args": [
      {"kind": "reference", "op": "uid-1"},  // 合并的第一个节点的唯一id引用,结果以这个节点的字段为准
      {"kind": "reference", "op": "uid-2"},  // 合并的其他节点的唯一id引用
      {"kind": "reference", "op": "uid-3"}
    ],
    "uid": "union-uid-1"                     // 节点唯一id
  },
  "options": {
    "title": "合并节点",
    "nodeType": "UNION"
  }
}
选择节点

使用表达式过滤的例子

json
{
  "he": {
    "kind": "function",
    "op": "select_fields_complete",
    "args": [
      {"kind": "reference", "op": "uid-1"},                                   // 上游引用的节点id
      [
        {"kind": "field", "op": "*"},                                         // 保留的字段,*表示全部
        {"kind": "field", "op": "f1", "uid": "别名1"},                         // 重命名字段
        {"kind": "field", "op": "f2", "type": "string"},                      // 修改字段类型
        {"kind": "formula", "op": "{f1} + {f2} * {f3}", "uid": "别名2"}        // 新增表达式字段
      ],
      [                                                                       // 过滤条件,这里是一个表达式过滤
        {
          "kind": "formula", "op": "{f1} > 10 and {f2} + {f3} > 100"
        }
      ]
    ],
    "uid": "select-uid-1"
  },
  "options": {
    "title": "选择节点",
    "nodeType": "SELECT"
  }
}

使用简单过滤条件的例子

json
{
  "he": {
    "kind": "function",
    "op": "select_fields_complete",
    "args": [
      {"kind": "reference", "op": "uid-1"},
      [
        {"kind": "field", "op": "*"},
        {"kind": "field", "op": "f1", "uid": "别名1"},
        {"kind": "field", "op": "f2", "type": "string"},
        {"kind": "formula", "op": "{f1} + {f2} * {f3}", "uid": "别名2"}
      ],
      [                                                                         // 过滤条件,这里是一个简单过滤条件
        {
          "kind":"function",
          "op":"and",
          "args":[
            {
              "kind":"function",
              "op":"=",
              "args":[
                {"kind":"field","op":"id","type":"number"},
                {"kind":"constant","op":1,"type":"number"}
              ]
            },
            {
              "kind":"function",
              "op":"isnotnull",
              "args":[
                {"kind":"field","op":"short_name","type":"string"}
              ]
            }
          ]
        }
      ]
    ],
    "uid": "select-uid-1"
  },
  "options": {
    "title": "选择节点",
    "nodeType": "SELECT"
  }
}
输出节点
json
{
        "he": {
          "kind": "function",
          "op": "db_output",
          "args": [
            "input-1",                  // 要输出的上游节点id
            1,                          // 输出的数据连接id
            [                           // 输出的schema路径,支持多层,每层一个元素
              "test"
            ],
            "movie"                     // 输出表名
          ],
          "uid": "output-1"             // 节点唯一id
        },
        "options": {
          "title": "output-1",
          "nodeType": "SINK",
          "validateSchema": false,
          "tableAction": "RECREATE",
          "updateMethod": "INSERT",
          "keyFields": [
            "f3",
            "f4"
          ],
          "preSql": "",
          "postSql": ""
        }
      }

接口说明

新建一个数据集成项目

请求URL

http
POST /api/pipelines HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

请求参数

URL 参数
字段类型是否必须说明
Request Body 参数
字段类型是否必须描述
titleSTRING数据集成项目的标题
options.defaultOutputConnectionIdINTEGER数据集成项目的默认输出目标连接id
options.defaultPathSTRING 数组数据集成项目的默认输出路径

返回对象的格式说明

字段类型说明
versionSTRING当前系统版本哈希值
dataOBJECT数据集成项目结构说明

接口示例:

http
POST /api/pipelines HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

// Request Body:
{
  "title": "wss p2",
  "options": {
    "defaultOutputConnectionId": 5,
    "defaultPath": [
      "public"
    ]
  }
}
http
HTTP/1.1 200 Ok
Content-Type: application/json

{
  "version": "version@9a5e106#6730f0d",
  "code": 0,
  "msg": "success",
  "data": {
    "id": 2,
    "title": "wss p2",
    "createdBy": 1,
    "createdAt": "2020-04-02 15:08:33",
    "updatedBy": 1,
    "updatedAt": "2020-04-02 15:08:33",
    "options": {
      "defaultPath": [
        "public"
      ],
      "defaultOutputConnectionId": 5
    }
  }
}

删除一个数据集成项目

请求URL

DELETE /api/pipelines/{pipelineId}

请求参数

URL 参数
字段类型是否必须说明
Request Body 参数
字段类型是否必须描述

返回对象的格式说明

字段类型说明
versionSTRING当前系统版本哈希值
code错误码请求成功返回0
msgSTRING请求成功返回success

接口示例:

http
DELETE /api/pipelines/1 HTTP/1.1
http
HTTP/1.1 200 Ok
Content-Type: application/json

{
  "code": 0,
  "msg": "success",
  "version": "version@9a5e106#6730f0d",
}

修改一个数据集成项目

请求URL

PUT /api/pipelines/{pipelineId}

请求参数

URL 参数
字段类型是否必须说明
Request Body 参数

字段见 数据集成项目结构说明,其中updatePart是必须的,后端根据修改的部分对该部分做校验。

返回对象的格式说明

字段类型说明
versionSTRING当前系统版本哈希值
dataOBJECT数据集成项目结构说明

接口示例: 1 修改标题和默认输出路径

http
PUT /api/pipelines/1 HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

// Request Body:
{
  "updatePart": "TITLE_AND_PATH",
  "title": "wss p3",
  "options": {
    "defaultOutputConnectionId": 2,
    "defaultPath": [
      "test"
    ]
  }
}
http
HTTP/1.1 200 Ok
Content-Type: application/json

{
  "version": "version@9a5e106#6730f0d",
  "code": 0,
  "msg": "success",
  "data": {
    "id": 2,
    "title": "wss p3",
    "createdBy": 1,
    "createdAt": "2020-04-02 15:08:33",
    "updatedBy": 1,
    "updatedAt": "2020-04-02 15:08:33",
    "options": {
      "defaultPath": [
        "test"
      ],
      "defaultOutputConnectionId": 2
    }
  }
}

接口示例: 2 添加数据集类型输入节点

http
PUT /api/pipelines/1 HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

// Request Body:
{
  "updatePart": "NODES",
  "options": {
    "nodes": [
      {
        "he": {
          "kind": "function",
          "op": "app_dataset",
          "args": [
            101126,
            1
          ],
          "uid": "input-2"
        },
        "options": {
          "title": "A_IVT_MOVIE",
          "nodeType": "DATASET",
          "samplingMethod": "DEFAULT",
          "samplingNum": 1000,
          "inputStrategy": "INCREMENTAL",
          "incrementalField": [
            "f1",
            "f2"
          ]
        }
      }
    ]
  }
}
http
HTTP/1.1 200 Ok
Content-Type: application/json

{
  "version": "version@9a5e106#6730f0d",
  "code": 0,
  "msg": "success",
  "data": {
    "id": 2,
    "title": "wss p3",
    "createdBy": 1,
    "createdAt": "2020-04-02 15:08:33",
    "updatedBy": 1,
    "updatedAt": "2020-04-02 15:08:33",
    "options": {
      "defaultPath": [
        "test"
      ],
      "defaultOutputConnectionId": 2,
      "nodes": [
        {
          "he": {
            "kind": "function",
            "op": "app_dataset",
            "args": [
              101126,
              1
            ],
            "uid": "input-2"
          },
          "options": {
            "title": "A_IVT_MOVIE",
            "nodeType": "DATASET",
            "samplingMethod": "DEFAULT",
            "samplingNum": 1000,
            "inputStrategy": "INCREMENTAL",
            "incrementalField": [
              "f1",
              "f2"
            ]
          }
        }
      ]
    }
  }
}

接口示例: 3 添加文件类型输入节点, 文件上传接口和返回信息参考上传文件准备作为输入节点接口

http
PUT /api/pipelines/1 HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

// Request Body:
{
  "updatePart": "NODES",
  "options": {
    "nodes": [
      {
        "he": {
          "kind": "function",
          "op": "db_source",
          "args": [
            1240,
            [
              "hengshi_internal_engine_tmp_schema"
            ],
            "f_u_7_1630930370000_18151_0"
          ],
          "uid": "input-1"
        },
        "options": {
          "title": "date.txt",
          "nodeType": "FILE",
          "samplingMethod": "DEFAULT",
          "samplingNum": 1000,
          "inputStrategy": "ALL",
          "sourceTitle": "date.txt"
        }
      }
    ]
  }
}
http
HTTP/1.1 200 Ok
Content-Type: application/json

{
  "version": "version@9a5e106#6730f0d",
  "code": 0,
  "msg": "success",
  "data": {
    "id": 2,
    "title": "wss p3",
    "createdBy": 1,
    "createdAt": "2020-04-02 15:08:33",
    "updatedBy": 1,
    "updatedAt": "2020-04-02 15:08:33",
    "options": {
      "defaultPath": [
        "test"
      ],
      "defaultOutputConnectionId": 2,
      "nodes": [
        {
          "he": {
            "kind": "function",
            "op": "db_source",
            "args": [
              1240,
              [
                "hengshi_internal_engine_tmp_schema"
              ],
              "f_u_7_1630930370000_18151_0"
            ],
            "uid": "input-1"
          },
          "options": {
            "title": "date.txt",
            "nodeType": "FILE",
            "samplingMethod": "DEFAULT",
            "samplingNum": 1000,
            "inputStrategy": "ALL"
          }
        }
      ]
    }
  }
}

接口示例: 4 添加连接类型的输入节点

http
PUT /api/pipelines/1 HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

// Request Body:
{
  "updatePart": "NODES",
  "options": {
    "nodes": [
      {
        "he": {
          "kind": "function",
          "op": "db_source",
          "args": [
            25055,
            [
              "test"
            ],
            "客户"
          ],
          "uid": "input-3"
        },
        "options": {
          "title": "客户",
          "nodeType": "CONNECTION",
          "samplingMethod": "DEFAULT",
          "samplingNum": 1000,
          "inputStrategy": "INCREMENTAL",
          "incrementalField": [
            "f1",
            "f2"
          ]
        }
      }
    ]
  }
}
http
HTTP/1.1 200 Ok
Content-Type: application/json

{
  "version": "version@9a5e106#6730f0d",
  "code": 0,
  "msg": "success",
  "data": {
    "id": 2,
    "title": "wss p3",
    "createdBy": 1,
    "createdAt": "2020-04-02 15:08:33",
    "updatedBy": 1,
    "updatedAt": "2020-04-02 15:08:33",
    "options": {
      "defaultPath": [
        "test"
      ],
      "defaultOutputConnectionId": 2,
      "nodes": [
        {
          "he": {
            "kind": "function",
            "op": "db_source",
            "args": [
              25055,
              [
                "test"
              ],
              "客户"
            ],
            "uid": "input-3"
          },
          "options": {
            "title": "客户",
            "nodeType": "CONNECTION",
            "samplingMethod": "DEFAULT",
            "samplingNum": 1000,
            "inputStrategy": "INCREMENTAL",
            "incrementalField": [
              "f1",
              "f2"
            ]
          }
        }
      ]
    }
  }
}

接口示例: 5 添加输出节点

http
PUT /api/pipelines/1 HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

// Request Body:
{
  "updatePart": "NODES",
  "options": {
    "nodes": [
      {
        "he": {
          "kind": "function",
          "op": "db_output",
          "args": [
            "input-1",
            1,
            [
              "test"
            ],
            "movie"
          ],
          "uid": "output-1"
        },
        "options": {
          "title": "output-1",
          "nodeType": "SINK",
          "validateSchema": false,
          "tableAction": "RECREATE",
          "updateMethod": "INSERT",
          "keyFields": [
            "f3",
            "f4"
          ],
          "preSql": "",
          "postSql": ""
        }
      }
    ]
  }
}
http
HTTP/1.1 200 Ok
Content-Type: application/json

{
  "version": "version@9a5e106#6730f0d",
  "code": 0,
  "msg": "success",
  "data": {
    "id": 2,
    "title": "wss p3",
    "createdBy": 1,
    "createdAt": "2020-04-02 15:08:33",
    "updatedBy": 1,
    "updatedAt": "2020-04-02 15:08:33",
    "options": {
      "defaultPath": [
        "test"
      ],
      "defaultOutputConnectionId": 2,
      "nodes": [
        {
          "he": {
            "kind": "function",
            "op": "db_output",
            "args": [
              "input-1",
              1,
              [
                "test"
              ],
              "movie"
            ],
            "uid": "output-1"
          },
          "options": {
            "title": "output-1",
            "nodeType": "SINK",
            "validateSchema": false,
            "tableAction": "RECREATE",
            "updateMethod": "INSERT",
            "keyFields": [
              "f3",
              "f4"
            ],
            "preSql": "",
            "postSql": ""
          }
        }
      ]
    }
  }
}

查看一个数据集成项目

请求URL

GET /api/pipelines/{pipelineId}

请求参数

URL 参数
字段类型是否必须说明
Request Body 参数
字段类型是否必须描述

返回对象的格式说明

字段类型说明
versionSTRING当前系统版本哈希值
dataOBJECT数据集成项目结构说明

接口示例:

http
GET /api/pipelines/1 HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
http
HTTP/1.1 200 Ok
Content-Type: application/json

{
  "data": {
    "id": 1,
    "title": "数据集成1",
    "status": "PENDING",
    "latestStartAt": "2019-05-05 17:55:55.123",
    "costTime": 60,
    "options": {
      "nodes": [
        {
          "he": {
            "kind": "function",
            "op": "app_dataset",
            "args": [
              101126,
              1
            ],
            "uid": "input-2"
          },
          "options": {
            "title": "A_IVT_MOVIE",
            "nodeType": "DATASET",
            "samplingMethod": "DEFAULT",
            "samplingNum": 1000,
            "inputStrategy": "INCREMENTAL",
            "incrementalField": [
              "f1",
              "f2"
            ]
          }
        }
      ],
      "defaultOutputConnectionId": 1,
      "defaultPath": [
        "public"
      ]
    }
  }
}

获取所有数据集成项目

请求URL

http
GET /api/pipelines HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

请求参数

URL 参数
字段类型是否必须说明
offsetINTEGER可选分页偏移量,默认是0
limitINTEGER可选分页获取个数,默认是10
Request Body 参数
字段类型是否必须描述

返回对象的格式说明

字段类型说明
versionSTRING当前系统版本哈希值
dataOBJECT 数组每个对象见见数据集成项目结构说明

接口示例:

http
GET /api/pipelines HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
http
HTTP/1.1 200 Ok
Content-Type: application/json

{
  "data": [
    {
      "id": 1,
      "title": "数据集成1",
      "status": "PENDING",
      "latestStartAt": "2019-05-05 17:55:55.123",
      "costTime": 60,
      "options": {
        "nodes": [
          {
            "he": {
              "kind": "function",
              "op": "app_dataset",
              "args": [
                101126,
                1
              ],
              "uid": "input-2"
            },
            "options": {
              "title": "A_IVT_MOVIE",
              "nodeType": "DATASET",
              "samplingMethod": "DEFAULT",
              "samplingNum": 1000,
              "inputStrategy": "INCREMENTAL",
              "incrementalField": [
                "f1",
                "f2"
              ]
            }
          }
        ],
        "defaultOutputConnectionId": 1,
        "defaultPath": [
          "public"
        ]
      }
    },
    ...
  ],
  ...
}

使用新上传的文件表格为内容添加数据到内部连接并返回连接和表信息

请求URL

http
POST /api/pipelines/{pipelineId}/nodes/files/{fileId}/sheets/{sheetId}/save HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

请求参数

URL 参数
字段类型是否必须说明
Request Body 参数

注意,这个options是描述文件内容的options,不是节点的options。

字段类型是否必须描述
optionsOBJECT描述文件内容
options.delimiterSTRING文件的分隔符,见文件分隔符
options.encodingSTRING文件的编码
options.headerINTEGER表头所在行数,从0开始
options.originSTRING文件元素类,可能为file_excel,file_csv
options.rangeOBJECT数组数据在文件中的行列范围
options.range[].xbeginINTEGER开始列数,从0开始,包含自身
options.range[].xendINTEGER结束列数,从0开始,包含自身
options.range[].ybeginINTEGER开始行数,从0开始,包含自身
options.range[].yendINTEGER结束行数,从0开始,包含自身
transposeBOOL是否做行列翻转
titleSTRING文件的名字
文件分隔符
意义
colon英文的冒号(😃
comma英文的逗号(,)
pipe英文的竖线(|)
semicolon英文的分号(😉
tabtab符号(\t)

返回对象的格式说明

字段类型说明
versionSTRING当前系统版本哈希值
dataOBJECT数据集成项目结构说明 中options.nodes的说明

接口示例:

http
POST /api/pipelines/2/nodes/files/56c8905fcc06bd0b210b29401e336194/sheets/0/save HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

// Request Body:
{
  "options": {
    "delimiter": "comma",
    "encoding": "UTF-8",
    "header": 1,
    "origin": "file_csv",
    "range": [
      {
        "xbegin": 1,
        "xend": 6,
        "ybegin": 2,
        "yend": 10
      }
    ],
    "transpose": false
  },
  "title": "date.txt"
}
http
HTTP/1.1 200 Ok
Content-Type: application/json

{
  "version": "version@9a5e106#6730f0d",
  "data": {
    "he": {
      "kind": "function",
      "op": "db_source",
      "args": [
        1240,
        [
          "hengshi_internal_engine_tmp_schema"
        ],
        "f_u_7_1630930370000_18151_0"
      ]
    },
    "options": {
      "sourceTitle": "date.txt"
    }
  }
}

预览一个节点的数据

请求URL

http
GET /api/pipelines/{pipelineId}/nodes/{uid}/data HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

请求参数

URL 参数
字段类型是否必须说明
offsetINTEGER可选分页偏移量,默认是0
limitINTEGER可选分页获取个数,默认是1000,可以用0来表示只获取schema
Request Body 参数
字段类型是否必须描述

返回对象的格式说明

字段类型说明
versionSTRING当前系统版本哈希值
dataOBJECT任务执行结果说明
errorMapOBJECT具体节点的错误信息,是一个字符串到字符串的map

接口示例:

http
GET /api/pipelines/1/nodes/input-1/data HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
http
HTTP/1.1 200 Ok
Content-Type: application/json

{
  "data": {
    "data": [
      ...
    ],
    "schema": [
      ...
    ],
    "randomable": false
  },
  ...
}

根据当前设置预览一个节点的数据

请求URL

http
POST /api/pipelines/{pipelineId}/nodes/{uid}/data HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

请求参数

URL 参数
字段类型是否必须说明
offsetINTEGER可选分页偏移量,默认是0
limitINTEGER可选分页获取个数,默认是1000,可以用0来表示只获取schema
Request Body 参数
字段类型是否必须描述
optionsOBJECT字段和数据集成项目结构说明中的options一致

返回对象的格式说明

字段类型说明
versionSTRING当前系统版本哈希值
dataOBJECT任务执行结果说明
errorMapOBJECT具体节点的错误信息,是一个字符串到字符串的map

接口示例:

http
POST /api/pipelines/1/nodes/input-1/data HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

// Request Body:
{
  "options":{
    "defaultOutputConnectionId":1,
    "defaultPath":["public"],
    "nodes":[...]
  }
}
http
HTTP/1.1 200 Ok
Content-Type: application/json

{
  "data": {
    "data": [
      ...
    ],
    "schema": [
      ...
    ],
    "randomable": false
  },
  ...
}

获取一个节点的debug sql

请求URL

http
GET /api/pipelines/{pipelineId}/nodes/{uid}/sql-debug HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

请求参数

URL 参数
字段类型是否必须说明
Request Body 参数
字段类型是否必须描述

返回对象的格式说明

字段类型说明
versionSTRING当前系统版本哈希值
dataSTRING当前节点预计会生成的sql
errorMapOBJECT具体节点的错误信息,是一个字符串到字符串的map

接口示例:

http
GET /api/pipelines/1/nodes/input-1/sql-debug HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
http
HTTP/1.1 200 Ok
Content-Type: application/json

{
  "data": "select * from table",
  ...
}

根据当前设置获取一个节点的debug sql

请求URL

http
POST /api/pipelines/{pipelineId}/nodes/{uid}/sql-debug HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

请求参数

URL 参数
字段类型是否必须说明
Request Body 参数
字段类型是否必须描述
optionsOBJECT字段和数据集成项目结构说明中的options一致

返回对象的格式说明

字段类型说明
versionSTRING当前系统版本哈希值
dataSTRING当前节点预计会生成的sql
errorMapOBJECT具体节点的错误信息,是一个字符串到字符串的map

接口示例:

http
POST /api/pipelines/1/nodes/input-1/sql-debug HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

// Request Body:
{
  "options":{
    "defaultOutputConnectionId":1,
    "defaultPath":["public"],
    "nodes":[...]
  }
}
http
HTTP/1.1 200 Ok
Content-Type: application/json

{
  "data": "select * from table",
  ...
}

根据当前设置预览一个节点的某个字段的排重值数据

请求URL

http
POST /api/pipelines/{pipelineId}/nodes/{uid}/fields/{fieldName}/distinct HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

请求参数

URL 参数
字段类型是否必须说明
offsetINTEGER可选分页偏移量,默认是0
limitINTEGER可选分页获取个数,默认是1000,可以用0来表示只获取schema
Request Body 参数
字段类型是否必须描述
optionsOBJECT字段和数据集成项目结构说明中的options一致

返回对象的格式说明

字段类型说明
versionSTRING当前系统版本哈希值
dataOBJECT任务执行结果说明
errorMapOBJECT具体节点的错误信息,是一个字符串到字符串的map

接口示例:

http
POST /api/pipelines/1/nodes/input-1/fields/month/distinct HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

// Request Body:
{
  "options":{
    "defaultOutputConnectionId":1,
    "defaultPath":["public"],
    "nodes":[...]
  }
}
http
HTTP/1.1 200 Ok
Content-Type: application/json

{
  "data": {
    "data": [
      ["Jan"],
      ["Feb"],
      ["Mar"],
      ["Apr"],
      ["May"],
      ...
    ],
    "schema": [
      ...
    ]
  },
  ...
}

列出当前用户的支持输出的连接列表

请求URL

http
GET /api/connections/writable HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

请求参数

URL 参数
字段类型是否必须说明
Request Body 参数
字段类型是否必须描述

返回对象的格式说明

字段类型说明
versionSTRING当前系统版本哈希值
dataOBJECT 数组每个成员描述连接的信息

接口示例:

http
GET /api/connections/writable HTTP/1.1
Accept: application/json

// Response:
{
  "version": "version@9a5e106#6730f0d",
  "code": 0,
  "msg": "success",
  "data": [
    {
      "id": 5,
      "options": {
        "type": "postgresql",
        "maxConnNum": 10,
        "config": {},
        "protocol": "http",
        "outputAble": true
      },
      "createdBy": 1,
      "createdAt": "2020-04-01 16:57:51",
      "updatedBy": 1,
      "updatedAt": "2020-04-01 16:57:51",
      "visible": true,
      "title": "pg wss",
      "status": 0,
      "refreshStats": {},
      "hsVersion": 0,
      "accessCount": 1,
      "creator": {
        "id": 1,
        "name": "hengshiwss",
        "email": "anonymous@hengshi.com",
        "userAttributes": {
          "name": "hengshiwss",
          "id": 1,
          "email": "anonymous@hengshi.com"
        }
      },
      "updater": {
        "id": 1,
        "name": "hengshiwss",
        "email": "anonymous@hengshi.com",
        "userAttributes": {
          "name": "hengshiwss",
          "id": 1,
          "email": "anonymous@hengshi.com"
        }
      },
      "auth": false
    }
  ],
  "totalHits": 1,
  "offset": 0
}

获取指定数据集成项目的状态

请求URL

http
GET /api/pipelines/status HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

请求参数

URL 参数
字段类型是否必须说明
idsSTRING需要查询状态的pipeline的id列表,用英文逗号分隔
Request Body 参数
字段类型是否必须描述

返回对象的格式说明

字段类型说明
versionSTRING当前系统版本哈希值
dataOBJECT是一个map,key是pipeline的id,value是状态,见执行计划中的任务执行的状态说明,id不存在或者没权限的,map里面不返回

接口示例:

http
GET /api/pipelines/status?ids=1,2,3 HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
http
HTTP/1.1 200 Ok
Content-Type: application/json

{
  "data": {
    "1": "PENDING",
    "2": "RUNNING",
    "3": "SUCCESSFUL"
  },
  ...
}

根据当前配置立即执行接口

请求URL

http
POST /api/schedules HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

请求参数

URL 参数
字段类型是否必须描述
Request Body 参数

这个接口重用执行计划管理中的立即执行接口,参数比那边两个以下参数

字段类型是否必须描述
execDetail.jobParams.uidSTRING节点的uid,当非空的时候是执行指定节点,否则按当前options配置执行整个项目
execDetail.jobParams.optionsOBJECT当前项目的设置,字段和数据集成项目结构说明中的options一致

返回对象的格式说明

字段类型说明
versionSTRING当前系统版本哈希值
dataOBJECT执行计划说明中执行计划结构说明

接口示例1: 根据options立即执行数据集成

http
POST /api/schedules HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

// Request Body:
{
    "entityGroup": "PIPELINE",
    "entityKey": "1",
    "planItems": [
        {
            "triggerType": "ONCE"
        }
    ],
    "execDetail": {
        "jobClass": "com.hengshi.nangaparbat.schedulejob.PipelineJob",
        "jobParams": {
          "pipeline": 1,
          "options": {
            "defaultOutputConnectionId":1,
            "defaultPath":["public"],
            "nodes":[...]
          }
        },
        "retryTimes": 1
    }
}
http
HTTP/1.1 200 Ok
Content-Type: application/json

{
    "code": 0,
    "data": {
        "id": 1,
        "entityGroup": "PIPELINE",
        "entityKey": "1",
        "enabled": true,
        "planItems": [
            {
                "id": 3,
                "triggerType": "ONCE",
                "planId": 1
            }
        ],
        "execDetail": {
            "jobClass": "com.hengshi.nangaparbat.schedulejob.PipelineJob",
            "jobParams": {
              "pipeline": 1,
              "options": {
                "defaultOutputConnectionId":1,
                "defaultPath":["public"],
                "nodes":[...]
              }
            },
            "retryTimes": 1
        },
        "title": "pl1",
        "createdAt": "2020-03-05 15:01:02",
        "createdBy": 1,
        "updatedAt": "2020-03-05 15:01:02",
        "updatedBy": 1
    },
    "msg": "success",
    "version": "version@9a5e106#6730f0d",
}

接口示例2: 根据当前options配置立即执行一个(一般是输出)节点

http
POST /api/schedules HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

// Request Body:
{
    "entityGroup": "PIPELINE",
    "entityKey": "1",
    "planItems": [
        {
            "triggerType": "ONCE"
        }
    ],
    "execDetail": {
        "jobClass": "com.hengshi.nangaparbat.schedulejob.PipelineJob",
        "jobParams": {
          "pipeline": 1,
          "uid": "output-1",
          "options": {
            "defaultOutputConnectionId":1,
            "defaultPath":["public"],
            "nodes":[...]
          }
        },
        "retryTimes": 1
    }
}
http
HTTP/1.1 200 Ok
Content-Type: application/json

{
    "code": 0,
    "data": {
        "id": 1,
        "entityGroup": "PIPELINE",
        "entityKey": "1",
        "enabled": true,
        "planItems": [
            {
                "id": 3,
                "triggerType": "ONCE",
                "planId": 1
            }
        ],
        "execDetail": {
            "jobClass": "com.hengshi.nangaparbat.schedulejob.PipelineJob",
            "jobParams": {
              "pipeline": 1,
              "uid": "output-1",
              "options": {
                "defaultOutputConnectionId":1,
                "defaultPath":["public"],
                "nodes":[...]
              }
            },
            "retryTimes": 1
        },
        "title": "pl1",
        "createdAt": "2020-03-05 15:01:02",
        "createdBy": 1,
        "updatedAt": "2020-03-05 15:01:02",
        "updatedBy": 1
    },
    "msg": "success",
    "version": "version@9a5e106#6730f0d",
}

查询任务执行具体节点错误信息

请求URL

http
GET /api/contexts/{contextId}/errors HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

请求参数

URL 参数
字段类型是否必须说明
blockBOOL可选是否阻塞,为true表示等到执行完再返回,否则返回空data,默认是true
Request Body 参数
字段类型是否必须描述

返回对象的格式说明

字段类型说明
versionSTRING当前系统版本哈希值
dataOBJECT是一个map,map的key是节点的uid,value是具体的错误信息

接口示例:

http
GET /api/contexts/{contextId}/errors HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
http
HTTP/1.1 200 Ok
Content-Type: application/json

{
    "code": 0,
    "data": {
      "uid-1": "文件节点没有上传",
      "uid-2": "输出节点没有上游",
      ...
    },
    "msg": "success",
    "version": "version@9a5e106#6730f0d",
}

复制一个数据集成项目

请求URL

http
POST /api/pipelines/{pipelineId}/duplicate HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

请求参数

URL 参数
字段类型是否必须说明
Request Body 参数
字段类型是否必须描述

返回对象的格式说明

字段类型说明
versionSTRING当前系统版本哈希值
dataOBJECT数据集成项目结构说明

接口示例:

http
POST /api/pipelines/1/duplicate HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

// Request Body:
{}
http
HTTP/1.1 200 Ok
Content-Type: application/json

{
  "version": "version@9a5e106#6730f0d",
  "code": 0,
  "msg": "success",
  "data": {
    "id": 2,
    "title": "数据集成1 (1)",
    "createdBy": 1,
    "createdAt": "2022-04-02 15:08:33",
    "updatedBy": 1,
    "updatedAt": "2022-04-02 15:08:33",
    "options": {
      "defaultPath": [
        "public"
      ],
      "defaultOutputConnectionId": 5,
      "nodes":[...]
    }
  }
}

根据当前设置获取输出节点的建表属性模板

请求URL

http
POST /api/pipelines/{pipelineId}/nodes/{uid}/create-table-props-template HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

请求参数

URL 参数
字段类型是否必须说明
Request Body 参数
字段类型是否必须描述
optionsOBJECT字段和数据集成项目结构说明中的options一致

返回对象的格式说明

字段类型说明
versionSTRING当前系统版本哈希值
dataSTRING根据当前输出的数据源类型和字段生成的建表属性模板

接口示例:

http
POST /api/pipelines/1/nodes/input-1/create-table-props-template HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

// Request Body:
{
  "options":{
    "defaultOutputConnectionId":1,
    "defaultPath":["public"],
    "nodes":[...]
  }
}
http
HTTP/1.1 200 Ok
Content-Type: application/json

{
  "data": "distributed by (f3) partition by list(f2) (DEFAULT PARTITION pd,partition p1 values(1),partition p2 values (2),partition p3 values (3))",
  ...
}

根据当前建表属性等设置测试建表

请求URL

http
POST /api/pipelines/{pipelineId}/nodes/{uid}/test-create HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

请求参数

URL 参数
字段类型是否必须说明
Request Body 参数
字段类型是否必须描述
optionsOBJECT字段和数据集成项目结构说明中的options一致

返回对象的格式说明

字段类型说明
versionSTRING当前系统版本哈希值
msgSTRINGhttp code不是200的话,返回具体的错误信息

接口示例:

http
POST /api/pipelines/1/nodes/input-1/test-create HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...

// Request Body:
{
  "options":{
    "defaultOutputConnectionId":1,
    "defaultPath":["public"],
    "nodes":[...]
  }
}
http
HTTP/1.1 200 Ok
Content-Type: application/json

{
  "msg": "ERROR:  column "f3" named in 'DISTRIBUTED BY' clause does not exist",
  ...
}

HENGSHI SENSE API 使用手册