tonglin0325的个人主页

Spark学习笔记——构建基于Spark的推荐引擎

推荐模型

推荐模型的种类分为:

1.基于内容的过滤:基于内容的过滤利用物品的内容或是属性信息以及某些相似度定义,来求出与该物品类似的物品。

2.协同过滤:协同过滤是一种借助众包智慧的途径。它利用大量已有的用户偏好来估计用户对其未接触过的物品的喜好程度。其内在思想是相似度的定义。

在基于用户的方法的中,如果两个用户表现出相似的偏好(即对相同物品的偏好大体相同),那就认为他们的兴趣类似。

同样也可以借助基于物品的方法来做推荐。这种方法通常根据现有用户对物品的偏好或是评级情况,来计算物品之间的某种相似度。

3.矩阵分解:

3.1. 显式矩阵分解

例如我们可以得到多个用户对多部电影的评级的数据,这样我们就可以得到一个用户—电影评级的矩阵

我们所得到的这个矩阵是稀疏的,假设得到的**“用户—物品”矩阵的维度为U×I,我们需要对其进行降维,然后得到一个表示用户的U×k维矩阵和一个表示物品的k×I维矩阵**。

**要计算给定用户对某个物品的预计评级 **:

只需要从用户因子矩阵和物品因子矩阵分别选取相应的行(用户因子向量)与列(物品因子向量),然后计算两者的点积即可

对于物品之间相似度的计算,可以用最近邻模型中用到的相似度衡量方法。不同的是,这里可以直接利用物品因子向量,将相似度计算转换为对两物品因子向量之间相似度的计算

全文 >>

Spark学习笔记——基于MLlib的机器学习

使用MLlib库中的机器学习算法对垃圾邮件进行分类

分类的垃圾邮件的如图中分成4个文件夹,两个文件夹是训练集合,两个文件夹是测试集合

build.sbt文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
name := "spark-first"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % "2.1.0",
"org.apache.hadoop" % "hadoop-common" % "2.7.2",
"mysql" % "mysql-connector-java" % "5.1.31",
"org.apache.spark" %% "spark-sql" % "2.1.0",
"org.apache.spark" %% "spark-streaming" % "2.1.0",
"org.apache.spark" % "spark-mllib_2.11" % "2.1.0"
)

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import org.apache.hadoop.io.{IntWritable, LongWritable, MapWritable, Text}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.spark.sql.SQLContext
import java.util.Properties

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds

import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD

/**
* Created by common on 17-4-6.
*/
object SparkRDD {

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)

val spam = sc.textFile("input/email/spam")
val normal = sc.textFile("input/email/ham")

// 创建一个HashingTF实例来把邮件文本映射为包含10000个特征的向量
val tf = new HashingTF(numFeatures = 10000)
// 各邮件都被切分为单词,每个单词被映射为一个特征
val spamFeatures = spam.map(email => tf.transform(email.split(" ")))
val normalFeatures = normal.map(email => tf.transform(email.split(" ")))
// 创建LabeledPoint数据集分别存放阳性(垃圾邮件)和阴性(正常邮件)的例子
val positiveExamples = spamFeatures.map(features => LabeledPoint(1, features))
val negativeExamples = normalFeatures.map(features => LabeledPoint(0, features))
val trainingData = positiveExamples.union(negativeExamples)
trainingData.cache() // 因为逻辑回归是迭代算法,所以缓存训练数据RDD
// 使用SGD算法运行逻辑回归
val model = new LogisticRegressionWithSGD().run(trainingData)
// 以阳性(垃圾邮件)和阴性(正常邮件)的例子分别进行测试
val posTest = tf.transform(
"Experience with BiggerPenis Today! Grow 3-inches more ...".split(" "))
val negTest = tf.transform(
"That is cold. Is there going to be a retirement party? ...".split(" "))
println("Prediction for positive test example: " + model.predict(posTest))
println("Prediction for negative test example: " + model.predict(negTest))

}
}

 

结果

全文 >>

Spark学习笔记——Spark Streaming

