tonglin0325的个人主页

Hive学习笔记——UDF开发

实现一个UDF函数可以继承 org.apache.hadoop.hive.ql.exec.UDF,也可以继承 org.apache.hadoop.hive.ql.udf.generic.GenericUDF

1.继承UDF,参考

1
2
https://docs.microsoft.com/en-us/azure/hdinsight/hadoop/apache-hadoop-hive-java-udf

引入依赖

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.1.0-cdh5.16.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.16.2</version>
</dependency>

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Description(
name = "hello",
value = "_FUNC_(str) - from the input string"
+ "returns the value that is \"Hello $str\" ",
extended = "Example:\n"
+ " > SELECT _FUNC_(str) FROM src;"
)
public class MyUDF extends UDF {

private static final Logger logger = LoggerFactory.getLogger(MyUDF.class);

public String evaluate(String str){
try {
return "Hello " + str;
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
return "ERROR";
}
}

}

打包的时候需要注意,需要把所以依赖的jar都打进去,然后将jar包上传到HDFS上或者s3上

1
2
3
4
5
6
7
8
9
10
11
12
hive> add jar hdfs:///user/hive/udf/bigdata-1.0-SNAPSHOT-jar-with-dependencies.jar;
converting to local hdfs:///user/hive/udf/bigdata-1.0-SNAPSHOT-jar-with-dependencies.jar
Added [/tmp/5aa66ab6-35ab-45d5-bef1-5acc79d16b23_resources/bigdata-1.0-SNAPSHOT-jar-with-dependencies.jar] to class path
Added resources: [hdfs:///user/hive/udf/bigdata-1.0-SNAPSHOT-jar-with-dependencies.jar]
hive> create temporary function my_lower as "com.bigdata.hive.MyUDF";
OK
Time taken: 0.073 seconds
hive> select my_lower("123");
OK
Hello 123
Time taken: 0.253 seconds, Fetched: 1 row(s)

查看jar包

全文 >>

Hudi学习笔记——同步hive metastore

如果使用的是flink sql的话,如果想要同步表到hive metastore的话,只需要在flink sql的建表语句中添加 hive_sync 相关的一些配置即可,如下

1
2
3
4
5
6
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://xxx:9083',
'hive_sync.table'='hudi_xxxx_table',
'hive_sync.db'='default',

如果遇到不能正常建表,或者只能建出ro表的情况,报错如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
org.apache.hudi.exception.HoodieException: Got runtime exception when hive syncing hudi_xxxx_table
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:145) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.doSyncHive(StreamWriteOperatorCoordinator.java:335) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_372]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_372]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_372]
Caused by: org.apache.hudi.hive.HoodieHiveSyncException: Failed to sync partitions for table hudi_xxxx_table_ro
at org.apache.hudi.hive.HiveSyncTool.syncPartitions(HiveSyncTool.java:341) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:232) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at org.apache.hudi.hive.HiveSyncTool.doSync(HiveSyncTool.java:158) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:142) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
... 5 more
Caused by: org.apache.hudi.hive.HoodieHiveSyncException: Failed to get all partitions for table default.hudi_xxxx_table_ro
at org.apache.hudi.hive.HoodieHiveSyncClient.getAllPartitions(HoodieHiveSyncClient.java:180) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at org.apache.hudi.hive.HiveSyncTool.syncPartitions(HiveSyncTool.java:317) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:232) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at org.apache.hudi.hive.HiveSyncTool.doSync(HiveSyncTool.java:158) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:142) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
... 5 more
Caused by: org.apache.hadoop.hive.metastore.api.NoSuchObjectException: @hive#default.hudi_xxxx_table_ro table not found
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partitions_result$get_partitions_resultStandardScheme.read(ThriftHiveMetastore.java) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partitions_result$get_partitions_resultStandardScheme.read(ThriftHiveMetastore.java) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partitions_result.read(ThriftHiveMetastore.java) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partitions(ThriftHiveMetastore.java:2958) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partitions(ThriftHiveMetastore.java:2943) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitions(HiveMetaStoreClient.java:1368) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitions(HiveMetaStoreClient.java:1362) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at sun.reflect.GeneratedMethodAccessor125.invoke(Unknown Source) ~[?:?]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_372]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372]
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:212) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at com.sun.proxy.$Proxy92.listPartitions(Unknown Source) ~[?:?]
at sun.reflect.GeneratedMethodAccessor125.invoke(Unknown Source) ~[?:?]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_372]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372]
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2773) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at com.sun.proxy.$Proxy92.listPartitions(Unknown Source) ~[?:?]
at org.apache.hudi.hive.HoodieHiveSyncClient.getAllPartitions(HoodieHiveSyncClient.java:175) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at org.apache.hudi.hive.HiveSyncTool.syncPartitions(HiveSyncTool.java:317) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:232) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at org.apache.hudi.hive.HiveSyncTool.doSync(HiveSyncTool.java:158) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:142) ~[blob_p-8126a52f21d07b448344e4277f6fb0837c921987-71810e5577eef736251ddb420010bd50:?]
... 5 more

