tonglin0325的个人主页

Solr学习笔记——导入JSON数据

1.导入JSON数据的方式有两种,一种是在web管理界面中导入,另一种是使用curl命令来导入

1
2
curl http://localhost:8983/solr/baikeperson/update/json?commit=true --data-binary @/home/XXX/下载/person/test1.json -H 'Content-type:text/json; charset=utf-8'

2.导入的时候注意格式

使用curl可以导入的格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
{
  "add": {
    "overwrite": true,
    "doc": {
      "id": 1,
      "name": "Some book",
      "author": ["John", "Marry"]
    }
  },
  "add": {
    "overwrite": true,
    "boost": 2.5,
    "doc": {
      "id": 2,
      "name": "Important Book",
      "author": ["Harry", "Jane"]
    }
  },
  "add": {
    "overwrite": true,
    "doc": {
      "id": 3,
      "name": "Some other book",
      "author": "Marry"
    }
  }
}

 

在web界面中可以导入的格式

1
2
{"title":"许宝江","url":"7254863","chineseName":"许宝江","sex":"男","occupation":" 滦县农业局局长","nationality":"中国"}

不可以导入的格式

1
2
3
{"title":"鲍志成","url":"2074015","chineseName":"鲍志成","occupation":"医师","nationality":"中国","birthDate":"1901年","deathDate":"1973年","graduatedFrom":"香港大学"}
{"title":"许宝江","url":"7254863","chineseName":"许宝江","sex":"男","occupation":" 滦县农业局局长","nationality":"中国"}

全文 >>

Solr学习笔记——查询

1.进入Solr管理界面http://localhost:8983/solr/