许多应用需要即时处理收到的数据,例如用来实时追踪页面访问统计的应用、训练机器学习模型的应用, 还有自动检测异常的应用。Spark Streaming 是 Spark 为这些应用而设计的模型。它允许用户使用一套和批处理非常接近的 API 来编写流式计算应用,这样就可以大量重用批处理应用的技术甚至代码。
Spark Streaming 使用离散化流( discretized stream)作为抽象表示, 叫作 DStream。 DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。DStream 可以从各种输入源创建,比如 Flume、 Kafka 或者 HDFS。创建出来的 DStream 支持两种操作,一种是转化操作( transformation)会生成一个新的DStream,另一种是输出操作( output operation)可以把数据写入外部系统中。DStream提供了许多与 RDD 所支持的操作相类似的操作支持,还增加了与时间相关的新操作,比如滑动窗口。

build.sbt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
name := "spark-first"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % "2.1.0",
"org.apache.hadoop" % "hadoop-common" % "2.7.2",
"mysql" % "mysql-connector-java" % "5.1.31",
"org.apache.spark" %% "spark-sql" % "2.1.0",
"org.apache.spark" %% "spark-streaming" % "2.1.0"

)

 

代码,使用Spark Streaming对端口发过来的数据进行词频统计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.apache.hadoop.io.{IntWritable, LongWritable, MapWritable, Text}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark._

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds

/**
* Created by common on 17-4-6.
*/
object SparkRDD {

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("WordCount").setMaster("local")

// Spark streaming
// 从SparkConf创建StreamingContext并指定1秒钟的批处理大小
val ssc = new StreamingContext(conf, Seconds(1))
// 连接到本地机器7777端口上后,使用收到的数据创建DStream
val lines = ssc.socketTextStream("localhost", 7777)
// 对每一行数据执行Split操作
val words = lines.flatMap(_.split(" "))
// 统计word的数量
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// 输出结果
wordCounts.print()
ssc.start() // 开始
ssc.awaitTermination() // 计算完毕退出
}
}

 

首先在终端中运行命令,向7777端口发送数据

1
2
nc -l 7777

nc命令参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
-g<网关>:设置路由器跃程通信网关,最多设置8个; 
-G<指向器数目>:设置来源路由指向器,其数值为4的倍数;
-h:在线帮助;
-i<延迟秒数>:设置时间间隔,以便传送信息及扫描通信端口;
-l:使用监听模式,监控传入的资料;
-n:直接使用ip地址,而不通过域名服务器;
-o<输出文件>:指定文件名称,把往来传输的数据以16进制字码倾倒成该文件保存;
-p<通信端口>:设置本地主机使用的通信端口;
-r:指定源端口和目的端口都进行随机的选择;
-s<来源位址>:设置本地主机送出数据包的IP地址;
-u:使用UDP传输协议;
-v:显示指令执行过程;
-w<超时秒数>:设置等待连线的时间;
-z:使用0输入/输出模式,只在扫描通信端口时使用。

然后运行Spark Streaming程序

接着在终端中输入

1
2
3
Hello World 1 #回车
Hello World 2

中断程序,在Spark Streaming输出看见

全文 >>

Scala学习笔记——断言和单元测试

1.断言

**assert(conditon)**将在条件不成立的时候,抛出assertionError

**assert(conditon,explanation)**讲在条件不成立的时候,抛出explanation作为说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.scala.first

/**
* Created by common on 17-4-19.
*/
object Assert {

def main(args: Array[String]): Unit = {
val a = new Assert()
a.above1(0)

}

}

class Assert {
val value = 1

def above(that: Int): Unit = {
val thatVal = that
val thisVal = this.value
//如果条件不满足,Exception in thread "main" java.lang.AssertionError: assertion failed
assert(thatVal == thisVal)
}

//另一种断言
//如果条件不满足,Exception in thread "main" java.lang.AssertionError: assertion failed
def above1(that: Int): Unit = {
{
val thatVal = that
val thisVal = this.value
} ensuring(that == this.value)

}
}

 

2.单元测试

Scala中提供了多种单元测试的方法,比如ScalaTest

ScalaTest提供了多种单元测试的方法,最简单的就是创建org.scalatest.suite类,并在这些类中定义测试方法

如果cmd+shift+T的快捷键无效的话,在需要测试的类上右键,Go to Test,创建一个测试类

Scala学习笔记——样本类和模式匹配

1.样本类