原因是没有使用正确的hudi-flink-bundle jar包的原因,可用的jar需要自行编译打包hudi项目来得到,参考:Flink SQL操作Hudi并同步Hive使用总结

自行编译hudi-flink-bundle jar包的步骤:

1.git clone hudi项目,并且切到使用的hudi版本的分支上,比如0.13.0

1
2
3
git clone git@github.com:apache/hudi.git
git checkout release-0.13.0

2.编译hudi-flink-hundle jar包,这里使用的hive metastore是hive2,flink版本是1.16.0

1
2
mvn clean package -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive2 -Dflink1.16 -Dscala-2.12 -Dspark3

全文 >>

Java——NIO

1.缓冲区Buffer

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.nio.IntBuffer;

//主类
//Function : IntBuffer_demo
public class IntBuffer_demo {

public static void main(String[] args) {
// TODO 自动生成的方法存根
IntBuffer buf = IntBuffer.allocate(10); //开辟10个大小的缓冲区
System.out.print("1.写入数据之前的position、limit和capacity");
System.out.println("position="+buf.position()+"、limit="+buf.limit()+"、capacity="+buf.capacity());
int temp[] = {3,5,7}; //定义整型数组
buf.put(3); //向缓冲区写入数据
buf.put(temp); //向缓冲区中写入一组数据
System.out.print("2.写入数据之后的position、limit和capacity");
System.out.println("position="+buf.position()+"、limit="+buf.limit()+"、capacity="+buf.capacity());
buf.flip(); //重设缓冲区,改变指针
System.out.print("3.准备输出数据时的position、limit和capacity");
System.out.println("position="+buf.position()+"、limit="+buf.limit()+"、capacity="+buf.capacity());
while(buf.hasRemaining()){
int x = buf.get();
System.out.print(x+"、");
}
}

}

 

 

创建子缓冲区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.nio.IntBuffer;

//主类
//Function : IntBuffer_demo
public class IntBuffer_demo {

public static void main(String[] args) {
// TODO 自动生成的方法存根

IntBuffer buf = IntBuffer.allocate(10); //开辟10个大小的缓冲区
IntBuffer sub = null; //定义缓冲区对象
for(int i=0;i<10;i++){
buf.put(2*i+1);
}
buf.position(2);
buf.limit(6);
sub = buf.slice(); //开辟子缓冲区
for(int i=0;i<sub.capacity();i++){
int temp = sub.get(i);
sub.put(temp-1);
}
buf.flip(); //重设缓冲区
buf.limit(buf.capacity()); //设置limit
System.out.println("主缓冲区中的内容:");
while(buf.hasRemaining()){
int x = buf.get(); //取出当前内容
System.out.print(x+"、");
}
}

}

 

全文 >>

Spark学习笔记——读写Avro

1.DataFrame API读取avro文件

1
2
https://sparkbyexamples.com/spark/read-write-avro-file-spark-dataframe/

pom引入,spark2.4.0之后可以使用apache的spark-avro包,之前需要使用databricks的spark-avro包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
<!--avro-->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.0</version>
<exclusions>
<exclusion>
<artifactId>jackson-databind</artifactId>
<groupId>com.fasterxml.jackson.core</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.12.1</version>
</dependency>
<!--spark-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0.cloudera2</version>
<exclusions>
<exclusion>
<artifactId>avro</artifactId>
<groupId>org.apache.avro</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0.cloudera2</version>
<exclusions>
<exclusion>
<artifactId>avro</artifactId>
<groupId>org.apache.avro</groupId>
</exclusion>
<exclusion>
<artifactId>parquet-jackson</artifactId>
<groupId>com.twitter</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>2.4.0.cloudera2</version>
</dependency>

读取avro文件,得到DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.io.File

import org.apache.avro.Schema
import org.apache.spark.sql.SparkSession