可以看到Query中有若干的参数,其意义如下(参考:http://www.jianshu.com/p/3c4cae5dee8d

Solr的查询语法:

Solr默认有三种查询解析器(Query Parser):

  • Standard Query Parser
  • DisMax Query Parser
  • Extended DisMax Query Parser (eDisMax)

第一种是标准的Parser,最后一种是最强大的,也是Sunspot默认使用的Parser。

支持的参数:#

  • defType: 选择查询解析器类型,例如dismax, edismax
  • q:主查询参数(field_name:value)
  • sort:排序,例如score desc,price asc
  • start:起始的数据偏移offset,用于分页
  • raws:一次返回的数量,用于分页
  • fq:filter query 返回结果的过滤查询
  • fl:fields to list 返回的字段(*, score)
  • debug:返回调试信息,debug=timing,debug=results
  • timeAllowed:超时时间
  • wt:response writer返回的响应格式

下面是DisMax Parser可以使用的:

  • qf:query fields,指定查询的字段,指定solr从哪些field中搜索,没有值的时候使用df
  • mm:最小匹配比例
  • pf:phrase fields
  • ps:phrase slop
  • qs:query phrase slop

特殊符号意义:#

  • ?:te?t 单个字符匹配
  • :tes 多个字符匹配
  • :fuzzy searches(模糊匹配),roam,roams/foam/foams
  • count:{1 TO 10}:range search 范围检索
  • ^:Boosting a Term(升级权重),jakarta^4 apache, “酒店”^4 “宾馆”
  • ^=:Constant Score with(指定分数),(description:blue OR color:blue)^=1.0 text:shoes

逻辑操作#

  • AND 或者 &&
  • NOT 或者 !
  • OR 或者 !!
    • 必须满足
    • 剔除,比如 title: -安徽,返回的是title中不含有”安徽”的所有结果

 

Ubuntu下安装Solr

1.在清华开源软件镜像站或者http://www.us.apache.org/dist/

下载Solr的安装包,我下载的是solr-6.5.1.tgz

2.解压并移动到/usr/local目录下

3.安装Solr需要安装Java环境,假设Java环境是安装好的

4.解压solr-6.5.1.tgz目录中的install_solr_service.sh文件

1
2
tar zxvf solr-6.5.1.tgz solr-6.5.1/bin/install_solr_service.sh --strip-components=2

5.运行这个脚本,进行安装

1
2
sudo bash ./install_solr_service.sh solr-6.5.1.tgz

6.如果安装失败的话,使用下面的命令删除Solr,然后重新安装

1
2
3
4
5
6
7
8
sudo service solr stop
sudo rm -r /var/solr
sudo rm -r /opt/solr-6.5.1
sudo rm -r /opt/solr
sudo rm /etc/init.d/solr
sudo deluser --remove-home solr
sudo deluser --group solr

全文 >>

Spark学习笔记——文本处理技术

1.建立TF-IDF模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.linalg.{SparseVector => SV}
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.feature.IDF

/**
* Created by common on 17-5-6.
*/
object TFIDF {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)

// val path = "hdfs://master:9000/user/common/20Newsgroups/20news-bydate-train/*"
val path = "file:///media/common/工作/kaggle/test/*"
val rdd = sc.wholeTextFiles(path)

// 提取文本信息
val text = rdd.map { case (file, text) => text }
// print(text.count())

val regex = """[^0-9]*""".r

// 排除停用词
val stopwords = Set(
"the", "a", "an", "of", "or", "in", "for", "by", "on", "but", "is", "not",
"with", "as", "was", "if",
"they", "are", "this", "and", "it", "have", "from", "at", "my",
"be", "that", "to"
)

// 以使用正则表达切分原始文档来移除这些非单词字符
val nonWordSplit = text.flatMap(t =>
t.split("""\W+""").map(_.toLowerCase))

// 过滤掉数字和包含数字的单词
val filterNumbers = nonWordSplit.filter(token =>
regex.pattern.matcher(token).matches)

// 基于出现的频率,排除很少出现的单词,需要先计算一遍整个测试集
val tokenCounts = filterNumbers.map(t => (t, 1)).reduceByKey(_ + _)
val rareTokens = tokenCounts.filter { case (k, v) => v < 2 }.map {
case (k, v) => k
}.collect.toSet

// 每一个文档的预处理函数
def tokenize(line: String): Seq[String] = {
line.split("""\W+""")
.map(_.toLowerCase)
.filter(token => regex.pattern.matcher(token).matches)
.filterNot(token => stopwords.contains(token))
.filterNot(token => rareTokens.contains(token))
.filter(token => token.size >= 2) //删除只有一个字母的单词
.toSeq
}

// 每一篇文档经过预处理之后,每一个文档成为一个Seq[String]
val tokens = text.map(doc => tokenize(doc)).cache()

println(tokens.distinct.count)
// 第一篇文档第一部分分词之后的结果
println(tokens.first())
println(tokens.first().length)

// 生成2^18维的特征
val dim = math.pow(2, 18).toInt
val hashingTF = new HashingTF(dim)

// HashingTF 的 transform 函数把每个输入文档(即词项的序列)映射到一个MLlib的Vector对象
val tf = hashingTF.transform(tokens)
// tf的长度是文档的个数,对应的是文档和维度的矩阵
tf.cache

// 取得第一个文档的向量
val v = tf.first.asInstanceOf[SV]
println(v.size)
// v.value和v.indices的长度相等,value是词频,indices是词频非零的下标
println(v.values.size)
println(v.indices.size)
println(v.values.toSeq)
println(v.indices.take(10).toSeq)

// 对每个单词计算逆向文本频率
val idf = new IDF().fit(tf)
// 转换词频向量为TF-IDF向量
val tfidf = idf.transform(tf)
val v2 = tfidf.first.asInstanceOf[SV]
println(v2.values.size)
println(v2.values.take(10).toSeq)
println(v2.indices.take(10).toSeq)

// 计算整个文档的TF-IDF最小和最大权值
val minMaxVals = tfidf.map { v =>
val sv = v.asInstanceOf[SV]
(sv.values.min, sv.values.max)
}
val globalMinMax = minMaxVals.reduce { case ((min1, max1),
(min2, max2)) =>
(math.min(min1, min2), math.max(max1, max2))
}
println(globalMinMax)

// 比较几个单词的TF-IDF权值
val common = sc.parallelize(Seq(Seq("you", "do", "we")))
val tfCommon = hashingTF.transform(common)
val tfidfCommon = idf.transform(tfCommon)
val commonVector = tfidfCommon.first.asInstanceOf[SV]
println(commonVector.values.toSeq)

val uncommon = sc.parallelize(Seq(Seq("telescope", "legislation","investment")))
val tfUncommon = hashingTF.transform(uncommon)
val tfidfUncommon = idf.transform(tfUncommon)
val uncommonVector = tfidfUncommon.first.asInstanceOf[SV]
println(uncommonVector.values.toSeq)

}


}

 

Spark学习笔记——spark listener

