tonglin0325的个人主页

Spark学习笔记——键值对操作

键值对 RDD是 Spark 中许多操作所需要的常见数据类型

键值对 RDD 通常用来进行聚合计算。我们一般要先通过一些初始** ETL(抽取、转化、装载)**操作来将数据转化为键值对形式。

Spark 为包含键值对类型的 RDD 提供了一些专有的操作。

全文 >>

Spark学习笔记——RDD编程

1.RDD——弹性分布式数据集(Resilient Distributed Dataset)

RDD是一个分布式的元素集合,在Spark中,对数据的操作就是创建RDD转换已有的RDD调用RDD操作进行求值

Spark 中的 RDD 就是一个不可变的分布式对象集合。每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。

1
2
3
4
5
6
7
8
9
10
11
object WordCount {
def main(args: Array[String]) {
val inputFile = "file:///home/common/coding/coding/Scala/word-count/test.segmented"
val conf = new SparkConf().setAppName("WordCount").setMaster("local")    #创建一个SparkConf对象来配置应用<br>    #集群URL:告诉Spark连接到哪个集群,local是单机单线程,无需连接到集群,应用名:在集群管理器的用户界面方便找到应用
val sc = new SparkContext(conf)        #然后基于这SparkConf创建一个SparkContext对象
val textFile = sc.textFile(inputFile)    #读取输入的数据
val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)  #切分成单词,转换成键值对并计数
wordCount.foreach(println)
}
}

创建一个RDD

1
2
val textFile = sc.textFile(inputFile)

或者

1
2
val lines = sc.parallelize(List("pandas", "i like pandas"))

全文 >>

Scala学习笔记——函数和闭包

1.本地函数

可以在一个方法内再次定义一个方法,这个方法就是外部方法的内部私有方法,省略了private关键字

2.头等函数

1
2
3
var increase = (x: Int) => x + 1
System.out.println(increase(10))

集合类的foreach方法

1
2
3
var list1 = List(1, 2)
list1.foreach((x: Int) => println(x))

全文 >>

Spark学习笔记——在远程机器中运行WordCount

1.通过realy机器登录relay-shell

1
2
ssh XXX@XXX

2.登录了跳板机之后,连接可以用的机器

1
2
XXXX.bj

3.在本地的idea生成好程序的jar包(word-count_2.11-1.0.jar)之后,把jar包需要put到远程机器的hdfs文件系统中的文件通过scp命令从开发机传到远程的机器中

1
2
scp 开发机用户名@开发机ip地址:/home/XXXXX/文件 .    #最后一个.表示cd的根目录下

全文 >>

Ubuntu下安装HBase

1.在清华镜像站点下载hbase的安装文件,选择的是stable的版本,版本号是hbase-1.2.5/

2.解压放在/usr/local的目录下

3.修改权限

1
2
sudo chown -R hduser hadoop hbase-1.2.5/

4.修改文件夹的名称为hbase

5.在~/.bashrc下添加,之后source一下

1
2
export PATH=$PATH:/usr/local/hbase/bin

或者在 /etc/profile中添加

1
2
3
export HBASE_HOME=/usr/local/hbase
export PATH=${HBASE_HOME}/bin:$PATH

6.修改文件夹的权限

1
2
3
cd /usr/local
sudo chown -R hadoop ./hbase

7.测试一下是否安装成功

1
2
3
hbase version
HBase 1.2.5...

全文 >>

Scala学习笔记——内建控制结构

Scala的内建控制结构包括:if、while、for、try、match和函数调用

1.if表达式

1
2
3
4
5
6
7
8
9
10
11
12
13
//常见的写法
var filename = "name"
if (!args.isEmpty)
filename = args(0)

//比较简洁的写法
var filename1 =
if (!args.isEmpty) args(0)
else "name"

//更简洁的写法,不要有中间变量
println(if(!args.isEmpty) args(0) else "name")

全文 >>

Scala学习笔记——函数式对象

用创建一个函数式对象(类Rational)的过程来说明

类Rational是一种表示有理数(Rational number)的类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package com.scala.first

