tonglin0325的个人主页

Spark学习笔记——读写Avro

1.DataFrame API读取avro文件

1
2
https://sparkbyexamples.com/spark/read-write-avro-file-spark-dataframe/

pom引入,spark2.4.0之后可以使用apache的spark-avro包,之前需要使用databricks的spark-avro包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
<!--avro-->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.0</version>
<exclusions>
<exclusion>
<artifactId>jackson-databind</artifactId>
<groupId>com.fasterxml.jackson.core</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.12.1</version>
</dependency>
<!--spark-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0.cloudera2</version>
<exclusions>
<exclusion>
<artifactId>avro</artifactId>
<groupId>org.apache.avro</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0.cloudera2</version>
<exclusions>
<exclusion>
<artifactId>avro</artifactId>
<groupId>org.apache.avro</groupId>
</exclusion>
<exclusion>
<artifactId>parquet-jackson</artifactId>
<groupId>com.twitter</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>2.4.0.cloudera2</version>
</dependency>

读取avro文件,得到DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.io.File

import org.apache.avro.Schema
import org.apache.spark.sql.SparkSession

object SparkAvroDemo {

def main(args: Array[String]) {

val sparkSession = SparkSession.builder()
.master("local")
.appName("spark session example")
.getOrCreate()

val schemaAvro = new Schema.Parser().parse(new File("src/main/avro/kst.avsc"))

val df = sparkSession.read
.format("avro")
.option("avroSchema", schemaAvro.toString)
.load("file:///Users/lintong/coding/java/interview/bigdata/data/000000_0")

df.show()

}

}

全文 >>

scala报Exception in thread "main" java.lang.NoSuchMethodError:scala.Product.$init$(Lscala/Product;)V

IDEA中运行spark报如下错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Exception in thread "main" java.lang.NoSuchMethodError: scala.Product.$init$(Lscala/Product;)V
at org.apache.spark.SparkConf$DeprecatedConfig.<init>(SparkConf.scala:799)
at org.apache.spark.SparkConf$.<init>(SparkConf.scala:596)
at org.apache.spark.SparkConf$.<clinit>(SparkConf.scala)
at org.apache.spark.SparkConf.set(SparkConf.scala:94)
at org.apache.spark.SparkConf.set(SparkConf.scala:83)
at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$1(SparkSession.scala:916)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:916)

是由于本应该使用scala 2.12,但是在IDEA中配置的却是Scala 2.11导致的

全文 >>

将parquet schema转换成avro schema

1.引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!--parquet-->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.10.0</version>
</dependency>
<!--hadoop-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>

2.从parquet文件的footer读取parquet schema

1
2
3
4
5
6
7
8
9
10
11
12
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.schema.MessageType;

Configuration config = new Configuration();
Path parquetPath = new Path("file:///Users/lintong/Downloads/xxxx.snappy.parquet");
ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(parquetPath, config));
MessageType parquetSchema = reader.getFooter().getFileMetaData().getSchema();
System.out.println(parquetSchema);

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
message TestSerializer {
optional binary string1 (UTF8);
optional int32 int1;
optional int32 tinyint1;
optional int32 smallint1;
optional int64 bigint1;
optional boolean boolean1;
optional double float1;
optional double double1;
optional group list1 (LIST) {
repeated binary array (UTF8);
}
optional group map1 (LIST) {
repeated group array {
optional binary key (UTF8);
optional int32 value;
}
}
optional group struct1 {
optional int32 sInt;
optional boolean sBoolean;
optional binary sString (UTF8);
}
optional binary enum1 (UTF8);
optional int32 nullableint;
}

3.将parquet schema转换成avro schema

1
2
3
4
5
6
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.avro.Schema;

Schema avroSchema = new AvroSchemaConverter(config).convert(parquetSchema);
System.out.println(avroSchema);

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
{
"type":"record",
"name":"TestSerializer",
"fields":[
{
"name":"string1",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"int1",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"tinyint1",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"smallint1",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"bigint1",
"type":[
"null",
"long"
],
"default":null
},
{
"name":"boolean1",
"type":[
"null",
"boolean"
],
"default":null
},
{
"name":"float1",
"type":[
"null",
"double"
],
"default":null
},
{
"name":"double1",
"type":[
"null",
"double"
],
"default":null
},
{
"name":"list1",
"type":[
"null",
{
"type":"array",
"items":"string"
}
],
"default":null
},
{
"name":"map1",
"type":[
"null",
{
"type":"array",
"items":{
"type":"record",
"name":"array",
"fields":[
{
"name":"key",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"value",
"type":[
"null",
"int"
],
"default":null
}
]
}
}
],
"default":null
},
{
"name":"struct1",
"type":[
"null",
{
"type":"record",
"name":"struct1",
"fields":[
{
"name":"sInt",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"sBoolean",
"type":[
"null",
"boolean"
],
"default":null
},
{
"name":"sString",
"type":[
"null",
"string"
],
"default":null
}
]
}
],
"default":null
},
{
"name":"enum1",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"nullableint",
"type":[
"null",
"int"
],
"default":null
}
]
}

