1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
| import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.linalg.{SparseVector => SV} import org.apache.spark.mllib.feature.HashingTF import org.apache.spark.mllib.feature.IDF
/** * Created by common on 17-5-6. */ object TFIDF {
def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("WordCount").setMaster("local") val sc = new SparkContext(conf)
// val path = "hdfs://master:9000/user/common/20Newsgroups/20news-bydate-train/*" val path = "file:///media/common/工作/kaggle/test/*" val rdd = sc.wholeTextFiles(path)
// 提取文本信息 val text = rdd.map { case (file, text) => text } // print(text.count())
val regex = """[^0-9]*""".r
// 排除停用词 val stopwords = Set( "the", "a", "an", "of", "or", "in", "for", "by", "on", "but", "is", "not", "with", "as", "was", "if", "they", "are", "this", "and", "it", "have", "from", "at", "my", "be", "that", "to" )
// 以使用正则表达切分原始文档来移除这些非单词字符 val nonWordSplit = text.flatMap(t => t.split("""\W+""").map(_.toLowerCase))
// 过滤掉数字和包含数字的单词 val filterNumbers = nonWordSplit.filter(token => regex.pattern.matcher(token).matches)
// 基于出现的频率,排除很少出现的单词,需要先计算一遍整个测试集 val tokenCounts = filterNumbers.map(t => (t, 1)).reduceByKey(_ + _) val rareTokens = tokenCounts.filter { case (k, v) => v < 2 }.map { case (k, v) => k }.collect.toSet
// 每一个文档的预处理函数 def tokenize(line: String): Seq[String] = { line.split("""\W+""") .map(_.toLowerCase) .filter(token => regex.pattern.matcher(token).matches) .filterNot(token => stopwords.contains(token)) .filterNot(token => rareTokens.contains(token)) .filter(token => token.size >= 2) //删除只有一个字母的单词 .toSeq }
// 每一篇文档经过预处理之后,每一个文档成为一个Seq[String] val tokens = text.map(doc => tokenize(doc)).cache()
println(tokens.distinct.count) // 第一篇文档第一部分分词之后的结果 println(tokens.first()) println(tokens.first().length)
// 生成2^18维的特征 val dim = math.pow(2, 18).toInt val hashingTF = new HashingTF(dim)
// HashingTF 的 transform 函数把每个输入文档(即词项的序列)映射到一个MLlib的Vector对象 val tf = hashingTF.transform(tokens) // tf的长度是文档的个数,对应的是文档和维度的矩阵 tf.cache
// 取得第一个文档的向量 val v = tf.first.asInstanceOf[SV] println(v.size) // v.value和v.indices的长度相等,value是词频,indices是词频非零的下标 println(v.values.size) println(v.indices.size) println(v.values.toSeq) println(v.indices.take(10).toSeq)
// 对每个单词计算逆向文本频率 val idf = new IDF().fit(tf) // 转换词频向量为TF-IDF向量 val tfidf = idf.transform(tf) val v2 = tfidf.first.asInstanceOf[SV] println(v2.values.size) println(v2.values.take(10).toSeq) println(v2.indices.take(10).toSeq)
// 计算整个文档的TF-IDF最小和最大权值 val minMaxVals = tfidf.map { v => val sv = v.asInstanceOf[SV] (sv.values.min, sv.values.max) } val globalMinMax = minMaxVals.reduce { case ((min1, max1), (min2, max2)) => (math.min(min1, min2), math.max(max1, max2)) } println(globalMinMax)
// 比较几个单词的TF-IDF权值 val common = sc.parallelize(Seq(Seq("you", "do", "we"))) val tfCommon = hashingTF.transform(common) val tfidfCommon = idf.transform(tfCommon) val commonVector = tfidfCommon.first.asInstanceOf[SV] println(commonVector.values.toSeq)
val uncommon = sc.parallelize(Seq(Seq("telescope", "legislation","investment"))) val tfUncommon = hashingTF.transform(uncommon) val tfidfUncommon = idf.transform(tfUncommon) val uncommonVector = tfidfUncommon.first.asInstanceOf[SV] println(uncommonVector.values.toSeq)
}
}
|