tonglin0325的个人主页

Spark学习笔记——基于MLlib的机器学习

使用MLlib库中的机器学习算法对垃圾邮件进行分类

分类的垃圾邮件的如图中分成4个文件夹,两个文件夹是训练集合,两个文件夹是测试集合

build.sbt文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
name := "spark-first"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % "2.1.0",
"org.apache.hadoop" % "hadoop-common" % "2.7.2",
"mysql" % "mysql-connector-java" % "5.1.31",
"org.apache.spark" %% "spark-sql" % "2.1.0",
"org.apache.spark" %% "spark-streaming" % "2.1.0",
"org.apache.spark" % "spark-mllib_2.11" % "2.1.0"
)

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import org.apache.hadoop.io.{IntWritable, LongWritable, MapWritable, Text}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.spark.sql.SQLContext
import java.util.Properties

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds

import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD

/**
* Created by common on 17-4-6.
*/
object SparkRDD {

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)

val spam = sc.textFile("input/email/spam")
val normal = sc.textFile("input/email/ham")

// 创建一个HashingTF实例来把邮件文本映射为包含10000个特征的向量
val tf = new HashingTF(numFeatures = 10000)
// 各邮件都被切分为单词,每个单词被映射为一个特征
val spamFeatures = spam.map(email => tf.transform(email.split(" ")))
val normalFeatures = normal.map(email => tf.transform(email.split(" ")))
// 创建LabeledPoint数据集分别存放阳性(垃圾邮件)和阴性(正常邮件)的例子
val positiveExamples = spamFeatures.map(features => LabeledPoint(1, features))
val negativeExamples = normalFeatures.map(features => LabeledPoint(0, features))
val trainingData = positiveExamples.union(negativeExamples)
trainingData.cache() // 因为逻辑回归是迭代算法,所以缓存训练数据RDD
// 使用SGD算法运行逻辑回归
val model = new LogisticRegressionWithSGD().run(trainingData)
// 以阳性(垃圾邮件)和阴性(正常邮件)的例子分别进行测试
val posTest = tf.transform(
"Experience with BiggerPenis Today! Grow 3-inches more ...".split(" "))
val negTest = tf.transform(
"That is cold. Is there going to be a retirement party? ...".split(" "))
println("Prediction for positive test example: " + model.predict(posTest))
println("Prediction for negative test example: " + model.predict(negTest))

}
}

全文 >>

Spark学习笔记——Spark Streaming

许多应用需要即时处理收到的数据,例如用来实时追踪页面访问统计的应用、训练机器学习模型的应用, 还有自动检测异常的应用。Spark Streaming 是 Spark 为这些应用而设计的模型。它允许用户使用一套和批处理非常接近的 API 来编写流式计算应用,这样就可以大量重用批处理应用的技术甚至代码。
Spark Streaming 使用离散化流( discretized stream)作为抽象表示, 叫作 DStream。 DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。DStream 可以从各种输入源创建,比如 Flume、 Kafka 或者 HDFS。创建出来的 DStream 支持两种操作,一种是转化操作( transformation)会生成一个新的DStream,另一种是输出操作( output operation)可以把数据写入外部系统中。DStream提供了许多与 RDD 所支持的操作相类似的操作支持,还增加了与时间相关的新操作,比如滑动窗口。

build.sbt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
name := "spark-first"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % "2.1.0",
"org.apache.hadoop" % "hadoop-common" % "2.7.2",
"mysql" % "mysql-connector-java" % "5.1.31",
"org.apache.spark" %% "spark-sql" % "2.1.0",
"org.apache.spark" %% "spark-streaming" % "2.1.0"

)

全文 >>

Scala学习笔记——断言和单元测试

1.断言

**assert(conditon)**将在条件不成立的时候,抛出assertionError

**assert(conditon,explanation)**讲在条件不成立的时候,抛出explanation作为说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.scala.first

/**
* Created by common on 17-4-19.
*/
object Assert {

def main(args: Array[String]): Unit = {
val a = new Assert()
a.above1(0)

}

}

class Assert {
val value = 1

def above(that: Int): Unit = {
val thatVal = that
val thisVal = this.value
//如果条件不满足,Exception in thread "main" java.lang.AssertionError: assertion failed
assert(thatVal == thisVal)
}

//另一种断言
//如果条件不满足,Exception in thread "main" java.lang.AssertionError: assertion failed
def above1(that: Int): Unit = {
{
val thatVal = that
val thisVal = this.value
} ensuring(that == this.value)

}
}

全文 >>

Scala学习笔记——样本类和模式匹配

1.样本类

在申明的类前面加上一个case修饰符,带有这种修饰符的类被称为样本类(case class)。

被申明为样本类的类的特点:1.会添加和类名一致的工厂方法;2.样本类参数列表中的所有参数隐式获得了val前缀,因此它被当做字段维护;3.编译器被这个样本类添加了toString、hashcode、equals方法的实现;4.支持了模式匹配

全文 >>

Spark学习笔记——读写HBase

