spark所支持的文件格式
spark所支持的文件格式
键值对 RDD是 Spark 中许多操作所需要的常见数据类型
键值对 RDD 通常用来进行聚合计算。我们一般要先通过一些初始** ETL(抽取、转化、装载)**操作来将数据转化为键值对形式。
Spark 为包含键值对类型的 RDD 提供了一些专有的操作。
1.RDD——弹性分布式数据集(Resilient Distributed Dataset)
RDD是一个分布式的元素集合,在Spark中,对数据的操作就是创建RDD、转换已有的RDD和调用RDD操作进行求值。
Spark 中的 RDD 就是一个不可变的分布式对象集合。每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。
1 | object WordCount { |
创建一个RDD
1 | val textFile = sc.textFile(inputFile) |
或者
1 | val lines = sc.parallelize(List("pandas", "i like pandas")) |
1.本地函数
可以在一个方法内再次定义一个方法,这个方法就是外部方法的内部私有方法,省略了private关键字
2.头等函数
1 | var increase = (x: Int) => x + 1 |
集合类的foreach方法
1 | var list1 = List(1, 2) |
1.通过realy机器登录relay-shell
1 | ssh XXX@XXX |
2.登录了跳板机之后,连接可以用的机器
1 | XXXX.bj |
3.在本地的idea生成好程序的jar包(word-count_2.11-1.0.jar)之后,把jar包和需要put到远程机器的hdfs文件系统中的文件通过scp命令从开发机传到远程的机器中
1 | scp 开发机用户名@开发机ip地址:/home/XXXXX/文件 . #最后一个.表示cd的根目录下 |
1.在清华镜像站点下载hbase的安装文件,选择的是stable的版本,版本号是hbase-1.2.5/
2.解压放在/usr/local的目录下
3.修改权限
1 | sudo chown -R hduser hadoop hbase-1.2.5/ |
4.修改文件夹的名称为hbase
5.在~/.bashrc下添加,之后source一下
1 | export PATH=$PATH:/usr/local/hbase/bin |
或者在 /etc/profile中添加
1 | export HBASE_HOME=/usr/local/hbase |
6.修改文件夹的权限
1 | cd /usr/local |
7.测试一下是否安装成功
1 | hbase version |
2.删除三个文件夹: SogouPY, SogouPY.users, sogou-qimpanel
然后重启输入法
Scala的内建控制结构包括:if、while、for、try、match和函数调用
1.if表达式
1 | //常见的写法 |
用创建一个函数式对象(类Rational)的过程来说明
类Rational是一种表示有理数(Rational number)的类
1 | package com.scala.first |
**1.**去清华的镜像站点下载文件spark-2.1.0-bin-without-hadoop.tgz,不要下spark-2.1.0-bin-hadoop2.7.tgz
2.把文件解压到/usr/local目录下,解压之后的效果,Hadoop和Spark都在Hadoop用户下
下面的操作都在Hadoop用户下
1 | drwxrwxrwx 13 hadoop hadoop 4096 4月<!--more--> |
添加Hadoop用户和用户组
1 | $ sudo addgroup hadoop |
然后修改文件夹的用户,用户组以及权限
1 | sudo chown -R hduser:hadoop spark-2.1.0-bin-without-hadoop |
Hadoop文件夹如果权限不对的话,也需要修改
**3.**在/etc/profile下添加路径
1 | export SPARK_HOME=/usr/local/spark-2.1.0-bin-without-hadoop |
**4.**还需要修改Spark的配置文件spark-env.sh
1 | cd /usr/local/spark-2.1.0-bin-without-hadoop |
添加如下
1 | export SPARK_DIST_CLASSPATH=$(/home/lintong/software/apache/hadoop-2.9.1/bin/hadoop classpath) |
(以上可以参考厦门大学林子雨老师的教程——Spark2.1.0入门:Spark的安装和使用),有些教程坑无数
**5.**在~/coding/coding/Scala/word-count路径下准备一个文本文件,比如test.segmented文件
**6.**在该目录下,在终端运行 spark-shell
创建一个RDD
1 | scala> val textFile = sc.textFile("file:///home/common/coding/coding/Scala/word-count/test.segmented") |
保存RDD成文件
1 | textFile.saveAsTextFile("file:///home/common/coding/coding/Scala/word-count/writeback") |
这时候会发现在文件夹目录下多了writeback目录,目录下是这么几个文件
现在,我们建立hdfs文件夹,来把 test.segmented 文件放进我们的hdfs文件夹中
首先,启动Hadoop的HDFS组件,因为没有用到MapReduce组件,所以没有必要启动MapReducen或者YARN
1 | cd /usr/local/hadoop |
在HDFS文件系统中,建立文件夹
1 | ./bin/hdfs dfs -mkdir -p /user/hadoop |
使用命令查看一下HDFS文件系统中的目录和文件
在Hadoop文件夹下运行命令
1 | ./bin/hdfs dfs -ls . #或者 |
或者直接
1 | hadoop fs -ls /user/hadoop #或者 |
把刚刚的 test.segmented** 文件上传到分布式文件系统HDFS**中(放到hadoop用户目录下)
1 | hadoop fs -put /home/common/coding/coding/Java/WordCount/input/test.segmented . |
再次查看一下
1 | hadoop@master:~$ hadoop fs -ls /user/hadoop |
如果需要删除
1 | hadoop fs -rm /user/hadoop/test.segmented |
查看一个文件的大小
1 | hadoop fs -du -h /logs/xxxx |
现在**回到 **spark-shell 窗口,编写代码从HDFS文件系统加载 test.segmented 文件
并打印文件中的第一行内容
1 | scala> val textFile = sc.textFile("hdfs://master:9000/user/hadoop/test.segmented") |
如果是单机的话,其中下面两条语句和上面第一条语句是一样的,但是如果是Hadoop伪分布式或者分布式的话,就不行
1 | val textFile = sc.textFile("/user/hadoop/test.segmented") |
再次把textFile写回到HDFS文件系统中
1 | textFile.saveAsTextFile("hdfs://master:9000/user/hadoop/writeback") |
再次查看
1 | hadoop@master:~$ hadoop fs -ls /user/hadoop |
如果进入writeback文件夹中查看的话,可以看到里面的文件的内容和test.segmented中的是一样的
1 | hadoop@master:~$ hadoop fs -ls /user/hadoop/writeback |
1 | hadoop@master:~$ hadoop fs -cat /user/hadoop/writeback/part-00000 |
1 | hadoop@master:~$ hadoop fs -cat /user/hadoop/writeback/part-00001 |
现在进入WordCount阶段,再次进入 Spark-shell 中
1 | val textFile = sc.textFile("hdfs://master:9000/user/hadoop/test.segmented") |
输出
1 | res6: Array[(String, Int)] = Array((ee,3), (aa,6), (gg,1), (dd,1), (hh,1), (ff,2), (bb,3), (cc,3)) |
在spark-shell下面运行成功之后,就需要试着在idea里面建立一个工程来运行这段代码
在idea下面建立一个Scala的工程,构建的方式选择是sbt
由于本机的Scala的版本是2.11.8
所以在project structure里面设置成2.11.8
接着在build.sbt里面写
1 | name := "word-count" |
注意里面的scalaVersion如果是2.11.X的话,sbt就会去拉spark-core_2.11-2.1.0的包
可以去公司的私服nexus里面去看看有没有这个包
然后在WordCount.scala文件中写入我们的代码
注意如果是setMaster(“local”)的话,需要在/etc/hosts中设置127.0.1.1,然后取消192.168.0.1
1 | import org.apache.spark.SparkContext |
然后在sbt中refresh,进行拉包,拉包的过程是无比缓慢的
拉好了之后运行的结果
和在 spark-shell中运行的结果是一致的
在林子雨老师的教程中,Spark2.1.0入门:第一个Spark应用程序:WordCount
最后是将整个应用程序打包成JAR,然后通过 spark-submit 提交到 Spark 中运行
做法是在idea的终端中,对代码进行打包
1 | common@master:~/coding/coding/Scala/word-count$ sbt package |
生成的jar包位置在
1 | /home/common/coding/coding/Scala/word-count/target/scala-2.11 |
最后通过 spark-submit 运行程序,将jar包通过这个命令提交到 Spark 中运行
1 | common@master:~/coding/coding/Scala/word-count$ spark-submit --class "WordCount" /home/common/coding/coding/Scala/word-count/target/scala-2.11/word-count_2.11-1.0.jar |
运行结果
在执行spark任务的时候,如果遇到
1 | 报如下错误: |
在spark的配置文件 conf/spark-env.sh 中添加
1 | export HADOOP_HOME=/home/lintong/software/apache/hadoop-2.9.1 |
spark集群安装参考:spark 集群搭建 详细步骤
主要是配置slave文件和spark-env文件
集群内容spark-env文件,其中xxx是spark web ui的端口
1 | export SPARK_DIST_CLASSPATH=$(/usr/bin/hadoop classpath) |
spark-shell查看当前spark的配置
1 | scala> spark.sql("set").filter("key rlike 'metastore|jdo'").rdd.take(100).foreach(println(_)) |