Thursday, November 26, 2015

Using the Spark Connector for Couchbase - Part 1

Hi all!

This is the first post from a series of how using Couchbase with Apache Spark.
In this post I will explain how to set up the environment, and how to use Spark with Couchbase in the simplest form.
In the next posts we will continue on to the SparkSQL with N1QL world, and the exciting Spark Streaming with DCP and more.

Our world, "Big Data", is divided.
As obscure as Big Data may sound – and is, there are 2 major parts on every data analysis.
The first is the operational side, That is what you need in order to get the work done – in real time those are Databases such as Couchbase.
The Second part is the heavy lifting of aggregation large amount of data,
those are platforms such as Apache Hadoop or Spark.

Couchbase integrates with both in order to achieve the full solution, In that post and in others to follow – I will work you through how to do you first steps and more, integrating your Couchbase server – with Spark – with ease.

I'm assuming that you already know at least a little bit on Spark,
But if not – in one sentence  Apache Spark is an open source cluster computing framework.
It’s main Data structure call RDD and I’m encouraging you to read the Spark developer guide.

The demo we are going to build will be in Spark “Native” language, which is Scala,
Don’t panic, that is fairly simple!

Software needed:
Java: 1.8
Scala: 2.10.4 (important!)
Couchbase: 4.0 with travel-sample bucket installed
Spark: 1.5.x or greater

So first thing first.
If you already know Spark or Scala you can skip the set up phase.

Setting up the Project
Let's open the IDE and start a new project of type Scala with SBT

Hit next and choose the name of the project, Project SDK (at least 1.7) SBT version (whatever you have it’s fine, here I’ve used 0.13.8) and the Scala Version as 2.10.4 (any 2.10 will be fine).

Check the auto-import and click finish.

Next, set up the build.sbt file under the module root.
SBT stands for Scala Build Tool, and function also as a dependencies resolver, A bit like Maven. It also connects to the same repository as Maven.
After you define the properties on this file, the SBT plugin will download the required dependencies.

The simple structure or the SBT is as follows:
1) name of the project “name := someProjectName”
2) version of your project “version := 1.0”
3) Scala version which you’ll be using “scalaVersion := “2.10.4”
4) list of dependencies (from maven repository)     
            “libraryDependencies ++= Seq( “groupId” % “artifactId” % “revision”)”

our sbt file will look like that:
name := "SparkCouchbase"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq ("org.apache.spark" % "spark-core_2.10" % "1.5.1",
"org.apache.spark" % "spark-sql_2.10" % "1.5.1",
"org.apache.spark" % "spark-streaming_2.10" % "1.5.1",
"com.couchbase.client" %% "spark-connector" % "1.0.0")
note:  the double %% means that the Scala version will be taken from the variable defined in scalaVersion

the dependencies are as follows:
  1.  The Spark code dependency compiled for Scala 2.10 in version 1.5.1
  2.  The Spark SQL dependency compiled for Scala 2.10 in version 1.5.1
  3.  The streaming for Spark compiled for Scala 2.10 in version 1.5.1
  4. The great Couchbase connector to Spark, compiled for Scala 2.10 (from the scalaVersion) in version 1.0.0
for more about dependencies look here

Next, we need to take care on the directory structure.
If we don’t set the correct structure, the SBT plugin won’t be able to compile our project.

So create new Directory under the module named src/main/scala,
and mark the Scala folder as sources (bluish colored).

Your project structure should look more or less as follows:

Now after we set everything up it’s time for some coding!

Finally, some coding.
After you see that the little bar of the sbt has finished downloading the dependencies package you can start writing your program.

First make sure that you already have the Couchbase server installed with the travel-sample bucket.

Now, create a new Scala Object in the Scala source folder with the name of your liking,
I chose SparkCouchbase.scala

Next, create a main function so your code will look like that

 object SparkCouchbase {  
  def main(args: Array[String]): Unit ={  

So far, what we’ve created here is an Object (think of is as a singleton class), and a method main which gets an array of String and return Unit (which is basically nothing)

Now we need to add the sparky flavor.
In order to do so, we need to add some imports that were downloaded earlier by the SBT auto import.
We need the basic spark package
import org.apache.spark._

we need the basic Couchbase to spark connector package
import com.couchbase.spark._

and Json document and object extension packages


4 imports in total, plus we need the spark init and configuration, which defining the application name, spark cluster location, what bucket we want to connect to in Couchabse, or the node addresses.
Finally, we must have the SparkContext, in order to use spark framework
so our code will look like that,
please not, if you don’t specify a bucket – the connector will go to the default bucket, and if you don’t specify an address for the nodes, it will try to find Couchbase in the localhost (

 import org.apache.spark._  
 import com.couchbase.spark._  
 object SparkCouchbase {  
  def main(args: Array[String]): Unit ={  
    val sparkConf = new SparkConf().setAppName("CouchbaseTricks")  
    val sc = new SparkContext(sparkConf)  

So we set the app name, CouchbaseTricks, the cluster (local cluster-testing with x number of nodes as the number of cores), which buckets we want to connect to (travel-sample, default) and the nodes in the cluster.

Now we need to do something with it. Like Getting some documents.
We will do it using the couchbaseGet from the context.

Let’s get some major airports: Heathrow, SF international and Los Angeles international and others, and print their code and name with country.
Then save it back to Couchbase, default bucket, as our major airports.

We will use two methods from the Couchbase connector, the couchbaseGet and saveToCouchbase.
The first get a scala sequence of documents from Couchbase and parallelize them (making them an RDD), this method under your spark context.
The Latter saves an RDD to Couchbase.

Let’s look at the code and break it down a bit

   val airportsSeq = Seq("airport_507", "airport_3469", "airport_3484", "airport_3797", "airport_3576", "airport_502", "airport_1382") // Heathrow, SFO, LAX, JFK, MIA, LGW, CDG  
   val airports: RDD[JsonDocument] = sc.couchbaseGet[JsonDocument](airportsSeq ,"travel-sample" )  
   val airportsByCountry = => (airport.content().getString("country"), 1)) // map  
   val majorAirportCount = airportsByCountry.reduce((a,b) => ("Total airport Number", a._2 + b._2)) => {  
    val id = "mymajorairports::2015::" +  
    val content = JsonObject.create().put("name", myDocument.content().getString("airportname"))  
                     .put("country", myDocument.content().get("country"))  
                     .put("code", myDocument.content().getString("faa"))  
    JsonDocument.create(id, content)  

So now we've just wrote our first Spark application with Couchbase (or maybe ever!).
Simple Spark init, simple reduce function, with get and set to the Couchbase cluster.

Next time, on those foundations, we will build another a bit more complicated solution.


1 comment:

  1. It is nice blog Thank you provide important information and i am searching for same information to save my time Big data hadoop online training