tonglin0325的个人主页

Spark学习笔记——读写MySQL

1.使用Spark读取MySQL中某个表中的信息

build.sbt文件

1
2
3
4
5
6
7
8
9
10
11
12
name := "spark-hbase"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.1.0",
"mysql" % "mysql-connector-java" % "5.1.31",
"org.apache.spark" %% "spark-sql" % "2.1.0"
)

 Mysql.scala文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SaveMode}
import java.util.Properties


/**
* Created by mi on 17-4-11.
*/

case class resultset(name: String,
info: String,
summary: String)

object MysqlOpt {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

//定义数据库和表信息
val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&characterEncoding=UTF-8"
val table = "baike_pages"

//读MySQL的方法1
val reader = sqlContext.read.format("jdbc")
reader.option("url", url)
reader.option("dbtable", table)
reader.option("driver", "com.mysql.jdbc.Driver")
reader.option("user", "root")
reader.option("password", "XXX")
val df = reader.load()
df.show()

//读MySQL的方法2
// val jdbcDF = sqlContext.read.format("jdbc").options(
// Map("url"->"jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&characterEncoding=UTF-8",
// "dbtable"->"(select name,info,summary from baike_pages) as some_alias",
// "driver"->"com.mysql.jdbc.Driver",
// "user"-> "root",
// //"partitionColumn"->"day_id",
// "lowerBound"->"0",
// "upperBound"-> "1000",
// //"numPartitions"->"2",
// "fetchSize"->"100",
// "password"->"XXX")).load()
// jdbcDF.show()

}
}

 输出

 

2.使用Spark写MySQL中某个表中的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SaveMode}
import java.util.Properties


/**
* Created by mi on 17-4-11.
*/

case class resultset(name: String,
info: String,
summary: String)

object MysqlOpt {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

//定义数据库和表信息
val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&characterEncoding=UTF-8"
val table = "baike_pages"

//写MySQL的方法1
val list = List(
resultset("名字1", "标题1", "简介1"),
resultset("名字2", "标题2", "简介2"),
resultset("名字3", "标题3", "简介3"),
resultset("名字4", "标题4", "简介4")
)
val jdbcDF = sqlContext.createDataFrame(list)
jdbcDF.collect().take(20).foreach(println)
// jdbcDF.rdd.saveAsTextFile("/home/mi/coding/coding/Scala/spark-hbase/output")
val prop = new Properties()
prop.setProperty("user", "root")
prop.setProperty("password", "123456")
//jdbcDF.write.mode(SaveMode.Overwrite).jdbc(url,"baike_pages",prop)
jdbcDF.write.mode(SaveMode.Append).jdbc(url, "baike_pages", prop)


}
}

 

HBase学习笔记——基本CRUD操作

进入HBase的安装目录,****启动HBase

1
2
bin/start-hbase.sh

打开shell命令行模式

1
2
bin/hbase shell

关闭HBase

1
2
bin/stop-hbase.sh

 

一个cell的值,取决于Row,Column family,Column Qualifier和Timestamp

HBase表结构

 

1.查看当前用户

1
2
3
hbase(main):001:0> whoami
hbase/master@HADOOP.COM (auth:KERBEROS)

2. HBase中创建表,这里面的name,sex,age,dept,course都是column-family

1
2
create 'student','name','sex','age','dept','course'

3.列出表

1
2
3
4
5
6
hbase(main):005:0> list
TABLE
0 row(s) in 0.0170 seconds

=> []

4.HBase中添加数据,当添加了数据之后,就有了column,‘1000’是ROW

1
2
3
put 'student','1000','name','XiaoMing'  #这么写的话,family为name,column为空
put 'student','1000','course:math','99'  #这么写的话,family为course,column为math

全文 >>

Spark学习笔记——Spark on YARN

Spark运行的时候,采用的是主从结构,有一个节点负责中央协调, 调度各个分布式工作节点。这个中央协调节点被称为驱动器( Driver) 节点。与之对应的工作节点被称为执行器( executor) 节点

所有的 Spark 程序都遵循同样的结构:程序从输入数据创建一系列 RDD, 再使用转化操作派生出新的 RDD,最后使用行动操作收集或存储结果 RDD 中的数据。

1.驱动器节点:

Spark 驱动器是执行你的程序中的 main() 方法的进程。它执行用户编写的用来创建 SparkContext、创建 RDD,以及进行 RDD 的转化操作和行动操作的代码。其实,当你启动 Spark shell 时,你就启动了一个 Spark 驱动器程序

驱动器程序在 Spark 应用中有下述两个职责:1.把用户程序转为任务 2.为执行器节点调度任务

2.执行器节点:

Spark 执行器节点是一种工作进程,负责在 Spark 作业中运行任务,任务间相互独立。 Spark 应用启动时, 执行器节点就被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。

执行器进程有两大作用: 第一,它们负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程; 第二,它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。

3.集群管理器:

驱动器节点和执行器节点是如何启动的呢? Spark 依赖于集群管理器来启动执行器节点,而在某些特殊情况下,也依赖集群管理器来启动驱动器节点。

 

Spark架构

1
2
http://spark.apache.org/docs/latest/cluster-overview.html

转自  

1
2
https://zhuanlan.zhihu.com/p/91143069

全文 >>

Spark学习笔记——数据读取和保存

spark所支持的文件格式

 

1.文本文件

在 Spark 中读写文本文件很容易。

当我们将一个文本文件读取为** RDD** 时,输入的每一行 都会成为 RDD 的 一个元素

也可以将多个完整的文本文件一次性读取为一个 pair RDD, 其中键是文件名,值是文件内容

 在 Scala 中读取一个文本文件

