Skip to content

Commit

Permalink
Merge pull request apache#367 from ankurdave/graphx
Browse files Browse the repository at this point in the history
GraphX: Unifying Graphs and Tables

GraphX extends Spark's distributed fault-tolerant collections API and interactive console with a new graph API which leverages recent advances in graph systems (e.g., [GraphLab](http://graphlab.org)) to enable users to easily and interactively build, transform, and reason about graph structured data at scale. See http://amplab.github.io/graphx/.

Thanks to @jegonzal, @rxin, @ankurdave, @dcrankshaw, @jianpingjwang, @amatsukawa, @kellrott, and @adamnovak.

Tasks left:
- [x] Graph-level uncache
- [x] Uncache previous iterations in Pregel
- [x] ~~Uncache previous iterations in GraphLab~~ (postponed to post-release)
- [x] - Describe GC issue with GraphLab
- [ ] Write `docs/graphx-programming-guide.md`
- [x] - Mention future Bagel support in docs
- [ ] - Section on caching/uncaching in docs: As with Spark, cache something that is used more than once. In an iterative algorithm, try to cache and force (i.e., materialize) something every iteration, then uncache the cached things that depended on the newly materialized RDD but that won't be referenced again.
- [x] Undo modifications to core collections and instead copy them to org.apache.spark.graphx
- [x] Make Graph serializable to work around capture in Spark shell
- [x] Rename graph -> graphx in package name and subproject
- [x] Remove standalone PageRank
- [x] ~~Fix amplab/graphx#52 by checking `iter.hasNext`~~
  • Loading branch information
pwendell committed Jan 14, 2014
2 parents 945fe7a + 80e73ed commit 4a805af
Show file tree
Hide file tree
Showing 76 changed files with 7,132 additions and 21 deletions.
2 changes: 2 additions & 0 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ if [ -f "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*-dep
CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/graphx/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SCALA_VERSION/classes"

DEPS_ASSEMBLY_JAR=`ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*-deps.jar`
Expand All @@ -59,6 +60,7 @@ if [[ $SPARK_TESTING == 1 ]]; then
CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/test-classes"
CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/test-classes"
CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/test-classes"
CLASSPATH="$CLASSPATH:$FWDIR/graphx/target/scala-$SCALA_VERSION/test-classes"
CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SCALA_VERSION/test-classes"
fi

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
new ShuffledRDD[K, V, (K, V)](self, partitioner)
if (self.partitioner == partitioner) self else new ShuffledRDD[K, V, (K, V)](self, partitioner)
}

/**
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,11 @@ abstract class RDD[T: ClassTag](
* *same number of partitions*, but does *not* require them to have the same number
* of elements in each partition.
*/
def zipPartitions[B: ClassTag, V: ClassTag]
(rdd2: RDD[B], preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning)