在申明的类前面加上一个case修饰符,带有这种修饰符的类被称为样本类(case class)。

被申明为样本类的类的特点:1.会添加和类名一致的工厂方法;2.样本类参数列表中的所有参数隐式获得了val前缀,因此它被当做字段维护;3.编译器被这个样本类添加了toString、hashcode、equals方法的实现;4.支持了模式匹配

 

2.模式匹配

一个模式匹配包含了一系列备选项,每个都开始于关键字case。每个备选项都包含了一个模式及一到多个表达式,它们将在模式匹配过程中被计算。

其中**箭头符号=>**隔开了模式和表达式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.scala.first

/**
* Created by common on 17-4-19.
*/
object CaseClass {

def main(args: Array[String]): Unit = {
println(cal("+"))

prtList(List(0, 1))

println(prtType("abc"))
println(prtType(Map(1 -> 1)))
}

def cal(exp: String): Int = {
val add = "+"
val result = 1
exp match {
case "+" => result + 2 //常量模式仅仅匹配自身
case "-" => result - 2
case "*" => result * 2
case "/" => result / 2
case add => result + 2 //变量模式可以匹配任意对象
case _ => result //通配模式匹配任意对象
}
}

//序列模式
def prtList(list: List[Int]) = list match {
case List(0, _, _) => println("this is a list") //序列模式,可以匹配List或者Array
case List(1, _*) => println("this is a list, too") //匹配一个不指定长度的序列
case _ => println("other")
}

//元组模式
def prtTuple(tup: Any) = tup match {
case (0, _, _) => println("this is a tuple") //元组模式
case _ => println("other")
}

//类型模式,可以用在类型测试和类型转换
def prtType(x: Any) = x match {
case s: String => s.length
case m: Map[_, _] => m.size
case _ => 1
}

}

 

Scala学习笔记——类型

1.Option类型

Option类型可以有两种类型,一种是Some(x),一种是None对象

比如Scala的Map的get方法发现了指定键,返回Some(x),没有发现,返回None对象

 

2.列表

List类型中的所有元素都具有相同的类型

空列表的类型为**List[Nothing]**。对于任意类型T的List[T],List[Nothing]都是其子类。

1
2
3
val list = List[String]("1","2","3")
val list1 = "1"::"2"::"3"::Nil //所有的列表都是由两个基础构造块Nil和::构造出来的,Nil表示空列表

 

列表的基本操作

head  返回列表的第一个元素,仅能作用在非空列表上

tail  返回除第一个之外所有元素组成的列表,仅能作用在非空列表上

isEmpty  判断是否为空

 

全文 >>

Spark学习笔记——读写HBase

1.首先在HBase中建立一张表,名字为student

参考 Hbase学习笔记——基本CRUD操作

一个cell的值,取决于Row,Column family,Column Qualifier和Timestamp

HBase表结构

2.往HBase中写入数据,写入的时候,需要写family和column

build.sbt

1
2
3
4
5
6
7
8
9
10
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.1.0",
"mysql" % "mysql-connector-java" % "5.1.31",
"org.apache.spark" %% "spark-sql" % "2.1.0",
"org.apache.hbase" % "hbase-common" % "1.3.0",
"org.apache.hbase" % "hbase-client" % "1.3.0",
"org.apache.hbase" % "hbase-server" % "1.3.0",
"org.apache.hbase" % "hbase" % "1.2.1"
)

在hbaseshell中写数据的时候,写的是String,但是在idea中写代码的话,如果写的是int类型的,就会出现\x00…的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql._
import java.util.Properties

import com.google.common.collect.Lists
import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Get, Put, Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf

/**
* Created by mi on 17-4-11.
*/

case class resultset(name: String,
info: String,
summary: String)

case class IntroItem(name: String, value: String)


case class BaikeLocation(name: String,
url: String = "",
info: Seq[IntroItem] = Seq(),
summary: Option[String] = None)

case class MewBaikeLocation(name: String,
url: String = "",
info: Option[String] = None,
summary: Option[String] = None)