/**
* Created by common on 17-4-3.
*/
object Rational {
def main(args: Array[String]) {

var r1 = new Rational(1, 2)
var r2 = new Rational(1)
System.out.println(r1.toString)
System.out.println(r1.add(r2).toString)
var r3 = new Rational(2, 2)
System.out.println(r3)
System.out.println(r1 + r3)
}
}

class Rational(n: Int, d: Int) {
//检查先决条件,不符合先决条件将抛出IllegalArgumentException
require(d != 0)
//最大公约数
private val g = gcd(n.abs, d.abs)

private def gcd(a: Int, b: Int): Int = {
if (b == 0) a else gcd(b, a % b)
}

//进行约分
val numer: Int = n / g
val denom: Int = d / g

//辅助构造器
def this(n: Int) = this(n, 1)

//定义操作符
def +(that: Rational): Rational = {
new Rational(
numer * that.denom + that.numer * denom,
denom * that.denom
)
}

//方法重载
def +(i: Int): Rational = {
new Rational(
numer + i * denom, denom
)
}

def *(that: Rational): Rational = {
new Rational(
numer * that.numer,
denom * that.denom
)
}

//方法重载
override def toString = numer + "/" + denom

//定义方法
def add(that: Rational): Rational = {
new Rational(
numer * that.denom + that.numer * denom,
denom * that.denom
)
}

//定义方法,自指向this可写可不写
def lessThan(that: Rational): Boolean = {
this.numer * that.denom < that.numer * this.denom
}


}

全文 >>

Spark学习笔记——安装和WordCount

**1.**去清华的镜像站点下载文件spark-2.1.0-bin-without-hadoop.tgz,不要下spark-2.1.0-bin-hadoop2.7.tgz

2.把文件解压到/usr/local目录下,解压之后的效果,Hadoop和Spark都在Hadoop用户

下面的操作都在Hadoop用户下

1
2
3
drwxrwxrwx 13 hadoop hadoop 4096 4月<!--more-->
&nbsp;&nbsp; 4 11:50 spark-2.1.0-bin-without-hadoop/

 添加Hadoop用户和用户组

1
2
3
4
$ sudo addgroup hadoop
$ sudo adduser --ingroup hadoop hadoop
$ sudo adduser hadoop sudo

 然后修改文件夹的用户,用户组以及权限

1
2
3
sudo chown -R hduser:hadoop spark-2.1.0-bin-without-hadoop
sudo chmod 777 hadoop/

 Hadoop文件夹如果权限不对的话,也需要修改

**3.**在/etc/profile下添加路径

1
2
3
export SPARK_HOME=/usr/local/spark-2.1.0-bin-without-hadoop
export PATH=${SPARK_HOME}/bin:$PATH

**4.**还需要修改Spark的配置文件spark-env.sh

1
2
3
cd /usr/local/spark-2.1.0-bin-without-hadoop
cp ./conf/spark-env.sh.template ./conf/spark-env.sh

添加如下

1
2
export SPARK_DIST_CLASSPATH=$(/home/lintong/software/apache/hadoop-2.9.1/bin/hadoop classpath)

 (以上可以参考厦门大学林子雨老师的教程——Spark2.1.0入门:Spark的安装和使用),有些教程坑无数

**5.**在~/coding/coding/Scala/word-count路径下准备一个文本文件,比如test.segmented文件

**6.**在该目录下,在终端运行 spark-shell

创建一个RDD

1
2
scala> val textFile = sc.textFile("file:///home/common/coding/coding/Scala/word-count/test.segmented")

保存RDD成文件

1
2
textFile.saveAsTextFile("file:///home/common/coding/coding/Scala/word-count/writeback")

 这时候会发现在文件夹目录下多了writeback目录,目录下是这么几个文件

 现在,我们建立hdfs文件夹,来把 test.segmented 文件放进我们的hdfs文件夹中

首先,启动Hadoop的HDFS组件,因为没有用到MapReduce组件,所以没有必要启动MapReducen或者YARN

1
2
3
cd /usr/local/hadoop
./sbin/start-dfs.sh

 在HDFS文件系统中,建立文件夹

1
2
./bin/hdfs dfs -mkdir -p /user/hadoop

使用命令查看一下HDFS文件系统中的目录和文件

在Hadoop文件夹下运行命令

