tonglin0325的个人主页

Spark学习笔记——Spark Streaming

许多应用需要即时处理收到的数据,例如用来实时追踪页面访问统计的应用、训练机器学习模型的应用, 还有自动检测异常的应用。Spark Streaming 是 Spark 为这些应用而设计的模型。它允许用户使用一套和批处理非常接近的 API 来编写流式计算应用,这样就可以大量重用批处理应用的技术甚至代码。
Spark Streaming 使用离散化流( discretized stream)作为抽象表示, 叫作 DStream。 DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。DStream 可以从各种输入源创建,比如 Flume、 Kafka 或者 HDFS。创建出来的 DStream 支持两种操作,一种是转化操作( transformation)会生成一个新的DStream,另一种是输出操作( output operation)可以把数据写入外部系统中。DStream提供了许多与 RDD 所支持的操作相类似的操作支持,还增加了与时间相关的新操作,比如滑动窗口。

build.sbt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
name := "spark-first"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % "2.1.0",
"org.apache.hadoop" % "hadoop-common" % "2.7.2",
"mysql" % "mysql-connector-java" % "5.1.31",
"org.apache.spark" %% "spark-sql" % "2.1.0",
"org.apache.spark" %% "spark-streaming" % "2.1.0"

)

全文 >>

Scala学习笔记——断言和单元测试

1.断言

**assert(conditon)**将在条件不成立的时候,抛出assertionError

**assert(conditon,explanation)**讲在条件不成立的时候,抛出explanation作为说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.scala.first

/**
* Created by common on 17-4-19.
*/
object Assert {

def main(args: Array[String]): Unit = {
val a = new Assert()
a.above1(0)

}

}

class Assert {
val value = 1

def above(that: Int): Unit = {
val thatVal = that
val thisVal = this.value
//如果条件不满足,Exception in thread "main" java.lang.AssertionError: assertion failed
assert(thatVal == thisVal)
}

//另一种断言
//如果条件不满足,Exception in thread "main" java.lang.AssertionError: assertion failed
def above1(that: Int): Unit = {
{
val thatVal = that
val thisVal = this.value
} ensuring(that == this.value)

}
}

全文 >>

Scala学习笔记——样本类和模式匹配

1.样本类

在申明的类前面加上一个case修饰符,带有这种修饰符的类被称为样本类(case class)。

被申明为样本类的类的特点:1.会添加和类名一致的工厂方法;2.样本类参数列表中的所有参数隐式获得了val前缀,因此它被当做字段维护;3.编译器被这个样本类添加了toString、hashcode、equals方法的实现;4.支持了模式匹配

全文 >>

Spark学习笔记——读写HBase

1.首先在HBase中建立一张表,名字为student

参考 Hbase学习笔记——基本CRUD操作

一个cell的值,取决于Row,Column family,Column Qualifier和Timestamp

HBase表结构

2.往HBase中写入数据,写入的时候,需要写family和column

build.sbt

1
2
3
4
5
6
7
8
9
10
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.1.0",
"mysql" % "mysql-connector-java" % "5.1.31",
"org.apache.spark" %% "spark-sql" % "2.1.0",
"org.apache.hbase" % "hbase-common" % "1.3.0",
"org.apache.hbase" % "hbase-client" % "1.3.0",
"org.apache.hbase" % "hbase-server" % "1.3.0",
"org.apache.hbase" % "hbase" % "1.2.1"
)

在hbaseshell中写数据的时候,写的是String,但是在idea中写代码的话,如果写的是int类型的,就会出现\x00…的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql._
import java.util.Properties

import com.google.common.collect.Lists
import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Get, Put, Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf

/**
* Created by mi on 17-4-11.
*/

case class resultset(name: String,
info: String,
summary: String)

case class IntroItem(name: String, value: String)


case class BaikeLocation(name: String,
url: String = "",
info: Seq[IntroItem] = Seq(),
summary: Option[String] = None)

case class MewBaikeLocation(name: String,
url: String = "",
info: Option[String] = None,
summary: Option[String] = None)


