Sunday, November 23, 2014

Write and test a Trident non transactional topology in Storm

Trident provides high-level abstraction data model for Storm with the concepts of base function, filter, projection, aggregate, grouping, etc. Though it adds overhead to storm but it makes Storm easier to implement as well as provides support such as at least oncely processing or exactly oncely processing.

The trident non transactional topology to be implemented is extremely simple, a dummy spout (derived from IBatchSpout) emits batch (size:10) of tuples having the form of ["{CountryName}", "{Rank}"]. The tuples emitted contains illegal country names which needs to be filtered away. The tuples having the same country in partitions belonging to the same batch is groupped together. Then the frequency of a particular country appearing in the same batch is counted and printed out.

The source codes of the project can be downloaded from the link below:

https://dl.dropboxusercontent.com/u/113201788/storm/trident-test.tar.gz

To start, create a Maven project (in my case, with groupId="memeanalytics", artifactId="trident-test"), and modify the pom.xml file as shown below:



<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.memeanalytics</groupId>
  <artifactId>trident-test</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>trident-test</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
  
  <repositories>
  <repository>
  <id>clojars</id>
  <url>http://clojars.org/repo</url>
  </repository>
  </repositories>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
     <groupId>storm</groupId>
     <artifactId>storm</artifactId>
     <version>0.9.0.1</version>
     <scope>provided</scope>
    </dependency>
  </dependencies>
  
  <build>
  <plugins>
  <plugin>
  <groupId>org.codehaus.mojo</groupId>
  <artifactId>exec-maven-plugin</artifactId>
  <version>1.2.1</version>
  <executions>
  <execution>
  <goals>
  <goal>exec</goal>
  </goals>
  </execution>
  </executions>
  <configuration>
  <includeProjectDependencies>true</includeProjectDependencies>
  <includePluginDependencies>false</includePluginDependencies>
  <executable>java</executable>
  <classpathScope>compile</classpathScope>
  <mainClass>${main.class}</mainClass>
  </configuration>
  </plugin>
  <plugin>
  <artifactId>maven-assembly-plugin</artifactId>
  <version>2.2.1</version>
  <configuration>
  <descriptorRefs>
  <descriptorRef>jar-with-dependencies</descriptorRef>
  </descriptorRefs>
  <archive>
  <manifest>
  <mainClass></mainClass>
  </manifest>
  </archive>
  </configuration>
  <executions>
  <execution>
  <id>make-assembly</id>
  <phase>package</phase>
  <goals>
  <goal>single</goal>
  </goals>
  </execution>
  </executions>
  </plugin>
  </plugins>
  </build>
</project>

The pom specifies where to download storm as well as maven plugins for building, executing (exec-maven-plugin), and packaging (maven-assembly-plugin) the java project. Now lets create the spout which emits tuples in batch:

package com.memeanalytics.trident_test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;

import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;

public class RandomWordSpout implements IBatchSpout {
 private static final long serialVersionUID = 1L;

 private static String[] countries=new String[]{
  "China",
  "USA",
  "Ruassia",
  "UK",
  "France",
  "Rubbish",
  "Garbage"
 };
 
 private static Integer[] ranks=new Integer[]{
  1,
  2,
  3,
  4,
  5
 };
 
 private Map<Long, List<List<Object>>> dataStore=new HashMap<Long, List<List<Object>>>();
 
 private int batchSize;
 
 public RandomWordSpout(int batchSize)
 {
  this.batchSize = batchSize;
 }
 
 public void open(Map conf, TopologyContext context) {
  // TODO Auto-generated method stub
 }

 public void emitBatch(long batchId, TridentCollector collector) {
  // TODO Auto-generated method stub
  List<List<Object>> batch=dataStore.get(batchId);
  if(batch == null)
  {
   final Random rand=new Random();
   batch=new ArrayList<List<Object>>();
   for(int i=0; i < batchSize; ++i)
   {
    batch.add(new Values(
      countries[rand.nextInt(countries.length)],
      ranks[rand.nextInt(ranks.length)]
      ));
   }
   dataStore.put(batchId, batch);
  }
  
  for(List<Object> tuple : batch)
  {
   collector.emit(tuple);
  }
 }

 public void ack(long batchId) {
  // TODO Auto-generated method stub
  dataStore.remove(batchId);
 }

 public void close() {
  // TODO Auto-generated method stub
  
 }

 public Map getComponentConfiguration() {
  return null;
 }

