Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
set RestartPolicy=Never for executor (apache#367)
* set RestartPolicy=Never for executor As for current implementation the RestartPolicy of executor pod is not set, so the default value "OnFailure" is in effect. But this causes problem. If an executor is terminated unexpectedly, for example, exit by java.lang.OutOfMemoryError, it'll be restarted by k8s with the same executor ID. When the new executor tries to fetch a block hold by the last executor, ShuffleBlockFetcherIterator.splitLocalRemoteBlocks() think it's a **local** block and tries to read it from it's local dir. But the executor's local dir is changed because random generated ID is part of local dir. FetchFailedException will raise and the stage will fail. The rolling Error message: 17/06/29 01:54:56 WARN KubernetesTaskSetManager: Lost task 0.1 in stage 2.0 (TID 7, 172.16.75.92, executor 1): FetchFailed(BlockManagerId(1, 172.16.75.92, 40539, None), shuffleId=2, mapId=0, reduceId=0, message= org.apache.spark.shuffle.FetchFailedException: /data2/spark/blockmgr-0e228d3c-8727-422e-aa97-2841a877c42a/32/shuffle_2_0_0.index (No such file or directory) at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) * Update KubernetesClusterSchedulerBackend.scala
- Loading branch information