主题
数据集成
数据集成说明
数据集成定义
数据集成支持自助式的数据清洗工作,包括字段过滤,字段重命名,字段添加/删除,字段修改类型,表关联,表聚合,表合并等。支持输出到不同目标数据库。
数据集成项目结构说明
字段 | 类型 | 描述 |
---|---|---|
id | INTEGER | 数据集成项目的 id |
title | STRING | 数据集成项目的标题 |
createdAt | DATETIME | 数据集成项目创建的时间 |
createdBy | INTEGER | 数据集成项目创建用户的 id |
updatedAt | DATETIME | 数据集成项目最后修改的时间 |
updatedBy | INTEGER | 数据集成项目最后修改用户的 id |
options | OBJECT | 数据集成项目的配置 |
options.nodes | OBJECT 数组 | 数据集成项目的节点连线关系 |
options.nodes[].he | OBJECT | 节点HE表达式,语法参考HQL参考和衡石函数参考 |
options.nodes[].options | OBJECT | 节点其他附加信息 |
options.nodes[].options.title | STRING | 节点名称 |
options.nodes[].options.nodeType | STRING | 节点的类型,详见数据集成项目节点类型 |
options.nodes[].options.samplingMethod | STRING | 输入节点的取样方法,详见输入节点的取样方法 |
options.nodes[].options.samplingNum | INTEGER | 输入节点的取样个数 |
options.nodes[].options.inputStrategy | STRING | 输入节点的输入策略,见输入节点的输入策略 |
options.nodes[].options.incrementalField | STRING 数组 | 输入节点的增量字段名列表,从左到右的区分度越来越细,比如年,月,日,时,分,秒,毫秒,也可以简单用自增主键 |
options.nodes[].options.sourceTitle | STRING | 输入节点的来源名字,对于数据集是数据集名字,对于连接是连接名字,对于文件是原始文件名 |
options.nodes[].options.validateSchema | BOOL | 输出节点的输出配置,是否验证schema |
options.nodes[].options.tableAction | STRING | 输出节点的输出配置,表操作,见输出节点的表操作 |
options.nodes[].options.updateMethod | STRING | 输出节点的输出配置,更新方法,见输出节点的更新方法 |
options.nodes[].options.keyFields | STRING 数组 | 输出节点的输出配置,键字段 |
options.nodes[].options.preSql | STRING | 输出节点的输出配置,加载前sql |
options.nodes[].options.postSql | STRING | 输出节点的输出配置,加载后sql |
options.nodes[].options.createTableProperties | STRING | 输出节点的输出配置,建表属性 |
options.defaultOutputConnectionId | INTEGER | 数据集成项目的默认输出目标连接id |
options.defaultPath | STRING 数组 | 数据集成项目的默认输出路径 |
status | STRING | 数据集成项目最近一次执行的状态,见执行计划中的任务执行的状态说明 |
latestStartAt | DATETIME | 数据集成项目最近一次任务执行开始执行的时间 |
costTime | INTEGER | 数据集成项目最近一次任务执行耗时,单位是秒 |
updatePart | STRING | 仅修改的时候需要,表示要修改的目标部分,见数据集成项目修改目标说明 |
entityGroup | STRING | 数据集成的执行计划类别,用于管理执行计划,固定为PIPELINE |
entityKey | STRING | 数据集成的执行计划关键字,用于管理执行计划 |
execDetail | OBJECT | 创建执行计划需要用到的任务描述信息,详见执行计划 |
数据集成项目修改目标说明
值 | 意义 |
---|---|
TITLE_AND_PATH | 修改标题和输出默认配置,包括title和 options.defaultOutputConnectionId,options.defaultPath |
NODES | 修改节点的信息,包括options.nodes |
数据集成项目节点类型
值 | 意义 |
---|---|
CONNECTION | 数据连接 |
FILE | 文件上传 |
DATASET | 数据集市中的数据集 |
SQL | SQL节点 |
SINK | 输出节点 |
JOIN | 联合节点 |
UNION | 合并节点 |
AGGREGATE | 聚合节点 |
PIVOT | 行转列节点 |
UNPIVOT | 列转行节点 |
SELECT | 选择节点 |
输入节点的取样方法
值 | 意义 |
---|---|
DEFAULT | limit n,顺序依赖具体数据源的实现 |
RANDOM | 随机 + limit n |
ALL | 全量 |
输入节点的输入策略
值 | 意义 |
---|---|
ALL | 全量 |
INCREMENTAL | 增量 |
输出节点的表操作
值 | 意义 |
---|---|
NONE | 无 |
RECREATE | 重建表 |
TRUNCATE | 清空数据 |
输出节点的更新方法
值 | 意义 |
---|---|
INSERT | 插入 |
UPSERT | 更新 |
任务执行结果说明
字段 | 类型 | 描述 |
---|---|---|
schema | OBJECT数组 | 每一个元素表示一个字段的属性,与数据集的字段结构相同 |
data | OBJECT数组 | 每一个元素是表示一行数据的数组 |
randomable | BOOL | 数据源是否支持随机获取 |
各个类型节点样例
数据集输入节点
json
{
"he": {
"kind": "function",
"op": "app_dataset",
"args": [
101126, // 应用id
1 // 数据集id
],
"uid": "input-2" // 节点唯一id
},
"options": {
"title": "A_IVT_MOVIE",
"nodeType": "DATASET",
"samplingMethod": "DEFAULT",
"samplingNum": 1000,
"inputStrategy": "INCREMENTAL",
"incrementalField": [
"f1",
"f2"
]
}
}
数据连接输入节点
json
{
"he": {
"kind": "function",
"op": "db_source",
"args": [
25055, // 数据连接id
[ // schema路径,支持多层,数组元素每个表示一层
"test"
],
"客户" // 表名
],
"uid": "input-3" // 节点唯一id
},
"options": {
"title": "客户",
"nodeType": "CONNECTION",
"samplingMethod": "DEFAULT",
"samplingNum": 1000,
"inputStrategy": "INCREMENTAL",
"incrementalField": [
"f1",
"f2"
]
}
}
sql输入节点
json
{
"he": {
"kind": "function",
"op": "raw_sql",
"args": [
25055, // 数据连接id
"select * from city" // 执行的sql
],
"uid": "input-3" // 节点唯一id
},
"options": {
"title": "sql输入节点",
"nodeType": "SQL",
"samplingMethod": "DEFAULT",
"samplingNum": 1000,
"inputStrategy": "INCREMENTAL",
"incrementalField": [
"f1",
"f2"
]
}
}
文件输入节点
节点信息生成需要先调用文件保存接口后生成,见使用新上传的文件表格为内容添加数据到内部连接并返回连接和表信息
json
{
"he": {
"kind": "function",
"op": "db_source",
"args": [
1240,
[
"hengshi_internal_engine_tmp_schema"
],
"f_u_7_1630930370000_18151_0"
],
"uid": "input-1"
},
"options": {
"title": "date.txt",
"nodeType": "FILE",
"samplingMethod": "DEFAULT",
"samplingNum": 1000,
"inputStrategy": "ALL",
"sourceTitle": "date.txt"
}
}
行转列节点
json
{
"he":{
"kind":"function",
"op":"pivot",
"args":[
{ // 上游引用的节点id
"kind": "reference",
"op": "uid-1"
},
[ // 分组列
{"kind": "field", "op": "year"}
],
{ // 聚合列
"kind": "function",
"op": "sum",
"args": [
{
"kind": "field",
"op": "val"
}
]
},
{"kind": "field", "op": "month"}, // 转置列
[ // 转置列列值对应的列名,没有表示自动生成
{"kind": "constant", "op": "一月", "uid": "Jan"},
{"kind": "constant", "op": "二月", "uid": "Feb"},
{"kind": "constant", "op": "三月", "uid": "Mar"},
{"kind": "constant", "op": "四月", "uid": "Apr"},
{"kind": "constant", "op": "五月", "uid": "May"},
{"kind": "constant", "op": "六月", "uid": "Jun"}
]
],
"uid":"uid-2"
},
"options":{
"title":"pivot行转列节点",
"nodeType":"PIVOT",
"customizedFieldName":true
}
}
列转行节点
json
{
"he":{
"kind":"function",
"op":"unpivot",
"args": [
{"kind": "reference", "op": "uid-1"}, // 上游节点
{"kind": "field", "op": "month", "type": "number"}, // 存放列转行的key列,需要明确的 type
{"kind": "field", "op": "val"}, // 存放列转行的 value 列
[
{"kind": "field", "op": "Jan", "value": 1}, // 待转的列, value 表示转换后的key值
{"kind": "field", "op": "Feb", "value": 2},
{"kind": "field", "op": "Mar", "value": 3},
{"kind": "field", "op": "Apr", "value": 4},
{"kind": "field", "op": "May", "value": 5},
{"kind": "field", "op": "Jun", "value": 6}
]
],
"uid":"uid-3"
},
"options":{
"title":"unpivot列转换节点",
"nodeType":"UNPIVOT"
}
}
聚合节点
json
{
"he":{
"kind": "function",
"op": "summarize",
"args": [
{"kind": "reference", "op": "uid-1"}, // 上游节点引用id
{"kind": "field", "op": "id", "uid": "id2"}, // 聚合的字段列表,可以是纯字段或者是聚合表达式
{
"kind": "function",
"op": "trunc_year",
"args": [{"kind": "field", "op": "pubdate"}],
"uid": "pubdate2"
},
{
"kind": "function",
"op": "trunc_millisecond",
"args": [{"kind": "field", "op": "release_time"}],
"uid": "release_time2"
},
{
"kind": "function",
"op": "count",
"args": [{"kind": "field", "op": "id"}]
"uid": "votes"
},
{"kind": "formula", "op": "sum({rate_num})", "uid": "rate_num_sum"},
],
"uid":"uid-3"
},
"options":{
"title":"聚合节点",
"nodeType":"AGGREGATE",
"aggStartIndex":4 // 分组列和聚合列的分界线,仅在和衡石前端交互有用
}
}
联合节点
json
{
"he": {
"kind": "function",
"op": "left_join", // 支持left_join,right_join,inner_join,full_join
"args": [
{"kind": "reference", "op": "input-1"}, // 联合左表引用的节点id
{"kind": "reference", "op": "input-2"}, // 联合右表引用的节点id
{ // 联合的条件
"kind": "function",
"op": "and",
"args": [
{
"kind": "function",
"op": "=",
"args": [
{
"kind": "field",
"dataset": "input-1", // 字段来自的节点id
"op": "id"
},
{
"kind": "field",
"dataset": "input-2", // 字段来自的节点id
"op": "id"
}
]
}
]
}
],
"uid": "join-uid-1"
},
"options": {
"title": "联合节点",
"nodeType": "JOIN"
}
}
合并节点
json
{
"he": {
"kind": "function",
"op": "append",
"args": [
{"kind": "reference", "op": "uid-1"}, // 合并的第一个节点的唯一id引用,结果以这个节点的字段为准
{"kind": "reference", "op": "uid-2"}, // 合并的其他节点的唯一id引用
{"kind": "reference", "op": "uid-3"}
],
"uid": "union-uid-1" // 节点唯一id
},
"options": {
"title": "合并节点",
"nodeType": "UNION"
}
}
选择节点
使用表达式过滤的例子
json
{
"he": {
"kind": "function",
"op": "select_fields_complete",
"args": [
{"kind": "reference", "op": "uid-1"}, // 上游引用的节点id
[
{"kind": "field", "op": "*"}, // 保留的字段,*表示全部
{"kind": "field", "op": "f1", "uid": "别名1"}, // 重命名字段
{"kind": "field", "op": "f2", "type": "string"}, // 修改字段类型
{"kind": "formula", "op": "{f1} + {f2} * {f3}", "uid": "别名2"} // 新增表达式字段
],
[ // 过滤条件,这里是一个表达式过滤
{
"kind": "formula", "op": "{f1} > 10 and {f2} + {f3} > 100"
}
]
],
"uid": "select-uid-1"
},
"options": {
"title": "选择节点",
"nodeType": "SELECT"
}
}
使用简单过滤条件的例子
json
{
"he": {
"kind": "function",
"op": "select_fields_complete",
"args": [
{"kind": "reference", "op": "uid-1"},
[
{"kind": "field", "op": "*"},
{"kind": "field", "op": "f1", "uid": "别名1"},
{"kind": "field", "op": "f2", "type": "string"},
{"kind": "formula", "op": "{f1} + {f2} * {f3}", "uid": "别名2"}
],
[ // 过滤条件,这里是一个简单过滤条件
{
"kind":"function",
"op":"and",
"args":[
{
"kind":"function",
"op":"=",
"args":[
{"kind":"field","op":"id","type":"number"},
{"kind":"constant","op":1,"type":"number"}
]
},
{
"kind":"function",
"op":"isnotnull",
"args":[
{"kind":"field","op":"short_name","type":"string"}
]
}
]
}
]
],
"uid": "select-uid-1"
},
"options": {
"title": "选择节点",
"nodeType": "SELECT"
}
}
输出节点
json
{
"he": {
"kind": "function",
"op": "db_output",
"args": [
"input-1", // 要输出的上游节点id
1, // 输出的数据连接id
[ // 输出的schema路径,支持多层,每层一个元素
"test"
],
"movie" // 输出表名
],
"uid": "output-1" // 节点唯一id
},
"options": {
"title": "output-1",
"nodeType": "SINK",
"validateSchema": false,
"tableAction": "RECREATE",
"updateMethod": "INSERT",
"keyFields": [
"f3",
"f4"
],
"preSql": "",
"postSql": ""
}
}
接口说明
新建一个数据集成项目
请求URL
http
POST /api/pipelines HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
请求参数
URL 参数
字段 | 类型 | 是否必须 | 说明 |
---|
Request Body 参数
字段 | 类型 | 是否必须 | 描述 |
---|---|---|---|
title | STRING | 是 | 数据集成项目的标题 |
options.defaultOutputConnectionId | INTEGER | 是 | 数据集成项目的默认输出目标连接id |
options.defaultPath | STRING 数组 | 是 | 数据集成项目的默认输出路径 |
返回对象的格式说明
字段 | 类型 | 说明 |
---|---|---|
version | STRING | 当前系统版本哈希值 |
data | OBJECT | 见数据集成项目结构说明 |
接口示例:
http
POST /api/pipelines HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
// Request Body:
{
"title": "wss p2",
"options": {
"defaultOutputConnectionId": 5,
"defaultPath": [
"public"
]
}
}
http
HTTP/1.1 200 Ok
Content-Type: application/json
{
"version": "version@9a5e106#6730f0d",
"code": 0,
"msg": "success",
"data": {
"id": 2,
"title": "wss p2",
"createdBy": 1,
"createdAt": "2020-04-02 15:08:33",
"updatedBy": 1,
"updatedAt": "2020-04-02 15:08:33",
"options": {
"defaultPath": [
"public"
],
"defaultOutputConnectionId": 5
}
}
}
删除一个数据集成项目
请求URL
DELETE /api/pipelines/{pipelineId}
请求参数
URL 参数
字段 | 类型 | 是否必须 | 说明 |
---|
Request Body 参数
字段 | 类型 | 是否必须 | 描述 |
---|
返回对象的格式说明
字段 | 类型 | 说明 |
---|---|---|
version | STRING | 当前系统版本哈希值 |
code | 错误码 | 请求成功返回0 |
msg | STRING | 请求成功返回success |
接口示例:
http
DELETE /api/pipelines/1 HTTP/1.1
http
HTTP/1.1 200 Ok
Content-Type: application/json
{
"code": 0,
"msg": "success",
"version": "version@9a5e106#6730f0d",
}
修改一个数据集成项目
请求URL
PUT /api/pipelines/{pipelineId}
请求参数
URL 参数
字段 | 类型 | 是否必须 | 说明 |
---|
Request Body 参数
字段见 数据集成项目结构说明,其中updatePart是必须的,后端根据修改的部分对该部分做校验。
返回对象的格式说明
字段 | 类型 | 说明 |
---|---|---|
version | STRING | 当前系统版本哈希值 |
data | OBJECT | 见数据集成项目结构说明 |
接口示例: 1 修改标题和默认输出路径
http
PUT /api/pipelines/1 HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
// Request Body:
{
"updatePart": "TITLE_AND_PATH",
"title": "wss p3",
"options": {
"defaultOutputConnectionId": 2,
"defaultPath": [
"test"
]
}
}
http
HTTP/1.1 200 Ok
Content-Type: application/json
{
"version": "version@9a5e106#6730f0d",
"code": 0,
"msg": "success",
"data": {
"id": 2,
"title": "wss p3",
"createdBy": 1,
"createdAt": "2020-04-02 15:08:33",
"updatedBy": 1,
"updatedAt": "2020-04-02 15:08:33",
"options": {
"defaultPath": [
"test"
],
"defaultOutputConnectionId": 2
}
}
}
接口示例: 2 添加数据集类型输入节点
http
PUT /api/pipelines/1 HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
// Request Body:
{
"updatePart": "NODES",
"options": {
"nodes": [
{
"he": {
"kind": "function",
"op": "app_dataset",
"args": [
101126,
1
],
"uid": "input-2"
},
"options": {
"title": "A_IVT_MOVIE",
"nodeType": "DATASET",
"samplingMethod": "DEFAULT",
"samplingNum": 1000,
"inputStrategy": "INCREMENTAL",
"incrementalField": [
"f1",
"f2"
]
}
}
]
}
}
http
HTTP/1.1 200 Ok
Content-Type: application/json
{
"version": "version@9a5e106#6730f0d",
"code": 0,
"msg": "success",
"data": {
"id": 2,
"title": "wss p3",
"createdBy": 1,
"createdAt": "2020-04-02 15:08:33",
"updatedBy": 1,
"updatedAt": "2020-04-02 15:08:33",
"options": {
"defaultPath": [
"test"
],
"defaultOutputConnectionId": 2,
"nodes": [
{
"he": {
"kind": "function",
"op": "app_dataset",
"args": [
101126,
1
],
"uid": "input-2"
},
"options": {
"title": "A_IVT_MOVIE",
"nodeType": "DATASET",
"samplingMethod": "DEFAULT",
"samplingNum": 1000,
"inputStrategy": "INCREMENTAL",
"incrementalField": [
"f1",
"f2"
]
}
}
]
}
}
}
接口示例: 3 添加文件类型输入节点, 文件上传接口和返回信息参考上传文件准备作为输入节点接口
http
PUT /api/pipelines/1 HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
// Request Body:
{
"updatePart": "NODES",
"options": {
"nodes": [
{
"he": {
"kind": "function",
"op": "db_source",
"args": [
1240,
[
"hengshi_internal_engine_tmp_schema"
],
"f_u_7_1630930370000_18151_0"
],
"uid": "input-1"
},
"options": {
"title": "date.txt",
"nodeType": "FILE",
"samplingMethod": "DEFAULT",
"samplingNum": 1000,
"inputStrategy": "ALL",
"sourceTitle": "date.txt"
}
}
]
}
}
http
HTTP/1.1 200 Ok
Content-Type: application/json
{
"version": "version@9a5e106#6730f0d",
"code": 0,
"msg": "success",
"data": {
"id": 2,
"title": "wss p3",
"createdBy": 1,
"createdAt": "2020-04-02 15:08:33",
"updatedBy": 1,
"updatedAt": "2020-04-02 15:08:33",
"options": {
"defaultPath": [
"test"
],
"defaultOutputConnectionId": 2,
"nodes": [
{
"he": {
"kind": "function",
"op": "db_source",
"args": [
1240,
[
"hengshi_internal_engine_tmp_schema"
],
"f_u_7_1630930370000_18151_0"
],
"uid": "input-1"
},
"options": {
"title": "date.txt",
"nodeType": "FILE",
"samplingMethod": "DEFAULT",
"samplingNum": 1000,
"inputStrategy": "ALL"
}
}
]
}
}
}
接口示例: 4 添加连接类型的输入节点
http
PUT /api/pipelines/1 HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
// Request Body:
{
"updatePart": "NODES",
"options": {
"nodes": [
{
"he": {
"kind": "function",
"op": "db_source",
"args": [
25055,
[
"test"
],
"客户"
],
"uid": "input-3"
},
"options": {
"title": "客户",
"nodeType": "CONNECTION",
"samplingMethod": "DEFAULT",
"samplingNum": 1000,
"inputStrategy": "INCREMENTAL",
"incrementalField": [
"f1",
"f2"
]
}
}
]
}
}
http
HTTP/1.1 200 Ok
Content-Type: application/json
{
"version": "version@9a5e106#6730f0d",
"code": 0,
"msg": "success",
"data": {
"id": 2,
"title": "wss p3",
"createdBy": 1,
"createdAt": "2020-04-02 15:08:33",
"updatedBy": 1,
"updatedAt": "2020-04-02 15:08:33",
"options": {
"defaultPath": [
"test"
],
"defaultOutputConnectionId": 2,
"nodes": [
{
"he": {
"kind": "function",
"op": "db_source",
"args": [
25055,
[
"test"
],
"客户"
],
"uid": "input-3"
},
"options": {
"title": "客户",
"nodeType": "CONNECTION",
"samplingMethod": "DEFAULT",
"samplingNum": 1000,
"inputStrategy": "INCREMENTAL",
"incrementalField": [
"f1",
"f2"
]
}
}
]
}
}
}
接口示例: 5 添加输出节点
http
PUT /api/pipelines/1 HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
// Request Body:
{
"updatePart": "NODES",
"options": {
"nodes": [
{
"he": {
"kind": "function",
"op": "db_output",
"args": [
"input-1",
1,
[
"test"
],
"movie"
],
"uid": "output-1"
},
"options": {
"title": "output-1",
"nodeType": "SINK",
"validateSchema": false,
"tableAction": "RECREATE",
"updateMethod": "INSERT",
"keyFields": [
"f3",
"f4"
],
"preSql": "",
"postSql": ""
}
}
]
}
}
http
HTTP/1.1 200 Ok
Content-Type: application/json
{
"version": "version@9a5e106#6730f0d",
"code": 0,
"msg": "success",
"data": {
"id": 2,
"title": "wss p3",
"createdBy": 1,
"createdAt": "2020-04-02 15:08:33",
"updatedBy": 1,
"updatedAt": "2020-04-02 15:08:33",
"options": {
"defaultPath": [
"test"
],
"defaultOutputConnectionId": 2,
"nodes": [
{
"he": {
"kind": "function",
"op": "db_output",
"args": [
"input-1",
1,
[
"test"
],
"movie"
],
"uid": "output-1"
},
"options": {
"title": "output-1",
"nodeType": "SINK",
"validateSchema": false,
"tableAction": "RECREATE",
"updateMethod": "INSERT",
"keyFields": [
"f3",
"f4"
],
"preSql": "",
"postSql": ""
}
}
]
}
}
}
查看一个数据集成项目
请求URL
GET /api/pipelines/{pipelineId}
请求参数
URL 参数
字段 | 类型 | 是否必须 | 说明 |
---|
Request Body 参数
字段 | 类型 | 是否必须 | 描述 |
---|
返回对象的格式说明
字段 | 类型 | 说明 |
---|---|---|
version | STRING | 当前系统版本哈希值 |
data | OBJECT | 见数据集成项目结构说明 |
接口示例:
http
GET /api/pipelines/1 HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
http
HTTP/1.1 200 Ok
Content-Type: application/json
{
"data": {
"id": 1,
"title": "数据集成1",
"status": "PENDING",
"latestStartAt": "2019-05-05 17:55:55.123",
"costTime": 60,
"options": {
"nodes": [
{
"he": {
"kind": "function",
"op": "app_dataset",
"args": [
101126,
1
],
"uid": "input-2"
},
"options": {
"title": "A_IVT_MOVIE",
"nodeType": "DATASET",
"samplingMethod": "DEFAULT",
"samplingNum": 1000,
"inputStrategy": "INCREMENTAL",
"incrementalField": [
"f1",
"f2"
]
}
}
],
"defaultOutputConnectionId": 1,
"defaultPath": [
"public"
]
}
}
}
获取所有数据集成项目
请求URL
http
GET /api/pipelines HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
请求参数
URL 参数
字段 | 类型 | 是否必须 | 说明 |
---|---|---|---|
offset | INTEGER | 可选 | 分页偏移量,默认是0 |
limit | INTEGER | 可选 | 分页获取个数,默认是10 |
Request Body 参数
字段 | 类型 | 是否必须 | 描述 |
---|
返回对象的格式说明
字段 | 类型 | 说明 |
---|---|---|
version | STRING | 当前系统版本哈希值 |
data | OBJECT 数组 | 每个对象见见数据集成项目结构说明 |
接口示例:
http
GET /api/pipelines HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
http
HTTP/1.1 200 Ok
Content-Type: application/json
{
"data": [
{
"id": 1,
"title": "数据集成1",
"status": "PENDING",
"latestStartAt": "2019-05-05 17:55:55.123",
"costTime": 60,
"options": {
"nodes": [
{
"he": {
"kind": "function",
"op": "app_dataset",
"args": [
101126,
1
],
"uid": "input-2"
},
"options": {
"title": "A_IVT_MOVIE",
"nodeType": "DATASET",
"samplingMethod": "DEFAULT",
"samplingNum": 1000,
"inputStrategy": "INCREMENTAL",
"incrementalField": [
"f1",
"f2"
]
}
}
],
"defaultOutputConnectionId": 1,
"defaultPath": [
"public"
]
}
},
...
],
...
}
使用新上传的文件表格为内容添加数据到内部连接并返回连接和表信息
请求URL
http
POST /api/pipelines/{pipelineId}/nodes/files/{fileId}/sheets/{sheetId}/save HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
请求参数
URL 参数
字段 | 类型 | 是否必须 | 说明 |
---|
Request Body 参数
注意,这个options是描述文件内容的options,不是节点的options。
字段 | 类型 | 是否必须 | 描述 |
---|---|---|---|
options | OBJECT | 是 | 描述文件内容 |
options.delimiter | STRING | 是 | 文件的分隔符,见文件分隔符 |
options.encoding | STRING | 是 | 文件的编码 |
options.header | INTEGER | 是 | 表头所在行数,从0开始 |
options.origin | STRING | 是 | 文件元素类,可能为file_excel,file_csv |
options.range | OBJECT数组 | 是 | 数据在文件中的行列范围 |
options.range[].xbegin | INTEGER | 是 | 开始列数,从0开始,包含自身 |
options.range[].xend | INTEGER | 是 | 结束列数,从0开始,包含自身 |
options.range[].ybegin | INTEGER | 是 | 开始行数,从0开始,包含自身 |
options.range[].yend | INTEGER | 是 | 结束行数,从0开始,包含自身 |
transpose | BOOL | 是 | 是否做行列翻转 |
title | STRING | 是 | 文件的名字 |
文件分隔符
值 | 意义 |
---|---|
colon | 英文的冒号(😃 |
comma | 英文的逗号(,) |
pipe | 英文的竖线(|) |
semicolon | 英文的分号(😉 |
tab | tab符号(\t) |
返回对象的格式说明
字段 | 类型 | 说明 |
---|---|---|
version | STRING | 当前系统版本哈希值 |
data | OBJECT | 见数据集成项目结构说明 中options.nodes的说明 |
接口示例:
http
POST /api/pipelines/2/nodes/files/56c8905fcc06bd0b210b29401e336194/sheets/0/save HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
// Request Body:
{
"options": {
"delimiter": "comma",
"encoding": "UTF-8",
"header": 1,
"origin": "file_csv",
"range": [
{
"xbegin": 1,
"xend": 6,
"ybegin": 2,
"yend": 10
}
],
"transpose": false
},
"title": "date.txt"
}
http
HTTP/1.1 200 Ok
Content-Type: application/json
{
"version": "version@9a5e106#6730f0d",
"data": {
"he": {
"kind": "function",
"op": "db_source",
"args": [
1240,
[
"hengshi_internal_engine_tmp_schema"
],
"f_u_7_1630930370000_18151_0"
]
},
"options": {
"sourceTitle": "date.txt"
}
}
}
预览一个节点的数据
请求URL
http
GET /api/pipelines/{pipelineId}/nodes/{uid}/data HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
请求参数
URL 参数
字段 | 类型 | 是否必须 | 说明 |
---|---|---|---|
offset | INTEGER | 可选 | 分页偏移量,默认是0 |
limit | INTEGER | 可选 | 分页获取个数,默认是1000,可以用0来表示只获取schema |
Request Body 参数
字段 | 类型 | 是否必须 | 描述 |
---|
返回对象的格式说明
字段 | 类型 | 说明 |
---|---|---|
version | STRING | 当前系统版本哈希值 |
data | OBJECT | 见任务执行结果说明 |
errorMap | OBJECT | 具体节点的错误信息,是一个字符串到字符串的map |
接口示例:
http
GET /api/pipelines/1/nodes/input-1/data HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
http
HTTP/1.1 200 Ok
Content-Type: application/json
{
"data": {
"data": [
...
],
"schema": [
...
],
"randomable": false
},
...
}
根据当前设置预览一个节点的数据
请求URL
http
POST /api/pipelines/{pipelineId}/nodes/{uid}/data HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
请求参数
URL 参数
字段 | 类型 | 是否必须 | 说明 |
---|---|---|---|
offset | INTEGER | 可选 | 分页偏移量,默认是0 |
limit | INTEGER | 可选 | 分页获取个数,默认是1000,可以用0来表示只获取schema |
Request Body 参数
字段 | 类型 | 是否必须 | 描述 |
---|---|---|---|
options | OBJECT | 是 | 字段和数据集成项目结构说明中的options一致 |
返回对象的格式说明
字段 | 类型 | 说明 |
---|---|---|
version | STRING | 当前系统版本哈希值 |
data | OBJECT | 见任务执行结果说明 |
errorMap | OBJECT | 具体节点的错误信息,是一个字符串到字符串的map |
接口示例:
http
POST /api/pipelines/1/nodes/input-1/data HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
// Request Body:
{
"options":{
"defaultOutputConnectionId":1,
"defaultPath":["public"],
"nodes":[...]
}
}
http
HTTP/1.1 200 Ok
Content-Type: application/json
{
"data": {
"data": [
...
],
"schema": [
...
],
"randomable": false
},
...
}
获取一个节点的debug sql
请求URL
http
GET /api/pipelines/{pipelineId}/nodes/{uid}/sql-debug HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
请求参数
URL 参数
字段 | 类型 | 是否必须 | 说明 |
---|
Request Body 参数
字段 | 类型 | 是否必须 | 描述 |
---|
返回对象的格式说明
字段 | 类型 | 说明 |
---|---|---|
version | STRING | 当前系统版本哈希值 |
data | STRING | 当前节点预计会生成的sql |
errorMap | OBJECT | 具体节点的错误信息,是一个字符串到字符串的map |
接口示例:
http
GET /api/pipelines/1/nodes/input-1/sql-debug HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
http
HTTP/1.1 200 Ok
Content-Type: application/json
{
"data": "select * from table",
...
}
根据当前设置获取一个节点的debug sql
请求URL
http
POST /api/pipelines/{pipelineId}/nodes/{uid}/sql-debug HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
请求参数
URL 参数
字段 | 类型 | 是否必须 | 说明 |
---|
Request Body 参数
字段 | 类型 | 是否必须 | 描述 |
---|---|---|---|
options | OBJECT | 是 | 字段和数据集成项目结构说明中的options一致 |
返回对象的格式说明
字段 | 类型 | 说明 |
---|---|---|
version | STRING | 当前系统版本哈希值 |
data | STRING | 当前节点预计会生成的sql |
errorMap | OBJECT | 具体节点的错误信息,是一个字符串到字符串的map |
接口示例:
http
POST /api/pipelines/1/nodes/input-1/sql-debug HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
// Request Body:
{
"options":{
"defaultOutputConnectionId":1,
"defaultPath":["public"],
"nodes":[...]
}
}
http
HTTP/1.1 200 Ok
Content-Type: application/json
{
"data": "select * from table",
...
}
根据当前设置预览一个节点的某个字段的排重值数据
请求URL
http
POST /api/pipelines/{pipelineId}/nodes/{uid}/fields/{fieldName}/distinct HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
请求参数
URL 参数
字段 | 类型 | 是否必须 | 说明 |
---|---|---|---|
offset | INTEGER | 可选 | 分页偏移量,默认是0 |
limit | INTEGER | 可选 | 分页获取个数,默认是1000,可以用0来表示只获取schema |
Request Body 参数
字段 | 类型 | 是否必须 | 描述 |
---|---|---|---|
options | OBJECT | 是 | 字段和数据集成项目结构说明中的options一致 |
返回对象的格式说明
字段 | 类型 | 说明 |
---|---|---|
version | STRING | 当前系统版本哈希值 |
data | OBJECT | 见任务执行结果说明 |
errorMap | OBJECT | 具体节点的错误信息,是一个字符串到字符串的map |
接口示例:
http
POST /api/pipelines/1/nodes/input-1/fields/month/distinct HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
// Request Body:
{
"options":{
"defaultOutputConnectionId":1,
"defaultPath":["public"],
"nodes":[...]
}
}
http
HTTP/1.1 200 Ok
Content-Type: application/json
{
"data": {
"data": [
["Jan"],
["Feb"],
["Mar"],
["Apr"],
["May"],
...
],
"schema": [
...
]
},
...
}
列出当前用户的支持输出的连接列表
请求URL
http
GET /api/connections/writable HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
请求参数
URL 参数
字段 | 类型 | 是否必须 | 说明 |
---|
Request Body 参数
字段 | 类型 | 是否必须 | 描述 |
---|
返回对象的格式说明
字段 | 类型 | 说明 |
---|---|---|
version | STRING | 当前系统版本哈希值 |
data | OBJECT 数组 | 每个成员描述连接的信息 |
接口示例:
http
GET /api/connections/writable HTTP/1.1
Accept: application/json
// Response:
{
"version": "version@9a5e106#6730f0d",
"code": 0,
"msg": "success",
"data": [
{
"id": 5,
"options": {
"type": "postgresql",
"maxConnNum": 10,
"config": {},
"protocol": "http",
"outputAble": true
},
"createdBy": 1,
"createdAt": "2020-04-01 16:57:51",
"updatedBy": 1,
"updatedAt": "2020-04-01 16:57:51",
"visible": true,
"title": "pg wss",
"status": 0,
"refreshStats": {},
"hsVersion": 0,
"accessCount": 1,
"creator": {
"id": 1,
"name": "hengshiwss",
"email": "anonymous@hengshi.com",
"userAttributes": {
"name": "hengshiwss",
"id": 1,
"email": "anonymous@hengshi.com"
}
},
"updater": {
"id": 1,
"name": "hengshiwss",
"email": "anonymous@hengshi.com",
"userAttributes": {
"name": "hengshiwss",
"id": 1,
"email": "anonymous@hengshi.com"
}
},
"auth": false
}
],
"totalHits": 1,
"offset": 0
}
获取指定数据集成项目的状态
请求URL
http
GET /api/pipelines/status HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
请求参数
URL 参数
字段 | 类型 | 是否必须 | 说明 |
---|---|---|---|
ids | STRING | 是 | 需要查询状态的pipeline的id列表,用英文逗号分隔 |
Request Body 参数
字段 | 类型 | 是否必须 | 描述 |
---|
返回对象的格式说明
字段 | 类型 | 说明 |
---|---|---|
version | STRING | 当前系统版本哈希值 |
data | OBJECT | 是一个map,key是pipeline的id,value是状态,见执行计划中的任务执行的状态说明,id不存在或者没权限的,map里面不返回 |
接口示例:
http
GET /api/pipelines/status?ids=1,2,3 HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
http
HTTP/1.1 200 Ok
Content-Type: application/json
{
"data": {
"1": "PENDING",
"2": "RUNNING",
"3": "SUCCESSFUL"
},
...
}
根据当前配置立即执行接口
请求URL
http
POST /api/schedules HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
请求参数
URL 参数
字段 | 类型 | 是否必须 | 描述 |
---|
Request Body 参数
这个接口重用执行计划管理中的立即执行
接口,参数比那边多两个以下参数
字段 | 类型 | 是否必须 | 描述 |
---|---|---|---|
execDetail.jobParams.uid | STRING | 否 | 节点的uid,当非空的时候是执行指定节点,否则按当前options配置执行整个项目 |
execDetail.jobParams.options | OBJECT | 是 | 当前项目的设置,字段和数据集成项目结构说明中的options一致 |
返回对象的格式说明
字段 | 类型 | 说明 |
---|---|---|
version | STRING | 当前系统版本哈希值 |
data | OBJECT | 见执行计划说明中执行计划结构说明 |
接口示例1: 根据options立即执行数据集成
http
POST /api/schedules HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
// Request Body:
{
"entityGroup": "PIPELINE",
"entityKey": "1",
"planItems": [
{
"triggerType": "ONCE"
}
],
"execDetail": {
"jobClass": "com.hengshi.nangaparbat.schedulejob.PipelineJob",
"jobParams": {
"pipeline": 1,
"options": {
"defaultOutputConnectionId":1,
"defaultPath":["public"],
"nodes":[...]
}
},
"retryTimes": 1
}
}
http
HTTP/1.1 200 Ok
Content-Type: application/json
{
"code": 0,
"data": {
"id": 1,
"entityGroup": "PIPELINE",
"entityKey": "1",
"enabled": true,
"planItems": [
{
"id": 3,
"triggerType": "ONCE",
"planId": 1
}
],
"execDetail": {
"jobClass": "com.hengshi.nangaparbat.schedulejob.PipelineJob",
"jobParams": {
"pipeline": 1,
"options": {
"defaultOutputConnectionId":1,
"defaultPath":["public"],
"nodes":[...]
}
},
"retryTimes": 1
},
"title": "pl1",
"createdAt": "2020-03-05 15:01:02",
"createdBy": 1,
"updatedAt": "2020-03-05 15:01:02",
"updatedBy": 1
},
"msg": "success",
"version": "version@9a5e106#6730f0d",
}
接口示例2: 根据当前options配置立即执行一个(一般是输出)节点
http
POST /api/schedules HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
// Request Body:
{
"entityGroup": "PIPELINE",
"entityKey": "1",
"planItems": [
{
"triggerType": "ONCE"
}
],
"execDetail": {
"jobClass": "com.hengshi.nangaparbat.schedulejob.PipelineJob",
"jobParams": {
"pipeline": 1,
"uid": "output-1",
"options": {
"defaultOutputConnectionId":1,
"defaultPath":["public"],
"nodes":[...]
}
},
"retryTimes": 1
}
}
http
HTTP/1.1 200 Ok
Content-Type: application/json
{
"code": 0,
"data": {
"id": 1,
"entityGroup": "PIPELINE",
"entityKey": "1",
"enabled": true,
"planItems": [
{
"id": 3,
"triggerType": "ONCE",
"planId": 1
}
],
"execDetail": {
"jobClass": "com.hengshi.nangaparbat.schedulejob.PipelineJob",
"jobParams": {
"pipeline": 1,
"uid": "output-1",
"options": {
"defaultOutputConnectionId":1,
"defaultPath":["public"],
"nodes":[...]
}
},
"retryTimes": 1
},
"title": "pl1",
"createdAt": "2020-03-05 15:01:02",
"createdBy": 1,
"updatedAt": "2020-03-05 15:01:02",
"updatedBy": 1
},
"msg": "success",
"version": "version@9a5e106#6730f0d",
}
查询任务执行具体节点错误信息
请求URL
http
GET /api/contexts/{contextId}/errors HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
请求参数
URL 参数
字段 | 类型 | 是否必须 | 说明 |
---|---|---|---|
block | BOOL | 可选 | 是否阻塞,为true表示等到执行完再返回,否则返回空data,默认是true |
Request Body 参数
字段 | 类型 | 是否必须 | 描述 |
---|
返回对象的格式说明
字段 | 类型 | 说明 |
---|---|---|
version | STRING | 当前系统版本哈希值 |
data | OBJECT | 是一个map,map的key是节点的uid,value是具体的错误信息 |
接口示例:
http
GET /api/contexts/{contextId}/errors HTTP/1.1
Accept: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
http
HTTP/1.1 200 Ok
Content-Type: application/json
{
"code": 0,
"data": {
"uid-1": "文件节点没有上传",
"uid-2": "输出节点没有上游",
...
},
"msg": "success",
"version": "version@9a5e106#6730f0d",
}
复制一个数据集成项目
请求URL
http
POST /api/pipelines/{pipelineId}/duplicate HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
请求参数
URL 参数
字段 | 类型 | 是否必须 | 说明 |
---|
Request Body 参数
字段 | 类型 | 是否必须 | 描述 |
---|
返回对象的格式说明
字段 | 类型 | 说明 |
---|---|---|
version | STRING | 当前系统版本哈希值 |
data | OBJECT | 见数据集成项目结构说明 |
接口示例:
http
POST /api/pipelines/1/duplicate HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
// Request Body:
{}
http
HTTP/1.1 200 Ok
Content-Type: application/json
{
"version": "version@9a5e106#6730f0d",
"code": 0,
"msg": "success",
"data": {
"id": 2,
"title": "数据集成1 (1)",
"createdBy": 1,
"createdAt": "2022-04-02 15:08:33",
"updatedBy": 1,
"updatedAt": "2022-04-02 15:08:33",
"options": {
"defaultPath": [
"public"
],
"defaultOutputConnectionId": 5,
"nodes":[...]
}
}
}
根据当前设置获取输出节点的建表属性模板
请求URL
http
POST /api/pipelines/{pipelineId}/nodes/{uid}/create-table-props-template HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
请求参数
URL 参数
字段 | 类型 | 是否必须 | 说明 |
---|
Request Body 参数
字段 | 类型 | 是否必须 | 描述 |
---|---|---|---|
options | OBJECT | 是 | 字段和数据集成项目结构说明中的options一致 |
返回对象的格式说明
字段 | 类型 | 说明 |
---|---|---|
version | STRING | 当前系统版本哈希值 |
data | STRING | 根据当前输出的数据源类型和字段生成的建表属性模板 |
接口示例:
http
POST /api/pipelines/1/nodes/input-1/create-table-props-template HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
// Request Body:
{
"options":{
"defaultOutputConnectionId":1,
"defaultPath":["public"],
"nodes":[...]
}
}
http
HTTP/1.1 200 Ok
Content-Type: application/json
{
"data": "distributed by (f3) partition by list(f2) (DEFAULT PARTITION pd,partition p1 values(1),partition p2 values (2),partition p3 values (3))",
...
}
根据当前建表属性等设置测试建表
请求URL
http
POST /api/pipelines/{pipelineId}/nodes/{uid}/test-create HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
请求参数
URL 参数
字段 | 类型 | 是否必须 | 说明 |
---|
Request Body 参数
字段 | 类型 | 是否必须 | 描述 |
---|---|---|---|
options | OBJECT | 是 | 字段和数据集成项目结构说明中的options一致 |
返回对象的格式说明
字段 | 类型 | 说明 |
---|---|---|
version | STRING | 当前系统版本哈希值 |
msg | STRING | http code不是200的话,返回具体的错误信息 |
接口示例:
http
POST /api/pipelines/1/nodes/input-1/test-create HTTP/1.1
Content-Type: application/json
Cookie: csrf=183f1c4...; sid=26ee552d...; _USER_SESSION_ID=f2a01083...
// Request Body:
{
"options":{
"defaultOutputConnectionId":1,
"defaultPath":["public"],
"nodes":[...]
}
}
http
HTTP/1.1 200 Ok
Content-Type: application/json
{
"msg": "ERROR: column "f3" named in 'DISTRIBUTED BY' clause does not exist",
...
}