0%

Hive函数

Hive 提供了一些内置函数,如果无法满足需求,还可以编写用户自定义函数(UDF),并在查询中调用这些函数。


内置函数

在 Beeline 或 CLI 中,使用以下命令查看 Hive 内置函数及使用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 查看内置函数
> show functions;
+-------------------------+--+
| tab_name |
+-------------------------+--+
| ! |
| abs |
| upper |
| 下略 |
+-------------------------+--+

# 查看内置函数的用法
> desc function abs;
+-------------------------------------------+--+
| tab_name |
+-------------------------------------------+--+
| abs(x) - returns the absolute value of x |
+-------------------------------------------+--+

# 查看内置函数的用法及用例
> desc function extended abs;
+-------------------------------------------+--+
| tab_name |
+-------------------------------------------+--+
| abs(x) - returns the absolute value of x |
| Example: |
| > SELECT abs(0) FROM src LIMIT 1; |
| 0 |
| > SELECT abs(-5) FROM src LIMIT 1; |
| 5 |
+-------------------------------------------+--+

用户定义函数

用户定义函数必须用 Java 语言编写,因为 Hive 本身也是用 Java 写的。对于其他编程语言,可以考虑使用 SELECT TRANSFORM 查询,让数据流经用户定义的脚本。

Hive 有三种用户定义函数:

  • UDF(User-Defined-Function):作用于单个数据行,且产生一个数据行作为输出。一进一出
  • UDAF(User-Defined Aggregation Function):接受多个输入数据行,并产生一个数据行作为输出。多进一出
  • UDTF(User-Defined Table-Generating Functions):作用于单个数据行,且产生多个数据行(即一个表)作为输出。一进多出

从使用方式上来看,大多数函数(比如数学函数和字符串函数)和 UDF 类似(除了不是用户定义的);聚合函数(比如 COUNT 和 MAX)和 UDAF 类似;带 UDTF 的 SELECT 在使用时有一些限制,相对来说较少使用。

创建UDF

UDF 创建步骤:

  1. 继承 org.apache.hadoop.hive.ql.UDF
  2. 实现 evaluate 函数。evaluate 函数支持重载。
  3. 在 Hive Shell 中注册函数:
1
2
3
4
5
6
7
8
-- 添加jar包路径
ADD jar linux_jar_path
-- 创建函数
CREATE [TEMPORARY] FUNCTION [dbname.]function_name AS class_name;
-- 或是一步到位
CREATE [TEMPORARY] FUNCTION [dbname.]function_name AS class_name USING JAR jar_path;
-- 删除函数
DROP [TEMPORARY] FUNCTION [if exists] [dbname.]function_name;

注意:UDF 必须有返回类型,可以返回 null,但返回类型不能是 void

创建一个名为 strip 的 UDF 函数,实现和 Hive 内置的 trim 函数一样功能:删除字符串首尾的“空白字符”或“指定字符集中的字符”。

创建工程

创建 Maven 工程,配置 pom.xml

pom.xml
1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>

编写代码

创建 Strip 类,继承 org.apache.hadoop.hive.ql.exec.UDF类,实现 evaluate() 方法(支持重载)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public class Strip extends UDF {
private Text result = new Text();

//删除字符串首尾的空白字符
public Text evaluate(Text str) {
if (str == null) {
return null;
}
result.set(StringUtils.strip(str.toString()));
return result;
}

//删除字符串首尾的出现在指定字符集中的字符
public Text evaluate(Text str, String stripChars) {
if (str == null) {
return null;
}
result.set(StringUtils.strip(str.toString(), stripChars));
return result;
}
}

一个 UDF 必须满足下面两个条件:

  • 一个 UDF 必须是org.apache.hadoop.hive.ql.exec.UDF的子类。
  • 一个 UDF 必须实现 evaluate() 方法。

其中, evaluate() 方法并非由接口定义,因为它接受的参数个数、参数的数据类型及其返回值的数据类型都是不确定的。Hive 会检查 UDF,看能否找到和函数调用相匹配的 evaluate() 方法。

这个 Strip 类有两个 evaluate() 方法,分别用来删除字符串首尾的空白字符或指定字符集中的字符。实际的字符处理由 org.apache.commons.lang.StringUtils 这个类来完成。

注册函数

将该工程打包得到 hiveudf-strip.jar ,上传到 HDFS,然后在 metastore 中注册这个函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
-- 注册函数并对该函数命名为strip
> CREATE FUNCTION strip AS 'moe.sannaha.hiveudf.Strip' USING JAR 'hdfs://node01:8020/udf/hiveudf-strip.jar';

-- 在查询中调用该函数
> SELECT strip(' sannaha ') FROM dual;
+----------+--+
| _c0 |
+----------+--+
| sannaha |
+----------+--+

> SELECT strip('sannaha','sa') FROM dual;
+-------+--+
| _c0 |
+-------+--+
| nnah |
+-------+--+

-- 删除函数
> DROP FUNCTION strip;

