/** * Created by common on 17-4-6. */ object SparkRDD {
def main(args: Array[String]) { val conf = new SparkConf().setAppName("WordCount").setMaster("local") val sc = new SparkContext(conf)
val spam = sc.textFile("input/email/spam") val normal = sc.textFile("input/email/ham")
// 创建一个HashingTF实例来把邮件文本映射为包含10000个特征的向量 val tf = new HashingTF(numFeatures = 10000) // 各邮件都被切分为单词,每个单词被映射为一个特征 val spamFeatures = spam.map(email => tf.transform(email.split(" "))) val normalFeatures = normal.map(email => tf.transform(email.split(" "))) // 创建LabeledPoint数据集分别存放阳性(垃圾邮件)和阴性(正常邮件)的例子 val positiveExamples = spamFeatures.map(features => LabeledPoint(1, features)) val negativeExamples = normalFeatures.map(features => LabeledPoint(0, features)) val trainingData = positiveExamples.union(negativeExamples) trainingData.cache() // 因为逻辑回归是迭代算法,所以缓存训练数据RDD // 使用SGD算法运行逻辑回归 val model = new LogisticRegressionWithSGD().run(trainingData) // 以阳性(垃圾邮件)和阴性(正常邮件)的例子分别进行测试 val posTest = tf.transform( "Experience with BiggerPenis Today! Grow 3-inches more ...".split(" ")) val negTest = tf.transform( "That is cold. Is there going to be a retirement party? ...".split(" ")) println("Prediction for positive test example: " + model.predict(posTest)) println("Prediction for negative test example: " + model.predict(negTest))