1
2
3
./bin/hdfs dfs -ls .       #或者
./bin/hdfs dfs -ls /user/hadoop

 或者直接

1
2
3
hadoop fs -ls /user/hadoop    #或者
hadoop fs -ls .

 把刚刚的 test.segmented** 文件上传到分布式文件系统HDFS**中(放到hadoop用户目录下)

1
2
hadoop fs -put /home/common/coding/coding/Java/WordCount/input/test.segmented .

 再次查看一下

1
2
3
4
5
hadoop@master:~$ hadoop fs -ls /user/hadoop
Found 2 items
drwxr-xr-x - hadoop supergroup 0 2017-04-03 16:18 /user/hadoop/QuasiMonteCarlo_1491207499210_758373570
-rw-r--r-- 1 hadoop supergroup 59 2017-04-03 16:43 /user/hadoop/test.segmented

如果需要删除

1
2
hadoop fs -rm /user/hadoop/test.segmented

查看一个文件的大小

1
2
hadoop fs -du -h /logs/xxxx

现在**回到 **spark-shell 窗口,编写代码从HDFS文件系统加载 test.segmented 文件

并打印文件中的第一行内容

1
2
3
4
5
6
scala> val textFile = sc.textFile("hdfs://master:9000/user/hadoop/test.segmented")
textFile: org.apache.spark.rdd.RDD[String] = hdfs://master:9000/user/hadoop/test.segmented MapPartitionsRDD[1] at textFile at <console>:24

scala> textFile.first()
res0: String = aa bb aa

 如果是单机的话,其中下面两条语句和上面第一条语句是一样的,但是如果是Hadoop伪分布式或者分布式的话,就不行

1
2
val textFile = sc.textFile("/user/hadoop/test.segmented")

 再次把textFile写回到HDFS文件系统中

1
2
textFile.saveAsTextFile("hdfs://master:9000/user/hadoop/writeback")

再次查看

1
2
3
4
5
6
hadoop@master:~$ hadoop fs -ls /user/hadoop
Found 3 items
drwxr-xr-x - hadoop supergroup 0 2017-04-03 16:18 /user/hadoop/QuasiMonteCarlo_1491207499210_758373570
-rw-r--r-- 1 hadoop supergroup 59 2017-04-03 16:43 /user/hadoop/test.segmented
drwxr-xr-x - hadoop supergroup 0 2017-04-03 17:10 /user/hadoop/writeback

如果进入writeback文件夹中查看的话,可以看到里面的文件的内容和test.segmented中的是一样的

1
2
3
4
5
6
hadoop@master:~$ hadoop fs -ls /user/hadoop/writeback
Found 3 items
-rw-r--r-- 3 hadoop supergroup 0 2017-04-03 17:10 /user/hadoop/writeback/_SUCCESS
-rw-r--r-- 3 hadoop supergroup 36 2017-04-03 17:10 /user/hadoop/writeback/part-00000
-rw-r--r-- 3 hadoop supergroup 24 2017-04-03 17:10 /user/hadoop/writeback/part-00001

1
2
3
4
5
6
hadoop@master:~$ hadoop fs -cat /user/hadoop/writeback/part-00000
aa bb aa
bb aa aa
cc bb ee
dd ee cc

1
2
3
4
5
6
7
8
9
10
hadoop@master:~$ hadoop fs -cat /user/hadoop/writeback/part-00001
aa
cc
ee
ff
ff
gg
hh
aa

 

现在进入WordCount阶段,再次进入 Spark-shell

1
2
3
4
val textFile = sc.textFile("hdfs://master:9000/user/hadoop/test.segmented")
val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCount.collect()

 输出

1
2
res6: Array[(String, Int)] = Array((ee,3), (aa,6), (gg,1), (dd,1), (hh,1), (ff,2), (bb,3), (cc,3))

 

 在spark-shell下面运行成功之后,就需要试着在idea里面建立一个工程来运行这段代码

在idea下面建立一个Scala的工程,构建的方式选择是sbt

 

由于本机的Scala的版本是2.11.8

所以在project structure里面设置成2.11.8

接着在build.sbt里面写

