tonglin0325的个人主页

Ubuntu下安装Solr

1.在清华开源软件镜像站或者http://www.us.apache.org/dist/

下载Solr的安装包,我下载的是solr-6.5.1.tgz

2.解压并移动到/usr/local目录下

3.安装Solr需要安装Java环境,假设Java环境是安装好的

4.解压solr-6.5.1.tgz目录中的install_solr_service.sh文件

1
2
tar zxvf solr-6.5.1.tgz solr-6.5.1/bin/install_solr_service.sh --strip-components=2

5.运行这个脚本,进行安装

1
2
sudo bash ./install_solr_service.sh solr-6.5.1.tgz

6.如果安装失败的话,使用下面的命令删除Solr,然后重新安装

1
2
3
4
5
6
7
8
sudo service solr stop
sudo rm -r /var/solr
sudo rm -r /opt/solr-6.5.1
sudo rm -r /opt/solr
sudo rm /etc/init.d/solr
sudo deluser --remove-home solr
sudo deluser --group solr

全文 >>

Spark学习笔记——文本处理技术

1.建立TF-IDF模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.linalg.{SparseVector => SV}
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.feature.IDF

/**
* Created by common on 17-5-6.
*/
object TFIDF {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)

// val path = "hdfs://master:9000/user/common/20Newsgroups/20news-bydate-train/*"
val path = "file:///media/common/工作/kaggle/test/*"
val rdd = sc.wholeTextFiles(path)

// 提取文本信息
val text = rdd.map { case (file, text) => text }
// print(text.count())

val regex = """[^0-9]*""".r

// 排除停用词
val stopwords = Set(
"the", "a", "an", "of", "or", "in", "for", "by", "on", "but", "is", "not",
"with", "as", "was", "if",
"they", "are", "this", "and", "it", "have", "from", "at", "my",
"be", "that", "to"
)

// 以使用正则表达切分原始文档来移除这些非单词字符
val nonWordSplit = text.flatMap(t =>
t.split("""\W+""").map(_.toLowerCase))

// 过滤掉数字和包含数字的单词
val filterNumbers = nonWordSplit.filter(token =>
regex.pattern.matcher(token).matches)

// 基于出现的频率,排除很少出现的单词,需要先计算一遍整个测试集
val tokenCounts = filterNumbers.map(t => (t, 1)).reduceByKey(_ + _)
val rareTokens = tokenCounts.filter { case (k, v) => v < 2 }.map {
case (k, v) => k
}.collect.toSet

// 每一个文档的预处理函数
def tokenize(line: String): Seq[String] = {
line.split("""\W+""")
.map(_.toLowerCase)
.filter(token => regex.pattern.matcher(token).matches)
.filterNot(token => stopwords.contains(token))
.filterNot(token => rareTokens.contains(token))
.filter(token => token.size >= 2) //删除只有一个字母的单词
.toSeq
}

// 每一篇文档经过预处理之后,每一个文档成为一个Seq[String]
val tokens = text.map(doc => tokenize(doc)).cache()

println(tokens.distinct.count)
// 第一篇文档第一部分分词之后的结果
println(tokens.first())
println(tokens.first().length)

// 生成2^18维的特征
val dim = math.pow(2, 18).toInt
val hashingTF = new HashingTF(dim)

// HashingTF 的 transform 函数把每个输入文档(即词项的序列)映射到一个MLlib的Vector对象
val tf = hashingTF.transform(tokens)
// tf的长度是文档的个数,对应的是文档和维度的矩阵
tf.cache

// 取得第一个文档的向量
val v = tf.first.asInstanceOf[SV]
println(v.size)
// v.value和v.indices的长度相等,value是词频,indices是词频非零的下标
println(v.values.size)
println(v.indices.size)
println(v.values.toSeq)
println(v.indices.take(10).toSeq)

// 对每个单词计算逆向文本频率
val idf = new IDF().fit(tf)
// 转换词频向量为TF-IDF向量
val tfidf = idf.transform(tf)
val v2 = tfidf.first.asInstanceOf[SV]
println(v2.values.size)
println(v2.values.take(10).toSeq)
println(v2.indices.take(10).toSeq)

// 计算整个文档的TF-IDF最小和最大权值
val minMaxVals = tfidf.map { v =>
val sv = v.asInstanceOf[SV]
(sv.values.min, sv.values.max)
}
val globalMinMax = minMaxVals.reduce { case ((min1, max1),
(min2, max2)) =>
(math.min(min1, min2), math.max(max1, max2))
}
println(globalMinMax)

// 比较几个单词的TF-IDF权值
val common = sc.parallelize(Seq(Seq("you", "do", "we")))
val tfCommon = hashingTF.transform(common)
val tfidfCommon = idf.transform(tfCommon)
val commonVector = tfidfCommon.first.asInstanceOf[SV]
println(commonVector.values.toSeq)

val uncommon = sc.parallelize(Seq(Seq("telescope", "legislation","investment")))
val tfUncommon = hashingTF.transform(uncommon)
val tfidfUncommon = idf.transform(tfUncommon)
val uncommonVector = tfidfUncommon.first.asInstanceOf[SV]
println(uncommonVector.values.toSeq)

}


}

