Saturday, October 7, 2017

Spark cluster


A Spark application is launched on a set of machines using an external service called acluster manager. As noted, Spark is packaged with a built-in cluster manager called the Standalone clustermanager. Spark also works with Hadoop YARN and Apache Mesos, two popular open sourcecluster managers

Spark Tutorial - Part 2

Some utilities in Scala which can be used over RDD's:-

First thing to note is that Scala also some collections like --> Array, List,Seq,Set etc.

Reduce --> This basically reduces all the elements in a collection to single value by performing some operation like multiplication, addition etc.

scala> var rdd4 = sc.parallelize(List(1,2,3,4,5,6))
rdd4: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> rdd4.reduce{(x,y)=>x*y}
res9: Int = 720

scala> var list = Array(1,2,3,4)
list: Array[Int] = Array(1, 2, 3, 4)

scala> list
res3: Array[Int] = Array(1, 2, 3, 4)

scala> list.reduce(_+_)
res4: Int = 10

scala> list.reduceLeft(_+_)
res5: Int = 10

scala> list.reduceRight(_+_)
res6: Int = 10

scala> list.reduceRight((a,b)=>{println(a+","+b);a+b})
3,4
2,7
1,9

res7: Int = 10

scala> list.foldLeft(0)(_+_)
res13: Int = 10

scala> list.foldLeft(2)(_+_)

res14: Int = 12


Persistence (caching):- To store the rdd in memory/disk so that we dont have to recompute the RDD again & again.

val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
This can be memory only, both memory and disk etc

Also, one more question arise from persistence is that what will happen if we persist/cache too much data. Will out spark cluster run out of memory ? The answer to this question is no as spark automatically evict old partitions using a Least Recently Used (LRU) cache policy. Though we should not cache too much data as it can lead to eviction of useful data also.

We can also remove the data from cache by calling unpersist();



Thursday, October 5, 2017

Spark Tutorial - Part 1


The ability to always recompute an RDD is actually why RDDs are called “resilient.” When a machine holding RDD data fails, Spark uses this ability to recompute the missing partitions, transparent to the user.

We can create a spark RDD by first opening a spark-shell on emr and then creating a RDD using spark context on spark-shell.

create a spark context on emr by typing sc on emr

On EMR, when we use sc.textFile("") --> This basically searches the file on the path /user/hadoop/



org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://ip-10-162-101-19.ec2.internal:8020/user/hadoop/johri.logs

So we first need to keep a file in HDFS at this location and for this, we run hdfs copyfromlocal command. Below is the command:-

hdfs dfs -copyFromLocal  derby.log /user/hadoop/

After copy the file from local file system into hdfs, we can check if the file is there or not by running below hdfs command:-

hdfs dfs -ls /user/hadoop/

Once the file is there, we can again open spark-shell on emr and run the below commands:-
Some common RDD operations:-


1) sc --> creates an spark context

2) var lines = sc.textFile("derby.log") -- this will basically create an rdd from the file on hdfs

3) lines.count() --> will count the total number for lines in the rdd






In spark, main thing is RDD and doing following on RDD :-

1) creating a new RDD

2) Transforming a new RDD from an existing RDD

3) Calling operations/performing actions on this new RDD



Under the hood, spark distributes the data in RDD across our cluster and performs parallel operations to process this data.



RDD --> resilient dietributed data is an immutable distributed data which is split into multiple partitions which may computed on different nodes.of the cluster. We can create a RDD in two ways:-

     (a)  loading an external dataset
     (b) Distributing collection of objects from the list in the driver program.

Now spark has many functions which can be used. Some examples are given below:-


sc
var inputRdd = sc.textFile("derby.log") 
inputRdd.count() --> this will give total number of lines.
var osdata = inputRdd.filter(line=>line.contains("os")) --> this will basically take the cursor to first place where os appears. First curson is because of lazy computation done by spark.
inputRdd.filter(line => line.contains("os")).count() --> this will give total count of lines having the word "os"
inputRdd.filter(line => line.contains("os")).first() --> this will contain the first line which has "os"
inputRdd.filter(line => line.contains("os")).take(4) --> will give array of 4 lines of havng word "os". If there are lesser number of lines, then that lesser number is result/output.