object MysqlOpt {

def main(args: Array[String]): Unit = {

// 本地模式运行,便于测试
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
// 创建 spark context
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

//定义数据库和表信息
val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&characterEncoding=UTF-8"
val table = "baike_pages"

// 读取Hbase文件,在hbase的/usr/local/hbase/conf/hbase-site.xml中写的地址
val hbasePath = "file:///usr/local/hbase/hbase-tmp"

// 创建hbase configuration
val hBaseConf = HBaseConfiguration.create()
hBaseConf.set(TableInputFormat.INPUT_TABLE, "student")

// 初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!
val jobConf = new JobConf(hBaseConf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "student")

val indataRDD = sc.makeRDD(Array("1,99,98","2,97,96","3,95,94"))

val rdd = indataRDD.map(_.split(',')).map{arr=>{
/*一个Put对象就是一行记录,在构造方法中指定主键
* 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换
* Put.add方法接收三个参数:列族,列名,数据
*/
val put = new Put(Bytes.toBytes(arr(0)))
put.add(Bytes.toBytes("course"),Bytes.toBytes("math"),Bytes.toBytes(arr(1)))
put.add(Bytes.toBytes("course"),Bytes.toBytes("english"),Bytes.toBytes(arr(2)))
//转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset
(new ImmutableBytesWritable, put)
}}

rdd.saveAsHadoopDataset(jobConf)

sc.stop()

}

}

全文 >>

Spark学习笔记——读写HDFS

使用Spark读写HDFS中的parquet文件

文件夹中的parquet文件

build.sbt文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
name := "spark-hbase"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.1.0",
"mysql" % "mysql-connector-java" % "5.1.31",
"org.apache.spark" %% "spark-sql" % "2.1.0",
"org.apache.hbase" % "hbase-common" % "1.3.0",
"org.apache.hbase" % "hbase-client" % "1.3.0",
"org.apache.hbase" % "hbase-server" % "1.3.0",
"org.apache.hbase" % "hbase" % "1.2.1"
)

全文 >>

Scala学习笔记——简化代码、柯里化、继承、特质

1.简化代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.scala.first

import java.io.File
import javax.management.Query

/**
* Created by common on 17-4-5.
*/
object FileMatcher {

def main(args: Array[String]) {

for (file <- filesHere)
println(file)

println()


for (file <- filesMatching("src", _.endsWith(_)))
println(file)

for (file <- filesEnding("src"))
println(file)

}

private def filesHere = (new File(".")).listFiles

//matcher是传入一个函数,返回boolean值,比如_.endsWith(_)
private def filesMatching(query: String, matcher: (String, String) => Boolean) = {
for (file <- filesHere; if matcher(file.getName, query)) yield file
}

//上面的函数不够简洁,下面是更加简洁的定义
private def filesMatch(matcher: String => Boolean) = {
for (file <- filesHere; if matcher(file.getName)) yield file
}

//然后可以定义使用不同matcher()的方法
def filesEnding(query: String) = {
filesMatch(_.endsWith(query))
}

//使用exists来简化代码
def containsOdd(nums: List[Int]): Boolean = {
nums.exists(_ % 2 == 1)
}

def containsNeg(nums: List[Int]): Boolean = {
nums.exists(_ < 0)
}

}

全文 >>

Spark学习笔记——读写MySQL

1.使用Spark读取MySQL中某个表中的信息

build.sbt文件

1
2
3
4
5
6
7
8
9
10
11
12
name := "spark-hbase"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.1.0",
"mysql" % "mysql-connector-java" % "5.1.31",
"org.apache.spark" %% "spark-sql" % "2.1.0"
)

全文 >>

Spark学习笔记——Spark on YARN

Spark运行的时候,采用的是主从结构,有一个节点负责中央协调, 调度各个分布式工作节点。这个中央协调节点被称为驱动器( Driver) 节点。与之对应的工作节点被称为执行器( executor) 节点

所有的 Spark 程序都遵循同样的结构:程序从输入数据创建一系列 RDD, 再使用转化操作派生出新的 RDD,最后使用行动操作收集或存储结果 RDD 中的数据。

1.驱动器节点:

Spark 驱动器是执行你的程序中的 main() 方法的进程。它执行用户编写的用来创建 SparkContext、创建 RDD,以及进行 RDD 的转化操作和行动操作的代码。其实,当你启动 Spark shell 时,你就启动了一个 Spark 驱动器程序

驱动器程序在 Spark 应用中有下述两个职责:1.把用户程序转为任务 2.为执行器节点调度任务

2.执行器节点:

Spark 执行器节点是一种工作进程,负责在 Spark 作业中运行任务,任务间相互独立。 Spark 应用启动时, 执行器节点就被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。

执行器进程有两大作用: 第一,它们负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程; 第二,它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。

3.集群管理器:

驱动器节点和执行器节点是如何启动的呢? Spark 依赖于集群管理器来启动执行器节点,而在某些特殊情况下,也依赖集群管理器来启动驱动器节点。

全文 >>