全文 >>

Spark学习笔记——spark listener

spark可以使用SparkListener API在spark运行的过程中监控spark任务当前的运行状态,参考:SparkListener监听使用方式及自定义的事件处理动作

编写 MySparkAppListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.bigdata.spark

import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart}

class MySparkAppListener extends SparkListener with Logging {

// 启动事件
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
val appId = applicationStart.appId
logInfo("spark job start => " + appId.get)
}

// 结束事件
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
logInfo("spark job end => " + applicationEnd.time)
}
}

添加 spark.extraListeners 参数

1
2
3
4
5
6
val sparkSession = SparkSession.builder()
.master("local")
.config("spark.extraListeners", "com.bigdata.spark.MySparkAppListener")
.appName("spark session example")
.getOrCreate()

运行任务后就可以在日志当中看到对应的日志

1
2
3
4
21/12/27 23:13:46 INFO MySparkAppListener: spark job start => local-1640618026361

21/12/27 23:13:48 INFO MySparkAppListener: spark job end => 1640618028287

还有其他的事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
abstract class SparkListener extends SparkListenerInterface {
//阶段完成时触发的事件
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { }

//阶段提交时触发的事件
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { }

//任务启动时触发的事件
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }

//下载任务结果的事件
override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = { }

//任务结束的事件
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { }

//job启动的事件
override def onJobStart(jobStart: SparkListenerJobStart): Unit = { }

//job结束的事件
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { }

//环境变量被更新的事件
override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { }

//块管理被添加的事件
override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { }

override def onBlockManagerRemoved(
blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { }

//取消rdd缓存的事件
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = { }

//app启动的事件
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { }

//app结束的事件 [以下各事件也如同函数名所表达各个阶段被触发的事件不在一一标注]
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { }

override def onExecutorMetricsUpdate(
executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { }

override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { }

override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }

override def onExecutorBlacklisted(
executorBlacklisted: SparkListenerExecutorBlacklisted): Unit = { }

override def onExecutorUnblacklisted(
executorUnblacklisted: SparkListenerExecutorUnblacklisted): Unit = { }

override def onNodeBlacklisted(
nodeBlacklisted: SparkListenerNodeBlacklisted): Unit = { }

override def onNodeUnblacklisted(
nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { }

override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }

override def onOtherEvent(event: SparkListenerEvent): Unit = { }
}

全文 >>

Ubuntu安装redis缓存数据库

参考:http://blog.csdn.net/xiangwanpeng/article/details/54586087

1.在下载目录下

1
2
sudo wget http://download.redis.io/releases/redis-3.2.6.tar.gz

2.解压,并复制到/usr/local目录下

1
2
3
tar -zxvf redis-3.2.6.tar.gz
mv redis-3.2.6 /usr/local/

3.编译和安装

1
2
3
4
cd /redis
sudo make
sudo make install

4.在redis安装文件夹中修改文件redis.conf,使得redis在后台运行

1
2
3
vim redis.conf
#修改daemonize yes

5.启动redis

1
2
3
redis-server redis.conf
redis-cli -p 6379

全文 >>

Spark学习笔记——构建分类模型

Spark中常见的三种分类模型:线性模型、决策树和朴素贝叶斯模型。

线性模型,简单而且相对容易扩展到非常大的数据集;线性模型又可以分成:1.逻辑回归;2.线性支持向量机

决策树是一个强大的非线性技术,训练过程计算量大并且较难扩展(幸运的是,MLlib会替我们考虑扩展性的问题),但是在很多情况下性能很好;

朴素贝叶斯模型简单、易训练,并且具有高效和并行的优点(实际中,模型训练只需要遍历所有数据集一次)。当采用合适的特征工程,这些模型在很多应用中都能达到不错的性能。而且,朴素贝叶斯模型可以作为一个很好的模型测试基准,用于比较其他模型的性能。

全文 >>

Python爬虫——使用request请求网页

1.安装request

1
2
pip install requests

2.请求网页

下载地址:http://phantomjs.org/download.html

1
2
3
4
>>> import requests
>>> r = requests.get('https://wwww.baidu.com')
>>> print(r.text)

3.请求失败重试

如果请求失败的话,可以使用urllib3的Retry来进行重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import requests
from requests.adapters import HTTPAdapter
from urllib3.util import Retry

try:
retry = Retry(
total=5,
backoff_factor=2,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry)
session = requests.Session()
session.mount('https://', adapter)
r = session.get('https://httpbin.org/status/502', timeout=180)
print(r.status_code)
except Exception as e:
print(e)

参考:https://oxylabs.io/blog/python-requests-retry

4.使用lxml解析网页

可以安装xpath helper插件来测试xpath,https://chromewebstore.google.com/detail/xpath-helper/hgimnogjllphhhkhlmebbmlgjoejdpjl,使用如下

1
2
3
4
5
6
from lxml import etree