object MysqlOpt {

def main(args: Array[String]): Unit = {

// 本地模式运行,便于测试
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
// 创建 spark context
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

//定义数据库和表信息
val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&amp;characterEncoding=UTF-8"
val table = "baike_pages"

// 读取Hbase文件,在hbase的/usr/local/hbase/conf/hbase-site.xml中写的地址
val hbasePath = "file:///usr/local/hbase/hbase-tmp"

// 创建hbase configuration
val hBaseConf = HBaseConfiguration.create()
hBaseConf.set(TableInputFormat.INPUT_TABLE, "student")

// 初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!
val jobConf = new JobConf(hBaseConf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "student")

val indataRDD = sc.makeRDD(Array("1,99,98","2,97,96","3,95,94"))

val rdd = indataRDD.map(_.split(',')).map{arr=>{
/*一个Put对象就是一行记录,在构造方法中指定主键
* 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换
* Put.add方法接收三个参数:列族,列名,数据
*/
val put = new Put(Bytes.toBytes(arr(0)))
put.add(Bytes.toBytes("course"),Bytes.toBytes("math"),Bytes.toBytes(arr(1)))
put.add(Bytes.toBytes("course"),Bytes.toBytes("english"),Bytes.toBytes(arr(2)))
//转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset
(new ImmutableBytesWritable, put)
}}

rdd.saveAsHadoopDataset(jobConf)

sc.stop()

}

}

 

 

 

3.从Hbase中读取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql._
import java.util.Properties

import com.google.common.collect.Lists
import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Get, Put, Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf

/**
* Created by mi on 17-4-11.
*/

case class resultset(name: String,
info: String,
summary: String)

case class IntroItem(name: String, value: String)


case class BaikeLocation(name: String,
url: String = "",
info: Seq[IntroItem] = Seq(),
summary: Option[String] = None)

case class MewBaikeLocation(name: String,
url: String = "",
info: Option[String] = None,
summary: Option[String] = None)


object MysqlOpt {

def main(args: Array[String]): Unit = {

// 本地模式运行,便于测试
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
// 创建 spark context
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

//定义数据库和表信息
val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&amp;characterEncoding=UTF-8"
val table = "baike_pages"

// 读取Hbase文件,在hbase的/usr/local/hbase/conf/hbase-site.xml中写的地址
val hbasePath = "file:///usr/local/hbase/hbase-tmp"

// 创建hbase configuration
val hBaseConf = HBaseConfiguration.create()
hBaseConf.set(TableInputFormat.INPUT_TABLE, "student")

// 从数据源获取数据并转化成rdd
val hBaseRDD = sc.newAPIHadoopRDD(hBaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

println(hBaseRDD.count())

// 将数据映射为表 也就是将 RDD转化为 dataframe schema
hBaseRDD.foreach{case (_,result) =>{
//获取行键
val key = Bytes.toString(result.getRow)
//通过列族和列名获取列
val math = Bytes.toString(result.getValue("course".getBytes,"math".getBytes))
println("Row key:"+key+" Math:"+math)
}}

sc.stop()

}


}

 输出

1
2
3
4
5
6
7
8
Row key:    Math:99
Row key:  Math:97
Row key:  Math:95
Row key:1 Math:99
Row key:1000 Math:99
Row key:2 Math:97
Row key:3 Math:95

 

全文 >>

Spark学习笔记——读写HDFS

使用Spark读写HDFS中的parquet文件

文件夹中的parquet文件

build.sbt文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
name := "spark-hbase"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.1.0",
"mysql" % "mysql-connector-java" % "5.1.31",
"org.apache.spark" %% "spark-sql" % "2.1.0",
"org.apache.hbase" % "hbase-common" % "1.3.0",
"org.apache.hbase" % "hbase-client" % "1.3.0",
"org.apache.hbase" % "hbase-server" % "1.3.0",
"org.apache.hbase" % "hbase" % "1.2.1"
)

 

Scala实现方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql._
import java.util.Properties

import com.google.common.collect.Lists
import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat


/**
* Created by mi on 17-4-11.
*/

case class resultset(name: String,
info: String,
summary: String)

case class IntroItem(name: String, value: String)


case class BaikeLocation(name: String,
url: String = "",
info: Seq[IntroItem] = Seq(),
summary: Option[String] = None)

case class MewBaikeLocation(name: String,
url: String = "",
info: Option[String] = None,
summary: Option[String] = None)


