tonglin0325的个人主页

Hive学习笔记——UDF开发

实现一个UDF函数可以继承 org.apache.hadoop.hive.ql.exec.UDF,也可以继承 org.apache.hadoop.hive.ql.udf.generic.GenericUDF

1.继承UDF,参考

1
2
https://docs.microsoft.com/en-us/azure/hdinsight/hadoop/apache-hadoop-hive-java-udf

引入依赖

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.1.0-cdh5.16.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.16.2</version>
</dependency>

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Description(
name = "hello",
value = "_FUNC_(str) - from the input string"
+ "returns the value that is \"Hello $str\" ",
extended = "Example:\n"
+ " > SELECT _FUNC_(str) FROM src;"
)
public class MyUDF extends UDF {

private static final Logger logger = LoggerFactory.getLogger(MyUDF.class);

public String evaluate(String str){
try {
return "Hello " + str;
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
return "ERROR";
}
}

}

打包的时候需要注意,需要把所以依赖的jar都打进去,然后将jar包上传到HDFS上或者s3上

1
2
3
4
5
6
7
8
9
10
11
12
hive> add jar hdfs:///user/hive/udf/bigdata-1.0-SNAPSHOT-jar-with-dependencies.jar;
converting to local hdfs:///user/hive/udf/bigdata-1.0-SNAPSHOT-jar-with-dependencies.jar
Added [/tmp/5aa66ab6-35ab-45d5-bef1-5acc79d16b23_resources/bigdata-1.0-SNAPSHOT-jar-with-dependencies.jar] to class path
Added resources: [hdfs:///user/hive/udf/bigdata-1.0-SNAPSHOT-jar-with-dependencies.jar]
hive> create temporary function my_lower as "com.bigdata.hive.MyUDF";
OK
Time taken: 0.073 seconds
hive> select my_lower("123");
OK
Hello 123
Time taken: 0.253 seconds, Fetched: 1 row(s)

查看jar包

1
2
3
hive> list jar;
/tmp/5aa66ab6-35ab-45d5-bef1-5acc79d16b23_resources/bigdata-1.0-SNAPSHOT-jar-with-dependencies.jar

删除jar包

1
2
hive> delete jar /tmp/5aa66ab6-35ab-45d5-bef1-5acc79d16b23_resources/bigdata-1.0-SNAPSHOT-jar-with-dependencies.jar;

查看function

1
2
3
4
5
hive> show functions like '*lower';
OK
lower
Time taken: 0.016 seconds, Fetched: 1 row(s)

删除function

1
2
hive> drop function if exists my_lower;

全文 >>

Spark学习笔记——读写Avro

1.DataFrame API读取avro文件

1
2
https://sparkbyexamples.com/spark/read-write-avro-file-spark-dataframe/

pom引入,spark2.4.0之后可以使用apache的spark-avro包,之前需要使用databricks的spark-avro包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
<!--avro-->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.0</version>
<exclusions>
<exclusion>
<artifactId>jackson-databind</artifactId>
<groupId>com.fasterxml.jackson.core</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.12.1</version>
</dependency>
<!--spark-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0.cloudera2</version>
<exclusions>
<exclusion>
<artifactId>avro</artifactId>
<groupId>org.apache.avro</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0.cloudera2</version>
<exclusions>
<exclusion>
<artifactId>avro</artifactId>
<groupId>org.apache.avro</groupId>
</exclusion>
<exclusion>
<artifactId>parquet-jackson</artifactId>
<groupId>com.twitter</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>2.4.0.cloudera2</version>
</dependency>

读取avro文件,得到DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.io.File

import org.apache.avro.Schema
import org.apache.spark.sql.SparkSession

object SparkAvroDemo {

def main(args: Array[String]) {

val sparkSession = SparkSession.builder()
.master("local")
.appName("spark session example")
.getOrCreate()

val schemaAvro = new Schema.Parser().parse(new File("src/main/avro/kst.avsc"))

val df = sparkSession.read
.format("avro")
.option("avroSchema", schemaAvro.toString)
.load("file:///Users/lintong/coding/java/interview/bigdata/data/000000_0")

df.show()

}

}

全文 >>

scala报Exception in thread "main" java.lang.NoSuchMethodError:scala.Product.$init$(Lscala/Product;)V

IDEA中运行spark报如下错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Exception in thread "main" java.lang.NoSuchMethodError: scala.Product.$init$(Lscala/Product;)V
at org.apache.spark.SparkConf$DeprecatedConfig.<init>(SparkConf.scala:799)
at org.apache.spark.SparkConf$.<init>(SparkConf.scala:596)
at org.apache.spark.SparkConf$.<clinit>(SparkConf.scala)
at org.apache.spark.SparkConf.set(SparkConf.scala:94)
at org.apache.spark.SparkConf.set(SparkConf.scala:83)
at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$1(SparkSession.scala:916)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:916)

