Hive 提供了一些内置函数,如果无法满足需求,还可以编写用户自定义函数(UDF),并在查询中调用这些函数。
内置函数 在 Beeline 或 CLI 中,使用以下命令查看 Hive 内置函数及使用方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 > show functions ; +-------------------------+--+ | tab_name | +-------------------------+--+ | ! | | abs | | upper | | 下略 | +-------------------------+--+ > desc function abs; +-------------------------------------------+--+ | tab_name | +-------------------------------------------+--+ | abs(x) - returns the absolute value of x | +-------------------------------------------+--+ > desc function extended abs; +-------------------------------------------+--+ | tab_name | +-------------------------------------------+--+ | abs(x) - returns the absolute value of x | | Example: | | > SELECT abs(0) FROM src LIMIT 1; | | 0 | | > SELECT abs(-5) FROM src LIMIT 1; | | 5 | +-------------------------------------------+--+
用户定义函数 用户定义函数必须用 Java 语言编写,因为 Hive 本身也是用 Java 写的。对于其他编程语言,可以考虑使用 SELECT TRANSFORM
查询,让数据流经用户定义的脚本。
Hive 有三种用户定义函数:
UDF(User-Defined-Function):作用于单个数据行,且产生一个数据行作为输出。一进一出 。
UDAF(User-Defined Aggregation Function):接受多个输入数据行,并产生一个数据行作为输出。多进一出 。
UDTF(User-Defined Table-Generating Functions):作用于单个数据行,且产生多个数据行(即一个表)作为输出。一进多出 。
从使用方式上来看,大多数函数(比如数学函数和字符串函数)和 UDF 类似(除了不是用户定义的);聚合函数(比如 COUNT 和 MAX)和 UDAF 类似;带 UDTF 的 SELECT 在使用时有一些限制,相对来说较少使用。
创建UDF UDF 创建步骤:
继承 org.apache.hadoop.hive.ql.UDF
。
实现 evaluate
函数。evaluate
函数支持重载。
在 Hive Shell 中注册函数:
1 2 3 4 5 6 7 8 ADD jar linux_jar_pathCREATE [TEMPORARY] FUNCTION [dbname.]function_name AS class_name;CREATE [TEMPORARY] FUNCTION [dbname.]function_name AS class_name USING JAR jar_path;DROP [TEMPORARY] FUNCTION [if exists ] [dbname.]function_name;
注意:UDF 必须有返回类型,可以返回 null
,但返回类型不能是 void
。
创建一个名为 strip
的 UDF 函数,实现和 Hive 内置的 trim
函数一样功能:删除字符串首尾的“空白字符”或“指定字符集中的字符”。
创建工程 创建 Maven 工程,配置 pom.xml
:
pom.xml 1 2 3 4 5 6 7 <dependencies > <dependency > <groupId > org.apache.hive</groupId > <artifactId > hive-exec</artifactId > <version > 2.1.1</version > </dependency > </dependencies >
编写代码 创建 Strip
类,继承 org.apache.hadoop.hive.ql.exec.UDF
类,实现 evaluate()
方法(支持重载 )。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import org.apache.commons.lang.StringUtils;import org.apache.hadoop.hive.ql.exec.UDF;import org.apache.hadoop.io.Text;public class Strip extends UDF { private Text result = new Text(); public Text evaluate (Text str) { if (str == null ) { return null ; } result.set(StringUtils.strip(str.toString())); return result; } public Text evaluate (Text str, String stripChars) { if (str == null ) { return null ; } result.set(StringUtils.strip(str.toString(), stripChars)); return result; } }
一个 UDF 必须满足下面两个条件:
一个 UDF 必须是org.apache.hadoop.hive.ql.exec.UDF
的子类。
一个 UDF 必须实现 evaluate()
方法。
其中, evaluate()
方法并非由接口定义,因为它接受的参数个数、参数的数据类型及其返回值的数据类型都是不确定的。Hive 会检查 UDF,看能否找到和函数调用相匹配的 evaluate()
方法。
这个 Strip 类有两个 evaluate()
方法,分别用来删除字符串首尾的空白字符或指定字符集中的字符。实际的字符处理由 org.apache.commons.lang.StringUtils
这个类来完成。
注册函数 将该工程打包得到 hiveudf-strip.jar
,上传到 HDFS,然后在 metastore 中注册这个函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 > CREATE FUNCTION strip AS 'moe.sannaha.hiveudf.Strip' USING JAR 'hdfs://node01:8020/udf/hiveudf-strip.jar' ;> SELECT strip(' sannaha ' ) FROM dual;+ | _c0 | + | sannaha | + > SELECT strip('sannaha' ,'sa' ) FROM dual;+ | _c0 | + | nnah | + > DROP FUNCTION strip;
Hive 中并没有像 Oracle 中 dual 这样的虚表,调用一些与表数据无关的函数时可能会有所不便。可以自行创建一个 dual 表来实现相似的用法。dual.txt
文件中保存任意一个字符串即可。
1 2 3 CREATE TABLE dual (dual STRING);load data local inpath '/data/dual.txt' overwrite into table dual; select rand() from dual;
创建UDAF Hive 允许使用两种类型的 UDAF:简单的(simple)和通用的(generic)。简单 UDAF 很容易编写,但是由于使用了 Java Reflection,因此会导致性能下降,并且不允许使用可变长度参数列表之类的功能;通用 UDAF 允许这些功能,但编写起来可能不像简单 UDAF 那样直观。
这里只讨论简单 UDAF,通用 UDAF 的例子请参考官方的教程。
计算最大值 创建一个名为 maximum
的简单 UDAF,实现和 Hive 内置的 max
函数的功能——返回最大值。
编写代码 创建 Maximum
类,继承 org.apache.hadoop.hive.ql.exec.UDAF
类,且包含一个或多个实现了 org.apache.hadoop.hive.ql.exec.UDAFEvaluator
的静态类,在静态类中编写计算函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 import org.apache.hadoop.hive.ql.exec.UDAF;import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;import org.apache.hadoop.io.IntWritable;public class Maximum extends UDAF { public static class MaximumIntUDAFEvaluator implements UDAFEvaluator { private IntWritable result; @Override public void init () { result = null ; } public boolean iterate (IntWritable value) { if (value == null ) { return true ; } if (result == null ) { result = new IntWritable(value.get()); } else { result.set(Math.max(result.get(), value.get())); } return true ; } public IntWritable terminatePartial () { return result; } public boolean merge (IntWritable other) { return iterate(other); } public IntWritable terminate () { return result; } } }
一个计算函数必须实现以下5个方法:
init()
:初始化计算函数并重设它的内部状态。
iterate()
:每次对一个新值进行聚合函数计算时都会调用该方法,接受的参数和在 Hive 中调用函数的参数对应。如果输入值合法,返回true。
terminatePartial()
:Hive 需要一个“部分的聚合结果(partial result)”时会调用该方法,这个方法必须返回一个封装了当前聚合计算状态的对象。
merge()
:Hive决定要合并“一个部分”和“另一个部分”时会调用该方法。该方法接受一个对象作为输入,这个对象的类型必须和 terminatePartial()
方法的返回类型一致。这个计算最值 的示例比较特殊,因为不同部分进行聚合计算的处理逻辑和一个新值参加聚合计算时一致(计算平均值 就不能这样写了),所以 merge()
方法可以直接使用 iterate()
方法。
terminate()
:Hive 需要最终聚合结果时会调用该方法。
注册函数 将该工程打包得到 hiveudaf-maximum.jar
,上传到 HDFS,然后注册这个函数(这里注册为临时函数)。
1 2 3 4 5 6 7 8 > CREATE TEMPORARY FUNCTION maximum AS 'moe.sannaha.hiveudaf.Maximum' USING JAR 'hdfs://node01:8020/udf/hiveudaf-maximum.jar' ; > SELECT maximum(score) FROM int_records; +------+--+ | _c0 | +------+--+ | 99 | +------+--+
计算函数的处理流程:
计算均值 上个示例有一个特别的现象:不同部分进行聚合计算的处理逻辑和一个新值参加聚合计算时一致,即 merge()
方法里可以直接调用 iterate()
的处理逻辑。对于更复杂的聚合函数,比如计算平均值的 UDAF,就需要针对性地编写 merge()
和 terminate()
的处理逻辑。因为从数学的角度来看,不能简单地把两个部分的均值求和去得到最终的均值(类似 MapReduce 的 Combiner 步骤)。
编写代码 创建 Mean
类,继承 org.apache.hadoop.hive.ql.exec.UDAF
类,且包含一个或多个实现了 org.apache.hadoop.hive.ql.exec.UDAFEvaluator
的静态类,在静态类中编写计算函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 import org.apache.hadoop.hive.ql.exec.UDAF;import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;import org.apache.hadoop.hive.serde2.io.DoubleWritable;public class Mean extends UDAF { public static class MeanDoubleUDAFEvaluator implements UDAFEvaluator { public static class PartialResult { double sum; long count; } private PartialResult partialResult; @Override public void init () { partialResult = null ; } public boolean iterate (Double value) { if (value == null ) { return true ; } if (partialResult == null ) { partialResult = new PartialResult(); } partialResult.sum += value; partialResult.count++; return true ; } public PartialResult terminatePartial () { return partialResult; } public boolean merge (PartialResult other) { if (other == null ) { return true ; } if (partialResult == null ) { partialResult = new PartialResult(); } partialResult.sum += other.sum; partialResult.count += other.count; return true ; } public Double terminate () { if (partialResult == null ) { return null ; } return partialResult.sum / partialResult.count; } } }
编写 UDF 函数时不能使用 org.apache.hadoop.io.DoubleWritable!
注意这里的 `DoubleWritable` 需要使用 `org.apache.hadoop.hive.serde2.io.DoubleWritable`,该类继承自 `org.apache.hadoop.io.DoubleWritable`,如果直接使用后者,则在调用函数执行查询时会报错:
1 2 > select mean(s_score) from records; Error: Error while compiling statement: FAILED: NoMatchingMethodException No matching method for class moe.sannaha.hiveudaf.Mean with (double). Possible choices: _FUNC_(struct<value:double>)
官方文档给出的说明是:hadoop.hive.serde2.io.DoubleWritable
类是在 Hadoop 版本的 DoubleWritable
类(即 hadoop.io.DoubleWritable
)可用之前创建的,并且需要保留该类以实现第三方 UDF / SerDes 的向后兼容性。我们应该考虑删除此类,以便在下一个主要版本中直接使用 Hadoop 版本的。
目前 hadoop.hive.serde2.io
这个包下的类如下图所示:
注册函数 将该工程打包得到 hiveudaf-mean.jar
,上传到 HDFS,然后注册这个函数(这里注册为临时函数)。
1 > CREATE TEMPORARY FUNCTION mean AS 'moe.sannaha.hiveudaf.Mean' USING JAR 'hdfs://node01:8020/udf/hiveudaf-mean.jar' ;
测试函数 1 2 3 4 5 6 > select mean(score) from double_records;+ | _c0 | + | 86.375 | +
创建UDTF 列转行 创建一个名为 explosion
的 UDTF,将某列中的数据根据指定分隔符切割,并将切割后得到的多行数据返回。
编写代码 创建 Explosion
类,继承 org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
,实现 initialize()
、process()
和 close()
三个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package moe.sannaha.hiveudtf;import org.apache.hadoop.hive.ql.exec.UDFArgumentException;import org.apache.hadoop.hive.ql.metadata.HiveException;import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;import java.util.ArrayList;public class Explosion extends GenericUDTF { private ArrayList<String> outList = new ArrayList<String>(); @Override public StructObjectInspector initialize (StructObjectInspector argOIs) throws UDFArgumentException { ArrayList<String> fieldNames = new ArrayList<String>(); ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); fieldNames.add("Column2Row" ); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } public void process (Object[] args) throws HiveException { String data = args[0 ].toString(); String splitKey = args[1 ].toString(); String[] fields = data.split(splitKey); for (String field : fields) { outList.clear(); outList.add(field); forward(outList); } } public void close () throws HiveException { } }
注册函数 将该工程打包得到 hiveudtf-explosion.jar
,上传到 HDFS,注册这个函数:
1 CREATE TEMPORARY FUNCTION explosion AS 'moe.sannaha.hiveudtf.Explosion' USING JAR 'hdfs://devcdh1.cdh.com:8020/hdfsdata/hiveudtf-explosion.jar' ;
测试函数 准备测试数据:
udtfdata.csv 1 2 3 1 AHK,BigData,CDH 2 Flink,Flume,GFW,Git 3 HBase,Hadoop,Hive
创建测试表,加载数据并测试函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 > create table udtftest(id int ,info string) row format delimited fields terminated by '\t' ;> load data local inpath '/data/udtfdata.csv' into table udtftest;> select explosion(info,',' ) from udtftest where id in (1 ,2 );Column2Row AHK BigData CDH Flink Flume GFW Git
参考资料 Creating Custom UDFs Writing GenericUDAFs: A Tutorial org.apache.hadoop.hive.serde2.io.DoubleWritable org.apache.hadoop.io.DoubleWritable