如何实现 dual 表?

Hive 中并没有像 Oracle 中 dual 这样的虚表,调用一些与表数据无关的函数时可能会有所不便。可以自行创建一个 dual 表来实现相似的用法。dual.txt 文件中保存任意一个字符串即可。

1
2
3
CREATE TABLE dual (dual STRING);
load data local inpath '/data/dual.txt' overwrite into table dual;
select rand() from dual;

创建UDAF

Hive 允许使用两种类型的 UDAF:简单的(simple)和通用的(generic)。简单 UDAF 很容易编写,但是由于使用了 Java Reflection,因此会导致性能下降,并且不允许使用可变长度参数列表之类的功能;通用 UDAF 允许这些功能,但编写起来可能不像简单 UDAF 那样直观。

这里只讨论简单 UDAF,通用 UDAF 的例子请参考官方的教程。

计算最大值

创建一个名为 maximum 的简单 UDAF,实现和 Hive 内置的 max 函数的功能——返回最大值。

编写代码

创建 Maximum 类,继承 org.apache.hadoop.hive.ql.exec.UDAF类,且包含一个或多个实现了 org.apache.hadoop.hive.ql.exec.UDAFEvaluator 的静态类,在静态类中编写计算函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;

public class Maximum extends UDAF {
public static class MaximumIntUDAFEvaluator implements UDAFEvaluator {
private IntWritable result;

//初始化计算函数并重设它的内部状态
@Override
public void init() {
result = null;
}

//每次对一个新值进行聚合函数计算时都会调用该方法,接受的参数和在Hive中调用函数的参数对应
//如果输入值合法,返回true
public boolean iterate(IntWritable value) {
if (value == null) {
return true;
}
if (result == null) {
result = new IntWritable(value.get());
} else {
result.set(Math.max(result.get(), value.get()));
}
return true;
}

//Hive需要一个“部分的聚合结果(partial result)”时会调用该方法
//这个方法必须返回一个封装了当前聚合计算状态的对象
public IntWritable terminatePartial() {
return result;
}

//Hive决定要合并“一个部分”和“另一个部分”时会调用该方法
//该方法接受一个对象作为输入,这个对象的类型必须和terminatePartial()方法的返回类型一致
public boolean merge(IntWritable other) {
return iterate(other);
}

//Hive需要最终聚合结果时会调用该方法
public IntWritable terminate() {
return result;
}
}
}

一个计算函数必须实现以下5个方法:

  • init():初始化计算函数并重设它的内部状态。
  • iterate():每次对一个新值进行聚合函数计算时都会调用该方法,接受的参数和在 Hive 中调用函数的参数对应。如果输入值合法,返回true。
  • terminatePartial():Hive 需要一个“部分的聚合结果(partial result)”时会调用该方法,这个方法必须返回一个封装了当前聚合计算状态的对象。
  • merge():Hive决定要合并“一个部分”和“另一个部分”时会调用该方法。该方法接受一个对象作为输入,这个对象的类型必须和 terminatePartial() 方法的返回类型一致。这个计算最值的示例比较特殊,因为不同部分进行聚合计算的处理逻辑和一个新值参加聚合计算时一致(计算平均值就不能这样写了),所以 merge() 方法可以直接使用 iterate() 方法。
  • terminate():Hive 需要最终聚合结果时会调用该方法。

注册函数

将该工程打包得到 hiveudaf-maximum.jar ,上传到 HDFS,然后注册这个函数(这里注册为临时函数)。

1
2
3
4
5
6
7
8
> CREATE TEMPORARY FUNCTION maximum AS 'moe.sannaha.hiveudaf.Maximum' USING JAR 'hdfs://node01:8020/udf/hiveudaf-maximum.jar';

> SELECT maximum(score) FROM int_records;
+------+--+
| _c0 |
+------+--+
| 99 |
+------+--+

计算函数的处理流程:

包含UDAF部分结果的数据流

计算均值

上个示例有一个特别的现象:不同部分进行聚合计算的处理逻辑和一个新值参加聚合计算时一致,即 merge() 方法里可以直接调用 iterate() 的处理逻辑。对于更复杂的聚合函数,比如计算平均值的 UDAF,就需要针对性地编写 merge()terminate() 的处理逻辑。因为从数学的角度来看,不能简单地把两个部分的均值求和去得到最终的均值(类似 MapReduce 的 Combiner 步骤)。

编写代码

创建 Mean 类,继承 org.apache.hadoop.hive.ql.exec.UDAF类,且包含一个或多个实现了 org.apache.hadoop.hive.ql.exec.UDAFEvaluator 的静态类,在静态类中编写计算函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
//import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;