1.首先在HBase中建立一张表,名字为student

参考 Hbase学习笔记——基本CRUD操作

一个cell的值,取决于Row,Column family,Column Qualifier和Timestamp

HBase表结构

2.往HBase中写入数据,写入的时候,需要写family和column

build.sbt

1
2
3
4
5
6
7
8
9
10
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.1.0",
"mysql" % "mysql-connector-java" % "5.1.31",
"org.apache.spark" %% "spark-sql" % "2.1.0",
"org.apache.hbase" % "hbase-common" % "1.3.0",
"org.apache.hbase" % "hbase-client" % "1.3.0",
"org.apache.hbase" % "hbase-server" % "1.3.0",
"org.apache.hbase" % "hbase" % "1.2.1"
)

在hbaseshell中写数据的时候,写的是String,但是在idea中写代码的话,如果写的是int类型的,就会出现\x00…的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql._
import java.util.Properties

import com.google.common.collect.Lists
import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Get, Put, Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf

/**
* Created by mi on 17-4-11.
*/

case class resultset(name: String,
info: String,
summary: String)

case class IntroItem(name: String, value: String)


case class BaikeLocation(name: String,
url: String = "",
info: Seq[IntroItem] = Seq(),
summary: Option[String] = None)

case class MewBaikeLocation(name: String,
url: String = "",
info: Option[String] = None,
summary: Option[String] = None)


object MysqlOpt {

def main(args: Array[String]): Unit = {

// 本地模式运行,便于测试
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
// 创建 spark context
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

//定义数据库和表信息
val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&characterEncoding=UTF-8"
val table = "baike_pages"

// 读取Hbase文件,在hbase的/usr/local/hbase/conf/hbase-site.xml中写的地址
val hbasePath = "file:///usr/local/hbase/hbase-tmp"

// 创建hbase configuration
val hBaseConf = HBaseConfiguration.create()
hBaseConf.set(TableInputFormat.INPUT_TABLE, "student")

// 初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!
val jobConf = new JobConf(hBaseConf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "student")

val indataRDD = sc.makeRDD(Array("1,99,98","2,97,96","3,95,94"))

val rdd = indataRDD.map(_.split(',')).map{arr=>{
/*一个Put对象就是一行记录,在构造方法中指定主键
* 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换
* Put.add方法接收三个参数:列族,列名,数据
*/
val put = new Put(Bytes.toBytes(arr(0)))
put.add(Bytes.toBytes("course"),Bytes.toBytes("math"),Bytes.toBytes(arr(1)))
put.add(Bytes.toBytes("course"),Bytes.toBytes("english"),Bytes.toBytes(arr(2)))
//转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset
(new ImmutableBytesWritable, put)
}}

rdd.saveAsHadoopDataset(jobConf)

sc.stop()

}

}

全文 >>

Spark学习笔记——读写HDFS

使用Spark读写HDFS中的parquet文件

文件夹中的parquet文件

build.sbt文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
name := "spark-hbase"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.1.0",
"mysql" % "mysql-connector-java" % "5.1.31",
"org.apache.spark" %% "spark-sql" % "2.1.0",
"org.apache.hbase" % "hbase-common" % "1.3.0",
"org.apache.hbase" % "hbase-client" % "1.3.0",
"org.apache.hbase" % "hbase-server" % "1.3.0",
"org.apache.hbase" % "hbase" % "1.2.1"
)

全文 >>

Scala学习笔记——简化代码、柯里化、继承、特质

1.简化代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.scala.first

import java.io.File
import javax.management.Query

/**
* Created by common on 17-4-5.
*/
object FileMatcher {

def main(args: Array[String]) {

for (file <- filesHere)
println(file)

println()


for (file <- filesMatching("src", _.endsWith(_)))
println(file)

for (file <- filesEnding("src"))
println(file)

}

private def filesHere = (new File(".")).listFiles

//matcher是传入一个函数,返回boolean值,比如_.endsWith(_)
private def filesMatching(query: String, matcher: (String, String) => Boolean) = {
for (file <- filesHere; if matcher(file.getName, query)) yield file
}

//上面的函数不够简洁,下面是更加简洁的定义
private def filesMatch(matcher: String => Boolean) = {
for (file <- filesHere; if matcher(file.getName)) yield file
}

//然后可以定义使用不同matcher()的方法
def filesEnding(query: String) = {
filesMatch(_.endsWith(query))
}

//使用exists来简化代码
def containsOdd(nums: List[Int]): Boolean = {
nums.exists(_ % 2 == 1)
}

def containsNeg(nums: List[Int]): Boolean = {
nums.exists(_ < 0)
}

}

全文 >>

Spark学习笔记——读写MySQL

1.使用Spark读取MySQL中某个表中的信息

build.sbt文件

1
2
3
4
5
6
7
8
9
10
11
12
name := "spark-hbase"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.1.0",
"mysql" % "mysql-connector-java" % "5.1.31",
"org.apache.spark" %% "spark-sql" % "2.1.0"
)

全文 >>