object MysqlOpt {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

//定义数据库和表信息
val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&amp;characterEncoding=UTF-8"
val table = "baike_pages"

//读取parquetFile,并写入Mysql
val sparkSession = SparkSession.builder()
.master("local")
.appName("spark session example")
.getOrCreate()
val parquetDF = sparkSession.read.parquet("/home/mi/coding/coding/baikeshow_data/baikeshow")
// parquetDF.collect().take(20).foreach(println)
//parquetDF.show()

//BaikeLocation是读取的parquet文件中的case class
val ds = parquetDF.as[BaikeLocation].map { line =>
//把info转换为新的case class中的类型String
val info = line.info.map(item => item.name + ":" + item.value).mkString(",")
//注意需要把字段放在一个case class中,不然会丢失列信息
MewBaikeLocation(name = line.name, url = line.url, info = Some(info), summary = line.summary)
}.cache()

ds.show()
// ds.take(2).foreach(println)

//写入Mysql
// val prop = new Properties()
// prop.setProperty("user", "root")
// prop.setProperty("password", "123456")
// ds.write.mode(SaveMode.Append).jdbc(url, "baike_location", prop)

//写入parquetFile
ds.repartition(10).write.parquet("/home/mi/coding/coding/baikeshow_data/baikeshow1")

}

}

 

df.show打印出来的信息,如果没放在一个case class中的话,name,url,info,summary这列信息会变成1,2,3,4

使用spark-shell查看写回去的parquet文件的信息

1
2
3
4
5
6
7
8
#进入spark-shell
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val path = "file:///home/mi/coding/coding/baikeshow_data/baikeshow1"
val df = sqlContext.parquetFile(path)
df.show
df.count

 

如果只想显示某一列的话,可以这么做

1
2
df.select("title").take(100).foreach(println)  //只显示title这一列的信息

 

全文 >>

Scala学习笔记——简化代码、柯里化、继承、特质

1.简化代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.scala.first

import java.io.File
import javax.management.Query

/**
* Created by common on 17-4-5.
*/
object FileMatcher {

def main(args: Array[String]) {

for (file <- filesHere)
println(file)

println()


for (file <- filesMatching("src", _.endsWith(_)))
println(file)

for (file <- filesEnding("src"))
println(file)

}

private def filesHere = (new File(".")).listFiles

//matcher是传入一个函数,返回boolean值,比如_.endsWith(_)
private def filesMatching(query: String, matcher: (String, String) => Boolean) = {
for (file <- filesHere; if matcher(file.getName, query)) yield file
}

//上面的函数不够简洁,下面是更加简洁的定义
private def filesMatch(matcher: String => Boolean) = {
for (file <- filesHere; if matcher(file.getName)) yield file
}

//然后可以定义使用不同matcher()的方法
def filesEnding(query: String) = {
filesMatch(_.endsWith(query))
}

//使用exists来简化代码
def containsOdd(nums: List[Int]): Boolean = {
nums.exists(_ % 2 == 1)
}

def containsNeg(nums: List[Int]): Boolean = {
nums.exists(_ < 0)
}

}

 输出是

1
2
3
4
5
6
7
8
9
10
./.idea
./build.sbt
./target
./input
./project
./src

./src
./src

 

2.柯里化

1
2
3
4
5
6
7
8
//柯里化,可以看成是两个函数,y是第一个函数sum(x: Int)的参数
def sum(x: Int)(y: Int) = {
x + y
}

//println(sum2(2))的输出是4
def sum2 = sum(2)_

 

3.继承

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.scala.first

/**
* Created by common on 17-4-16.
*/
object AbstractClass {

def main(args: Array[String]): Unit = {
val ae = new ArrayElement(Array("hello", "world"))
println(ae.width)


val list1 = List(1,2,3,4)
val list2 = List(1,2,3,4,5)
//Scala中同时遍历两个list,如果长度不一样会截去
for((line1,line2) <- list1 zip list2){
println(line1+line2)
}
}

}

//定义一个抽象类
abstract class Element {

//定义无参方法,抽象成员
def contents: Array[String]

def height: Int = contents.length

def width: Int = if (height == 0) 0 else contents(0).length

}

//扩展类,继承了上面的抽象类,需要实现抽象类中的方法
class ArrayElement(content: Array[String]) extends Element {
def contents: Array[String] = content
}

