DML语句

Load

该语句用于向指定的table导入数据。该操作会同时更新和此table相关的base index和rollup index的数据。这是一个异步操作,任务提交成功则返回。执行后可使用SHOW LOAD命令查看进度。

NULL导入的时候用\N来表示。如果需要将其他字符串转化为NULL,可以使用replace_value进行转化。

语法:

  1. LOAD LABEL load_label
  2. (
  3. data_desc1[, data_desc2, ...]
  4. )
  5. broker
  6. [opt_properties];

load_label

load_label是当前导入批次的标签,由用户指定,需要保证在一个database是唯一的。也就是说,之前在某个database成功导入的label不能在这个database中再使用。该label用来唯一确定database中的一次导入,便于管理和查询。

语法:

  1. [database_name.]your_label

data_desc

用于具体描述一批导入数据。

语法:

  1. DATA INFILE
  2. (
  3. "file_path1 [, file_path2, ...]
  4. )
  5. [NEGATIVE]
  6. INTO TABLE table_name
  7. [PARTITION (p1, p2)]
  8. [COLUMNS TERMINATED BY "column_separator"]
  9. [(column_list)]
  10. [SET (k1 = func(k2))]

说明:

  • file_path,broker中的文件路径,可以指定到一个文件,也可以用/*通配符指定某个目录下的所有文件。

  • NEGATIVE:如果指定此参数,则相当于导入一批“负”数据。用于抵消之前导入的同一批数据。该参数仅适用于存在value列,并且value列的聚合类型为SUM的情况。不支持Broker方式导入

  • PARTITION:如果指定此参数,则只会导入指定的分区,导入分区以外的数据会被过滤掉。如果不指定,默认导入table的所有分区。

  • column_separator:用于指定导入文件中的列分隔符。默认为\t。如果是不可见字符,则需要加\\x作为前缀,使用十六进制来表示分隔符。如hive文件的分隔符\x01,指定为"\\x01"

  • column_list:用于指定导入文件中的列和table中的列的对应关系。当需要跳过导入文件中的某一列时,将该列指定为table中不存在的列名即可,语法:

  1. (col_name1, col_name2, ...)
  • SET: 如果指定此参数,可以将源文件某一列按照函数进行转化,然后将转化后的结果导入到table中。目前支持的函数有:

  • strftime(fmt, column) 日期转换函数

  • fmt: 日期格式,形如%Y%m%d%H%M%S (年月日时分秒)

  • column: column_list中的列,即输入文件中的列。存储内容应为数字型的时间戳。如果没有column_list,则按照palo表的列顺序默认输入文件的列。

  • time_format(output_fmt, input_fmt, column) 日期格式转化

  • output_fmt: 转化后的日期格式,形如%Y%m%d%H%M%S (年月日时分秒)

  • input_fmt: 转化前column列的日期格式,形如%Y%m%d%H%M%S (年月日时分秒)

  • column: column_list中的列,即输入文件中的列。存储内容应为input_fmt格式的日期字符串。如果没有column_list,则按照palo表的列顺序默认输入文件的列。

  • alignment_timestamp(precision, column) 将时间戳对齐到指定精度

  • precision: year|month|day|hour

  • column: column_list中的列,即输入文件中的列。存储内容应为数字型的时间戳。 如果没有column_list,则按照palo表的列顺序默认输入文件的列。

  • 注意:对齐精度为year、month的时候,只支持20050101~20191231范围内的时间戳。

  • default_value(value) 设置某一列导入的默认值,不指定则使用建表时列的默认值

  • md5sum(column1, column2, …​) 将指定的导入列的值求md5sum,返回32位16进制字符串

  • replace_value(old_value[, new_value]) 导入文件中指定的old_value替换为new_value。new_value如不指定则使用建表时列的默认值

  • hll_hash(column) 用于将表或数据里面的某一列转化成HLL列的数据结构

broker

用于指定导入使用的Broker

语法:

  1. WITH BROKER broker_name ("key"="value"[,...])

这里需要指定具体的Broker name, 以及所需的Broker属性. 开源hdfs Broker支持的属性如下: - fs.defaultFS:默认文件系统 - username: 访问hdfs的用户名 - password: 访问hdfs的用户密码 - dfs.nameservices: ha模式的hdfs中必须配置这个,指定hdfs服务的名字.以下参数都是在ha hdfs中需要指定.例子: "dfs.nameservices" = "palo" - dfs.ha.namenodes.xxx:ha模式中指定namenode的名字,多个名字以逗号分隔,ha模式中必须配置。其中xxx表示dfs.nameservices配置的value.例子: "dfs.ha.namenodes.palo" = "nn1,nn2" - dfs.namenode.rpc-address.xxx.nn: ha模式中指定namenode的rpc地址信息,ha模式中必须配置。其中nn表示dfs.ha.namenodes.xxx中配置的一个namenode的名字。例子: "dfs.namenode.rpc-address.palo.nn1" = "host:port" - dfs.client.failover.proxy.provider: ha模式中指定client连接namenode的provider,默认为:org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider - hadoop.security.authentication: 鉴权方式,包含simple和kerberos两种值。默认是SIMPLE模式。如果要使用kerberos,那必须配置以下配置。 - kerberos_principal: 指定kerberos的principal - kerberos_keytab: 指定kerberos的keytab文件路径 - kerberos_keytab_content: 指定kerberos中keytab文件内容经过base64编码之后的内容。这个跟kerberos_keytab配置二选一就可以.

opt_properties

用于指定一些特殊参数。

语法:

  1. [PROPERTIES ("key"="value", ...)]

可以指定如下参数:

  • timeout:指定导入操作的超时时间。默认不超时。单位秒

  • max_filter_ratio:最大容忍可过滤(数据不规范等原因)的数据比例。默认零容忍。

  • load_delete_flag:指定该导入是否通过导入key列的方式删除数据,仅适用于UNIQUE KEY,导入时可不指定value列。默认为false (不支持Broker方式导入)

  • exe_mem_limit:在Broker Load方式时生效,指定导入执行时,后端可使用的最大内存。

举例:

  1. 1.导入一批数据,指定个别参数
  1. LOAD LABEL example_db.label1
  2. (
  3. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  4. INTO TABLE my_table
  5. )
  6. WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password")
  7. PROPERTIES
  8. (
  9. "timeout"="3600",
  10. "max_filter_ratio"="0.1",
  11. );
  1. 2.导入一批数据,包含多个文件。导入不同的 table,指定分隔符,指定列对应关系
  1. LOAD LABEL example_db.label2
  2. (
  3. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file1")
  4. INTO TABLE my_table_1
  5. COLUMNS TERMINATED BY ","
  6. (k1, k3, k2, v1, v2),
  7. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file2")
  8. INTO TABLE my_table_2
  9. COLUMNS TERMINATED BY "\t"
  10. (k1, k2, k3, v2, v1)
  11. )
  12. WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
  1. 3.导入一批数据,指定hive的默认分隔符\x01,并使用通配符*指定目录下的所有文件
  1. LOAD LABEL example_db.label3
  2. (
  3. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/*")
  4. NEGATIVE
  5. INTO TABLE my_table
  6. COLUMNS TERMINATED BY "\\x01"
  7. )
  8. WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
  1. 4.导入一批“负”数据
  1. LOAD LABEL example_db.label4
  2. (
  3. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/old_file")
  4. NEGATIVE
  5. INTO TABLE my_table
  6. COLUMNS TERMINATED BY "\t"
  7. )
  8. WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
  1. 5.导入一批数据,指定分区
  1. LOAD LABEL example_db.label5
  2. (
  3. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  4. INTO TABLE my_table
  5. PARTITION (p1, p2)
  6. COLUMNS TERMINATED BY ","
  7. (k1, k3, k2, v1, v2)
  8. )
  9. WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
  1. 6.导入一批数据,指定分区, 并对导入文件的列做一些转化,如下:
  • k1将tmp_k1时间戳列转化为datetime类型的数据

  • k2将tmp_k2 date类型的数据转化为datetime的数据

  • k3将tmp_k3时间戳列转化为天级别时间戳

  • k4指定导入默认值为1

  • k5将tmp_k1、tmp_k2、tmp_k3列计算md5串

导入语句为:

  1. LOAD LABEL example_db.label6
  2. (
  3. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  4. INTO TABLE my_table
  5. PARTITION (p1, p2)
  6. COLUMNS TERMINATED BY ","
  7. (tmp_k1, tmp_k2, tmp_k3, v1, v2)
  8. SET (
  9. k1 = strftime("%Y-%m-%d %H:%M:%S", tmp_k1)),
  10. k2 = time_format("%Y-%m-%d %H:%M:%S", "%Y-%m-%d", tmp_k2)),
  11. k3 = alignment_timestamp("day", tmp_k3),
  12. k4 = default_value("1"),
  13. k5 = md5sum(tmp_k1, tmp_k2, tmp_k3)
  14. )
  15. )
  16. WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");

7.导入数据到含有HLL列的表,可以是表中的列或者数据里面的列

  1. LOAD LABEL example_db.label7
  2. (
  3. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  4. INTO TABLE my_table
  5. PARTITION (p1, p2)
  6. COLUMNS TERMINATED BY ","
  7. SET (
  8. v1 = hll_hash(k1),
  9. v2 = hll_hash(k2)
  10. )
  11. )
  12. WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
  1. LOAD LABEL example_db.label8
  2. (
  3. DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
  4. INTO TABLE `my_table`
  5. PARTITION (p1, p2)
  6. COLUMNS TERMINATED BY ","
  7. (k1, k2, tmp_k3, tmp_k4, v1, v2)
  8. SET (
  9. v1 = hll_hash(tmp_k3),
  10. v2 = hll_hash(tmp_k4)
  11. )
  12. )
  13. WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");

8.从ha模式的hdfs的路径中导入数据 LOAD LABEL table1_20170707 ( DATA INFILE("hdfs://bdos/palo/table1_data") INTO TABLE table1 ) WITH BROKER hdfs ( "fs.defaultFS"="hdfs://bdos", "username"="hdfs_user", "password"="hdfs_password", "dfs.nameservices"="bdos", "dfs.ha.namenodes.bdos"="nn1,nn2", "dfs.namenode.rpc-address.bdos.nn1"="host1:port1", "dfs.namenode.rpc-address.bdos.nn2"="host2:port2") PROPERTIES ( "timeout"="3600", "max_filter_ratio"="0.1" );

9.基于keytab的kerberos鉴权访问hdfs LOAD LABEL table1_20170707 ( DATA INFILE("hdfs://bdos/palo/table1_data") INTO TABLE table1 ) WITH BROKER hdfs ( "fs.defaultFS"="hdfs://bdos", "username"="hdfs_user", "password"="hdfs_password", "dfs.nameservices"="bdos", "dfs.ha.namenodes.bdos"="nn1,nn2", "dfs.namenode.rpc-address.bdos.nn1"="host1:port1", "dfs.namenode.rpc-address.bdos.nn2"="host2:port2", "hadoop.security.authentication"="kerberos", "kerberos_principal"="palo@BAIDU.COM", "kerberos_keytab"="/home/palo/palo.keytab")) PROPERTIES ( "timeout"="3600", "max_filter_ratio"="0.1" );

10.使用keytab content的kerberos鉴权访问hdfs LOAD LABEL table1_20170707 ( DATA INFILE("hdfs://bdos/palo/table1_data") INTO TABLE table1 ) WITH BROKER hdfs ( "fs.defaultFS"="hdfs://bdos", "username"="hdfs_user", "password"="hdfs_password", "dfs.nameservices"="bdos", "dfs.ha.namenodes.bdos"="nn1,nn2", "dfs.namenode.rpc-address.bdos.nn1"="host1:port1", "dfs.namenode.rpc-address.bdos.nn2"="host2:port2", "hadoop.security.authentication"="kerberos", "kerberos_principal"="palo@BAIDU.COM", "kerberos_keytab_content"="BQIAAABEAAEACUJBSURVLkNPTQAEcGFsbw")) PROPERTIES ( "timeout"="3600", "max_filter_ratio"="0.1" );

小批量导入

小批量导入是Palo新提供的一种导入方式,这种导入方式可以使用户不依赖 Hadoop,从而完成导入。此种导入方式提交任务并不是通过MySQL客户端,而是通过http协议来完成的。用户通过http协议将导入描述和数据一同发送给Palo,Palo在接收任务成功后,会立即返回给用户成功信息,但是此时,数据并未真正导入。用户需要通过 'SHOW LOAD' 命令来查看具体的导入结果。

语法:

  1. curl --location-trusted -u user:passwd -T data.file http://fe.host:port/api/{db}/{table}/_load?label=xxx

参数说明:

  • user:用户如果是在default_cluster中的,user即为user_name。否则为user_name@cluster_name。

  • label:用于指定这一批次导入的label,用于后期进行作业状态查询等。 这个参数是必须传入的。

  • columns: 用于描述导入文件中对应的列名字。如果不传入,那么认为文件中的列顺序与建表的顺序一致,指定的方式为逗号分隔,例如:columns=k1,k2,k3,k4

  • column_separator: 用于指定列与列之间的分隔符,默认的为'\t'。需要注意的是,这里应使用url编码,例如需要指定'\t’为分隔符,那么应该传入’column_separator=%09';需要指定'\x01’为分隔符,那么应该传入’column_separator=%01'

  • max_filter_ratio: 用于指定允许过滤不规范数据的最大比例,默认是0,不允许过滤。自定义指定应该如下:'max_filter_ratio=0.2',含义是允许20%的错误率。

  • hll:用于指定数据里面和表里面的HLL列的对应关系,表中的列和数据里面指定的列(如果不指定columns,则数据列里面的列也可以是表里面的其它非HLL列)通过","分割,指定多个hll列使用“:”分割,例如: 'hll1,cuid:hll2,device'

举例:

  1. 1.将本地文件'testData'中的数据导入到数据库'testDb''testTbl'的表(用户是defalut_cluster中的)
  1. curl --location-trusted -u root:root -T testData http://fe.host:port/api/testDb/testTbl/_load?label=123
  1. 2.将本地文件'testData'中的数据导入到数据库'testDb''testTbl'的表(用户是test_cluster中的)
  1. curl --location-trusted -u root@test_cluster:root -T testData http://fe.host:port/api/testDb/testTbl/_load?label=123
  1. 3.将本地文件'testData'中的数据导入到数据库'testDb''testTbl'的表, 允许20%的错误率(用户是defalut_cluster中的)
  1. curl --location-trusted -u root -T testData http://fe.host:port/api/testDb/testTbl/_load?label=123\$amp;max_filter_ratio=0.2
  1. 4.将本地文件'testData'中的数据导入到数据库'testDb''testTbl'的表, 允许20%的错误率,并且指定文件的列名(用户是defalut_cluster中的)
  1. curl --location-trusted -u root -T testData http://fe.host:port/api/testDb/testTbl/_load?label=123\$amp;max_filter_ratio=0.2\$amp;columns=k1,k2,k3
  1. 5.使用streaming方式导入(用户是defalut_cluster中的)
  1. seq 1 10 | awk '{OFS="\\t"}{print $1, $1 * 10}' | curl --location-trusted -u root -T testData http://fe.host:port/api/testDb/testTbl/_load?label=123
  1. 6.导入含有HLL列的表,可以是表中的列或者数据中的列用于生成HLL列(用户是defalut_cluster中的)
  1. curl --location-trusted -u root -T testData http://host:port/api/testDb/testTbl/_load?label=123\&max_filter_ratio=0.2\&hll=hll_column1,k1:hll_column2,k2
  1. curl --location-trusted -u root -T testData http://host:port/api/testDb/testTbl/_load?label=123\&max_filter_ratio=0.2\&hll=hll_column1,tmp_k4:hll_column2,tmp_k5\&columns=k1,k2,k3,tmp_k4,tmp_k5

Cancel Load

Cancel load用于撤销指定load label的导入作业。这是一个异步操作,任务提交成功就返回。提交后可以使用show load命令查看进度。

语法:

  1. CANCEL LOAD [FROM db_name] WHERE LABEL = "load_label";

举例:

  1. 撤销数据库 example_db 上, label example_db_test_load_label 的导入作业
  1. CANCEL LOAD FROM example_db WHERE LABEL = "example_db_test_load_label";

Export

该语句用于将指定表的数据导出到指定位置。这是一个异步操作,任务提交成功则返回。执行后可使用 SHOW EXPORT 命令查看进度。

语法:

  1. EXPORT TABLE table_name
  2. [PARTITION (p1[,p2])]
  3. TO export_path
  4. [opt_properties]
  5. broker;

table_name

当前要导出的表的表名,目前支持engine为olap和mysql的表的导出。

partition

可以只导出指定表的某些指定分区

export_path

导出的路径,需为目录。目前不能导出到本地,需要导出到broker。

opt_properties

用于指定一些特殊参数。

语法:

  1. [PROPERTIES ("key"="value", ...)]

可以指定如下参数:

  • column_separator:指定导出的列分隔符,默认为\t。

  • line_delimiter:指定导出的行分隔符,默认为\n。

broker

用于指定导出使用的broker

语法:

  1. WITH BROKER broker_name ("key"="value"[,...])

这里需要指定具体的broker name, 以及所需的broker属性

举例:

  1. 1.testTbl表中的所有数据导出到hdfs
  1. EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" WITH BROKER "broker_name" ("username"="xxx", "password"="yyy");
  1. 2.testTbl表中的分区p1,p2导出到hdfs
  1. EXPORT TABLE testTbl PARTITION (p1,p2) TO "hdfs://hdfs_host:port/a/b/c" WITH BROKER "broker_name" ("username"="xxx", "password"="yyy");
  1. 3.testTbl表中的所有数据导出到hdfs上,以","作为列分隔符
  1. EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" PROPERTIES ("column_separator"=",") WITH BROKER "broker_name" ("username"="xxx", "password"="yyy");

Delete

该语句用于按条件删除指定table(base index) partition中的数据。该操作会同时删除和此相关的rollup index的数据。

语法:

  1. DELETE FROM table_name PARTITION partition_name WHERE
  2. column_name1 op value[ AND column_name2 op value ...];

说明:

  • op的可选类型包括:=, <, >, <=, >=, !=

  • 只能指定key列上的条件。

  • 条件之间只能是“与”的关系。若希望达成“或”的关系,需要将条件分写在两个 DELETE语句中。

  • 如果没有创建partition,partition_name 同 table_name。

注意:

  • 该语句可能会降低执行后一段时间内的查询效率,影响程度取决于语句中指定的删除条件的数量,指定的条件越多,影响越大。

举例:

  1. 1.删除 my_table partition p1 k1 列值为 3 的数据行
  1. DELETE FROM my_table PARTITION p1 WHERE k1 = 3;
  1. 2.删除 my_table partition p1 k1 列值大于等于 3 k2 列值为 "abc" 的数据行
  1. DELETE FROM my_table PARTITION p1 WHERE k1 >= 3 AND k2 = "abc";