RDD.persist() --> If we want to store a RDD in the cluster memory so that we can re-use it mutiple times , it make sense to persist it. We can also persist on the disk instead of memory.

To create a RDD on the fly from spark context, we can create a using a function called parallelize(). Example is given below:-


var parRdd = sc.parallelize(List("Rahul","Johri","Johri123"))

and after this we can perform common RDD operations.

scala> var rdd1 = sc.parallelize(List("lion","tiger","tiger","peacock","horse"))
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> var rdd2 = sc.parallelize(List("lion","tiger"))
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> rdd1.distinct().collect()
res8: Array[String] = Array(peacock, lion, tiger, horse)

scala> rdd1.union(rdd2).collect()
res9: Array[String] = Array(lion, tiger, tiger, peacock, horse, lion, tiger)

scala> rdd1.intersection(rdd2).collect()
res10: Array[String] = Array(lion, tiger)

scala> rdd1.subtract(rdd2).collect()
res11: Array[String] = Array(peacock, horse)

scala> rdd1.cartesian(rdd2).collect()
res12: Array[(String, String)] = Array((lion,lion), (tiger,lion), (lion,tiger), (tiger,tiger), (tiger,lion), (peacock,lion), (horse,lion), (tiger,tiger), (peacock,tiger), (horse,tiger))

scala> rdd1.take(10).foreach(println)

Point to note is that the above mentioned examples, we are using rdd operations like distinct(),union(),intersection(),cartesian(). All of these operations are operated on 2 RDD's. In order to use these functions, RDD's should be of same type like String or Integer.

Collect() function is used to retrieve the entire RDD. To use collect() function on a RDD, we must make sure that the dataset is small as collect() brings all the RDD data into single machine/driver memory. So not suitable for a big dataset.

Function to save RDD are ==> saveAsTextFile() and saveAsSequenceFile()






Example of MAP:-
val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x * x)
println(result.collect().mkString(","))

We have used mkString() function in the above example. This is more of a scala function which is used to convert scala array/list/sequence into string.mkString needs seperator as parameter which can be new line, comma , space and also comtain prefix and suffix. It also converts an integer array into string. For more info refer -->
https://alvinalexander.com/scala/scala-convert-array-to-string-mkstring


Spark Map and Flat Map operation:-

Both these are RDD operations and basically compute on every single element in RDD. The difference is that Map take every single element in RDD, performs its operation and gives 1 output element. So mapping is one to one.

Flat Map on the other side do the same things as map, but for every single element, it can produce 0,1 or multiple output.

Apache Spark map transformation operation Apache Spark flatMap transformation operation





scala> var rdd1 = sc.parallelize(List("Rahul","Johri","is","james","bond"))
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> var fltmp = rdd1.flatMap(line=>line.split(" "))
fltmp: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at flatMap at <console>:26

scala> fltmp.first()
res3: String = Rahul

scala> var rdd2 = sc.parallelize(List("Rahul Rajiv","Johri","is","james","bond"))
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> var fltmp2 = rdd2.flatMap(line=>line.split(" "))
fltmp2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at <console>:26

scala> fltmp2.first()
res4: String = Rahul

scala> var fltmp3 = rdd2.map(line=>line.split(" "))
fltmp3: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[4] at map at <console>:26

scala> fltmp3.first()
res5: Array[String] = Array(Rahul, Rajiv)

scala> fltmp3.take(2)
res6: Array[Array[String]] = Array(Array(Rahul, Rajiv), Array(Johri))

scala> fltmp2.take(10)
res7: Array[String] = Array(Rahul, Rajiv, Johri, is, james, bond)

scala> fltmp3.take(10)
res8: Array[Array[String]] = Array(Array(Rahul, Rajiv), Array(Johri), Array(is), Array(james), Array(bond))