1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql._ import java.util.Properties
import com.google.common.collect.Lists import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType} import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{Result, Scan} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat
/** * Created by mi on 17-4-11. */
case class resultset(name: String, info: String, summary: String)
case class IntroItem(name: String, value: String)
case class BaikeLocation(name: String, url: String = "", info: Seq[IntroItem] = Seq(), summary: Option[String] = None)
case class MewBaikeLocation(name: String, url: String = "", info: Option[String] = None, summary: Option[String] = None)
object MysqlOpt {
def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("WordCount").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._
//定义数据库和表信息 val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&characterEncoding=UTF-8" val table = "baike_pages"
//读取parquetFile,并写入Mysql val sparkSession = SparkSession.builder() .master("local") .appName("spark session example") .getOrCreate() val parquetDF = sparkSession.read.parquet("/home/mi/coding/coding/baikeshow_data/baikeshow") // parquetDF.collect().take(20).foreach(println) //parquetDF.show()
//BaikeLocation是读取的parquet文件中的case class val ds = parquetDF.as[BaikeLocation].map { line => //把info转换为新的case class中的类型String val info = line.info.map(item => item.name + ":" + item.value).mkString(",") //注意需要把字段放在一个case class中,不然会丢失列信息 MewBaikeLocation(name = line.name, url = line.url, info = Some(info), summary = line.summary) }.cache()
ds.show() // ds.take(2).foreach(println)
//写入Mysql // val prop = new Properties() // prop.setProperty("user", "root") // prop.setProperty("password", "123456") // ds.write.mode(SaveMode.Append).jdbc(url, "baike_location", prop)
//写入parquetFile ds.repartition(10).write.parquet("/home/mi/coding/coding/baikeshow_data/baikeshow1")
}
}
|