参考:https://stackoverflow.com/questions/54159454/how-to-convert-parquet-schema-to-avro-in-java-scala

CDH5.16安装lzo

1.在CDH管理页面进入parcels,下载GPLEXTRAS

1
2
3
4
lintong@master:/opt/cloudera/parcel-repo$ ls | grep GPLEXTRAS
GPLEXTRAS-5.16.2-1.cdh5.16.2.p0.8-xenial.parcel
GPLEXTRAS-5.16.2-1.cdh5.16.2.p0.8-xenial.parcel.sha1

将sha1改成sha

1
2
sudo mv GPLEXTRAS-5.16.2-1.cdh5.16.2.p0.8-xenial.parcel.sha1 GPLEXTRAS-5.16.2-1.cdh5.16.2.p0.8-xenial.parcel.sha

如果parcels的哈希文件不存在,可以这样生成

1
2
sha1sum ./SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012-xenial.parcel | cut -d ' ' -f 1 > SPARK2-2.4..cloudera2-1.cdh5.13.3.p0.1041012-xenial.parcel.sha1

2.在界面上分配并激活

全文 >>

Flink学习笔记——Flink State

Flink有两种基本类型的状态:托管状态(Managed State)原生状态(Raw State)。两者的区别:Managed State是由Flink管理的,Flink帮忙存储、恢复和优化,Raw State是开发者自己管理的,需要自己序列化。
|

全文 >>

Mybatis学习笔记——缓存

Mybatis默认情况下,只开启一级缓存,一级缓存只是相对于同一个SqlSession而言。

如果想要开启二级缓存,则需要在xml配置文件中添加

1
2
<cache/>

此外,还要求返回的POJO对象要实现Serializable接口

全文 >>

SpringBoot学习笔记——缓存

Springboot可以使用Ehcache或者redis作为缓存

1.Ehcache缓存

参考:SpringBoot学习-(十八)SpringBoot整合EhCache

添加依赖,starter+ehcache

1
2
3
4
5
6
7
8
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<dependency>
<groupId>net.sf.ehcache</groupId>
<artifactId>ehcache</artifactId>
</dependency>

添加配置文件 ecache.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
<?xml version="1.0" encoding="UTF-8"?>
<ehcache xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://ehcache.org/ehcache.xsd">

<!--timeToIdleSeconds 当缓存闲置n秒后销毁 -->
<!--timeToLiveSeconds 当缓存存活n秒后销毁 -->
<!-- 缓存配置
name:缓存名称。
maxElementsInMemory:缓存最大个数。
eternal:对象是否永久有效,一但设置了,timeout将不起作用。
timeToIdleSeconds:设置对象在失效前的允许闲置时间(单位:秒)。仅当eternal=false对象不是永久有效时使用,可选属性,默认值是0,也就是可闲置时间无穷大。
timeToLiveSeconds:设置对象在失效前允许存活时间(单位:秒)。最大时间介于创建时间和失效时间之间。仅当eternal=false对象不是永久有效时使用,默认是0.,也就是对象存活时间无穷大。
overflowToDisk:当内存中对象数量达到maxElementsInMemory时,Ehcache将会对象写到磁盘中。 diskSpoolBufferSizeMB:这个参数设置DiskStore(磁盘缓存)的缓存区大小。默认是30MB。每个Cache都应该有自己的一个缓冲区。
maxElementsOnDisk:硬盘最大缓存个数。
diskPersistent:是否缓存虚拟机重启期数据 Whether the disk
store persists between restarts of the Virtual Machine. The default value
is false.
diskExpiryThreadIntervalSeconds:磁盘失效线程运行时间间隔,默认是120秒。 memoryStoreEvictionPolicy:当达到maxElementsInMemory限制时,Ehcache将会根据指定的策略去清理内存。默认策略是
LRU(最近最少使用)。你可以设置为FIFO(先进先出)或是LFU(较少使用)。
clearOnFlush:内存数量最大时是否清除。 -->

<!-- 磁盘缓存位置 -->
<diskStore path="java.io.tmpdir"/>
<!-- 默认缓存 -->
<defaultCache
maxElementsInMemory="10000"
eternal="false"
timeToIdleSeconds="120"
timeToLiveSeconds="120"
maxElementsOnDisk="10000000"
diskExpiryThreadIntervalSeconds="120"
memoryStoreEvictionPolicy="LRU">

<persistence strategy="localTempSwap"/>
</defaultCache>

<!-- 测试 -->
<cache name="GoodsType"
eternal="false"
timeToIdleSeconds="2400"
timeToLiveSeconds="2400"
maxEntriesLocalHeap="10000"
maxEntriesLocalDisk="10000000"
diskExpiryThreadIntervalSeconds="120"
overflowToDisk="false"
memoryStoreEvictionPolicy="LRU">
</cache>

</ehcache>

添加 @EnableCaching注解 开启缓存

全文 >>