object SparkAvroDemo {

def main(args: Array[String]) {

val sparkSession = SparkSession.builder()
.master("local")
.appName("spark session example")
.getOrCreate()

val schemaAvro = new Schema.Parser().parse(new File("src/main/avro/kst.avsc"))

val df = sparkSession.read
.format("avro")
.option("avroSchema", schemaAvro.toString)
.load("file:///Users/lintong/coding/java/interview/bigdata/data/000000_0")

df.show()

}

}

 

 

DataFrame API写入avro文件 

1
2
df.write.format("avro").save("person.avro")

 

2.spark使用Hadoop API读取avro文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val sparkSession = SparkSession.builder()
.master("local")
.appName("spark session example")
.getOrCreate()

val sc = sparkSession.sparkContext
val conf = sc.hadoopConfiguration
val path = "file:///Users/lintong/coding/java/interview/bigdata/data/000000_0"
val rdd = sc.newAPIHadoopFile(
path,
classOf[AvroKeyInputFormat[test_serializer]],
classOf[AvroKey[test_serializer]],
classOf[NullWritable],
conf
).map(_._1.datum())
rdd.foreach(println(_))

全文 >>

scala报Exception in thread "main" java.lang.NoSuchMethodError:scala.Product.$init$(Lscala/Product;)V

IDEA中运行spark报如下错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Exception in thread "main" java.lang.NoSuchMethodError: scala.Product.$init$(Lscala/Product;)V
at org.apache.spark.SparkConf$DeprecatedConfig.<init>(SparkConf.scala:799)
at org.apache.spark.SparkConf$.<init>(SparkConf.scala:596)
at org.apache.spark.SparkConf$.<clinit>(SparkConf.scala)
at org.apache.spark.SparkConf.set(SparkConf.scala:94)
at org.apache.spark.SparkConf.set(SparkConf.scala:83)
at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$1(SparkSession.scala:916)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:916)

是由于本应该使用scala 2.12,但是在IDEA中配置的却是Scala 2.11导致的

 

将其修改成2.12就解决了

 

将parquet schema转换成avro schema

1.引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!--parquet-->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.10.0</version>
</dependency>
<!--hadoop-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>

2.从parquet文件的footer读取parquet schema

1
2
3
4
5
6
7
8
9
10
11
12
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.schema.MessageType;

Configuration config = new Configuration();
Path parquetPath = new Path("file:///Users/lintong/Downloads/xxxx.snappy.parquet");
ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(parquetPath, config));
MessageType parquetSchema = reader.getFooter().getFileMetaData().getSchema();
System.out.println(parquetSchema);

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
message TestSerializer {
optional binary string1 (UTF8);
optional int32 int1;
optional int32 tinyint1;
optional int32 smallint1;
optional int64 bigint1;
optional boolean boolean1;
optional double float1;
optional double double1;
optional group list1 (LIST) {
repeated binary array (UTF8);
}
optional group map1 (LIST) {
repeated group array {
optional binary key (UTF8);
optional int32 value;
}
}
optional group struct1 {
optional int32 sInt;
optional boolean sBoolean;
optional binary sString (UTF8);
}
optional binary enum1 (UTF8);
optional int32 nullableint;
}

3.将parquet schema转换成avro schema

1
2
3
4
5
6
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.avro.Schema;

Schema avroSchema = new AvroSchemaConverter(config).convert(parquetSchema);
System.out.println(avroSchema);

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
{
"type":"record",
"name":"TestSerializer",
"fields":[
{
"name":"string1",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"int1",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"tinyint1",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"smallint1",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"bigint1",
"type":[
"null",
"long"
],
"default":null
},
{
"name":"boolean1",
"type":[
"null",
"boolean"
],
"default":null
},
{
"name":"float1",
"type":[
"null",
"double"
],
"default":null
},
{
"name":"double1",
"type":[
"null",
"double"
],
"default":null
},
{
"name":"list1",
"type":[
"null",
{
"type":"array",
"items":"string"
}
],
"default":null
},
{
"name":"map1",
"type":[
"null",
{
"type":"array",
"items":{
"type":"record",
"name":"array",
"fields":[
{
"name":"key",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"value",
"type":[
"null",
"int"
],
"default":null
}
]
}
}
],
"default":null
},
{
"name":"struct1",
"type":[
"null",
{
"type":"record",
"name":"struct1",
"fields":[
{
"name":"sInt",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"sBoolean",
"type":[
"null",
"boolean"
],
"default":null
},
{
"name":"sString",
"type":[
"null",
"string"
],
"default":null
}
]
}
],
"default":null
},
{
"name":"enum1",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"nullableint",
"type":[
"null",
"int"
],
"default":null
}
]
}

全文 >>

CDH5.16安装lzo

