Monday, 23 September 2013

Installing a Storm Cluster

From the previous post, you must have clear idea, what Storm is meant for. Now the next step is to setup the Storm cluster on the machine.

Following are the prerequisites for setting the  cluster :
  •  Linux Operating system
  • Java 6 installed
  • Python installed

Installation Steps :
Following steps are needed to get a Storm Cluster up and running.

  • Set up Zookeeper ClusterZookeeper is used as a coordinator  in Storm cluster.  You can refer here to see the installation steps for Zookeeper.
  • Install native dependencies on Nimbus & Worker machine  There are some dependencies which are required by Storm i.e. ZeroMQ, JZMQ.  These native dependencies are needed only on Storm cluster. While using Storm in local mode, Storm uses a pure Java Messaging system so you don’t need to install native dependencies there.  But in cluster mode, it’s needed.     
            ZeroMQ 2.1.7 Installation : Storm has been tested with ZeroMQ 2.1.7, and this is the recommended ZeroMQ release that    you install. You can download it from hereFollowing are the steps for installation.


tar –xzf zeromq-2.1.7.tar.gz

cd zeromq-2.1.7

./configure

make

sudo make install

JZMQ Installation: JZMQ is the java binding ZeroMQ.  Here are the steps:


cd jzmq

./autogen.sh

./configure

make

