Skip to content

Commit

Permalink
Merge pull request apache#408 from pwendell/external-serializers
Browse files Browse the repository at this point in the history
Improvements to external sorting

1. Adds the option of compressing outputs.
2. Adds batching to the serialization to prevent OOM on the read side.
3. Slight renaming of config options.
4. Use Spark's buffer size for reads in addition to writes.
  • Loading branch information
pwendell committed Jan 14, 2014
2 parents 68641bc + d4cd5de commit 945fe7a
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 13 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ case class Aggregator[K, V, C] (
mergeCombiners: (C, C) => C) {

private val sparkConf = SparkEnv.get.conf
private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
private val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)

def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
context: TaskContext) : Iterator[(K, C)] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ private[spark] class BlockManager(
val compressShuffle = conf.getBoolean("spark.shuffle.compress", true)
// Whether to compress RDD partitions that are stored serialized
val compressRdds = conf.getBoolean("spark.rdd.compress", false)
// Whether to compress shuffle output temporarily spilled to disk
val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", false)

val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf)

Expand Down Expand Up @@ -790,6 +792,7 @@ private[spark] class BlockManager(
case ShuffleBlockId(_, _, _) => compressShuffle
case BroadcastBlockId(_) => compressBroadcast
case RDDBlockId(_, _) => compressRdds
case TempBlockId(_) => compressShuffleSpill
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter}
import org.apache.spark.serializer.{KryoDeserializationStream, KryoSerializationStream, Serializer}
import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockManager, DiskBlockObjectWriter}

/**
* An append-only map that spills sorted content to disk when there is insufficient space for it
Expand Down Expand Up @@ -60,14 +60,15 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
serializer: Serializer = SparkEnv.get.serializerManager.default,
diskBlockManager: DiskBlockManager = SparkEnv.get.blockManager.diskBlockManager)
blockManager: BlockManager = SparkEnv.get.blockManager)
extends Iterable[(K, C)] with Serializable with Logging {

import ExternalAppendOnlyMap._

private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
private val spilledMaps = new ArrayBuffer[DiskMapIterator]
private val sparkConf = SparkEnv.get.conf
private val diskBlockManager = blockManager.diskBlockManager

// Collective memory threshold shared across all running tasks
private val maxMemoryThreshold = {
Expand All @@ -82,6 +83,14 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
// Number of in-memory pairs inserted before tracking the map's shuffle memory usage
private val trackMemoryThreshold = 1000

// Size of object batches when reading/writing from serializers. Objects are written in
// batches, with each batch using its own serialization stream. This cuts down on the size
// of reference-tracking maps constructed when deserializing a stream.
//
// NOTE: Setting this too low can cause excess copying when serializing, since some serializers
// grow internal data structures by growing + copying every time the number of objects doubles.
private val serializerBatchSize = sparkConf.getLong("spark.shuffle.spill.batchSize", 10000)

// How many times we have spilled so far
private var spillCount = 0

Expand Down Expand Up @@ -143,22 +152,35 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
logWarning("Spilling in-memory map of %d MB to disk (%d time%s so far)"
.format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
val (blockId, file) = diskBlockManager.createTempBlock()
val writer =
new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, identity, syncWrites)

val compressStream: OutputStream => OutputStream = blockManager.wrapForCompression(blockId, _)
def getNewWriter = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize,
compressStream, syncWrites)

var writer = getNewWriter
var objectsWritten = 0
try {
val it = currentMap.destructiveSortedIterator(comparator)
while (it.hasNext) {
val kv = it.next()
writer.write(kv)
objectsWritten += 1

if (objectsWritten == serializerBatchSize) {
writer.commit()
writer = getNewWriter
objectsWritten = 0
}
}
writer.commit()

if (objectsWritten > 0) writer.commit()
} finally {
// Partial failures cannot be tolerated; do not revert partial writes
_diskBytesSpilled += writer.bytesWritten
writer.close()
}
currentMap = new SizeTrackingAppendOnlyMap[K, C]
spilledMaps.append(new DiskMapIterator(file))
spilledMaps.append(new DiskMapIterator(file, blockId))

// Reset the amount of shuffle memory used by this map in the global pool
val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
Expand Down Expand Up @@ -306,16 +328,35 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
/**
* An iterator that returns (K, C) pairs in sorted order from an on-disk map
*/
private class DiskMapIterator(file: File) extends Iterator[(K, C)] {
private class DiskMapIterator(file: File, blockId: BlockId) extends Iterator[(K, C)] {
val fileStream = new FileInputStream(file)
val bufferedStream = new FastBufferedInputStream(fileStream)
val deserializeStream = ser.deserializeStream(bufferedStream)
val bufferedStream = new FastBufferedInputStream(fileStream, fileBufferSize)
val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
var deserializeStream = ser.deserializeStream(compressedStream)
var objectsRead = 0

var nextItem: (K, C) = null
var eof = false

def readNextItem(): (K, C) = {
if (!eof) {
try {
if (objectsRead == serializerBatchSize) {
val newInputStream = deserializeStream match {
case stream: KryoDeserializationStream =>
// Kryo's serializer stores an internal buffer that pre-fetches from the underlying
// stream. We need to capture this buffer and feed it to the new serialization
// stream so that the bytes are not lost.
val kryoInput = stream.input
val remainingBytes = kryoInput.limit() - kryoInput.position()
val extraBuf = kryoInput.readBytes(remainingBytes)
new SequenceInputStream(new ByteArrayInputStream(extraBuf), compressedStream)
case _ => compressedStream
}
deserializeStream = ser.deserializeStream(newInputStream)
objectsRead = 0
}
objectsRead += 1
return deserializeStream.readObject().asInstanceOf[(K, C)]
} catch {
case e: EOFException =>
Expand Down
11 changes: 9 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ Apart from these, the following properties are also available, and may be useful
<td>0.3</td>
<td>
Fraction of Java heap to use for aggregation and cogroups during shuffles, if
<code>spark.shuffle.externalSorting</code> is enabled. At any given time, the collective size of
<code>spark.shuffle.spill</code> is true. At any given time, the collective size of
all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will
begin to spill to disk. If spills are often, consider increasing this value at the expense of
<code>spark.storage.memoryFraction</code>.
Expand Down Expand Up @@ -154,6 +154,13 @@ Apart from these, the following properties are also available, and may be useful
Whether to compress map output files. Generally a good idea.
</td>
</tr>
<tr>
<td>spark.shuffle.spill.compress</td>
<td>false</td>
<td>
Whether to compress data spilled during shuffles.
</td>
</tr>
<tr>
<td>spark.broadcast.compress</td>
<td>true</td>
Expand Down Expand Up @@ -388,7 +395,7 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
<td>spark.shuffle.externalSorting</td>
<td>spark.shuffle.spill</td>
<td>true</td>
<td>
If set to "true", limits the amount of memory used during reduces by spilling data out to disk. This spilling
Expand Down

0 comments on commit 945fe7a

Please sign in to comment.