Apache Spark is a general purpose framework for processing large data sets in parallel on a computing cluster. It's used to efficiently crunch big data by distributing the computation across many nodes using an abstraction called a Resilient Distributed Dataset (RDD). It was originally developed at UC Berkeley AMPLab.
Spark is used to solve similar problems to the Hadoop MapReduce framework. It has overtaken and is effectively replacing MapReduce due to it's better performance, generality and simplicity. Spark can run within the Hadoop ecosystem (YARN, HDFS), in a non-Hadoop cluster management system (Mesos) or even as it's own standalone cluster. Hortonworks, Cloudera and MapR have embraced Spark as component in Hadoop while Databricks offers Spark as a standalone service.
In addition to general purpose data analytics processing with RDD's Spark comes with some very powerful more problem domain specific libraries such as
- Spark SQL for querying structured data, data warehousing and BI (Apache Hive compatible)
- Spark Streaming for real-time stream processing (similar to Apache Storm)
- MLlib for machine learning (similar to Apache Mahout)
- GraphX for iterative graph computation and ETL (similar to Apache Giraph)
Why has Spark gained so much momentum in the big data space and overtaken the tried and true Hadoop MapReduce framework?
Spark addresses some well known shortcomings of MapReduce. While MapReduce has been a dependable foundation for big data applications through the evolution of data science, Spark has had the advantage of learning from that existing work and has been developed to align with the state of the art and solve a wider class of problems.
Spark often achieves 10x to 100x better performance than Hadoop MapReduce. It performs especially well for iterative applications in which multiple operations are performed on the same data set.
Spark uses a distributed acyclic graph (DAG) computing model which was pioneered by Microsoft's Dryad Project. MapReduce treats each Map-Reduce step as an independent job. For iterative applications that consist of multiple computation steps Spark's DAG scheduler has a holistic view of the entire computation and can make global optimizations using this additional information that MapReduce cannot.
Spark uses in-memory computation when possible by memory caching RDDs and only spills results to disk when necessary. MapReduce reads the input for each Map-Reduce job from the file system and writes the results back to the file system to be used as the input for the next job.
MapReduce depends on it's distributed file system (HDFS) for fault tolerance. When a node fails MapReduce already has the results of that node's computations safely stored in HDFS. Spark is able to achieve fault tolerance with it's memory cached RDDs using a restricted form of shared memory, based on coarse-grained transformations rather than fine-grained updates to shared state. In other words the data in an RDD is immutable (read-only) and data changes resulting from computations are shared with other nodes by sharing a log of the high level transformations that have occurred to partitions of data rather than sharing all the individual changed values. When a node fails another node can backtrack in the DAG and redo any lost computations.
While the DAG model and memory caching are largely responsible for Spark's better performance, Spark also makes practical improvements over MapReduce. For example, MapReduce starts up a new JVM for each Map-Reduce step while Spark keeps an executor JVM running on each node so that dispatched tasks can begin doing real work sooner without the overhead of starting a new JVM.
Spark's RDD programming model is more general purpose than MapReduce. In fact, map and reduce are just two of many operations that can be applied to an RDD. In Spark the data set itself is the first class object with a rich set of useful built-in operations such as map, reduce, filter, etc... Programmers are already familiar with this way of thinking from collections in object oriented programming languages.
Hadoop MapReduce forces every application into a series of independent jobs that consist of a map operation followed by a reduce operation. The MapReduce programming model was designed at Google Labs to handle web search and is well suited for that. It was not intended to be a general purpose parallel programming framework. It evolved as such and has been used as a general purpose framework to meet the exploding demand for an open source big data processing platform as Hadoop has grown. As the variety of big data problems has increased this has resulted in applications that aren't a natural fit for the MapReduce model being shoehorned into MapReduce so they can run in a Hadoop cluster.
Spark is a generalization of MapReduce. It allows problems that are natural for the MapReduce model to be implemented as MapReduce applications but supports other parallel operations just as well. The more domain specific add-ons such as Spark Streaming and MLlib are implemented as libraries built on the same general purposes RDD model so applications can easily combine these libraries in the same application.
The Resilient Distributed Dataset (RDD) is the main concept in Spark. Parallel computations are done by performing operations on an RDD. RDD operations are categorized as either transformations or actions.
A Spark application consists of a driver program that runs on a single node in the cluster - the driver node. The driver program:
- Creates an RDD by reading it from a distributed file system or parallelizing an existing collection
- Builds a DAG that represents the entire computation.
- Dispatches tasks to worker nodes that execute parallel operations on RDD partitions
- Collects the results of those parallel computations and returns them to the user
Transformations are operations which create a new dataset from an existing one. Example transformations are map, filter, union, intersection and sort. Transformations are performed lazily. They are only executed when they are needed as an input to an action.
Actions are operations which return a value to the driver program after running a computation on a dataset. Example actions are reduce, collect and count. Every Spark program will have at least 1 action. Specifying transformations in the driver program simply builds up the DAG for later execution but specifying actions causes Spark to execute operations (actions and dependency transformations) in the DAG and produce a result.
To perform useful computations we often need to pass custom code from the driver program to parallel RDD operations. We pass this custom code using lambda expressions, anonymous functions, function objects, static or global functions depending on the programming language used.
Input variables are often passed to lambda expressions using closures. Closures allow a lambda expression to access and modify variables defined outside of it's scope. In Spark we have to be careful about passing lambda expressions to operations that modify variables outside of their scope. On a single node modifying a variable in a closure is a legitimate way to share the variable between instances of the lambda. In Spark, since the lambda is running in parallel on different nodes, each with it's own memory space, sharing variables like this will not work. Doing so will appear to work correctly during development on your single node dev environment but will fail when running in a multi-node cluster.
Spark itself is written in Scala and runs in a Java Virtual Machine (JVM). The Spark distribution has APIs for writing Spark applications in Scala, Java, Python and R. It even provides interactive command line shells for Scala, Python and R. These languages have long been widely established as big data tools and were natural choices for the market Spark is intended to serve.
Spark does not support writing applications in Ruby. The code examples in this presentation are written in Ruby because this is the Columbus Ruby Brigade. If you are writing a real Spark application you would probably use one of the languages officially supported by Spark.
However, Ondřej Moravčík has done some excellent work in writing a Ruby wrapper for Spark allowing Spark applications to be written in Ruby. It's a gem called Ruby-Spark with a nice getting started tutorial. The code mostly works but the project is not production ready and is more of a proof of concept at this point. If you're a Ruby developer and want to contribute to the future of big data this might be a great project for you to join. Today (3/21/16) Ruby-Spark appears to be somewhat stagnant and could really use additional help.
Follow the Ruby-Spark installation instructions in the tutorial and/or in the README.
Verify that you can run the ruby-spark interactive shell.
ruby-spark shell
Clone the Introduction to Apache Spark in Ruby project and cd into that directory.
git clone https://github.com/ryan-boder/spark-intro-ruby.git
cd spark-intro-ruby
Ruby-Spark applications need a little boilerplate code. An example application template looks like this.
require 'ruby-spark'
Spark.config do
set_app_name 'My Application Name'
set_master 'local[*]'
set 'spark.ruby.serializer', 'marshal'
set 'spark.ruby.serializer.batch_size', 2048
end
Spark.start
$sc = Spark.sc
# Your driver application code goes here
Spark.stop
If you use the interactive ruby-spark shell
then this boilerplate code is already done for you and you can just start using the Spark context $sc
directly.
ruby-spark shell
...
[1] pry(main)> $sc.parallelize([1, 2, 3]).sum
...
=> 6
To do anything in Spark you'll need to load data into an RDD. You can convert an existing collection in your driver program into an RDD...
rdd = $sc.parallelize(0..1000)
or you can read an RDD from the file system. Spark can create an RDD with any Hadoop InputFormat. For simplicity we will just use a text file on in the local file system as our data source.
rdd = $sc.text_file('data/atlas.txt')
A really simple example is to create an RDD and use it to count the number of items in the data set. In this case items are lines in a text file.
rdd = $sc.text_file('data/fruit.txt')
puts '---- Number of Lines: ' + rdd.count.to_s
ruby bin/example1.rb
Spark can easily handle Map Reduce applications with an RDD. This example takes an input range or 0 to 1000, doubles all the values making it 0 to 2000, sums all the values and divides by the total count to calculate the average.
input = $sc.parallelize(0..1000)
doubled = input.map(lambda { |x| x * 2 })
summed = doubled.reduce(lambda { |sum, x| sum + x })
average = summed / input.count
puts '---- Average: ' + average.to_s
ruby bin/example2.rb
The obligatory Hello World of data analytics is to count the number of each word in a text file. We can accomplish this by splitting the file by whitespace, mapping it to a pairs (2 element arrays) containing the word (key) and a count of 1 (value), then reducing the pairs by key (the word) summing up the counts.
text_file = $sc.text_file('data/fruit.txt')
words = text_file.flat_map(lambda { |x| x.split() })
pairs = words.map(lambda { |word| [word, 1] })
counts = pairs.reduce_by_key(lambda { |a, b| a + b })
puts '---- Word Counts: ' + counts.collect.to_s
ruby bin/example3.rb
The previous word count was not quite as robust as we would like it to be. Let's take it a little further. This time we'll
- Split with a regular expression to properly handle newlines and special characters
- Convert words to lower case so Hello and hello are counted as the same word
- Sort the results by word count from highest to lowest
- Write the results to a file so we can word count files with many words
text_file = $sc.text_file('data/atlas.txt')
words = text_file.flat_map(lambda { |x| x.split(/[\s.,!?"']+/) })
pairs = words.map(lambda { |word| [word.downcase, 1] })
counts = pairs.reduce_by_key(lambda { |a, b| a + b })
results = counts.sort_by(lambda { |x| x[1] }, false).collect
# Ruby-Spark seems to be missing the RDD.save_as_text_file method
File.open('example4-output.txt', 'w') do |file|
results.each { |x| file.puts(x[0] + ': ' + x[1].to_s) unless x[0].empty? }
end
ruby bin/example4.rb
less example4-output.txt
This example processes Alexa data in a CSV file and prints the average ranks and the most improved delta.
# RDD the CSV and filter out the header
csv = $sc.text_file('data/alexa.csv').filter(lambda { |x| !x.start_with?('Date') })
# Split into columns and convert to integers
data = csv.map(lambda do |x|
x.split(',').map { |y| y.strip.to_i }
end)
# Sum the ranks and find the most improved delta
results = data.reduce(lambda do |x, y|
[nil, x[1] + y[1], [x[2], y[2]].min, x[3] + y[3]]
end).collect.to_a
avgGlobalRank = results[1] / csv.count
minGlobalDelta = results[2]
avgUsRank = results[3] / csv.count
puts "---- Average Global Rank: #{avgGlobalRank}, Best Global Delta: #{minGlobalDelta}, Average US Rank: #{avgUsRank}"
ruby bin/example5.rb
This example estimates Pi using the Monte Carlo method.
n = 10_000_000
hits = $sc.parallelize(1..n).map(lambda do |_|
x = rand * 2 - 1
y = rand * 2 - 1
x**2 + y**2 < 1 ? 1 : 0
end).sum
pi = 4.0 * hits / n
puts "---- Pi ~ #{pi} in #{n} simulations"
ruby bin/example6.rb
We've covered what Apache Spark is and why it's having such an impact on big data. We've compared Spark to the Hadoop MapReduce framework and showed Sparks advantages. We've covered the programming languages officially supported by Spark and showed an fledgling 3rd party open source project that allows Spark programs to be written in Ruby. We've written a few basic Spark examples in Ruby to demonstrate how the Spark programming model works.