Friday, 20 June 2014

Submitting a topology to Remote Storm Cluster

It is very easy to write a topology and submit to the same Storm Cluster.
But problem arises when we need to submit a topology remotely to remote Storm cluster from a local machine.
What should we do in that case?

Here is the approach to submit a topology to remote cluster.

I have a local windows machine and one Storm Cluster(1 nimbus Linux machine and 2 supervisor Linux machine)
Let's say following are the machines in cluster :

Nimbus Machine : 192.168.1.5
Supervisor Machine 1: 192.168.1.6
Supervisor Machine 2: 192.168.1.7

Storm cluster should be up and running on above machine.

Now from local machine, use NimbusClient to submit Jar to cluster.

NimbusClient nimbus = new NimbusClient(storm_conf,"<nimbus machine ip>",<nimbus port>);
nimbus.getClient().submitTopology(topologyName,uploadedJarLocation,jsonConf, builder.createTopology());

Here is a running example:

import java.util.Map;
import org.json.simple.JSONValue;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.Nimbus.Client;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;

public class RunningClusterTopology {
    public static void main(String[] args) throws Exception {
  TopologyBuilder builder = new TopologyBuilder();
                Config conf = new Config();
                conf.put(Config.NIMBUS_HOST, "192.168.1.5");
                conf.setDebug(true);
                Map storm_conf = Utils.readStormConfig();
                storm_conf.put("nimbus.host", "192.168.1.5");
                Client client = NimbusClient.getConfiguredClient(storm_conf)
                                .getClient();
                String inputJar = "C:\\workspace\\TestStormRunner\\target\\TestStormRunner-0.0.1-SNAPSHOT-jar-with-dependencies.jar";
                NimbusClient nimbus = new NimbusClient(storm_conf, "192.168.1.5",
                                6627);
      // upload topology jar to Cluster using StormSubmitter
               String uploadedJarLocation = StormSubmitter.submitJar(storm_conf,
                                inputJar);
                try {
                        String jsonConf = JSONValue.toJSONString(storm_conf);
                        nimbus.getClient().submitTopology("testtopology",
                                        uploadedJarLocation, jsonConf, builder.createTopology());
                } catch (AlreadyAliveException ae) {
                        ae.printStackTrace();
                }
                Thread.sleep(60000);
        }
}

It will submit a topology on Nimbus Machine where it’ll run on 2 supervisor machines(192.168.1.6 and 192.168.1.7)
To test it, open storm UI in browser : http://<nimbus-ip>:<port>