1
2
3
val inputFile = "file:///home/common/coding/coding/Scala/word-count/test.segmented"
val textFile = sc.textFile(inputFile)

 在 Scala 中读取给定目录中的所有文件

1
2
val input = sc.wholeTextFiles("file:///home/common/coding/coding/Scala/word-count")

 保存文本文件,Spark 将传入的路径作为目录对待,会在那个目录下输出多个文件

1
2
3
textFile.saveAsTextFile("file:///home/common/coding/coding/Scala/word-count/writeback")
//textFile.repartition(1).saveAsTextFile 就能保存成一个文件

全文 >>

Spark学习笔记——键值对操作

键值对 RDD是 Spark 中许多操作所需要的常见数据类型

键值对 RDD 通常用来进行聚合计算。我们一般要先通过一些初始** ETL(抽取、转化、装载)**操作来将数据转化为键值对形式。

Spark 为包含键值对类型的 RDD 提供了一些专有的操作。

 

1.创建Pair RDD

1
2
3
4
5
6
7
8
9
10
11
    val input = sc.parallelize(List(1, 2, 3, 4))
  val pairs = input.map(x => (x+1, x))
for (pair <- pairs){
println(pair)
}
  //输出
(2,1)
(3,2)
(4,3)
(5,4)

 

2.Pair RDD的转化操作

Pair RDD 可以使用所有标准 RDD 上的可用的转化操作。

Pair RDD也支持RDD所支持的函数

1
2
pairs.filter{case (key, value) => value.length < 20}

全文 >>

Spark学习笔记——RDD编程

1.RDD——弹性分布式数据集(Resilient Distributed Dataset)

RDD是一个分布式的元素集合,在Spark中,对数据的操作就是创建RDD转换已有的RDD调用RDD操作进行求值

Spark 中的 RDD 就是一个不可变的分布式对象集合。每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。

1
2
3
4
5
6
7
8
9
10
11
object WordCount {
def main(args: Array[String]) {
val inputFile = "file:///home/common/coding/coding/Scala/word-count/test.segmented"
val conf = new SparkConf().setAppName("WordCount").setMaster("local")    #创建一个SparkConf对象来配置应用<br>    #集群URL:告诉Spark连接到哪个集群,local是单机单线程,无需连接到集群,应用名:在集群管理器的用户界面方便找到应用
val sc = new SparkContext(conf)        #然后基于这SparkConf创建一个SparkContext对象
val textFile = sc.textFile(inputFile)    #读取输入的数据
val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)  #切分成单词,转换成键值对并计数
wordCount.foreach(println)
}
}

创建一个RDD

1
2
val textFile = sc.textFile(inputFile)

或者

1
2
val lines = sc.parallelize(List("pandas", "i like pandas"))

 

RDD支持两种类型的操作: **转化操作(transformation)行动操作(action)**。

转化操作,是返回一个新的RDD的操作:

filter()函数

1
2
val RDD = textFile.filter(line => line.contains("Hadoop"))

全文 >>

Scala学习笔记——函数和闭包

1.本地函数

可以在一个方法内再次定义一个方法,这个方法就是外部方法的内部私有方法,省略了private关键字

2.头等函数

1
2
3
var increase = (x: Int) => x + 1
System.out.println(increase(10))

集合类的foreach方法

1
2
3
var list1 = List(1, 2)
list1.foreach((x: Int) => println(x))

 集合类的filter方法

1
2
list1.filter((x: Int) => x > 1)

 

3.函数字面量的短格式,使得函数写的更加简洁

1
2
3
//函数字面量的短格式
list1.filter(x => x > 1)

 

4.占位符语法,如果想让函数字面量更加简洁,可以把下划线当做一个或更多参数的占位符

1
2
3
//使用占位符语法
System.out.println(list1.filter(_ > 1))

全文 >>

Spark学习笔记——在远程机器中运行WordCount

1.通过realy机器登录relay-shell

1
2
ssh XXX@XXX

2.登录了跳板机之后,连接可以用的机器

1
2
XXXX.bj

3.在本地的idea生成好程序的jar包(word-count_2.11-1.0.jar)之后,把jar包需要put到远程机器的hdfs文件系统中的文件通过scp命令从开发机传到远程的机器中

1
2
scp 开发机用户名@开发机ip地址:/home/XXXXX/文件 .    #最后一个.表示cd的根目录下

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
object WordCount {
def main(args: Array[String]) {
// val inputFile = "file:///home/mi/coding/coding/Scala/word-count/input/README.txt"
// val inputFile = "file://README.txt"
val inputFile = "/user/XXXX/lintong/README.txt"
val conf = new SparkConf().setAppName("WordCount").setMaster("yarn-client")
val sc = new SparkContext(conf)
val textFile = sc.textFile(inputFile)
val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
// wordCount.foreach(println)
// wordCount.saveAsTextFile("file:///home/mi/coding/coding/Scala/word-count/output/READMEOUT.txt")
wordCount.saveAsTextFile("/user/XXXX/lintong/READMEOUT.txt")
}
}

 

4.通过put命令将远程机器中的txt文件,传到远程机器的hdfs文件系统

全文 >>

Ubuntu下安装HBase

1.在清华镜像站点下载hbase的安装文件,选择的是stable的版本,版本号是hbase-1.2.5/

2.解压放在/usr/local的目录下

3.修改权限

1
2
sudo chown -R hduser hadoop hbase-1.2.5/

4.修改文件夹的名称为hbase

5.在~/.bashrc下添加,之后source一下

1
2
export PATH=$PATH:/usr/local/hbase/bin

或者在 /etc/profile中添加

1
2
3
export HBASE_HOME=/usr/local/hbase
export PATH=${HBASE_HOME}/bin:$PATH

6.修改文件夹的权限

全文 >>