本文主要介绍sort shuffle manager write的过程:
之前在stage提交的过程中设计ShuffleMapTask的提交。shuffleMapTask的代码中的关键
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
1从环境中取出相应的manager默认是SortShuffleManager当然可以通过配置。这里我们先看sortShuffleManager的writer。调用getWriter获得writer。该方法中检测到满足Unsafe Shuffle条件会自动采用Unsafe Shuffle,否则采用Sort Shuffle。使用UnsafeShuffle有几个限制,shuffle阶段不能有aggregate操作,分区数不能超过一定大小( 2^24−1,这是可编码的最大paritionid),所以像reduceByKey这类有aggregate操作的算子是不能使用Unsafe Shuffle。
writer.write中通过rdd指定分区的迭代器iterator方法来遍历每一条数据,再之上再调用writer的write方法以写数据。
来看write方法:
override def write(records: Iterator[Product2[K, V]]): Unit = {
sorter = if (dep.mapSideCombine) {
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
sorter.insertAll(records)
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
try {
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
}
}
}
1)判断是否是map端进行combine如果是那么将aggregator以及keyOrdering,serializer传入
2)调用sorter insert records 先将记录写入内存,超过一定限额spill到磁盘中。
3) 将所有的文件进行merge并写入到.data文件中
4)获得每个partition对应的index并将其写入index文件中。
先来看看insertAll
def insertAll(records: Iterator[Product2[K, V]]): Unit = {
val shouldCombine = aggregator.isDefined
if (shouldCombine) {
// 获取对新value合并到聚合结果中的函数
val mergeValue = aggregator.get.mergeValue
// 获取创建初始聚合值的函数
val createCombiner = aggregator.get.createCombiner
var kv: Product2[K, V] = null
// 通过mergeValue 对已有的聚合结果的新value进行合并,通过createCombiner 对没有聚合结果的新value初始化聚合结果
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
// 遍历records
while (records.hasNext) {
addElementsRead()
kv = records.next()
// 使用update函数进行value的聚合
map.changeValue((getPartition(kv._1), kv._1), update)
// 是否需要spill到磁盘文件
maybeSpillCollection(usingMap = true)
}
} else {
// Stick values into our buffer
while (records.hasNext) {
addElementsRead()
val kv = records.next()
buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
maybeSpillCollection(usingMap = false)
}
}
}
1)如果不需要做合并那么直接插入buffer中,,判断是否需要进行spill到磁盘中
2)需要合并。mergeValue是合并的方法 createCombiner是合并初始化方法。
3)changer value的方法将update的函数传入changeValue返回新的值
override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
val newValue = super.changeValue(key, updateFunc)
super.afterUpdate()
newValue
}
def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {//key为null
if (!haveNullValue) {
incrementSize()
}
nullValue = updateFunc(haveNullValue, nullValue)
haveNullValue = true
return nullValue
}
var pos = rehash(k.hashCode) & mask //获取hash码并 &mask拿到位置
var i = 1
while (true) {
val curKey = data(2 * pos) //2*pos是key的位置
if (curKey.eq(null)) {
val newValue = updateFunc(false, null.asInstanceOf[V])
data(2 * pos) = k
data(2 * pos + 1) = newValue.asInstanceOf[AnyRef] //value的位置
incrementSize() //增加的size
return newValue
} else if (k.eq(curKey) || k.equals(curKey)) {
val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
return newValue
} else {
val delta = i
pos = (pos + delta) & mask
i += 1
}
}
null.asInstanceOf[V] // Never reached but needed to keep compiler happy
}
根据K的hashCode再哈希与上掩码 得到 pos,2 pos 为 k 应该所在的位置,2 pos + 1 为 k 对应的 v 所在的位置,获取k应该所在位置的原来的key:
若原来的key和当前的 k 相等,则通过update函数将两个v进行聚合并更新该位置的value
若原来的key存在但不和当前的k 相等,则说明hash冲突了,更新pos继续遍历
若原来的key不存在,则将当前k作为该位置的key,并通过update函数初始化该k对应的聚合结果,接着会通过incrementSize()方法进行扩容。感觉跟hashMap的实现有点像。
再回来看看maybeSpillCollection(usingMap = true)。
private def maybeSpillCollection(usingMap: Boolean): Unit = {
var estimatedSize = 0L
if (usingMap) {
estimatedSize = map.estimateSize()
if (maybeSpill(map, estimatedSize)) {
map = new PartitionedAppendOnlyMap[K, C]
}
} else {
estimatedSize = buffer.estimateSize()
if (maybeSpill(buffer, estimatedSize)) {
buffer = new PartitionedPairBuffer[K, C]
}
}
if (estimatedSize > _peakMemoryUsedBytes) {
_peakMemoryUsedBytes = estimatedSize
}
}
- 先评估大小
- 判断是否需要spill
estimateSize()
def estimateSize(): Long = {
assert(samples.nonEmpty)
val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
(samples.last.size + extrapolatedDelta).toLong
}
以上次采样完更新的bytePerUpdate作为最近平均每次更新的大小,估计当前占用内存:(当前update次数-上次采样时的update次数) * 每次跟新大小 + 上次采样记录的大小。
maySpill方法:
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
var shouldSpill = false
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
// Claim up to double our current memory from the shuffle memory pool
val amountToRequest = 2 * currentMemory - myMemoryThreshold
val granted = acquireMemory(amountToRequest)
myMemoryThreshold += granted
// If we were granted too little memory to grow further (either tryToAcquire returned 0,
// or we already had more memory than myMemoryThreshold), spill the current collection
shouldSpill = currentMemory >= myMemoryThreshold
}
shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
// Actually spill
if (shouldSpill) {
_spillCount += 1
logSpillage(currentMemory)
spill(collection)
_elementsRead = 0
_memoryBytesSpilled += currentMemory
releaseMemory()
}
shouldSpill
}
这里有两种情况都可导致spill:
1)当前集合包含的records数为32的整数倍,并且当前集合的大小超过了申请的内存myMemoryThreshold(第一次申请默认为5 1024 1024,可通过spark.shuffle.spill.initialMemoryThreshold设置),此时并不会立即spill,会尝试申请更多的内存避免spill,这里尝试申请的内存为2倍集合大小减去当前已经申请的内存大小(实际申请到的内存为granted),若加上原来的内存还是比当前集合的大小要小则需要spill。
2)当前集合包含的records数超过了numElementsForceSpillThreshold(默认为Long.MaxValue,可通过spark.shuffle.spill.numElementsForceSpillThreshold设置)
若需要spill,则跟新spill次数,调用spill(collection)方法进行溢写磁盘,并释放内存。
跟进spill方法看看其具体实现:
override protected[this] def spill(collection: SizeTracker): Unit = {
val inMemoryIterator = currentMap.destructiveSortedIterator(keyComparator) //对key进行排序
val diskMapIterator = spillMemoryIteratorToDisk(inMemoryIterator)//写到磁盘文件,并返回一个对该文件的描述对象SpilledFile
spilledMaps += diskMapIterator //将数据加入spillMap
}
private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator)
: SpilledFile = {
// 生成临时文件和blockId
val (blockId, file) = diskBlockManager.createTempShuffleBlock()
// 这些值在每次flush后会被重置
var objectsWritten: Long = 0
var spillMetrics: ShuffleWriteMetrics = null
var writer: DiskBlockObjectWriter = null
def openWriter(): Unit = {
assert (writer == null && spillMetrics == null)
spillMetrics = new ShuffleWriteMetrics
writer = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)
}
openWriter()
// 按写入磁盘的顺序记录分支的大小
val batchSizes = new ArrayBuffer[Long]
// 记录每个分区有多少元素
val elementsPerPartition = new Array[Long](numPartitions)
// Flush writer 内容到磁盘,并更新相关变量
def flush(): Unit = {
val w = writer
writer = null
w.commitAndClose()
_diskBytesSpilled += spillMetrics.bytesWritten
batchSizes.append(spillMetrics.bytesWritten)
spillMetrics = null
objectsWritten = 0
}
var success = false
try {
// 遍历迭代器
while (inMemoryIterator.hasNext) {
val partitionId = inMemoryIterator.nextPartition()
require(partitionId >= 0 && partitionId < numPartitions,
s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})")
inMemoryIterator.writeNext(writer)
elementsPerPartition(partitionId) += 1
objectsWritten += 1
// 元素个数达到批量序列化大小则flush到磁盘
if (objectsWritten == serializerBatchSize) {
flush()
openWriter()
}
}
// 将剩余的数据flush
if (objectsWritten > 0) {
flush()
} else if (writer != null) {
val w = writer
writer = null
w.revertPartialWritesAndClose()
}
success = true
} finally {
...
}
// 返回SpilledFile
SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)
}
通过diskBlockManager创建临时文件和blockID,临时文件名格式为是 "temp_shuffle_" + id,遍历内存数据迭代器,并调用Writer(DiskBlockObjectWriter)的write方法,当写的次数达到序列化大小则flush到磁盘文件,并重新打开writer,及跟新batchSizes等信息。
最后返回一个SpilledFile对象,该对象包含了溢写的临时文件File,blockId,每次flush的到磁盘的大小,每个partition对应的数据条数。
spill完成,并且insertAll方法也执行完成,回到开始的SortShuffleWriter的write方法:
override def write(records: Iterator[Product2[K, V]]): Unit = {
...
// 写内存缓冲区,超过阈值则溢写到磁盘文件
sorter.insertAll(records)
// 获取该task的最终输出文件
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
try {
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
// merge后写到data文件
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
// 写index文件shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
}
}
}
获取最后的输出文件名及blockId,文件格式:"shuffle_" + shuffleId + "" + mapId + "" + reduceId + ".data"
接着通过sorter.writePartitionedFile方法来写文件,其中包括内存及所有spill文件的merge操作,看看起具体实现:
def writePartitionedFile(
blockId: BlockId,
outputFile: File): Array[Long] = {
val writeMetrics = context.taskMetrics().shuffleWriteMetrics
// 跟踪每个分区在文件中的range
val lengths = new Array[Long](numPartitions)
// 数据只存在内存中
if (spills.isEmpty) {
val collection = if (aggregator.isDefined) map else buffer
// 将内存中的数据先通过partitionId再通过k排序后返回一个迭代器
val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
// 遍历数据写入磁盘
while (it.hasNext) {
val writer = blockManager.getDiskWriter(
blockId, outputFile, serInstance, fileBufferSize, writeMetrics)
val partitionId = it.nextPartition()
//等待一个partition的数据写完后刷新到磁盘文件
while (it.hasNext && it.nextPartition() == partitionId) {
it.writeNext(writer)
}
writer.commitAndClose()
val segment = writer.fileSegment()
// 记录每个partition数据长度
lengths(partitionId) = segment.length
}
} else {
// 有数据spill到磁盘,先merge
for ((id, elements) <- this.partitionedIterator) {
if (elements.hasNext) {
val writer = blockManager.getDiskWriter(
blockId, outputFile, serInstance, fileBufferSize, writeMetrics)
for (elem <- elements) {
writer.write(elem._1, elem._2)
}
writer.commitAndClose()
val segment = writer.fileSegment()
lengths(id) = segment.length
}
}
}
context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)
lengths
}
- 数据只存在内存中而没有spill文件,根据传入的比较函数comparator来对集合里的数据先根据partition排序再对里面的key排序并返回一个迭代器,遍历该迭代器得到所有recored,每一个partition对应一个writer,一个partition的数据写完后再flush到磁盘文件,并记录该partition的数据长度。
- 数据有spill文件,通过方法partitionedIterator对内存和spill文件的数据进行merge-sort后返回一个(partitionId,对应分区的数据的迭代器)的迭代器,也是一个partition对应一个Writer,写完一个partition再flush到磁盘,并记录该partition数据的长度。
接下来看看通过this.partitionedIterator方法是怎么将内存及spill文件的数据进行merge-sort的:
def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
val usingMap = aggregator.isDefined
val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
if (spills.isEmpty) {
if (!ordering.isDefined) {
// 只根据partitionId排序,不需要对key排序
groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
} else {
// 需要对partitionID和key进行排序
groupByPartition(destructiveIterator(
collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
}
} else {
// Merge spilled and in-memory data
merge(spills, destructiveIterator(
collection.partitionedDestructiveSortedIterator(comparator)))
}
}
这里在有spill文件的情况下会执行下面的merge方法,传入的是spill文件数组和内存中的数据进过partitionId和key排序后的数据迭代器,看看merge:
private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
: Iterator[(Int, Iterator[Product2[K, C]])] = {
// 每个文件对应一个Reader
val readers = spills.map(new SpillReader(_))
val inMemBuffered = inMemory.buffered
(0 until numPartitions).iterator.map { p =>
// 获取内存中当前partition对应的Iterator
val inMemIterator = new IteratorForPartition(p, inMemBuffered)
// 将spill文件对应的partition的数据与内存中对应partition数据合并
val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
if (aggregator.isDefined) {
// 对key进行聚合并排序
(p, mergeWithAggregation(
iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
} else if (ordering.isDefined) {
// 排序
(p, mergeSort(iterators, ordering.get))
} else {
(p, iterators.iterator.flatten)
}
}
}
merge方法将属于同一个reduce端的partition的内存数据和spill文件数据合并起来,再进行聚合排序(有需要的话),最后返回(reduce对应的partitionId,该分区数据迭代器)
将数据merge-sort后写入最终的文件后,需要将每个partition的偏移量持久化到文件以供后续每个reduce根据偏移量获取自己的数据,写偏移量的逻辑很简单,就是根据前面得到的partition长度的数组将偏移量写到index文件中:
def writeIndexFileAndCommit(
shuffleId: Int,
mapId: Int,
lengths: Array[Long],
dataTmp: File): Unit = {
val indexFile = getIndexFile(shuffleId, mapId)
val indexTmp = Utils.tempFileWith(indexFile)
try {
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
out.writeLong(offset)
for (length <- lengths) {
offset += length
out.writeLong(offset)
}
}
......
}
}
根据shuffleId和mapId获取index文件并创建一个写文件的文件流,按照reduce端partition对应的offset依次写到index文件中,如:
0,
length(partition1),
length(partition1)+length(partition2),
length(partition1)+length(partition2)+length(partition3)
...
最后创建一个MapStatus实例返回,包含了reduce端每个partition对应的偏移量。
该对象将返回到Driver端的DAGScheluer处理,被添加到对应stage的OutputLoc里,当该stage的所有task完成的时候会将这些结果注册到MapOutputTrackerMaster,以便下一个stage的task就可以通过它来获取shuffle的结果的元数据信息。
至此Shuffle Write完成!
本文由 妖言君 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Jan 10, 2021 at 02:57 pm