IT虾米网

【spark】HashShuffleManager解析详解

developer 2018年07月12日 大数据 1217 0

HashShuffleManager

HashShuffleManager在spark早期版本中为默认shuffle管理器(spark1.2以前)。单此版本存在明显的弊端,此shuffleManager在作业运行阶段会产生大量的文件,任务在此环节会产生大量IO操作。接下来我们会一起探讨HashShuffleManager的具体执行逻辑。

未经优化的HashShuffleManager

此shuffle过程每个task都会产生numReducers个文件,如果一个executor有M个task,经过shuffle阶段后R个reducers。那么当前executor会产生M*R个文件。如果一个节点运行E个executor,此节点上将会产生E*M*R个文件。

优化后的HashShuffleManager

为解决HashShuffleManager产生大量文件的问题,spark对HashShuffleManager进行了优化,在executor中为每一个reducer创建一个文件,将在创建的文件放在一个文件连接池中,当每个task运行时,从这个pool中取得文件引用。这样就可以将每个executor shuffle阶段产生文件减少到R个(reducer数),示意图如下:

代码解析:

  • ShuffleMapTask中runTask(context: TaskContext)方法执行逻辑

     var writer: ShuffleWriter[Any, Any] = null 
    try { 
      //获取HashShuffleManager对象,HashShuffleManager初始化时, 
      //同时创建FileShuffleBlockResolver对象 
      val manager = SparkEnv.get.shuffleManager 
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) 
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) 
      writer.stop(success = true).get 
    } catch { 
        .... 
    }   
  • HashShuffleManager初始化时,同时创建FileShuffleBlockResolver对象

    private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { 
     
    if (!conf.getBoolean("spark.shuffle.spill", true)) { 
    logWarning( 
      "spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+." + 
        " Shuffle will continue to spill to disk when necessary.") 
    } 
     
    private val fileShuffleBlockResolver = new FileShuffleBlockResolver(conf) 
    } 
    
  • 获取ShuffleWriter

    private[spark] class HashShuffleWriter[K, V]( 
    shuffleBlockResolver: FileShuffleBlockResolver, 
    handle: BaseShuffleHandle[K, V, _], 
    mapId: Int, 
    context: TaskContext) 
    extends ShuffleWriter[K, V] with Logging { 
    //初始化ShuffleWriterGroup 
    private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser, 
    writeMetrics) 
     
    }

    调用shuffleBlockResolver.forMapTask初始化文件

    def forMapTask(shuffleId: Int, mapId: Int, numReducers: Int, serializer: Serializer, 
      writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = { 
    new ShuffleWriterGroup { 
      shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numReducers)) 
      private val shuffleState = shuffleStates(shuffleId) 
     
      val openStartTime = System.nanoTime 
      val serializerInstance = serializer.newInstance() 
      val writers: Array[DiskBlockObjectWriter] = { 
        Array.tabulate[DiskBlockObjectWriter](numReducers) { bucketId => 
          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) 
          val blockFile = blockManager.diskBlockManager.getFile(blockId) 
          val tmp = Utils.tempFileWith(blockFile) 
          blockManager.getDiskWriter(blockId, tmp, serializerInstance, bufferSize, writeMetrics) 
        } 
      } 
      // Creating the file to write to and creating a disk writer both involve interacting with 
      // the disk, so should be included in the shuffle write time. 
      writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime) 
     
      override def releaseWriters(success: Boolean) { 
        shuffleState.completedMapTasks.add(mapId) 
      } 
    } 
    } 
    

    -调用 ShuffleWriter.write向文件中写内容

 override def write(records: Iterator[Product2[K, V]]): Unit = { 
    val iter = if (dep.aggregator.isDefined) { 
      if (dep.mapSideCombine) { 
        dep.aggregator.get.combineValuesByKey(records, context) 
      } else { 
        records 
      } 
    } else { 
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") 
      records 
    } 
 
    for (elem <- iter) { 
      val bucketId = dep.partitioner.getPartition(elem._1) 
      shuffle.writers(bucketId).write(elem._1, elem._2) 
    } 
  } 

参考资料

https://0x0fff.com/spark-architecture-shuffle/

发布评论
IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!

apache_sentry详解
你是第一个吃螃蟹的人
发表评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。