def zipPartitions[B: ClassTag, V: ClassTag]
(rdd2: RDD[B])
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
Expand Down
87 changes: 84 additions & 3 deletions core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,72 @@ package org.apache.spark.util.collection
* A simple, fixed-size bit set implementation. This implementation is fast because it avoids
* safety/bound checking.
*/
class BitSet(numBits: Int) {
class BitSet(numBits: Int) extends Serializable {

private[this] val words = new Array[Long](bit2words(numBits))
private[this] val numWords = words.length
private val words = new Array[Long](bit2words(numBits))
private val numWords = words.length

/**
* Compute the capacity (number of bits) that can be represented
* by this bitset.
*/
def capacity: Int = numWords * 64

/**
* Set all the bits up to a given index
*/
def setUntil(bitIndex: Int) {
val wordIndex = bitIndex >> 6 // divide by 64
var i = 0
while(i < wordIndex) { words(i) = -1; i += 1 }
if(wordIndex < words.size) {
// Set the remaining bits (note that the mask could still be zero)
val mask = ~(-1L << (bitIndex & 0x3f))
words(wordIndex) |= mask
}
}

/**
* Compute the bit-wise AND of the two sets returning the
* result.
*/
def &(other: BitSet): BitSet = {
val newBS = new BitSet(math.max(capacity, other.capacity))
val smaller = math.min(numWords, other.numWords)
assert(newBS.numWords >= numWords)
assert(newBS.numWords >= other.numWords)
var ind = 0
while( ind < smaller ) {
newBS.words(ind) = words(ind) & other.words(ind)
ind += 1
}
newBS
}

/**
* Compute the bit-wise OR of the two sets returning the
* result.
*/
def |(other: BitSet): BitSet = {
val newBS = new BitSet(math.max(capacity, other.capacity))
assert(newBS.numWords >= numWords)
assert(newBS.numWords >= other.numWords)
val smaller = math.min(numWords, other.numWords)
var ind = 0
while( ind < smaller ) {
newBS.words(ind) = words(ind) | other.words(ind)
ind += 1
}
while( ind < numWords ) {
newBS.words(ind) = words(ind)
ind += 1
}
while( ind < other.numWords ) {
newBS.words(ind) = other.words(ind)
ind += 1
}
newBS
}

/**
* Sets the bit at the specified index to true.
Expand All @@ -36,6 +98,11 @@ class BitSet(numBits: Int) {
words(index >> 6) |= bitmask // div by 64 and mask
}

def unset(index: Int) {
val bitmask = 1L << (index & 0x3f) // mod 64 and shift
words(index >> 6) &= ~bitmask // div by 64 and mask
}

/**
* Return the value of the bit with the specified index. The value is true if the bit with
* the index is currently set in this BitSet; otherwise, the result is false.
Expand All @@ -48,6 +115,20 @@ class BitSet(numBits: Int) {
(words(index >> 6) & bitmask) != 0 // div by 64 and mask
}

/**
* Get an iterator over the set bits.
*/
def iterator = new Iterator[Int] {
var ind = nextSetBit(0)
override def hasNext: Boolean = ind >= 0
override def next() = {
val tmp = ind
ind = nextSetBit(ind+1)
tmp
}
}


/** Return the number of bits set to true in this BitSet. */
def cardinality(): Int = {
var sum = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](

protected var _bitset = new BitSet(_capacity)

def getBitSet = _bitset

// Init of the array in constructor (instead of in declaration) to work around a Scala compiler
// specialization bug that would generate two arrays (one for Object and one for specialized T).
protected var _data: Array[T] = _
Expand Down Expand Up @@ -161,7 +163,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
def getPos(k: T): Int = {
var pos = hashcode(hasher.hash(k)) & _mask
var i = 1
while (true) {
val maxProbe = _data.size
while (i < maxProbe) {
if (!_bitset.get(pos)) {
return INVALID_POS
} else if (k == _data(pos)) {
Expand All @@ -179,6 +182,22 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
/** Return the value at the specified position. */
def getValue(pos: Int): T = _data(pos)

def iterator = new Iterator[T] {
var pos = nextPos(0)
override def hasNext: Boolean = pos != INVALID_POS
override def next(): T = {
val tmp = getValue(pos)
pos = nextPos(pos+1)
tmp
}
}

/** Return the value at the specified position. */
def getValueSafe(pos: Int): T = {
assert(_bitset.get(pos))
_data(pos)
}

/**
* Return the next position with an element stored, starting from the given position inclusively.
*/
Expand Down Expand Up @@ -259,7 +278,7 @@ object OpenHashSet {
* A set of specialized hash function implementation to avoid boxing hash code computation
* in the specialized implementation of OpenHashSet.
*/
sealed class Hasher[@specialized(Long, Int) T] {
sealed class Hasher[@specialized(Long, Int) T] extends Serializable {
def hash(o: T): Int = o.hashCode()
}

Expand Down
8 changes: 5 additions & 3 deletions docs/_layouts/global.html
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<link rel="stylesheet" href="css/main.css">

<script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script>

<link rel="stylesheet" href="css/pygments-default.css">

<!-- Google analytics script -->
Expand Down Expand Up @@ -68,9 +68,10 @@
<li><a href="streaming-programming-guide.html">Spark Streaming</a></li>
<li><a href="mllib-guide.html">MLlib (Machine Learning)</a></li>
<li><a href="bagel-programming-guide.html">Bagel (Pregel on Spark)</a></li>
<li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li>
</ul>
</li>

<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">API Docs<b class="caret"></b></a>
<ul class="dropdown-menu">
Expand All @@ -80,6 +81,7 @@
<li><a href="api/streaming/index.html#org.apache.spark.streaming.package">Spark Streaming</a></li>
<li><a href="api/mllib/index.html#org.apache.spark.mllib.package">MLlib (Machine Learning)</a></li>
<li><a href="api/bagel/index.html#org.apache.spark.bagel.package">Bagel (Pregel on Spark)</a></li>
<li><a href="api/graphx/index.html#org.apache.spark.graphx.package">GraphX (Graph Processing)</a></li>
</ul>
</li>

Expand Down Expand Up @@ -161,7 +163,7 @@ <h2>Heading</h2>
<script src="js/vendor/jquery-1.8.0.min.js"></script>
<script src="js/vendor/bootstrap.min.js"></script>
<script src="js/main.js"></script>

<!-- A script to fix internal hash links because we have an overlapping top bar.
Based on https://github.com/twitter/bootstrap/issues/193#issuecomment-2281510 -->
<script>
Expand Down
2 changes: 1 addition & 1 deletion docs/_plugins/copy_api_dirs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

if not (ENV['SKIP_API'] == '1' or ENV['SKIP_SCALADOC'] == '1')
# Build Scaladoc for Java/Scala
projects = ["core", "examples", "repl", "bagel", "streaming", "mllib"]
projects = ["core", "examples", "repl", "bagel", "graphx", "streaming", "mllib"]

puts "Moving to project root and building scaladoc."
curr_dir = pwd
Expand Down
1 change: 1 addition & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ Here you can find links to the Scaladoc generated for the Spark sbt subprojects.
- [Spark Examples](api/examples/index.html)
- [Spark Streaming](api/streaming/index.html)
- [Bagel](api/bagel/index.html)
- [GraphX](api/graphx/index.html)
- [PySpark](api/pyspark/index.html)
10 changes: 6 additions & 4 deletions docs/bagel-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ layout: global
title: Bagel Programming Guide
---

**Bagel will soon be superseded by [GraphX](graphx-programming-guide.html); we recommend that new users try GraphX instead.**

Bagel is a Spark implementation of Google's [Pregel](http://portal.acm.org/citation.cfm?id=1807184) graph processing framework. Bagel currently supports basic graph computation, combiners, and aggregators.

In the Pregel programming model, jobs run as a sequence of iterations called _supersteps_. In each superstep, each vertex in the graph runs a user-specified function that can update state associated with the vertex and send messages to other vertices for use in the *next* iteration.
Expand All @@ -21,7 +23,7 @@ To use Bagel in your program, add the following SBT or Maven dependency:

Bagel operates on a graph represented as a [distributed dataset](scala-programming-guide.html) of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state. In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages.

For example, we can use Bagel to implement PageRank. Here, vertices represent pages, edges represent links between pages, and messages represent shares of PageRank sent to the pages that a particular page links to.
For example, we can use Bagel to implement PageRank. Here, vertices represent pages, edges represent links between pages, and messages represent shares of PageRank sent to the pages that a particular page links to.

We first extend the default `Vertex` class to store a `Double`
representing the current PageRank of the vertex, and similarly extend
Expand All @@ -38,7 +40,7 @@ import org.apache.spark.bagel.Bagel._
val active: Boolean) extends Vertex

@serializable class PRMessage(
val targetId: String, val rankShare: Double) extends Message
val targetId: String, val rankShare: Double) extends Message
{% endhighlight %}

Next, we load a sample graph from a text file as a distributed dataset and package it into `PRVertex` objects. We also cache the distributed dataset because Bagel will use it multiple times and we'd like to avoid recomputing it.
Expand Down Expand Up @@ -114,7 +116,7 @@ Here are the actions and types in the Bagel API. See [Bagel.scala](https://githu
/*** Full form ***/

Bagel.run(sc, vertices, messages, combiner, aggregator, partitioner, numSplits)(compute)
// where compute takes (vertex: V, combinedMessages: Option[C], aggregated: Option[A], superstep: Int)
// where compute takes (vertex: V, combinedMessages: Option[C], aggregated: Option[A], superstep: Int)
// and returns (newVertex: V, outMessages: Array[M])

/*** Abbreviated forms ***/
Expand All @@ -124,7 +126,7 @@ Bagel.run(sc, vertices, messages, combiner, partitioner, numSplits)(compute)
// and returns (newVertex: V, outMessages: Array[M])

Bagel.run(sc, vertices, messages, combiner, numSplits)(compute)
// where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int)
// where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int)
// and returns (newVertex: V, outMessages: Array[M])

Bagel.run(sc, vertices, messages, numSplits)(compute)
Expand Down
Loading

0 comments on commit 4a805af

Please sign in to comment.