`
Kevin12
  • 浏览: 230554 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Spark RDD弹性表现和来源

阅读更多
hadoop 的MapReduce是基于数据集的,位置感知,容错 负载均衡
基于数据集的处理:从物理存储上加载数据,然后操作数据,然后写入物理存储设备;
基于数据集的操作不适应的场景:
1,不适合于大量的迭代
2,交互式查询
重点是:基于数据流的方式 不能够复用曾经的结果或者中间计算结果;

spark RDD是基于工作集的
工作流和工作集的共同特点:位置感知,自动容错,负载均衡等。
spark的位置感知比hadoop的好很多,具体如下:
hadoop位置感知:hadoop进行partition之后就不管Reducer在哪里了。
spark的位置感知:spark进行partition后再进行下一步Stage时会确定其位置,是更精致化的。
RDD:Resillient Distributed Dataset
RDD的弹性表现:
1、弹性之一:自动的进行内存和磁盘数据存储的切换;
2、弹性之二:基于Lineage的高效容错(第n个节点出错,会从第n-1个节点恢复,血统容错);
3、弹性之三:Task如果失败会自动进行特定次数的重试(默认4次);
4、弹性之四:Stage如果失败会自动进行特定次数的重试(可以只运行计算失败的阶段);只计算失败的数据分片;
5、checkpoint和persist
6、数据调度弹性:DAG TASK 和资源 管理无关
7、数据分片的高度弹性(人工自由设置分片函数),repartition


Spark RDD来源:
1,使用程序中的集合创建RDD(用于小量测试);
2,使用本地文件系统创建RDD(测试大量数据);
3,使用HDFS创建RDD(生产环境最常用的RDD创建方式);
4,基于DB创建RDD;
5,基于NoSQL,例如HBase;
6,基于S3创建RDD;
7,基于数据流创建RDD;


前三种是比较基本的,后面4种是基于数据库的,要注意数据本地性(getPreferedLocations);
1.集合创建RDD方式
package com.imf.spark.rdd

import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by lujinyong168 on 2016/2/2.
  * DT大数据梦工厂-IMF
  * 使用程序中的集合创建RDD(用于小量测试)
  */
object RDDCreateByCollections {
def main(args: Array[String]) {
val conf = new SparkConf()//创建SparkConf对象
conf.setAppName("RDDCreateByCollections")//设置应用名称
conf.setMaster("local")
val sc = new SparkContext(conf)//创建SparkContext对象
    //创建一个Scala集合
val numbers = 1 to 100
val rdd = sc.parallelize(numbers)
//    val rdd = sc.parallelize(numbers,10)//设置并行度为10
val sum = rdd.reduce(_+_)
println("1+2+3+...+99+100="+sum)
  }
}


2.local模式创建RDD
package com.imf.spark.rdd

import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by lujinyong168 on 2016/2/2.
  * DT大数据梦工厂-IMF
  * 使用本地文件系统创建RDD(测试大量数据)
  * 统计文本中的字符个数
  */
object RDDCreateByLocal {
def main(args: Array[String]) {
val conf = new SparkConf()//创建SparkConf对象
conf.setAppName("RDDCreateByLocal")//设置应用名称
conf.setMaster("local")
val sc = new SparkContext(conf)//创建SparkContext对象
val rdd = sc.textFile("D://testspark//WordCount.txt")
val linesLen = rdd.map(line=>line.length)
val sum = linesLen.reduce(_+_)
println("The total characters of the file is : "+sum)
  }
}


3.HDFS模式创建RDD
package com.imf.spark.rdd

import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by lujinyong168 on 2016/2/2.
  * DT大数据梦工厂-IMF
  * 使用HDFS创建RDD(生产环境最常用的RDD创建方式)
  */
object RDDCreateByHDFS {
def main(args: Array[String]) {
val conf = new SparkConf()//创建SparkConf对象
conf.setAppName("RDDCreateByHDFS")//设置应用名称
conf.setMaster("local")
val sc = new SparkContext(conf)//创建SparkContext对象
val rdd = sc.textFile("/library/")
val linesLen = rdd.map(line=>line.length)
val sum = linesLen.reduce(_+_)
println("The total characters of the file is : "+sum)
  }
}


分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics