Friday, 4 July 2014

Update a running topology on Storm Cluster


Sometimes, we want to update any running topology based on some given conditions or rules. As of now, storm doesn’t have any direct command or code to update it, so for that there are two approaches.

First approach :  Kill that topology from the command-line using :

storm kill <topology-name>

And re-run. But what if we don’t to kill it manually and that should be automatically handled in the code.

Second Approach: Use NimbusClient to kill the topology programmatically and start again.

Map storm_conf = Utils.readStormConfig();
Client client = NimbusClient.getConfiguredClient(storm_conf).getClient();
Iterator<TopologySummary> topologyList = client.getClusterInfo().get_topologies_iterator();
if (topologyNameExists(storm_conf, topologyName)) {
      client.killTopology(topologyName);
}

Above code will kill the topology. But topology takes some time to get cleared from list, therefore if you immediately start the same topology, it’ll throw the exception “Same name topology exists on the cluster”, so you need to check for few seconds.
Here is the running example:
  
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.ClusterSummary;
import backtype.storm.generated.TopologySummary;
import backtype.storm.generated.Nimbus.Client;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;

public class TestTopology {

    public static void main(String[] args) throws Exception {
            String topologyName = "testTopology";
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("testspout", new TestSpout(), 1);
            builder.setBolt("testbolt",new TestBolt(), 1)
                        .shuffleGrouping("testspout");
            Config conf = new Config();

            conf.setDebug(false);
            Map storm_conf = Utils.readStormConfig();
            Client client = NimbusClient.getConfiguredClient(storm_conf)
                        .getClient();
            Iterator<TopologySummary> topologyList = client.getClusterInfo()
                        .get_topologies_iterator();
            if (topologyNameExists(storm_conf, topologyName)) {
                  client.killTopology(topologyName);
            } 
            boolean flag = true;
            while (flag) {
                  if (topologyNameExists(storm_conf, topologyName)) {
                        flag = true;
                  } else {
                        flag = false;
                  }
            }
            TopologyBuilder builder = new TopologyBuilder();
            try{
                StormSubmitter.submitTopology(topologyName, conf,
                                    builder.createTopology());
            } catch (AlreadyAliveException ae) {
                  ae.printStackTrace();
            }
            Thread.sleep(60000);
}
}

Above code will start the Topology (if not running), otherwise will kill it and restart.  You can monitor this from storm ui.