DML语句
Load
该语句用于向指定的table导入数据。该操作会同时更新和此table相关的base index和rollup index的数据。这是一个异步操作,任务提交成功则返回。执行后可使用SHOW LOAD命令查看进度。
NULL导入的时候用\N来表示。如果需要将其他字符串转化为NULL,可以使用replace_value进行转化。
语法:
- LOAD LABEL load_label
- (
- data_desc1[, data_desc2, ...]
- )
- broker
- [opt_properties];
load_label
load_label是当前导入批次的标签,由用户指定,需要保证在一个database是唯一的。也就是说,之前在某个database成功导入的label不能在这个database中再使用。该label用来唯一确定database中的一次导入,便于管理和查询。
语法:
- [database_name.]your_label
data_desc
用于具体描述一批导入数据。
语法:
- DATA INFILE
- (
- "file_path1 [, file_path2, ...]
- )
- [NEGATIVE]
- INTO TABLE table_name
- [PARTITION (p1, p2)]
- [COLUMNS TERMINATED BY "column_separator"]
- [(column_list)]
- [SET (k1 = func(k2))]
说明:
file_path,broker中的文件路径,可以指定到一个文件,也可以用/*通配符指定某个目录下的所有文件。
NEGATIVE:如果指定此参数,则相当于导入一批“负”数据。用于抵消之前导入的同一批数据。该参数仅适用于存在value列,并且value列的聚合类型为SUM的情况。不支持Broker方式导入
PARTITION:如果指定此参数,则只会导入指定的分区,导入分区以外的数据会被过滤掉。如果不指定,默认导入table的所有分区。
column_separator:用于指定导入文件中的列分隔符。默认为\t。如果是不可见字符,则需要加\\x作为前缀,使用十六进制来表示分隔符。如hive文件的分隔符\x01,指定为"\\x01"
column_list:用于指定导入文件中的列和table中的列的对应关系。当需要跳过导入文件中的某一列时,将该列指定为table中不存在的列名即可,语法:
- (col_name1, col_name2, ...)
SET: 如果指定此参数,可以将源文件某一列按照函数进行转化,然后将转化后的结果导入到table中。目前支持的函数有:
strftime(fmt, column) 日期转换函数
fmt: 日期格式,形如%Y%m%d%H%M%S (年月日时分秒)
column: column_list中的列,即输入文件中的列。存储内容应为数字型的时间戳。如果没有column_list,则按照palo表的列顺序默认输入文件的列。
time_format(output_fmt, input_fmt, column) 日期格式转化
output_fmt: 转化后的日期格式,形如%Y%m%d%H%M%S (年月日时分秒)
input_fmt: 转化前column列的日期格式,形如%Y%m%d%H%M%S (年月日时分秒)
column: column_list中的列,即输入文件中的列。存储内容应为input_fmt格式的日期字符串。如果没有column_list,则按照palo表的列顺序默认输入文件的列。
alignment_timestamp(precision, column) 将时间戳对齐到指定精度
precision: year|month|day|hour
column: column_list中的列,即输入文件中的列。存储内容应为数字型的时间戳。 如果没有column_list,则按照palo表的列顺序默认输入文件的列。
注意:对齐精度为year、month的时候,只支持20050101~20191231范围内的时间戳。
default_value(value) 设置某一列导入的默认值,不指定则使用建表时列的默认值
md5sum(column1, column2, …) 将指定的导入列的值求md5sum,返回32位16进制字符串
replace_value(old_value[, new_value]) 导入文件中指定的old_value替换为new_value。new_value如不指定则使用建表时列的默认值
hll_hash(column) 用于将表或数据里面的某一列转化成HLL列的数据结构
broker
用于指定导入使用的Broker
语法:
- WITH BROKER broker_name ("key"="value"[,...])
这里需要指定具体的Broker name, 以及所需的Broker属性. 开源hdfs Broker支持的属性如下: - fs.defaultFS:默认文件系统 - username: 访问hdfs的用户名 - password: 访问hdfs的用户密码 - dfs.nameservices: ha模式的hdfs中必须配置这个,指定hdfs服务的名字.以下参数都是在ha hdfs中需要指定.例子: "dfs.nameservices" = "palo" - dfs.ha.namenodes.xxx:ha模式中指定namenode的名字,多个名字以逗号分隔,ha模式中必须配置。其中xxx表示dfs.nameservices配置的value.例子: "dfs.ha.namenodes.palo" = "nn1,nn2" - dfs.namenode.rpc-address.xxx.nn: ha模式中指定namenode的rpc地址信息,ha模式中必须配置。其中nn表示dfs.ha.namenodes.xxx中配置的一个namenode的名字。例子: "dfs.namenode.rpc-address.palo.nn1" = "host:port" - dfs.client.failover.proxy.provider: ha模式中指定client连接namenode的provider,默认为:org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider - hadoop.security.authentication: 鉴权方式,包含simple和kerberos两种值。默认是SIMPLE模式。如果要使用kerberos,那必须配置以下配置。 - kerberos_principal: 指定kerberos的principal - kerberos_keytab: 指定kerberos的keytab文件路径 - kerberos_keytab_content: 指定kerberos中keytab文件内容经过base64编码之后的内容。这个跟kerberos_keytab配置二选一就可以.
opt_properties
用于指定一些特殊参数。
语法:
- [PROPERTIES ("key"="value", ...)]
可以指定如下参数:
timeout:指定导入操作的超时时间。默认不超时。单位秒
max_filter_ratio:最大容忍可过滤(数据不规范等原因)的数据比例。默认零容忍。
load_delete_flag:指定该导入是否通过导入key列的方式删除数据,仅适用于UNIQUE KEY,导入时可不指定value列。默认为false (不支持Broker方式导入)
exe_mem_limit:在Broker Load方式时生效,指定导入执行时,后端可使用的最大内存。
举例:
- 1.导入一批数据,指定个别参数
- LOAD LABEL example_db.label1
- (
- DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
- INTO TABLE my_table
- )
- WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password")
- PROPERTIES
- (
- "timeout"="3600",
- "max_filter_ratio"="0.1",
- );
- 2.导入一批数据,包含多个文件。导入不同的 table,指定分隔符,指定列对应关系
- LOAD LABEL example_db.label2
- (
- DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file1")
- INTO TABLE my_table_1
- COLUMNS TERMINATED BY ","
- (k1, k3, k2, v1, v2),
- DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file2")
- INTO TABLE my_table_2
- COLUMNS TERMINATED BY "\t"
- (k1, k2, k3, v2, v1)
- )
- WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
- 3.导入一批数据,指定hive的默认分隔符\x01,并使用通配符*指定目录下的所有文件
- LOAD LABEL example_db.label3
- (
- DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/*")
- NEGATIVE
- INTO TABLE my_table
- COLUMNS TERMINATED BY "\\x01"
- )
- WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
- 4.导入一批“负”数据
- LOAD LABEL example_db.label4
- (
- DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/old_file")
- NEGATIVE
- INTO TABLE my_table
- COLUMNS TERMINATED BY "\t"
- )
- WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
- 5.导入一批数据,指定分区
- LOAD LABEL example_db.label5
- (
- DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
- INTO TABLE my_table
- PARTITION (p1, p2)
- COLUMNS TERMINATED BY ","
- (k1, k3, k2, v1, v2)
- )
- WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
- 6.导入一批数据,指定分区, 并对导入文件的列做一些转化,如下:
k1将tmp_k1时间戳列转化为datetime类型的数据
k2将tmp_k2 date类型的数据转化为datetime的数据
k3将tmp_k3时间戳列转化为天级别时间戳
k4指定导入默认值为1
k5将tmp_k1、tmp_k2、tmp_k3列计算md5串
导入语句为:
- LOAD LABEL example_db.label6
- (
- DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
- INTO TABLE my_table
- PARTITION (p1, p2)
- COLUMNS TERMINATED BY ","
- (tmp_k1, tmp_k2, tmp_k3, v1, v2)
- SET (
- k1 = strftime("%Y-%m-%d %H:%M:%S", tmp_k1)),
- k2 = time_format("%Y-%m-%d %H:%M:%S", "%Y-%m-%d", tmp_k2)),
- k3 = alignment_timestamp("day", tmp_k3),
- k4 = default_value("1"),
- k5 = md5sum(tmp_k1, tmp_k2, tmp_k3)
- )
- )
- WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
7.导入数据到含有HLL列的表,可以是表中的列或者数据里面的列
- LOAD LABEL example_db.label7
- (
- DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
- INTO TABLE my_table
- PARTITION (p1, p2)
- COLUMNS TERMINATED BY ","
- SET (
- v1 = hll_hash(k1),
- v2 = hll_hash(k2)
- )
- )
- WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
- LOAD LABEL example_db.label8
- (
- DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
- INTO TABLE `my_table`
- PARTITION (p1, p2)
- COLUMNS TERMINATED BY ","
- (k1, k2, tmp_k3, tmp_k4, v1, v2)
- SET (
- v1 = hll_hash(tmp_k3),
- v2 = hll_hash(tmp_k4)
- )
- )
- WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
8.从ha模式的hdfs的路径中导入数据 LOAD LABEL table1_20170707 ( DATA INFILE("hdfs://bdos/palo/table1_data") INTO TABLE table1 ) WITH BROKER hdfs ( "fs.defaultFS"="hdfs://bdos", "username"="hdfs_user", "password"="hdfs_password", "dfs.nameservices"="bdos", "dfs.ha.namenodes.bdos"="nn1,nn2", "dfs.namenode.rpc-address.bdos.nn1"="host1:port1", "dfs.namenode.rpc-address.bdos.nn2"="host2:port2") PROPERTIES ( "timeout"="3600", "max_filter_ratio"="0.1" );
9.基于keytab的kerberos鉴权访问hdfs LOAD LABEL table1_20170707 ( DATA INFILE("hdfs://bdos/palo/table1_data") INTO TABLE table1 ) WITH BROKER hdfs ( "fs.defaultFS"="hdfs://bdos", "username"="hdfs_user", "password"="hdfs_password", "dfs.nameservices"="bdos", "dfs.ha.namenodes.bdos"="nn1,nn2", "dfs.namenode.rpc-address.bdos.nn1"="host1:port1", "dfs.namenode.rpc-address.bdos.nn2"="host2:port2", "hadoop.security.authentication"="kerberos", "kerberos_principal"="palo@BAIDU.COM", "kerberos_keytab"="/home/palo/palo.keytab")) PROPERTIES ( "timeout"="3600", "max_filter_ratio"="0.1" );
10.使用keytab content的kerberos鉴权访问hdfs LOAD LABEL table1_20170707 ( DATA INFILE("hdfs://bdos/palo/table1_data") INTO TABLE table1 ) WITH BROKER hdfs ( "fs.defaultFS"="hdfs://bdos", "username"="hdfs_user", "password"="hdfs_password", "dfs.nameservices"="bdos", "dfs.ha.namenodes.bdos"="nn1,nn2", "dfs.namenode.rpc-address.bdos.nn1"="host1:port1", "dfs.namenode.rpc-address.bdos.nn2"="host2:port2", "hadoop.security.authentication"="kerberos", "kerberos_principal"="palo@BAIDU.COM", "kerberos_keytab_content"="BQIAAABEAAEACUJBSURVLkNPTQAEcGFsbw")) PROPERTIES ( "timeout"="3600", "max_filter_ratio"="0.1" );
小批量导入
小批量导入是Palo新提供的一种导入方式,这种导入方式可以使用户不依赖 Hadoop,从而完成导入。此种导入方式提交任务并不是通过MySQL客户端,而是通过http协议来完成的。用户通过http协议将导入描述和数据一同发送给Palo,Palo在接收任务成功后,会立即返回给用户成功信息,但是此时,数据并未真正导入。用户需要通过 'SHOW LOAD' 命令来查看具体的导入结果。
语法:
- curl --location-trusted -u user:passwd -T data.file http://fe.host:port/api/{db}/{table}/_load?label=xxx
参数说明:
user:用户如果是在default_cluster中的,user即为user_name。否则为user_name@cluster_name。
label:用于指定这一批次导入的label,用于后期进行作业状态查询等。 这个参数是必须传入的。
columns: 用于描述导入文件中对应的列名字。如果不传入,那么认为文件中的列顺序与建表的顺序一致,指定的方式为逗号分隔,例如:columns=k1,k2,k3,k4
column_separator: 用于指定列与列之间的分隔符,默认的为'\t'。需要注意的是,这里应使用url编码,例如需要指定'\t’为分隔符,那么应该传入’column_separator=%09';需要指定'\x01’为分隔符,那么应该传入’column_separator=%01'
max_filter_ratio: 用于指定允许过滤不规范数据的最大比例,默认是0,不允许过滤。自定义指定应该如下:'max_filter_ratio=0.2',含义是允许20%的错误率。
hll:用于指定数据里面和表里面的HLL列的对应关系,表中的列和数据里面指定的列(如果不指定columns,则数据列里面的列也可以是表里面的其它非HLL列)通过","分割,指定多个hll列使用“:”分割,例如: 'hll1,cuid:hll2,device'
举例:
- 1.将本地文件'testData'中的数据导入到数据库'testDb'中'testTbl'的表(用户是defalut_cluster中的)
- curl --location-trusted -u root:root -T testData http://fe.host:port/api/testDb/testTbl/_load?label=123
- 2.将本地文件'testData'中的数据导入到数据库'testDb'中'testTbl'的表(用户是test_cluster中的)
- curl --location-trusted -u root@test_cluster:root -T testData http://fe.host:port/api/testDb/testTbl/_load?label=123
- 3.将本地文件'testData'中的数据导入到数据库'testDb'中'testTbl'的表, 允许20%的错误率(用户是defalut_cluster中的)
- curl --location-trusted -u root -T testData http://fe.host:port/api/testDb/testTbl/_load?label=123\$amp;max_filter_ratio=0.2
- 4.将本地文件'testData'中的数据导入到数据库'testDb'中'testTbl'的表, 允许20%的错误率,并且指定文件的列名(用户是defalut_cluster中的)
- curl --location-trusted -u root -T testData http://fe.host:port/api/testDb/testTbl/_load?label=123\$amp;max_filter_ratio=0.2\$amp;columns=k1,k2,k3
- 5.使用streaming方式导入(用户是defalut_cluster中的)
- seq 1 10 | awk '{OFS="\\t"}{print $1, $1 * 10}' | curl --location-trusted -u root -T testData http://fe.host:port/api/testDb/testTbl/_load?label=123
- 6.导入含有HLL列的表,可以是表中的列或者数据中的列用于生成HLL列(用户是defalut_cluster中的)
- curl --location-trusted -u root -T testData http://host:port/api/testDb/testTbl/_load?label=123\&max_filter_ratio=0.2\&hll=hll_column1,k1:hll_column2,k2
- curl --location-trusted -u root -T testData http://host:port/api/testDb/testTbl/_load?label=123\&max_filter_ratio=0.2\&hll=hll_column1,tmp_k4:hll_column2,tmp_k5\&columns=k1,k2,k3,tmp_k4,tmp_k5
Cancel Load
Cancel load用于撤销指定load label的导入作业。这是一个异步操作,任务提交成功就返回。提交后可以使用show load命令查看进度。
语法:
- CANCEL LOAD [FROM db_name] WHERE LABEL = "load_label";
举例:
- 撤销数据库 example_db 上, label 为 example_db_test_load_label 的导入作业
- CANCEL LOAD FROM example_db WHERE LABEL = "example_db_test_load_label";
Export
该语句用于将指定表的数据导出到指定位置。这是一个异步操作,任务提交成功则返回。执行后可使用 SHOW EXPORT 命令查看进度。
语法:
- EXPORT TABLE table_name
- [PARTITION (p1[,p2])]
- TO export_path
- [opt_properties]
- broker;
table_name
当前要导出的表的表名,目前支持engine为olap和mysql的表的导出。
partition
可以只导出指定表的某些指定分区
export_path
导出的路径,需为目录。目前不能导出到本地,需要导出到broker。
opt_properties
用于指定一些特殊参数。
语法:
- [PROPERTIES ("key"="value", ...)]
可以指定如下参数:
column_separator:指定导出的列分隔符,默认为\t。
line_delimiter:指定导出的行分隔符,默认为\n。
broker
用于指定导出使用的broker
语法:
- WITH BROKER broker_name ("key"="value"[,...])
这里需要指定具体的broker name, 以及所需的broker属性
举例:
- 1.将testTbl表中的所有数据导出到hdfs上
- EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" WITH BROKER "broker_name" ("username"="xxx", "password"="yyy");
- 2.将testTbl表中的分区p1,p2导出到hdfs上
- EXPORT TABLE testTbl PARTITION (p1,p2) TO "hdfs://hdfs_host:port/a/b/c" WITH BROKER "broker_name" ("username"="xxx", "password"="yyy");
- 3.将testTbl表中的所有数据导出到hdfs上,以","作为列分隔符
- EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" PROPERTIES ("column_separator"=",") WITH BROKER "broker_name" ("username"="xxx", "password"="yyy");
Delete
该语句用于按条件删除指定table(base index) partition中的数据。该操作会同时删除和此相关的rollup index的数据。
语法:
- DELETE FROM table_name PARTITION partition_name WHERE
- column_name1 op value[ AND column_name2 op value ...];
说明:
op的可选类型包括:=, <, >, <=, >=, !=
只能指定key列上的条件。
条件之间只能是“与”的关系。若希望达成“或”的关系,需要将条件分写在两个 DELETE语句中。
如果没有创建partition,partition_name 同 table_name。
注意:
- 该语句可能会降低执行后一段时间内的查询效率,影响程度取决于语句中指定的删除条件的数量,指定的条件越多,影响越大。
举例:
- 1.删除 my_table partition p1 中 k1 列值为 3 的数据行
- DELETE FROM my_table PARTITION p1 WHERE k1 = 3;
- 2.删除 my_table partition p1 中 k1 列值大于等于 3 且 k2 列值为 "abc" 的数据行
- DELETE FROM my_table PARTITION p1 WHERE k1 >= 3 AND k2 = "abc";