tree = etree.HTML(r.content)
text = tree.xpath('/html/body/div/header/div/nav/ul/li')
print(text)

全文 >>

Spark学习笔记——Spark上数据的获取、处理和准备

数据获得的方式多种多样,常用的公开数据集包括:

1.UCL机器学习知识库:包括近300个不同大小和类型的数据集,可用于分类、回归、聚类和推荐系统任务。数据集列表位于:http://archive.ics.uci.edu/ml/

2.Amazon AWS公开数据集:包含的通常是大型数据集,可通过Amazon S3访问。这些数据集包括人类基因组项目、Common Crawl网页语料库、维基百科数据和Google Books Ngrams。相关信息可参见:http://aws.amazon.com/publicdatasets/

3.Kaggle:这里集合了Kaggle举行的各种机器学习竞赛所用的数据集。它们覆盖分类、回归、排名、推荐系统以及图像分析领域,可从Competitions区域下载: http://www.kaggle.com/competitions

4.KDnuggets:这里包含一个详细的公开数据集列表,其中一些上面提到过的。该列表位于:http://www.kdnuggets.com/datasets/index.html

全文 >>

Spark学习笔记——构建基于Spark的推荐引擎

推荐模型

推荐模型的种类分为:

1.基于内容的过滤:基于内容的过滤利用物品的内容或是属性信息以及某些相似度定义,来求出与该物品类似的物品。

2.协同过滤:协同过滤是一种借助众包智慧的途径。它利用大量已有的用户偏好来估计用户对其未接触过的物品的喜好程度。其内在思想是相似度的定义。

在基于用户的方法的中,如果两个用户表现出相似的偏好(即对相同物品的偏好大体相同),那就认为他们的兴趣类似。

同样也可以借助基于物品的方法来做推荐。这种方法通常根据现有用户对物品的偏好或是评级情况,来计算物品之间的某种相似度。

3.矩阵分解:

3.1. 显式矩阵分解

例如我们可以得到多个用户对多部电影的评级的数据,这样我们就可以得到一个用户—电影评级的矩阵

我们所得到的这个矩阵是稀疏的,假设得到的**“用户—物品”矩阵的维度为U×I,我们需要对其进行降维,然后得到一个表示用户的U×k维矩阵和一个表示物品的k×I维矩阵**。

**要计算给定用户对某个物品的预计评级 **:

只需要从用户因子矩阵和物品因子矩阵分别选取相应的行(用户因子向量)与列(物品因子向量),然后计算两者的点积即可

对于物品之间相似度的计算,可以用最近邻模型中用到的相似度衡量方法。不同的是,这里可以直接利用物品因子向量,将相似度计算转换为对两物品因子向量之间相似度的计算

全文 >>

Spark学习笔记——基于MLlib的机器学习

使用MLlib库中的机器学习算法对垃圾邮件进行分类

分类的垃圾邮件的如图中分成4个文件夹,两个文件夹是训练集合,两个文件夹是测试集合

build.sbt文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
name := "spark-first"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % "2.1.0",
"org.apache.hadoop" % "hadoop-common" % "2.7.2",
"mysql" % "mysql-connector-java" % "5.1.31",
"org.apache.spark" %% "spark-sql" % "2.1.0",
"org.apache.spark" %% "spark-streaming" % "2.1.0",
"org.apache.spark" % "spark-mllib_2.11" % "2.1.0"
)

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import org.apache.hadoop.io.{IntWritable, LongWritable, MapWritable, Text}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.spark.sql.SQLContext
import java.util.Properties

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds

import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD

/**
* Created by common on 17-4-6.
*/
object SparkRDD {

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)

val spam = sc.textFile("input/email/spam")
val normal = sc.textFile("input/email/ham")

// 创建一个HashingTF实例来把邮件文本映射为包含10000个特征的向量
val tf = new HashingTF(numFeatures = 10000)
// 各邮件都被切分为单词,每个单词被映射为一个特征
val spamFeatures = spam.map(email => tf.transform(email.split(" ")))
val normalFeatures = normal.map(email => tf.transform(email.split(" ")))
// 创建LabeledPoint数据集分别存放阳性(垃圾邮件)和阴性(正常邮件)的例子
val positiveExamples = spamFeatures.map(features => LabeledPoint(1, features))
val negativeExamples = normalFeatures.map(features => LabeledPoint(0, features))
val trainingData = positiveExamples.union(negativeExamples)
trainingData.cache() // 因为逻辑回归是迭代算法,所以缓存训练数据RDD
// 使用SGD算法运行逻辑回归
val model = new LogisticRegressionWithSGD().run(trainingData)
// 以阳性(垃圾邮件)和阴性(正常邮件)的例子分别进行测试
val posTest = tf.transform(
"Experience with BiggerPenis Today! Grow 3-inches more ...".split(" "))
val negTest = tf.transform(
"That is cold. Is there going to be a retirement party? ...".split(" "))
println("Prediction for positive test example: " + model.predict(posTest))
println("Prediction for negative test example: " + model.predict(negTest))

}
}

全文 >>