sudo make install

  • Download Storm release from here  and copy it to every machine in cluster (nimbus and worker machine.
  • Configure  storm.yaml Storm release contains a file at the conf/storm.yaml  with the default configuration to run the Storm Daemon.
storm.zookeeper.servers:
     - "IP_MACHINE_1"
     - "IP_MACHINE_2"
storm.local.dir: "/home/hadoop/STORM_DIR"
java.library.path: "/usr/local/lib"
nimbus.host: "IP_OF_NIMBUS_MACHINE"
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

               Copy this storm.yaml into “~/.storm/” location.
  • Now launch all the Storm Daemons.
hduser@ubuntu:/usr/local/storm-0.8.1$ bin/storm nimbus &
hduser@ubuntu:/usr/local/storm-0.8.1$ bin/storm supervisor &
hduser@ubuntu:/usr/local/storm-0.8.1$ bin/storm ui &

Here is the example to start with Storm Topology: 
                 https://github.com/nathanmarz/storm-starter

Saturday, 21 September 2013

Classification and Regression Trees(CART)

Classification & Regression Tree is a classification method, technically known as Binary Recursive Partitioning. It uses historical data to construct Decision trees. Decision trees are further used for classifying new data.

Here the point comes : Where should we use CART?

Sometimes we have problems where we want answer  in “Yes/No”.
i.e : “Is salary greater than 30000?”,” Is it going to rain today?” etc

CART asks “Yes/No” questions. CART algorithm searches all possible variables and possible values in order to find the best split (means the question that split the data into two parts to find the maximum homogeneity )
Key elements for CART analysis are :
·         Split  each node in a tree.
·         Decide whether tree is complete or not.
·         Assign each leaf node to a class outcome
It returns the decision tree as below.
CART Modeling via rpart
Classification & Regression Tree can be generated using rpart package in R.
Following are the steps to get the CART model :
·         Grow a tree:   Use following to grow a tree
rpart(formula, data, weights, subset, na.action = na.rpart, method,
      model = FALSE, x = FALSE, y = TRUE, parms, control, cost, ...)

·         Examine the results based on the model - There are some functions that help to test the result
printcp(fit)
display cp table                             
plotcp(fit)
plot cross-validation results
rsq.rpart(fit)
plot approximate R-squared and relative error for different splits (2 plots). labels are only appropriate for the "anova" method.
print(fit)
print results
summary(fit)
detailed results including surrogate splits
plot(fit)
plot decision tree
text(fit)
label the decision tree plot
post(fit,file=)
create postscript plot of decision tree
            Here fit is the model output of rpart command.

·         Pruning the tree - It helps in avoiding the overfitting of data. Typically, you will want to select a tree size that minimizes the cross-validated error, the xerror column printed by printcp( ).
Prune the tree of desired size using
prune(fit,cp=)

Here is the example of classification tree :
library(rpart)
dataset <- read.table("C:\\Users\\Nishu\\Downloads\\bank\\bank.csv",header=T,sep=";")
# grow tree
fit <- rpart(y ~ ., method="class", data=dataset )
printcp(fit) # display the results
plotcp(fit) # visualize cross-validation results
summary(fit) # detailed summary of splits
# plot tree
plot(fit, uniform=TRUE,main="Classification Tree for Bank")
text(fit, use.n=TRUE, all=TRUE, cex=.8)
# create attractive postscript plot of tree
post(fit, file = "c:/tree.ps", title = "Classification Tree")
# prune the tree
pfit<- prune(fit, cp=   fit$cptable[which.min(fit$cptable[,"xerror"]),"CP"])
# plot the pruned tree
plot(pfit, uniform=TRUE,
   main="Pruned Classification Tree")
text(pfit, use.n=TRUE, all=TRUE, cex=.8)
post(pfit, file = "c:/ptree.ps",
   title = "Pruned Classification ")

Now we have got the model. Next step is to predict data based on the trained model.
First we’ll split the dataset into trained and testdata with a fixed percentage.
library(rpart)
dataset<-read.table("C:\\Users\\Nishu\\Downloads\\bank\\bank.csv",header=T,sep=";")
sub <- sample(nrow(dataset), floor(nrow(dataset) * 0.9)) # Here we are taking 90% training data
training <- dataset [sub, ]
testing <- dataset [-sub, ]
fit <- rpart(y ~ ., method="class", data=dataset )
predict(fit,testing,type=”class”)
# to get the confusion matrix
out <- table(predict(fit,testing,type="class"),dataset[-sub,"y"])


Here confusion matrix is :
              no      yes
  no      391       25
  yes     13        24

# To get the accuracy and other details, use confusionMatrix method with Caret package

library(caret)
confusionMatrix(out)

Output would be :

no  yes
  no      391  25
  yes      13  24

               Accuracy : 0.9101
                 95% CI : (0.8936, 0.9248)
    No Information Rate : 0.8968
    P-Value [Acc > NIR] : 0.05704
    Kappa : 0.4005
Mcnemar's Test P-Value : 9.213e-08

            Sensitivity : 0.9745
            Specificity : 0.3500
         Pos Pred Value : 0.9287
         Neg Pred Value : 0.6125
             Prevalence : 0.8968
         Detection Rate : 0.8740
   Detection Prevalence : 0.9410


       'Positive' Class : no
So here we have the desired output. Prediction and Accuracy of model based on which, we can predict future data. 

Download this dataset or other dataset from here and test the algorithm.

Here you go..!!!!

Saturday, 14 September 2013

Twitter Storm : Real-time Hadoop

All of us might have run Hadoop batch jobs.
Now the next phase of revolution is here : Real-time data analysis
So Here comes the Twitter Storm : A distributed, fault-tolerant, real-time computation system 

Storm is a free and open source distributed real-time computation system. Storm makes it easy to reliably process unbounded streams of data, doing for real-time processing what Hadoop did for batch processing. Storm is simple, can be used with any programming language, and is a lot of fun to use!

Storm has many use cases: realtime analytics, online machine learning, continuous computation, distributed RPC, ETL, and more. Storm is fast: a benchmark clocked it at over a million tuples processed per second per node. It is scalable, fault-tolerant, guarantees your data will be processed, and is easy to set up and operate.


Storm Vs. Traditional Hadoop Batch jobs :

Hadoop is fundamentally a batch processing system. Data is introduced into HDFS , processed by the nodes and once process is complete, resulting data is back to HDFS. But the problem was how to perform the realtime data processing. Storm came into the picture to solve this problem.
Storm makes it easy to process unbound stream of data with real-time processing. It process data into topologies and continues the processing data as it arrives.


Storm Components :

  • Topology : As on Hadoop, you run "Map-Reduce jobs", on Storm, you will run 'Topologies'. Key difference between both is : MapReduce job eventually finished, whereas a topology runs forever(until you kill it
  • Nimbus :  master node runs a daemon called "Nimbus" that is similar to Hadoop's "JobTracker". Nimbus is responsible for distributing code around the cluster, assigning tasks to machines, and monitoring for failures.
  • Supervisor :  Each worker node runs a daemon called the "Supervisor". The supervisor listens for work assigned to its machine and starts and stops worker processes as necessary based on what Nimbus has assigned to it. Each worker process executes a subset of a topology; a running topology consists of many worker processes spread across many machines.
  • Stream : A stream is an unbounded sequence of tuples. Storm processes transforming a stream into a new stream in a distributed and reliable way. For example, transforming  tweets stream into a stream of trending topics.
  • Spout: It’s a source of streams. It reads tuple from external source and emits those as stream in the topology.
  • Bolt : It consumes input streams, does some processing, and possibly emits new streams. Complex stream transformations, like computing a stream of trending topics from a stream of tweets, require multiple steps and thus multiple bolts. Bolts can do anything from run functions, filter tuples, do streaming aggregations, do streaming joins, talk to databases etc.


Zookeeper cluster works as coordinator between Nimbus and supervisors. Nimbus and supervisor are stateless and fail-fast. All states are kept in Zookeeper or local disk.






Why should anyone use Storm:

Now we have the clear idea of storm in real-time processing. As Hadoop Map-reduce eases the batch processing, in the same way Storm eases the parallel real-time computation.
Following are some key point, that shows the importance of Storm.
  •  Scalable : It scales massive numbers of messages per second. To scale a topology, all you have to do is add machines and increase the parallelism settings of the topology. As an example, one of Storm's initial applications processed 1,000,000 messages per second on a 10 node cluster, including hundreds of database calls per second as part of the topology. Storm's usage of Zookeeper for cluster coordination makes it scale to much larger cluster sizes.
  • Guarantees no loss of data : Storm guarantees that every message will be processed.
  •  Robust: Goal of Storm is to make user painless for Storm cluster management unlike hadoop clusters.
  •  Fault-tolerent : If any fault occurs during computation, Storm reassigns tasks.
  •  Broad set of use cases: Storm can be used for stream processing, database updation, doing continous query on data streams and streaming results into client(continous computation), Distributed RPC.