spark可以使用SparkListener API在spark运行的过程中监控spark任务当前的运行状态,参考:SparkListener监听使用方式及自定义的事件处理动作

编写 MySparkAppListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.bigdata.spark

import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart}

class MySparkAppListener extends SparkListener with Logging {

// 启动事件
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
val appId = applicationStart.appId
logInfo("spark job start => " + appId.get)
}

// 结束事件
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
logInfo("spark job end => " + applicationEnd.time)
}
}

添加 spark.extraListeners 参数

1
2
3
4
5
6
val sparkSession = SparkSession.builder()
.master("local")
.config("spark.extraListeners", "com.bigdata.spark.MySparkAppListener")
.appName("spark session example")
.getOrCreate()

运行任务后就可以在日志当中看到对应的日志

1
2
3
4
21/12/27 23:13:46 INFO MySparkAppListener: spark job start => local-1640618026361

21/12/27 23:13:48 INFO MySparkAppListener: spark job end => 1640618028287

还有其他的事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
abstract class SparkListener extends SparkListenerInterface {
//阶段完成时触发的事件
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { }

//阶段提交时触发的事件
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { }

//任务启动时触发的事件
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }

//下载任务结果的事件
override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = { }

//任务结束的事件
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { }

//job启动的事件
override def onJobStart(jobStart: SparkListenerJobStart): Unit = { }

//job结束的事件
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { }

//环境变量被更新的事件
override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { }

//块管理被添加的事件
override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { }

override def onBlockManagerRemoved(
blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { }

//取消rdd缓存的事件
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = { }

//app启动的事件
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { }

//app结束的事件 [以下各事件也如同函数名所表达各个阶段被触发的事件不在一一标注]
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { }

override def onExecutorMetricsUpdate(
executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { }

override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { }

override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }

override def onExecutorBlacklisted(
executorBlacklisted: SparkListenerExecutorBlacklisted): Unit = { }

override def onExecutorUnblacklisted(
executorUnblacklisted: SparkListenerExecutorUnblacklisted): Unit = { }

override def onNodeBlacklisted(
nodeBlacklisted: SparkListenerNodeBlacklisted): Unit = { }

override def onNodeUnblacklisted(
nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { }

override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }

override def onOtherEvent(event: SparkListenerEvent): Unit = { }
}

 

全文 >>

Ubuntu安装redis缓存数据库

参考:http://blog.csdn.net/xiangwanpeng/article/details/54586087

1.在下载目录下

1
2
sudo wget http://download.redis.io/releases/redis-3.2.6.tar.gz

2.解压,并复制到/usr/local目录下

1
2
3
tar -zxvf redis-3.2.6.tar.gz
mv redis-3.2.6 /usr/local/

3.编译和安装

1
2
3
4
cd /redis
sudo make
sudo make install

4.在redis安装文件夹中修改文件redis.conf,使得redis在后台运行

1
2
3
vim redis.conf
#修改daemonize yes

5.启动redis

全文 >>

Spark学习笔记——构建分类模型

Spark中常见的三种分类模型:线性模型、决策树和朴素贝叶斯模型。

线性模型,简单而且相对容易扩展到非常大的数据集;线性模型又可以分成:1.逻辑回归;2.线性支持向量机

决策树是一个强大的非线性技术,训练过程计算量大并且较难扩展(幸运的是,MLlib会替我们考虑扩展性的问题),但是在很多情况下性能很好;

朴素贝叶斯模型简单、易训练,并且具有高效和并行的优点(实际中,模型训练只需要遍历所有数据集一次)。当采用合适的特征工程,这些模型在很多应用中都能达到不错的性能。而且,朴素贝叶斯模型可以作为一个很好的模型测试基准,用于比较其他模型的性能。

 

现在我们采用的数据集是stumbleupon,这个数据集是主要是一些网页的分类数据

内容样例:String = “http://www.bloomberg.com/news/2010-12-23/ibm-predicts-holographic-calls-air-breathing-batteries-by-2015.html"    ”4042”    ”{“”title””:””IBM Sees Holographic Calls Air Breathing Batteries ibm sees holographic calls, air-breathing batteries””,””body””:””A sign stands outside the International Business Machines Corp IBM Almaden Research Center campus in San Jose California Photographer Tony Avelar Bloomberg Buildings stand at the International Business Machines Corp IBM Almaden Research Center campus in the Santa Teresa Hills of San Jose California Photographer Tony Avelar Bloomberg By 2015 your mobile phone will project a 3 D image of anyone who calls and your laptop will be powered by kinetic energy At least that s what International Business Machines Corp sees in its crystal …

