`
Kevin12
  • 浏览: 230701 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Spark SQL内置函数应用

阅读更多
简单说明
    使用Spark SQL中的内置函数对数据进行分析,Spark SQL API不同的是,DataFrame中的内置函数操作的结果返回一个Column对象,而DataFrame天生就是“A distributed collection of data organized into named columns”,这就为数据的复杂分析建立了坚实的基础并提供了极大的方便性,例如说,我们在操作DataFrame的方法中可以随时调用内置函数进行业务需要的处理,这之于我们构建附件的业务逻辑而言是可以极大的减少不必须的时间消耗(基本上就是实际模型的映射),让我们聚焦在数据分析上,这对于提高工程师的生产力而言是非常有价值的;
Spark 1.5.x开始提供了大量的内置函数,例如agg:
def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = {
   groupBy().agg(aggExpr, aggExprs : _*)
}

还有max,mean,min,sum,ave,explode,size,sort_array,day,to_date,abs,acros,asin,atan等
总体而言,内置函数包含了五大基本类型:
1.聚合函数,例如countDistinct,sumDistinct等;
2.集合函数,例如sort_array,explode等;
3.日期,时间函数,例如hour,quarter,next_day等;
4.数学函数,例如asin,atan,sqrt,tan,round等;
5.开窗函数,例如rowNumber等;
6.字符串函数,例如concat,format_number,rexexp_extract(正则)等;
7.其他函数,isNaN,sha,randn,callUDF等;

package com.imf.spark.sql

import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types.{StructType, IntegerType, StringType, StructField}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._
/**
  * Description:使用Spark SQL中的内置函数对数据进行分析
  * Author:lujinyong168
  * Date:2016/4/14 21:09
  */
object SparkSQLAgg {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
    conf.setAppName("SparkSQLAgg for scala")
    conf.setMaster("local")

val sc = new SparkContext(conf)
//    val sqlContext  = new HiveContext(sc)//构建上下文
val sqlContext  = new SQLContext(sc)//构建上下文

    //要使用Spark SQL 的内置函数,就一定要导入SQLContext下的隐式转换
import sqlContext.implicits._
/**
      * 简单模拟电商访问的数据
      */
val userData = Array(
"2016-04-15,1001,http://spark.apache.org,1000",
"2016-04-15,1001,http://hadoop.apache.org,1001",
"2016-04-15,1002,http://fink.apache.org,1002",
"2016-04-16,1003,http://kafka.apache.org,1020",
"2016-04-16,1004,http://spark.apache.org,1010",
"2016-04-16,1002,http://hive.apache.org,1200",
"2016-04-16,1001,http://parquet.apache.org,1500",
"2016-04-16,1001,http://spark.apache.org,1800"
)

val userDataRDD = sc.parallelize(userData)//生成RDD分布式集合对象


/**
      * 根据业务需要对数据进行预处理生成DataFrame,要想把RDD转换成DataFrame,需要先把RDD中的元素类型变成Row类型
      * 与此同时要提供DataFrame中的Columns的元素信息描述
      */
val userDataRDDRow = userDataRDD.map(row=> {val splited = row.split(",");Row(splited(0),splited(1).toInt,splited(2),splited(3).toInt)})
val structTypes = StructType(Array(
StructField("date",StringType,true),
StructField("id",IntegerType,true),
StructField("url",StringType,true),
StructField("amount",IntegerType,true)
    ))
val userDataDF = sqlContext.createDataFrame(userDataRDDRow,structTypes)

/**
      * 使用Spark SQL提供的内置函数对DataFrame进行操作,特别注意:内置函数生成的Column对象且自动进行CG(code generation)
      * 所有的内置函数操作结果都会返回具体的列
      * DataFrame中的列可以动态增长
      */
//userDataDF.groupBy("date").agg('date,countDistinct('id)).show()
//    userDataDF.groupBy("date").agg('date,countDistinct('id)).map(row => Row(row(1),row(2))).collect.foreach(println)//数据量比较大时不能用collect
userDataDF.groupBy("date").agg('date,sum('amount)).show()//对销售额统计

}
}


执行结果

userDataDF.groupBy("date").agg('date,countDistinct('id)).show()
返回的结果:
+----------+----------+---------+
|      date|      date|count(id)|
+----------+----------+---------+
|2016-04-15|2016-04-15|        2|
|2016-04-16|2016-04-16|        4|
+----------+----------+---------+
userDataDF.groupBy("date").agg('date,countDistinct('id)).map(row => Row(row(1),row(2))).collect.foreach(println)//数据量比较大时不能用collect
返回的结果:
[2016-04-15,2]
[2016-04-16,4]
userDataDF.groupBy("date").agg('date,sum('amount)).show()//对销售额统计
返回的结果:
+----------+----------+-----------+
|      date|      date|sum(amount)|
+----------+----------+-----------+
|2016-04-15|2016-04-15|       3003|
|2016-04-16|2016-04-16|       6530|
+----------+----------+-----------+



分享到:
评论

相关推荐

    Spark SQL操作JSON字段的小技巧

    主要给大家介绍了关于Spark SQL操作JSON字段的小技巧,文中通过示例代码介绍的非常详细,对大家学习或者使用spark sql具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧。

    Learning Spark SQL epub

    Learning Spark SQL 英文epub 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除

    Spark SQL常见4种数据源详解

    Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。 Spark SQL的默认数据源为Parquet...

    2015 Spark技术峰会-Spark SQL结构化数据分析-连城

    Databrciks工程师,Spark Committer,Spark SQL主要开发者之一的连城详细解读了“Spark SQL结构化数据分析”。他介绍了Spark1.3版本中的很多新特性。重点介绍了DataFrame。其从SchemaRDD演变而来,提供了更加高层...

    Spark SQL 表达式计算

    表达式计算在Spark SQL中随处可见,本演讲将简介表达式、UDF、UDAF、UDTF的概念,主要的API,以及如何扩展Spark SQL函数库。本演讲还将提及Catalyst在计划阶段和Project Tungsten在执行层做的优化,以及未来性能提升...

    Spark SQL 教学讲解PPT

    参考Spark官网以及一些文献,制作的Spark SQL教学幻灯片,适合进行Spark入门介绍与教学!所有的Spark教学系列都在我的资源内!

    Spark SQL 实验

    Spark SQL 详细介绍 实验介绍 有需要的尽快下载吧

    实训指导书_使用Spark SQL进行法律服务网站数据分析.zip

    实训指导书_使用Spark SQL进行法律服务网站数据分析.zip

    Learning Spark SQL - Aurobindo Sarkar

    Learning Spark SQL - Aurobindo Sarka

    spark读取hbase数据,并使用spark sql保存到mysql

    使用spark读取hbase中的数据,并插入到mysql中

    Spark SQL操作大全.zip

    Spark SQL操作大全.zip

    Spark SQL源码概览.zip

    Spark SQL源码概览.zip Spark SQL源码概览.zip Spark SQL源码概览.zip Spark SQL源码概览.zipSpark SQL源码概览.zip

    Spark SQL分批入库

    List<row> list= spark.sql(sql).collectAsList(),获或者其他方法将数据存在List里面,然后就list转为 Dataset分批入库

    spark sql解析-源码分析

    spark-sql解析-源码分析

    Spark SQL源码概览.pdf

    Spark SQL Core 封装 Catalyst,向应用程序提供 SparkSession、Dataset、 DataFrame 等 API(DataFrame 是 Dataset[Row]的别名);Spark SQL Hive 则提供操作 Hive 的 接口。本文主要关注查询执行过程,不涉及 ...

    基于成本的Spark SQL优化器框架

    我们把基于成本的优化器框架贡献给社区版本Spark 2.2。...这个基于成本的优化器框架对Spark SQL查询的性能有很好的提升 。在这次演讲中,我们将展示Spark SQL的新的基于成本的优化器框架及其对TPC-DS查询的性能影响。

    《Spark SQL编程指南(v1.1.0)

    spark官文文档翻译有关spark SQL编程指南,spark SQL是以测试组件,有关api的介绍

Global site tag (gtag.js) - Google Analytics