IT虾米网

【spark】Shuffle过程解析详解

wyy 2018年07月12日 大数据 1324 0

Shuffle过程解析

ShuffleMapTask

ShuffleMapTask时shuffle过程的入口,runTask方法实现了shuffle的主要逻辑,runTask依赖ShuffleManager和ShuffleWriter实现具体的操作,其中ShuffleManager和ShuffleWriter在目前spark版本中都有多种实现,可以通过spark.shuffle.manager参数配置。

spark中task分为两类

  • Task

    • ResultTask
      一般一个Job的作业一个task为ResultTask,对应执行spark中action操作
    • ShuffleMapTask
      RDD遇到shuffle依赖,RDD将被分为多个不同的buckets,此时会执行ShuffleMapTask
  • ShuffleMapTask具体执行逻辑如下:

     override def runTask(context: TaskContext): MapStatus = { 
    // Deserialize the RDD using the broadcast variable. 
    val deserializeStartTime = System.currentTimeMillis() 
    val ser = SparkEnv.get.closureSerializer.newInstance() 
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( 
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) 
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime 
     
    metrics = Some(context.taskMetrics) 
    var writer: ShuffleWriter[Any, Any] = null 
    try { 
        //获取shuffleManager对象,目前有两种实现方式:hash[HashShuffleManager]和sort[SortShuffleManager] 
        //由spark.shuffle.manager参数空中,目前默认sort 
      val manager = SparkEnv.get.shuffleManager 
      //获取ShuffleWriter对象,目前有多种实现方式,ShuffleWriter主要负责写出shuffle文件 
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) 
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) 
      writer.stop(success = true).get 
    } catch { 
      case e: Exception => 
        try { 
          if (writer != null) { 
            writer.stop(success = false) 
          } 
        } catch { 
          case e: Exception => 
            log.debug("Could not stop writer", e) 
        } 
        throw e 
    } 
    } 
    
  • ShuffleManager初始化
    ShuffleManager中SparkEnv中初始化,SparkEnv是全局变量,在所有现场中可以得到相同的SuffleManager对象,具体初始化路基如下:

     // Let the user specify short names for shuffle managers 
    val shortShuffleMgrNames = Map( 
      "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager", 
      "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager", 
      "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager") 
    val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") 
    val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) 
    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

shuffle类图

这里写图片描述

发布评论
IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!

【spark】HashShuffleManager解析详解
你是第一个吃螃蟹的人
发表评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。