 public Fields getOutputFields() {
  return new Fields("Country","Rank");
 }

}

The spout basically create batch based on the batchId and emits the tuples in that batch. When acknowledgement is received, the acknowledge batch having the batchId is then removed from the spout. Note that the spout will emit tuples containing non-country name such as "Rubbish" and "Garbage"

Now we will create a set of Trident operations including CountryFilter (which filters away tuples containing non-country name), Print (which prints values of the count of tuples containing particular country in a batch). The code is as shown below:

package com.memeanalytics.trident_test;

import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFilter;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class TridentComps {

 public static class CountryFilter extends BaseFilter{
  private static final long serialVersionUID = 1L;

  public boolean isKeep(TridentTuple tuple) {
   // TODO Auto-generated method stub
   String country_candidate = tuple.getString(0);
   return !country_candidate.equals("Garbage") && !country_candidate.equals("Rubbish");
  }
 }
 
 public static class CountrySplit extends BaseFunction{

  private static final long serialVersionUID = 1L;

  public void execute(TridentTuple tuple, TridentCollector collector) {
   // TODO Auto-generated method stub
   String country_comps=tuple.getString(0);
   for(String country_candidate : country_comps.split("\\s"))
   {
    collector.emit(new Values(country_candidate.trim()));
   }
  }
  
 }
 
 public static class Print extends BaseFilter{

  private static final long serialVersionUID = 1L;

  public boolean isKeep(TridentTuple tuple) {
   // TODO Auto-generated method stub
   System.out.println(tuple);
   return true;
  }
  
 }
}

Now we are ready to implement the main class:

package com.memeanalytics.trident_test;

import storm.trident.TridentTopology;
import storm.trident.operation.builtin.Count;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;

public class App 
{
    public static void main( String[] args ) throws Exception
    {
     Config config=new Config();
 config.setMaxPendingSpouts(20);
        if(args.length==0)
        {
         LocalCluster cluster=new LocalCluster();
         cluster.submitTopology("TridentDemo", config, buildTopology());
         
         try{
          Thread.sleep(10000);
         }catch(InterruptedException ex)
         {
          ex.printStackTrace();
         }
         
         cluster.killTopology("TridentDemo");
         cluster.shutdown();
        }
        else 
        {
  config.setNumWorkers(3);
         try{
          StormSubmitter.submitTopology(args[0], config, buildTopology());
         }catch(AlreadyAliveException ex)
         {
          ex.printStackTrace();
         }catch(InvalidTopologyException ex)
         {
          ex.printStackTrace();
         }
        }
    }
    
    private static StormTopology buildTopology()
    {
     RandomWordSpout spout=new RandomWordSpout(10);
     TridentTopology topology=new TridentTopology();
     
     topology.newStream("TridentTxId", spout).shuffle().each(new Fields("Country"), new TridentComps.CountryFilter()).groupBy(new Fields("Country")).aggregate(new Fields("Country"), new Count(), new Fields("Count")).each(new Fields("Count"), new TridentComps.Print()).parallelismHint(2);
     
     return topology.build();
    }
}

The static method buildTopology() creates a Trident non transactional topology, which uses the spout created as data source, the tuples are then filtered by the CountryFilter, and the groupped by the "Country" field value within each batch, a frequency count is then generated via the aggregate method. Finally it is then printed out into the console. (Note that the "aggregate(new Fields("Country"), new Count(), new Fields("Count")" will lead the TridentComps.Print to print out the "Count" value, if you want to print the country as well, then change it to "aggregate(new Fields("Country"), new Count(), new Fields("Country", "Count")")

The main() method is quite straightforward, if there is arguments in the command line, then the project should be packaged into jar and submitted into a storm cluster, otherwise run a local storm cluster and submit the topology there to run. To run locally, navigate to the project root folder and run the following command:

> mvn compile exec:java -Dmain.class=com.memeanalytics.trident_test.App

To run in a storm cluster, make sure the zookeeper cluster and storm cluster is running (following instructions at this link: http://czcodezone.blogspot.sg/2014/11/setup-storm-in-cluster.html), run the following command:

> mvn clean install

After that a trident-test-0.0.1-SNAPSHOT-jar-with-dependencies.jar will be created in the "target" folder under the project root folder.

Now upload the jar by running the following command:

> $STORM_HOME/bin/storm jar [projectRootFolder]/target/trident-test-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.memeanalytics.trident.trident_test.App TridentDemo

No comments:

Post a Comment