1
2
3
4
5
6
7
8
name := "word-count"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"

 注意里面的scalaVersion如果是2.11.X的话,sbt就会去拉spark-core_2.11-2.1.0的包

可以去公司的私服nexus里面去看看有没有这个包

然后在WordCount.scala文件中写入我们的代码

注意如果是setMaster(“local”)的话,需要在/etc/hosts中设置127.0.1.1,然后取消192.168.0.1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
/**
* Created by common on 17-4-3.
*/


object WordCount {
def main(args: Array[String]) {
val inputFile = "file:///home/common/coding/coding/Scala/word-count/test.segmented"
val conf = new SparkConf().setAppName("WordCount").setMaster("local")    #创建一个SparkConf对象来配置应用<br />    #集群URL:告诉Spark连接到哪个集群,local是单机单线程,无需连接到集群,应用名:在集群管理器的用户界面方便找到应用
val sc = new SparkContext(conf)        #然后基于这SparkConf创建一个SparkContext对象
val textFile = sc.textFile(inputFile)    #读取输入的数据
val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)  #切分成单词,转换成键值对并计数
wordCount.foreach(println)
}
}

然后在sbt中refresh,进行拉包,拉包的过程是无比缓慢的

拉好了之后运行的结果

和在 spark-shell中运行的结果是一致的

 

在林子雨老师的教程中,Spark2.1.0入门:第一个Spark应用程序:WordCount

最后是将整个应用程序打包成JAR,然后通过 spark-submit 提交到 Spark 中运行

做法是在idea的终端中,对代码进行打包

1
2
3
4
5
6
7
8
common@master:~/coding/coding/Scala/word-count$ sbt package
[info] Loading project definition from /home/common/coding/coding/Scala/word-count/project
[info] Set current project to word-count (in build file:/home/common/coding/coding/Scala/word-count/)
[info] Compiling 1 Scala source to /home/common/coding/coding/Scala/word-count/target/scala-2.11/classes...
[info] Packaging /home/common/coding/coding/Scala/word-count/target/scala-2.11/word-count_2.11-1.0.jar ...
[info] Done packaging.
[success] Total time: 6 s, completed 2017-4-4 18:02:13

 生成的jar包位置在

1
2
/home/common/coding/coding/Scala/word-count/target/scala-2.11

 最后通过 spark-submit 运行程序,将jar包通过这个命令提交到 Spark 中运行

1
2
common@master:~/coding/coding/Scala/word-count$ spark-submit --class "WordCount"  /home/common/coding/coding/Scala/word-count/target/scala-2.11/word-count_2.11-1.0.jar

 运行结果

 

在执行spark任务的时候,如果遇到

1
2
3
报如下错误:
Exception in thread "main" java.lang.Exception: When running with master 'yarn-client' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment

在spark的配置文件 conf/spark-env.sh 中添加

1
2
3
4
export HADOOP_HOME=/home/lintong/software/apache/hadoop-2.9.1
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
PATH=$PATH:$HIVE_HOME/bin:$HADOOP_HOME/bin

 

spark集群安装参考:spark 集群搭建 详细步骤

主要是配置slave文件和spark-env文件

集群内容spark-env文件,其中xxx是spark web ui的端口

1
2
3
4
5
6
7
8
9
10
11
12
export SPARK_DIST_CLASSPATH=$(/usr/bin/hadoop classpath)

#export JAVA_HOME=/usr/lib/jvm/java-8-oracle
export SCALA_HOME=/home/dl/packages/scala-2.11.8

export SPARK_MASTER_WEBUI_PORT=xxxx

export HADOOP_HOME=/opt/cloudera/parcels/CDH-5.12.0-1.cdh5.12.0.p0.29/lib/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HIVE_HOME=/opt/cloudera/parcels/CDH-5.12.0-1.cdh5.12.0.p0.29/lib/hive
PATH=$PATH:$HIVE_HOME/bin:$HADOOP_HOME/bin

 

spark-shell查看当前spark的配置

1
2
3
scala> spark.sql("set").filter("key rlike 'metastore|jdo'").rdd.take(100).foreach(println(_))
[spark.sql.hive.metastore.sharedPrefixes,com.amazonaws.services.dynamodbv2]