是由于本应该使用scala 2.12,但是在IDEA中配置的却是Scala 2.11导致的

全文 >>

将parquet schema转换成avro schema

1.引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!--parquet-->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.10.0</version>
</dependency>
<!--hadoop-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>

2.从parquet文件的footer读取parquet schema

1
2
3
4
5
6
7
8
9
10
11
12
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.schema.MessageType;

Configuration config = new Configuration();
Path parquetPath = new Path("file:///Users/lintong/Downloads/xxxx.snappy.parquet");
ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(parquetPath, config));
MessageType parquetSchema = reader.getFooter().getFileMetaData().getSchema();
System.out.println(parquetSchema);

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
message TestSerializer {
optional binary string1 (UTF8);
optional int32 int1;
optional int32 tinyint1;
optional int32 smallint1;
optional int64 bigint1;
optional boolean boolean1;
optional double float1;
optional double double1;
optional group list1 (LIST) {
repeated binary array (UTF8);
}
optional group map1 (LIST) {
repeated group array {
optional binary key (UTF8);
optional int32 value;
}
}
optional group struct1 {
optional int32 sInt;
optional boolean sBoolean;
optional binary sString (UTF8);
}
optional binary enum1 (UTF8);
optional int32 nullableint;
}

3.将parquet schema转换成avro schema

1
2
3
4
5
6
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.avro.Schema;

Schema avroSchema = new AvroSchemaConverter(config).convert(parquetSchema);
System.out.println(avroSchema);

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
{
"type":"record",
"name":"TestSerializer",
"fields":[
{
"name":"string1",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"int1",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"tinyint1",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"smallint1",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"bigint1",
"type":[
"null",
"long"
],
"default":null
},
{
"name":"boolean1",
"type":[
"null",
"boolean"
],
"default":null
},
{
"name":"float1",
"type":[
"null",
"double"
],
"default":null
},
{
"name":"double1",
"type":[
"null",
"double"
],
"default":null
},
{
"name":"list1",
"type":[
"null",
{
"type":"array",
"items":"string"
}
],
"default":null
},
{
"name":"map1",
"type":[
"null",
{
"type":"array",
"items":{
"type":"record",
"name":"array",
"fields":[
{
"name":"key",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"value",
"type":[
"null",
"int"
],
"default":null
}
]
}
}
],
"default":null
},
{
"name":"struct1",
"type":[
"null",
{
"type":"record",
"name":"struct1",
"fields":[
{
"name":"sInt",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"sBoolean",
"type":[
"null",
"boolean"
],
"default":null
},
{
"name":"sString",
"type":[
"null",
"string"
],
"default":null
}
]
}
],
"default":null
},
{
"name":"enum1",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"nullableint",
"type":[
"null",
"int"
],
"default":null
}
]
}

参考:https://stackoverflow.com/questions/54159454/how-to-convert-parquet-schema-to-avro-in-java-scala

CDH5.16安装lzo

1.在CDH管理页面进入parcels,下载GPLEXTRAS

1
2
3
4
lintong@master:/opt/cloudera/parcel-repo$ ls | grep GPLEXTRAS
GPLEXTRAS-5.16.2-1.cdh5.16.2.p0.8-xenial.parcel
GPLEXTRAS-5.16.2-1.cdh5.16.2.p0.8-xenial.parcel.sha1

将sha1改成sha

1
2
sudo mv GPLEXTRAS-5.16.2-1.cdh5.16.2.p0.8-xenial.parcel.sha1 GPLEXTRAS-5.16.2-1.cdh5.16.2.p0.8-xenial.parcel.sha

如果parcels的哈希文件不存在,可以这样生成

1
2
sha1sum ./SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012-xenial.parcel | cut -d ' ' -f 1 > SPARK2-2.4..cloudera2-1.cdh5.13.3.p0.1041012-xenial.parcel.sha1

2.在界面上分配并激活

全文 >>

Flink学习笔记——Flink State

Flink有两种基本类型的状态:托管状态(Managed State)原生状态(Raw State)。两者的区别:Managed State是由Flink管理的,Flink帮忙存储、恢复和优化,Raw State是开发者自己管理的,需要自己序列化。
|

全文 >>

Mybatis学习笔记——缓存

Mybatis默认情况下,只开启一级缓存,一级缓存只是相对于同一个SqlSession而言。

如果想要开启二级缓存,则需要在xml配置文件中添加

1
2
<cache/>

此外,还要求返回的POJO对象要实现Serializable接口

全文 >>