开始四列分别包含 URL 、页面的 ID 、原始的文本内容和分配给页面的类别。接下来 22 列包含各种各样的数值或者类属特征。最后一列为目标值, -1 为长久, 0 为短暂。

 

1
2
3
4
val rawData = sc.textFile("/user/common/stumbleupon/train_noheader.tsv")
val records = rawData.map(line => line.split("\t"))
records.first()

由于数据格式的问题,我们做一些数据清理的工作,在处理过程中把额外的( “ )去掉。数据集中还有一些用 “?” 代替的缺失数据,本例中,我们直接用 0 替换那些缺失数据

在清理和处理缺失数据后,我们提取最后一列的标记变量以及第 5 列到第 25 列的特征矩阵。将标签变量转换为 Int 值,特征向量转换为 Double 数组。

最后,我们将标签和和特征向量转换为 LabeledPoint 实例,从而将特征向量存储到 MLlib 的 Vector 中。

全文 >>

Python爬虫——使用request请求网页

1.安装request

1
2
pip install requests

2.请求网页

下载地址:http://phantomjs.org/download.html

1
2
3
4
>>> import requests
>>> r = requests.get('https://wwww.baidu.com')
>>> print(r.text)

3.请求失败重试

如果请求失败的话,可以使用urllib3的Retry来进行重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import requests
from requests.adapters import HTTPAdapter
from urllib3.util import Retry

try:
retry = Retry(
total=5,
backoff_factor=2,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry)
session = requests.Session()
session.mount('https://', adapter)
r = session.get('https://httpbin.org/status/502', timeout=180)
print(r.status_code)
except Exception as e:
print(e)

参考:https://oxylabs.io/blog/python-requests-retry

4.使用lxml解析网页

可以安装xpath helper插件来测试xpath,https://chromewebstore.google.com/detail/xpath-helper/hgimnogjllphhhkhlmebbmlgjoejdpjl,使用如下

1
2
3
4
5
6
from lxml import etree

tree = etree.HTML(r.content)
text = tree.xpath('/html/body/div/header/div/nav/ul/li')
print(text)

 

全文 >>

Spark学习笔记——Spark上数据的获取、处理和准备

数据获得的方式多种多样,常用的公开数据集包括:

1.UCL机器学习知识库:包括近300个不同大小和类型的数据集,可用于分类、回归、聚类和推荐系统任务。数据集列表位于:http://archive.ics.uci.edu/ml/

2.Amazon AWS公开数据集:包含的通常是大型数据集,可通过Amazon S3访问。这些数据集包括人类基因组项目、Common Crawl网页语料库、维基百科数据和Google Books Ngrams。相关信息可参见:http://aws.amazon.com/publicdatasets/

3.Kaggle:这里集合了Kaggle举行的各种机器学习竞赛所用的数据集。它们覆盖分类、回归、排名、推荐系统以及图像分析领域,可从Competitions区域下载: http://www.kaggle.com/competitions

4.KDnuggets:这里包含一个详细的公开数据集列表,其中一些上面提到过的。该列表位于:http://www.kdnuggets.com/datasets/index.html

 

下面采用的数据集是MovieLens 100k数据集,MovieLens 100k数据集包含表示多个用户对多部电影的10万次评级数据,也包含电影元数据和用户属性信息。

在目录下,可以查看文件中的前5行的数据

1
2
3
4
5
6
7
head -5 u.user
1|24|M|technician|85711
2|53|F|other|94043
3|23|M|writer|32067
4|24|M|technician|43537
5|33|F|other|15213

现在使用Spark交互式终端来对数据进行可视化的操作,以直观的了解数据的情况

1.安装ipython

IPython是针对Python的一个高级交互式壳程序,包含内置一系列实用功能的pylab,其中有NumPy和SciPy用于数值计算,以及matplotlib用于交互式绘图和可视化

1
sudo apt-get install ipython

2.安装anaconda,安装的文件是Anaconda2-4.3.1-Linux-x86_64.sh,可以在清华的开源软件镜像站下载

一个预编译的科学Python套件

全文 >>