`
Kevin12
  • 浏览: 229875 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Spark内核架构

阅读更多
1.在将spark内核架构前,先了解一下Hadoop的MR,Hadoop的MR分为Map和Reduce阶段,在Map阶段产生的中间结果要写回到磁盘,它和Reduce之间还有Shuffle操作,这个操作需要从网络节点进行数据拷贝,大量时间耗费在网络传输上,网络传输导致Hadoop的MR慢的原因之一,所以在很多情况下只适合离线计算。
2.Spark的RDD(Resilient Distributed Dataset 弹性分布式数据集)
RDD是Spark对数据和计算的抽象,它表示已被分片的,不可变的并能够被并行操作的数据集合。对RDD操作分为两种:
Transformation:通过转换从一个或多个RDD生成新的RDD。
Action:从RDD生成最后的计算结果。Action的操作会导致调用RunJob。

虽然Spark是基于内存的计算,但是它也可以非内存的其他地方,下面列出了根据useDisk、useMemory、useOffHeap, deserialized、replication五个参数组合的12种存储级别。设置成OFF_HEAP时,RDD实际被保存到一个基于内存分布式文件系统Tachyon中。
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)

/**
 * :: DeveloperApi ::
 * Return the StorageLevel object with the specified name.
 */
@DeveloperApi
def fromString(s: String): StorageLevel = s match {
  case "NONE" => NONE
  case "DISK_ONLY" => DISK_ONLY
  case "DISK_ONLY_2" => DISK_ONLY_2
  case "MEMORY_ONLY" => MEMORY_ONLY
  case "MEMORY_ONLY_2" => MEMORY_ONLY_2
  case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER
  case "MEMORY_ONLY_SER_2" => MEMORY_ONLY_SER_2
  case "MEMORY_AND_DISK" => MEMORY_AND_DISK
  case "MEMORY_AND_DISK_2" => MEMORY_AND_DISK_2
  case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER
  case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2
  case "OFF_HEAP" => OFF_HEAP
  case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $s")
}

3.Spark Cluster中运行核心说明



Application:基于Spark的用户程序,包含了Driver程序和集群上的Executor;一般一个Application里面可以有多个Jobs,一般一个action就是一个job。Spark中的Application程序的运行不依赖Cluster Manager的,因为对于粗粒度程序运行初始化时Master已经为程序分配好了资源。
Driver Program:运行main函数并且创建SparkContext程序(SparkConf + SprarkContext);
Cluster Manager:在集群上获取资源的外部服务器(例如standalone,Messos,Yarn);
Worker Node : 集群中存放任何可以运行的代码节点;Worker上是不会运行程序代码的,它是管理该节点的内存CPU使用情况,会接受Master分配的资源(就是分配Executor这样的指令),并通过ExecutorRunner启动一个新的进程,进程里面有executor。即:Worker管理当前Node的资源资源,并接受Master的指令来分配具体的计算资源Executor(在新的进程中分配),Worker发心跳的时候,不会像master汇报资源,而是在master分配资源给worker时就知道worker的资源了。
Executor:是运行在Worker节点中的为当前应用程序开启的进程里面的一个对象,该对象负责了具体Task的运行,它是通过线程池并发执行和线程复用的方式。每个线程处理完分给自己的Task后将资源归还到线程池中。而 spark默认情况下为我们的程序开启一个jvm进程,jvm里面是线程池方式,通过线程处理具体Task任务,executor是进程中的对象;
Task:被送到某个Executor上的工作单元;
Job:包含很多任务并行计算,可以看做和Spark的action对应,Job一般是由Action触发的,在触发前会有一系列的RDD ,但是Action不会产生RDD.
Stage:一个Job会被拆分很多组任务,每组任务被称作Stage(就像MapReduce分map和reduce任务一样),不同Stage计算逻辑完全一样 只是计算的数据不同,同一个Stage内部计算逻辑是一样的;

Worker需要的资源是从下面的参数中获取:
3.1.spark-env.sh和spark-defaults.sh;
3.2.spark-submit提供的参数;
3.3.程序中SparkConf配置的参数;
参数使用的优先级是3>2>1;但是一般不在3中配置,而在2中进行配置,因为3中是编程式配置。
Spark的程序的运行有两种模式:Client Cluster;一个block是128MB,最后一条记录一般跨2个BLOCK;
4.Spark运行快绝对不是基于内存,有很多原因,最基本的是因为它的调度和容错;
下面进行解释:
4.1.Spark内核在需要计算触发job之前,会绘制一张DAG图(即计算路径的有向无环图),RDD是不可以变的lazy级别的,即在触发job之前都只是逻辑上的运算,并没有进行实际运算,这样可以最大化优化DAG;
4.2.Spark的RDD是基于工作集的,适合大量迭代计算;
4.3.RDD的写操作是粗粒度的,但是RDD的读操作既可以是粗粒度也可以是细粒度的;
4.4.Spark进行partition后再进行下一步Stage时会确定其位置,是更精致化的位置感知,RDD高效容错和弹性;


默认情况下有一台电脑专门用来提交spark程序,一般一定与spark cluster在同一个网络环境中(Driver频繁和Executors通信),且其配置和普通worker一致;
为什么不能够在IDE集成开发环境中直接发布Spark程序到Spark集群中
第一点:内存和Cores的限制,默认情况下Spark程序的Driver会在提交Spark程序的机器上,所以如果在IDE中提交程序的话,那IDE机器就必须非常强大
第二点:Driver要指挥Workers的运行并频繁的发生通信,如果开发环境IDE和Spark集群不在同样一个网络下,就会出现任务丢失,运行缓慢等多种不必要的问题;
第三点:这是不安全的!
TaskScheduler和SchedulerBackend负责具体Task的运行,使用的数据遵循本地性,是DAGSecheduler把job中RDD够成的DAG划分不同的Stage时确定的。
Task分为两种:最后一个Stage的Task称为ResultTask,产生Job的结果,其他前面的Stage的Task都是ShuffleMapTask,为下一阶段的Stage做数据准备,相当于MapReduce中的Mapper。

Spark内核架构图:


  • 大小: 376.7 KB
  • 大小: 154.9 KB
  • 大小: 73.9 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics