1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
| import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql._ import java.util.Properties
import com.google.common.collect.Lists import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType} import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{Get, Put, Result, Scan} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf
/** * Created by mi on 17-4-11. */
case class resultset(name: String, info: String, summary: String)
case class IntroItem(name: String, value: String)
case class BaikeLocation(name: String, url: String = "", info: Seq[IntroItem] = Seq(), summary: Option[String] = None)
case class MewBaikeLocation(name: String, url: String = "", info: Option[String] = None, summary: Option[String] = None)
object MysqlOpt {
def main(args: Array[String]): Unit = {
// 本地模式运行,便于测试 val conf = new SparkConf().setAppName("WordCount").setMaster("local") // 创建 spark context val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._
//定义数据库和表信息 val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&characterEncoding=UTF-8" val table = "baike_pages"
// 读取Hbase文件,在hbase的/usr/local/hbase/conf/hbase-site.xml中写的地址 val hbasePath = "file:///usr/local/hbase/hbase-tmp"
// 创建hbase configuration val hBaseConf = HBaseConfiguration.create() hBaseConf.set(TableInputFormat.INPUT_TABLE, "student")
// 初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的! val jobConf = new JobConf(hBaseConf) jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE, "student")
val indataRDD = sc.makeRDD(Array("1,99,98","2,97,96","3,95,94"))
val rdd = indataRDD.map(_.split(',')).map{arr=>{ /*一个Put对象就是一行记录,在构造方法中指定主键 * 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换 * Put.add方法接收三个参数:列族,列名,数据 */ val put = new Put(Bytes.toBytes(arr(0))) put.add(Bytes.toBytes("course"),Bytes.toBytes("math"),Bytes.toBytes(arr(1))) put.add(Bytes.toBytes("course"),Bytes.toBytes("english"),Bytes.toBytes(arr(2))) //转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset (new ImmutableBytesWritable, put) }}
rdd.saveAsHadoopDataset(jobConf)
sc.stop()
}
}
|