1.在CDH管理页面进入parcels,下载GPLEXTRAS

1
2
3
4
lintong@master:/opt/cloudera/parcel-repo$ ls | grep GPLEXTRAS
GPLEXTRAS-5.16.2-1.cdh5.16.2.p0.8-xenial.parcel
GPLEXTRAS-5.16.2-1.cdh5.16.2.p0.8-xenial.parcel.sha1

将sha1改成sha

1
2
sudo mv GPLEXTRAS-5.16.2-1.cdh5.16.2.p0.8-xenial.parcel.sha1 GPLEXTRAS-5.16.2-1.cdh5.16.2.p0.8-xenial.parcel.sha

如果parcels的哈希文件不存在,可以这样生成

1
2
sha1sum ./SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012-xenial.parcel | cut -d ' ' -f 1 > SPARK2-2.4..cloudera2-1.cdh5.13.3.p0.1041012-xenial.parcel.sha1

2.在界面上分配并激活

 

 

全文 >>

Flink学习笔记——Flink State

Flink有两种基本类型的状态:托管状态(Managed State)原生状态(Raw State)。两者的区别:Managed State是由Flink管理的,Flink帮忙存储、恢复和优化,Raw State是开发者自己管理的,需要自己序列化。
| |Managed State|Raw State

Raw State
|状态管理方式| Flink Runtime托管, 自动存储, 自动恢复, 自动伸缩| 用户自己管理
|状态数据结构| Flink提供多种常用数据结构, 例如:ListState, MapState等| 字节数组: byte[]
|使用场景| 绝大数Flink算子| 所有算子

ListState使用案例:使用flink实现一个topN的程序

Managed State又有两种类型:Keyed State和Operator State。

在Flink 中,Keyed State 是绑定到数据流中的特定键(Key)的。 这意味着Flink 维护了数据流中每一个key 的状态。 当处理一个新的元素时,Flink 会将状态的键设置为该元素的键,然后调用对应的处理函数。 Keyed State 的一个典型应用场景是计算每个键的滚动聚合。参考:Flink系列 13. 介绍Flink中的Operator State 和 Keyed State

Operator State可以被所有的operators使用。它通常适用于source,例如FlinkKafkaConsumer。每个operator状态都绑定到一个并行operator实例。Kafka consumer的每个并行实例都维护着一个topic分区和offset的映射,作为它的operator状态。官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/state/#operator-state
| |Operator State|Keyed State

Keyed State
|适用算子类型| 可用于所有算子: 常用于source, 例如 FlinkKafkaConsumer|只适用于KeyedStream上的算子

只适用于KeyedStream上的算子
|状态分配|一个算子的子任务对应一个状态|一个Key对应一个State: 一个算子会处理多个Key, 则访问相应的多个State

一个算子的子任务对应一个状态
|创建和访问方式|实现CheckpointedFunction或ListCheckpointed(已经过时)接口|重写RichFunction, 通过里面的RuntimeContext访问

全文 >>

Java——Java网络

1.IP和InetAddress

 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import java.net.InetAddress;

public class InetAddress_demo {

public static void main(String[] args) throws Exception{
// TODO 自动生成的方法存根
InetAddress locAdd = null; //声明InetAddress对象
InetAddress remAdd = null; //声明InetAddress对象

locAdd = InetAddress.getLocalHost(); //得到本地的InetAddress对象
remAdd = InetAddress.getByName("www.baidu.com");

System.out.println("本机的IP地址:"+locAdd.getHostAddress());
System.out.println("远程的IP地址:"+remAdd.getHostAddress());
System.out.println("本机是否可达:"+locAdd.isReachable(5000));
}

}

2.URLConnection,URLEncoder和URLDecoder

1.使用URL读取内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import java.awt.im.InputContext;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Scanner;

public class URL_demo {

public static void main(String[] args) throws Exception {
// TODO 自动生成的方法存根
URL url = new URL("http","10.108.84.220",8080,"/Hello/Hello.html"); //指定操作的URL
InputStream input = url.openStream(); //打开输入流,读取URL内容
Scanner scan = new Scanner(input); //实例化读取分隔符
scan.useDelimiter("\n"); //设置读取分隔符
while(scan.hasNext()){
System.out.println(scan.next());
}
}

}

 

全文 >>

Mybatis学习笔记——缓存

Mybatis默认情况下,只开启一级缓存,一级缓存只是相对于同一个SqlSession而言。

如果想要开启二级缓存,则需要在xml配置文件中添加

1
2
<cache/>

此外,还要求返回的POJO对象要实现Serializable接口