public class Mean extends UDAF {
public static class MeanDoubleUDAFEvaluator implements UDAFEvaluator {
//封装传入数值之和以及个数
public static class PartialResult {
double sum;
long count;
}

private PartialResult partialResult;

//初始化计算函数并重设它的内部状态
@Override
public void init() {
partialResult = null;
}

//每次对一个新值进行聚合函数计算时都会调用该方法,接受的参数和在Hive中调用函数的参数对应
//如果输入值合法,返回true
public boolean iterate(Double value) {
if (value == null) {
return true;
}
if (partialResult == null) {
partialResult = new PartialResult();
}
//对数值进行累加求和,个数自增
partialResult.sum += value;
partialResult.count++;
return true;
}

//Hive需要部分聚合结果时会调用该方法
//这个方法必须返回一个封装了聚合计算当前状态的对象
public PartialResult terminatePartial() {
return partialResult;
}

//Hive决定要合并“一个部分”和“另一个部分”时会调用该方法
//在这里对不同部分(Evaluator instance)的“部分和(parital sum)”和“部分计数值(parital count)”分别进行求和
public boolean merge(PartialResult other) {
if (other == null) {
return true;
}
if (partialResult == null) {
partialResult = new PartialResult();
}
partialResult.sum += other.sum;
partialResult.count += other.count;
return true;
}

//Hive需要最终聚合结果时会调用该方法
//在这里进行平均值的计算
public Double terminate() {
if (partialResult == null) {
return null;
}
return partialResult.sum / partialResult.count;
}
}
}

编写 UDF 函数时不能使用 org.apache.hadoop.io.DoubleWritable!

注意这里的 `DoubleWritable` 需要使用 `org.apache.hadoop.hive.serde2.io.DoubleWritable`,该类继承自 `org.apache.hadoop.io.DoubleWritable`,如果直接使用后者,则在调用函数执行查询时会报错:
1
2
> select mean(s_score) from records;
Error: Error while compiling statement: FAILED: NoMatchingMethodException No matching method for class moe.sannaha.hiveudaf.Mean with (double). Possible choices: _FUNC_(struct<value:double>)

官方文档给出的说明是:hadoop.hive.serde2.io.DoubleWritable 类是在 Hadoop 版本的 DoubleWritable 类(即 hadoop.io.DoubleWritable)可用之前创建的,并且需要保留该类以实现第三方 UDF / SerDes 的向后兼容性。我们应该考虑删除此类,以便在下一个主要版本中直接使用 Hadoop 版本的。

目前 hadoop.hive.serde2.io 这个包下的类如下图所示:

hive.serde2.io.png

注册函数

将该工程打包得到 hiveudaf-mean.jar ,上传到 HDFS,然后注册这个函数(这里注册为临时函数)。

1
> CREATE TEMPORARY FUNCTION mean AS 'moe.sannaha.hiveudaf.Mean' USING JAR 'hdfs://node01:8020/udf/hiveudaf-mean.jar';

测试函数

1
2
3
4
5
6
> select mean(score) from double_records;
+---------+--+
| _c0 |
+---------+--+
| 86.375 |
+---------+--+

创建UDTF

列转行

创建一个名为 explosion 的 UDTF,将某列中的数据根据指定分隔符切割,并将切割后得到的多行数据返回。

编写代码

创建 Explosion 类,继承 org.apache.hadoop.hive.ql.udf.generic.GenericUDTF,实现 initialize()process()close() 三个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package moe.sannaha.hiveudtf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.util.ArrayList;

public class Explosion extends GenericUDTF {

private ArrayList<String> outList = new ArrayList<String>();

@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
// 1.定义输出数据的列名和类型
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

// 2.添加输出数据的列名和类型
fieldNames.add("Column2Row");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}

public void process(Object[] args) throws HiveException {
// 1.获取原始数据
String data = args[0].toString();
// 2.获取分隔符
String splitKey = args[1].toString();
// 3.将原始数据使用分隔符进行切分
String[] fields = data.split(splitKey);
// 4.遍历切分结果,并写出
for (String field : fields) {
outList.clear();
outList.add(field);
// 写出
forward(outList);
}
}

public void close() throws HiveException {

}
}

注册函数

将该工程打包得到 hiveudtf-explosion.jar ,上传到 HDFS,注册这个函数:

1
CREATE TEMPORARY FUNCTION explosion AS 'moe.sannaha.hiveudtf.Explosion' USING JAR 'hdfs://devcdh1.cdh.com:8020/hdfsdata/hiveudtf-explosion.jar';

测试函数

准备测试数据:

udtfdata.csv
1
2
3
1	AHK,BigData,CDH
2 Flink,Flume,GFW,Git
3 HBase,Hadoop,Hive

创建测试表,加载数据并测试函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-- 创建测试表
> create table udtftest(id int,info string) row format delimited fields terminated by '\t';
-- 加载数据
> load data local inpath '/data/udtfdata.csv' into table udtftest;
-- 测试函数
> select explosion(info,',') from udtftest where id in (1,2);
Column2Row
AHK
BigData
CDH
Flink
Flume
GFW
Git

参考资料
Creating Custom UDFs
Writing GenericUDAFs: A Tutorial
org.apache.hadoop.hive.serde2.io.DoubleWritable
org.apache.hadoop.io.DoubleWritable