`
Kevin12
  • 浏览: 229874 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Spark SQL操作Hive数据库

阅读更多
本次例子通过scala编程实现Spark SQL操作Hive数据库!
Hadoop集群搭建:http://kevin12.iteye.com/blog/2273532
Spark集群搭建:http://kevin12.iteye.com/blog/2273532

数据准备
在/usr/local/sparkApps/SparkSQL2Hive/resources/目录下创建people.txt内容如下,name和age之间是"\t"分割
Michael    20
Andy    17
Justin    19
创建份数peopleScores.txt,内容如下,name和score之间用“\t”分割
Michael    98
Andy    95
Justin    68

代码实现
package com.imf.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.hive.HiveContext
/**
 * 通过spark sql操作hive数据源
 */
object SparkSQL2Hive {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf();
    conf.setAppName("SparkSQL2Hive for scala")
    conf.setMaster("spark://master1:7077")

    val sc = new SparkContext(conf)
    val hiveContext = new HiveContext(sc)
    //用户年龄
    hiveContext.sql("use testdb")
    hiveContext.sql("DROP TABLE IF EXISTS people")
    hiveContext.sql("CREATE TABLE IF NOT EXISTS people(name STRING, age INT)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' LINES TERMINATED BY '\\n'")
    //把本地数据加载到hive中(实际上发生了数据拷贝),也可以直接使用HDFS中的数据
    hiveContext.sql("LOAD DATA LOCAL INPATH '/usr/local/sparkApps/SparkSQL2Hive/resources/people.txt' INTO TABLE people")
    //用户份数
    hiveContext.sql("use testdb")
    hiveContext.sql("DROP TABLE IF EXISTS peopleScores")
    hiveContext.sql("CREATE TABLE IF NOT EXISTS peopleScores(name STRING, score INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' LINES TERMINATED BY '\\n'")
    hiveContext.sql("LOAD DATA LOCAL INPATH '/usr/local/sparkApps/SparkSQL2Hive/resources/peopleScore.txt' INTO TABLE peopleScores")

    /**
     * 通过HiveContext使用join直接基于hive中的两种表进行操作
     */
   val resultDF = hiveContext.sql("select pi.name,pi.age,ps.score "
                      +" from people pi join peopleScores ps on pi.name=ps.name"
                      +" where ps.score>90");
    /**
     * 通过saveAsTable创建一张hive managed table,数据的元数据和数据即将放的具体位置都是由
     * hive数据仓库进行管理的,当删除该表的时候,数据也会一起被删除(磁盘的数据不再存在)
     */
    hiveContext.sql("drop table if exists peopleResult")
    resultDF.saveAsTable("peopleResult")

    /**
     * 使用HiveContext的table方法可以直接读取hive数据仓库的Table并生成DataFrame,
     * 接下来机器学习、图计算、各种复杂的ETL等操作
     */
    val dataframeHive = hiveContext.table("peopleResult")
    dataframeHive.show()


  }
}


调度脚本
并将上面的程序打包成SparkSQL2Hive.jar,将SparkSQL2Hive.jar拷贝到/usr/local/sparkApps/SparkSQL2Hive/目录下面,并创建调度脚本run.sh,内容如下:
/usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit \
--class com.imf.spark.sql.SparkSQL2Hive \
--files /usr/local/hive/apache-hive-1.2.1-bin/conf/hive-site.xml \
--master spark://master1:7077 \
/usr/local/sparkApps/SparkSQL2Hive/SparkSQL2Hive.jar


#如果已经将msyql的驱动放到了spark的lib目录下面,则不用在添加下面的mysql的驱动了
#--driver-class-path /usr/local/hive/apache-hive-1.2.1-bin/lib/mysql-connector-java-5.1.35-bin.jar \


执行结果



详细执行的日志见附件 run.log


用hive来查看表内容和执行结果
root@master1:/usr/local/tools# hive
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/hadoop/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/spark/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/hadoop/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/spark/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

Logging initialized using configuration in jar:file:/usr/local/hive/apache-hive-1.2.1-bin/lib/hive-common-1.2.1.jar!/hive-log4j.properties
hive> show databases;
OK
default
testdb
Time taken: 1.013 seconds, Fetched: 2 row(s)
hive> use testdb;
OK
Time taken: 0.103 seconds
hive> show tables;
OK
people
peopleresult
peoplescores
student
student2
student3
student4
tbsogou
tmp_pre_hour_seach_info
Time taken: 0.082 seconds, Fetched: 9 row(s)
hive> select * from people;
OK
Michael    20
Andy    17
Justin    19
Time taken: 1.252 seconds, Fetched: 3 row(s)
hive> select * from peoplescores;
OK
Michael    98
Andy    95
Justin    68
Time taken: 0.142 seconds, Fetched: 3 row(s)
hive> select * from peopleresult;
OK
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Michael    20    98
Andy    17    95
Time taken: 0.298 seconds, Fetched: 2 row(s)
hive> 


至此,通过SparkSQL操作hive数据库成功!



  • 大小: 6.3 KB
分享到:
评论
2 楼 lwb314 2018-05-21  
你的这个是创建的临时的hive表,数据也是通过文件录入进去的,如想读取hive数据库的数据应该怎么写?
1 楼 yixiaoqi2010 2017-04-13  
你好  我的提交上去  总是报错,找不到hive表,可能是哪里的原因呢,--files也加上了,
但是我在./spark-shell --master 上就能访问hive表,不知道是什么问题 

相关推荐

Global site tag (gtag.js) - Google Analytics