//更加简洁的写法,contents和Element中的contents保持一致
class ArrayElement2(val contents: Array[String]) extends Element

//另一个例子
class cat {
//确保一个成员不被子类重写,需要把其定义成final
// final val dangerous = false
val dangerous = false
}

class tiger(override val dangerous: Boolean, private var age: Int) extends cat

 

4.特质

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.scala.first

/**
* Created by common on 17-4-17.
*/
object Trait {

def main(args: Array[String]): Unit = {
val person = new Person

//变量an可以初始化为任何混入了特质的类的对象,person对象包含了Animal特质
val an: Animal = person
println(an.toString)

val p = new PP()
//输出People特质中的内容
p.say()

}

}

trait Animal {

def run(): Unit = {
println("Animal can run")
}
}

//具有特质Animal
class Person extends Animal {
override def toString = "can say"
}

//具有特质Animal
class Tiger extends Animal {
override def toString = "can run fast"
}

//一个类只能继承一个父类。但是能混入多个特质
//特征的作用:
//1.把胖接口转换成瘦接口
//2.为类提供可堆叠的改变
class Live

trait HasLeg

//注意不能给特质中传递任何的参数
class PersonTiger extends Live with Animal with HasLeg {
println("混入了多个特质")
}

class P {
def say(): Unit = {
println("I am a people")
}
}

//特质在抽象方法中的动态绑定
trait People extends P {
abstract override def say(): Unit = {
println("I am a trait people")
}
}

class PP extends People{
//输出People特质中的内容
}

 

全文 >>

Spark学习笔记——读写MySQL

1.使用Spark读取MySQL中某个表中的信息

build.sbt文件

1
2
3
4
5
6
7
8
9
10
11
12
name := "spark-hbase"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.1.0",
"mysql" % "mysql-connector-java" % "5.1.31",
"org.apache.spark" %% "spark-sql" % "2.1.0"
)

 Mysql.scala文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SaveMode}
import java.util.Properties


/**
* Created by mi on 17-4-11.
*/

case class resultset(name: String,
info: String,
summary: String)

object MysqlOpt {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

//定义数据库和表信息
val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&amp;characterEncoding=UTF-8"
val table = "baike_pages"

//读MySQL的方法1
val reader = sqlContext.read.format("jdbc")
reader.option("url", url)
reader.option("dbtable", table)
reader.option("driver", "com.mysql.jdbc.Driver")
reader.option("user", "root")
reader.option("password", "XXX")
val df = reader.load()
df.show()

//读MySQL的方法2
// val jdbcDF = sqlContext.read.format("jdbc").options(
// Map("url"->"jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&amp;characterEncoding=UTF-8",
// "dbtable"->"(select name,info,summary from baike_pages) as some_alias",
// "driver"->"com.mysql.jdbc.Driver",
// "user"-> "root",
// //"partitionColumn"->"day_id",
// "lowerBound"->"0",
// "upperBound"-> "1000",
// //"numPartitions"->"2",
// "fetchSize"->"100",
// "password"->"XXX")).load()
// jdbcDF.show()

}
}

 输出

 

2.使用Spark写MySQL中某个表中的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SaveMode}
import java.util.Properties


/**
* Created by mi on 17-4-11.
*/

case class resultset(name: String,
info: String,
summary: String)

object MysqlOpt {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

//定义数据库和表信息
val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&amp;characterEncoding=UTF-8"
val table = "baike_pages"

//写MySQL的方法1
val list = List(
resultset("名字1", "标题1", "简介1"),
resultset("名字2", "标题2", "简介2"),
resultset("名字3", "标题3", "简介3"),
resultset("名字4", "标题4", "简介4")
)
val jdbcDF = sqlContext.createDataFrame(list)
jdbcDF.collect().take(20).foreach(println)
// jdbcDF.rdd.saveAsTextFile("/home/mi/coding/coding/Scala/spark-hbase/output")
val prop = new Properties()
prop.setProperty("user", "root")
prop.setProperty("password", "123456")
//jdbcDF.write.mode(SaveMode.Overwrite).jdbc(url,"baike_pages",prop)
jdbcDF.write.mode(SaveMode.Append).jdbc(url, "baike_pages", prop)


}
}