SAMOA-72: Squashed commits for pull request SAMOA-72 (#70). Fixed errors after rebase with master due to changes in intermediate commits.

[SAMOA-72] Initial topology and logic for BoostVHT algorithm.

[SAMOA-72] Optimize BoostVHT code with buffering and by introducing the AttributeSliceEvent.

We bunch up all the attributes one local stats processor is responsible for in an AttributeSliceEvent and send one message instead of creating one message per single attribute.

[SAMOA-72] Rebase on master branch and refactor BoostVHT code for pull request.

Fix #70
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/BoostMAProcessor.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/BoostMAProcessor.java
new file mode 100644
index 0000000..1a9d80f
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/BoostMAProcessor.java
@@ -0,0 +1,294 @@
+package org.apache.samoa.learners.classifiers.ensemble;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.learners.classifiers.trees.*;
+import org.apache.samoa.learners.classifiers.trees.BoostVHTActiveLearningNode.SplittingOption;
+import org.apache.samoa.moa.classifiers.core.AttributeSplitSuggestion;
+import org.apache.samoa.moa.classifiers.core.splitcriteria.InfoGainSplitCriterion;
+import org.apache.samoa.moa.classifiers.core.splitcriteria.SplitCriterion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+
+/**
+ * Boost Model Aggegator Processor consists of the decision tree model. It connects to local-statistic PI via attribute stream
+ * and control stream. Model-aggregator PI sends the split instances via attribute stream and it sends control messages
+ * to ask local-statistic PI to perform computation via control stream.
+ * <p>
+ * The calculation results from local statistic arrive to the model-aggregator PI via computation-result
+ * stream.
+ *
+ * @author Arinto Murdopo
+ */
+public final class BoostMAProcessor extends ModelAggregatorProcessor {
+
+  private static final Logger logger = LoggerFactory.getLogger(BoostMAProcessor.class);
+
+  private final SplittingOption splittingOption;
+  private final int maxBufferSize;
+
+  // private constructor based on Builder pattern
+  private BoostMAProcessor(BoostMABuilder builder) {
+    super(builder);
+    this.splittingOption = builder.splittingOption;
+    this.maxBufferSize = builder.maxBufferSize;
+
+    // These used to happen in onCreate which no longer gets called.
+    activeLeafNodeCount = 0;
+    inactiveLeafNodeCount = 0;
+    decisionNodeCount = 0;
+    growthAllowed = true;
+
+    splittingNodes = new ConcurrentHashMap<>();
+    splitId = 0;
+
+    // Executor for scheduling time-out threads
+    executor = Executors.newScheduledThreadPool(8);
+  }
+
+  public void updateModel(LocalResultContentEvent lrce) {
+
+    Long lrceSplitId = lrce.getSplitId();
+    SplittingNodeInfo splittingNodeInfo = splittingNodes.get(lrceSplitId);
+
+    if (splittingNodeInfo != null) {
+      BoostVHTActiveLearningNode bVHTactiveLearningNode = (BoostVHTActiveLearningNode)splittingNodeInfo.getActiveLearningNode();
+
+      bVHTactiveLearningNode.addDistributedSuggestions(lrce.getBestSuggestion(), lrce.getSecondBestSuggestion());
+
+      if (bVHTactiveLearningNode.isAllSuggestionsCollected()) {
+        this.splittingNodes.remove(lrceSplitId);
+        this.continueAttemptToSplit(bVHTactiveLearningNode, splittingNodeInfo.getFoundNode());
+      }
+    }
+  }
+
+  @Override
+  public boolean process(ContentEvent event) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public void trainOnInstanceImpl(FoundNode foundNode, Instance inst) {
+
+    Node leafNode = foundNode.getNode();
+
+    if (leafNode == null) {
+      leafNode = newLearningNode(parallelismHint);
+      foundNode.getParent().setChild(foundNode.getParentBranch(), leafNode);
+      activeLeafNodeCount++;
+    }
+
+    if (leafNode instanceof LearningNode) {
+      LearningNode learningNode = (LearningNode) leafNode;
+      learningNode.learnFromInstance(inst, this);
+    }
+
+    if (leafNode instanceof BoostVHTActiveLearningNode) {
+      BoostVHTActiveLearningNode activeLearningNode = (BoostVHTActiveLearningNode) leafNode;
+      // See if we can ask for splits
+      if (!activeLearningNode.isSplitting()) {
+        double weightSeen = activeLearningNode.getWeightSeen();
+        // check whether it is the time for splitting
+        if (weightSeen - activeLearningNode.getWeightSeenAtLastSplitEvaluation() >= gracePeriod) {
+          attemptToSplit(activeLearningNode, foundNode);
+        }
+      }
+    }
+
+  }
+
+  @Override
+  public void onCreate(int id) {
+    this.resetLearning();
+  }
+
+  @Override
+  public Processor newProcessor(Processor p) {
+    BoostMAProcessor oldProcessor = (BoostMAProcessor) p;
+    BoostMAProcessor newProcessor = new BoostMAProcessor.BoostMABuilder(oldProcessor).build();
+
+    newProcessor.setAttributeStream(oldProcessor.getAttributeStream());
+    newProcessor.setControlStream(oldProcessor.getControlStream());
+
+    return newProcessor;
+  }
+
+  /**
+   * Helper method to represent a split attempt
+   *
+   * @param bVHTactiveLearningNode The corresponding boostVHT active learning node which will be split
+   * @param foundNode          The data structure to represents the filtering of the instance using the tree model.
+   */
+  @Override
+  public void attemptToSplit(ActiveLearningNode bVHTactiveLearningNode, FoundNode foundNode) {
+    if (!bVHTactiveLearningNode.observedClassDistributionIsPure()) {
+      // Increment the split ID
+      this.splitId++;
+
+      this.splittingNodes.put(this.splitId, new SplittingNodeInfo((BoostVHTActiveLearningNode)bVHTactiveLearningNode, foundNode));
+
+      // Inform Local Statistic PI to perform local statistic calculation
+      bVHTactiveLearningNode.requestDistributedSuggestions(this.splitId, this);
+    }
+  }
+
+  /**
+   * Helper method to continue the attempt to split once all local calculation results are received.
+   *
+   * @param bvhtActiveLearningNode The corresponding active learning node which will be split
+   * @param foundNode          The data structure to represents the filtering of the instance using the tree model.
+   */
+  @Override
+  public void continueAttemptToSplit(ActiveLearningNode bvhtActiveLearningNode, FoundNode foundNode) {
+    BoostVHTActiveLearningNode bVHTActiveLearningNode = (BoostVHTActiveLearningNode)bvhtActiveLearningNode;
+    AttributeSplitSuggestion bestSuggestion = bVHTActiveLearningNode.getDistributedBestSuggestion();
+    AttributeSplitSuggestion secondBestSuggestion = bVHTActiveLearningNode.getDistributedSecondBestSuggestion();
+
+    // compare with null split
+    double[] preSplitDist = bVHTActiveLearningNode.getObservedClassDistribution();
+    AttributeSplitSuggestion nullSplit = new AttributeSplitSuggestion(null, new double[0][],
+        this.splitCriterion.getMeritOfSplit(preSplitDist, new double[][] { preSplitDist }));
+
+    if ((bestSuggestion == null) || (nullSplit.compareTo(bestSuggestion) > 0)) {
+      secondBestSuggestion = bestSuggestion;
+      bestSuggestion = nullSplit;
+    } else {
+      if ((secondBestSuggestion == null) || (nullSplit.compareTo(secondBestSuggestion) > 0)) {
+        secondBestSuggestion = nullSplit;
+      }
+    }
+
+    boolean shouldSplit = false;
+
+    if (secondBestSuggestion == null) {
+      shouldSplit = true;
+    } else {
+      double hoeffdingBound = computeHoeffdingBound(
+          this.splitCriterion.getRangeOfMerit(bVHTActiveLearningNode.getObservedClassDistribution()), this.splitConfidence,
+              bVHTActiveLearningNode.getWeightSeen());
+
+      if ((bestSuggestion.merit - secondBestSuggestion.merit > hoeffdingBound) || (hoeffdingBound < tieThreshold)) {
+        shouldSplit = true;
+      }
+      // TODO: add poor attributes removal
+    }
+
+    SplitNode parent = foundNode.getParent();
+    int parentBranch = foundNode.getParentBranch();
+
+    // split if the Hoeffding bound condition is satisfied
+    if (shouldSplit) {
+
+      if (bestSuggestion.splitTest != null) { // TODO: What happens when bestSuggestion is null? -> Deactivate node?
+        SplitNode newSplit = new SplitNode(bestSuggestion.splitTest, bVHTActiveLearningNode.getObservedClassDistribution());
+
+        for (int i = 0; i < bestSuggestion.numSplits(); i++) {
+          Node newChild = newLearningNode(bestSuggestion.resultingClassDistributionFromSplit(i), this.parallelismHint);
+          newSplit.setChild(i, newChild);
+        }
+
+        this.activeLeafNodeCount--;
+        this.decisionNodeCount++;
+        this.activeLeafNodeCount += bestSuggestion.numSplits();
+
+        if (parent == null) {
+          this.treeRoot = newSplit;
+        } else {
+          parent.setChild(parentBranch, newSplit);
+        }
+        //if keep w buffer
+        if (splittingOption == SplittingOption.KEEP && this.maxBufferSize > 0) {
+          Queue<Instance> buffer = bVHTActiveLearningNode.getBuffer();
+//          logger.debug("node: {}. split is happening, there are {} items in buffer", activeLearningNode.getId(), buffer.size());
+          while (!buffer.isEmpty()) {
+            this.trainOnInstanceImpl(buffer.poll());
+          }
+        }
+      }
+      // TODO: add check on the model's memory size
+    }
+
+    // housekeeping
+    bVHTActiveLearningNode.endSplitting();
+    bVHTActiveLearningNode.setWeightSeenAtLastSplitEvaluation(bVHTActiveLearningNode.getWeightSeen());
+  }
+
+  @Override
+  protected LearningNode newLearningNode(double[] initialClassObservations, int parallelismHint) {
+    BoostVHTActiveLearningNode newNode = new BoostVHTActiveLearningNode(initialClassObservations, parallelismHint,
+            this.splittingOption, this.maxBufferSize);
+    newNode.setEnsembleId(this.processorId);
+    return newNode;
+  }
+
+  /**
+   * Builder class to replace constructors with many parameters
+   *
+   * @author Arinto Murdopo
+   */
+  public static class BoostMABuilder extends ModelAggregatorProcessor.Builder<BoostMABuilder> {
+
+    // default values
+    private SplitCriterion splitCriterion = new InfoGainSplitCriterion();
+    private double splitConfidence = 0.0000001;
+    private double tieThreshold = 0.05;
+    private int gracePeriod = 200;
+    private int parallelismHint = 1;
+    private long timeOut = Integer.MAX_VALUE;
+    private SplittingOption splittingOption;
+    private int maxBufferSize = 0;
+
+    public BoostMABuilder(Instances dataset) {
+      super(dataset);
+    }
+
+    public BoostMABuilder(BoostMAProcessor oldProcessor) {
+      super(oldProcessor);
+      this.splittingOption = oldProcessor.splittingOption;
+      this.maxBufferSize = oldProcessor.maxBufferSize;
+    }
+
+    public BoostMABuilder splittingOption(SplittingOption splittingOption) {
+      this.splittingOption = splittingOption;
+      return this;
+    }
+
+    public BoostMABuilder maxBufferSize(int maxBufferSize) {
+      this.maxBufferSize = maxBufferSize;
+      return this;
+    }
+
+    public BoostMAProcessor build() {
+      return new BoostMAProcessor(this);
+    }
+
+  }
+}
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/BoostVHT.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/BoostVHT.java
new file mode 100644
index 0000000..7e85e0d
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/BoostVHT.java
@@ -0,0 +1,226 @@
+package org.apache.samoa.learners.classifiers.ensemble;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+import com.github.javacliparser.ClassOption;
+import com.github.javacliparser.Configurable;
+import com.github.javacliparser.FlagOption;
+import com.github.javacliparser.FloatOption;
+import com.github.javacliparser.IntOption;
+import com.google.common.collect.ImmutableSet;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.learners.ClassificationLearner;
+import org.apache.samoa.learners.Learner;
+import org.apache.samoa.learners.classifiers.trees.BoostVHTActiveLearningNode.SplittingOption;
+import org.apache.samoa.learners.classifiers.trees.LocalStatisticsProcessor;
+import org.apache.samoa.learners.classifiers.trees.VerticalHoeffdingTree;
+import org.apache.samoa.moa.classifiers.core.attributeclassobservers.AttributeClassObserver;
+import org.apache.samoa.moa.classifiers.core.attributeclassobservers.DiscreteAttributeClassObserver;
+import org.apache.samoa.moa.classifiers.core.attributeclassobservers.NumericAttributeClassObserver;
+import org.apache.samoa.moa.classifiers.core.splitcriteria.SplitCriterion;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.TopologyBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+
+/**
+ * The Bagging Classifier by Oza and Russell.
+ */
+public class BoostVHT implements ClassificationLearner, Configurable {
+
+  /** The Constant serialVersionUID. */
+  private static final long serialVersionUID = -7523211543185584536L;
+  
+  private static final Logger logger = LoggerFactory.getLogger(BoostVHT.class);
+  
+  public ClassOption numericEstimatorOption = new ClassOption("numericEstimator",
+          'n', "Numeric estimator to use.", NumericAttributeClassObserver.class,
+          "GaussianNumericAttributeClassObserver");
+
+  public ClassOption nominalEstimatorOption = new ClassOption("nominalEstimator",
+          'd', "Nominal estimator to use.", DiscreteAttributeClassObserver.class,
+          "NominalAttributeClassObserver");
+
+  public ClassOption splitCriterionOption = new ClassOption("splitCriterion",
+          'r', "Split criterion to use.", SplitCriterion.class,
+          "InfoGainSplitCriterion");
+
+  public FloatOption splitConfidenceOption = new FloatOption("splitConfidence", 'c',
+          "The allowable error in split decision, values closer to 0 will take longer to decide.",
+          0.0000001, 0.0, 1.0);
+
+  public FloatOption tieThresholdOption = new FloatOption("tieThreshold",
+          't', "Threshold below which a split will be forced to break ties.",
+          0.05, 0.0, 1.0);
+
+  public IntOption gracePeriodOption = new IntOption("gracePeriod", 'g',
+          "The number of instances a leaf should observe between split attempts.",
+          200, 0, Integer.MAX_VALUE);
+
+  public IntOption timeOutOption = new IntOption("timeOut", 'o',
+          "The duration to wait all distributed computation results from local statistics PI, in miliseconds",
+          Integer.MAX_VALUE, 1, Integer.MAX_VALUE);
+
+  public FlagOption binarySplitsOption = new FlagOption("binarySplits", 'b',
+          "Only allow binary splits.");
+
+  public FlagOption splittingOption = new FlagOption("keepInstanceWhileSplitting",'q',
+      "Keep instances in a buffer while splitting");
+
+  public IntOption maxBufferSizeOption = new IntOption("maxBufferSizeWhileSplitting",'z',
+      "Maximum buffer size while splitting, use in conjunction with 'q' option. Size 0 means we don't use buffer while splitting",
+      0, 0, Integer.MAX_VALUE);
+  
+    /** The ensemble size option. */
+  public IntOption ensembleSizeOption = new IntOption("ensembleSize", 's',
+      "The number of models in the bag.", 10, 1, Integer.MAX_VALUE);
+
+  public IntOption seedOption = new IntOption("seed", 'u',
+      "the seed for the rng.", (int) System.currentTimeMillis());
+
+  /** The Model Aggregator boosting processor. */
+  private BoostVHTProcessor boostVHTProcessor;
+
+  /** The result stream. */
+  protected Stream resultStream;
+  
+  /** The attribute stream. */
+  protected Stream attributeStream;
+  
+  /** The control stream. */
+  protected Stream controlStream;
+  
+  /** The compute stream. */
+  protected Stream computeStream;
+  
+  /** The dataset. */
+  private Instances dataset;
+
+  protected int parallelism;
+  
+  //for SAMMME
+  public IntOption numberOfClassesOption = new IntOption("numberOfClasses", 'k',
+          "The number of classes.", 2, 2, Integer.MAX_VALUE);
+
+  /**
+   * Sets the layout.
+   */
+  protected void setLayout() {
+
+    int ensembleSize = this.ensembleSizeOption.getValue();
+
+    // Set parameters for BoostVHT processor, and the BoostMA processors within.
+    try {
+      boostVHTProcessor = new BoostVHTProcessor.Builder(dataset)
+          .ensembleSize(this.ensembleSizeOption.getValue())
+          .numberOfClasses(this.numberOfClassesOption.getValue())
+          .splitCriterion(
+              (SplitCriterion) ClassOption.createObject(this.splitCriterionOption.getValueAsCLIString(),
+              this.splitCriterionOption.getRequiredType()))
+          .splitConfidence(this.splitConfidenceOption.getValue())
+          .tieThreshold(this.tieThresholdOption.getValue())
+          .gracePeriod(this.gracePeriodOption.getValue())
+          .parallelismHint(this.ensembleSizeOption.getValue())
+          .timeOut(this.timeOutOption.getValue())
+          .splittingOption(this.splittingOption.isSet() ? SplittingOption.KEEP: SplittingOption.THROW_AWAY)
+          .maxBufferSize(this.maxBufferSizeOption.getValue())
+          .seed(this.seedOption.getValue())
+          .build();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+    //add Boosting Model Aggregator Processor to the topology
+    this.topologyBuilder.addProcessor(boostVHTProcessor, 1);
+    
+
+    // Streams
+    attributeStream = this.topologyBuilder.createStream(boostVHTProcessor);
+    controlStream = this.topologyBuilder.createStream(boostVHTProcessor);
+    
+    //local statistics processor.
+    LocalStatisticsProcessor locStatProcessor = new LocalStatisticsProcessor.Builder()
+        .splitCriterion((SplitCriterion) this.splitCriterionOption.getValue())
+        .binarySplit(this.binarySplitsOption.isSet())
+        .nominalClassObserver((AttributeClassObserver) this.nominalEstimatorOption.getValue())
+        .numericClassObserver((AttributeClassObserver) this.numericEstimatorOption.getValue())
+        .build();
+    
+    this.topologyBuilder.addProcessor(locStatProcessor, ensembleSize);
+  
+    this.topologyBuilder.connectInputKeyStream(attributeStream, locStatProcessor);
+    this.topologyBuilder.connectInputAllStream(controlStream, locStatProcessor);
+    
+    
+    //local statistics result stream
+    computeStream = this.topologyBuilder.createStream(locStatProcessor);
+    locStatProcessor.setComputationResultStream(computeStream);
+    this.topologyBuilder.connectInputAllStream(computeStream, boostVHTProcessor);
+  
+    //prediction is computed in boostVHTProcessor
+    resultStream = this.topologyBuilder.createStream(boostVHTProcessor);
+    
+    //set the out streams of the BoostVHTProcessor
+    boostVHTProcessor.setResultStream(resultStream);
+    boostVHTProcessor.setAttributeStream(attributeStream);
+    boostVHTProcessor.setControlStream(controlStream);
+  }
+
+  /** The topologyBuilder. */
+  private TopologyBuilder topologyBuilder;
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see samoa.classifiers.Classifier#init(samoa.engines.Engine,
+   * samoa.core.Stream, weka.core.Instances)
+   */
+
+  @Override
+  public void init(TopologyBuilder builder, Instances dataset, int parallelism) {
+    this.topologyBuilder = builder;
+    this.dataset = dataset;
+    this.parallelism = parallelism;
+    this.setLayout();
+  }
+
+  @Override
+  public Processor getInputProcessor() {
+    return boostVHTProcessor;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see samoa.learners.Learner#getResultStreams()
+   */
+  @Override
+  public Set<Stream> getResultStreams() {
+    return ImmutableSet.of(this.resultStream);
+  }
+}
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/BoostVHTProcessor.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/BoostVHTProcessor.java
new file mode 100644
index 0000000..b71ec36
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/BoostVHTProcessor.java
@@ -0,0 +1,444 @@
+package org.apache.samoa.learners.classifiers.ensemble;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.learners.InstanceContentEvent;
+import org.apache.samoa.learners.ResultContentEvent;
+import org.apache.samoa.learners.classifiers.trees.BoostVHTActiveLearningNode.SplittingOption;
+import org.apache.samoa.learners.classifiers.trees.LocalResultContentEvent;
+import org.apache.samoa.moa.classifiers.core.splitcriteria.InfoGainSplitCriterion;
+import org.apache.samoa.moa.classifiers.core.splitcriteria.SplitCriterion;
+import org.apache.samoa.moa.core.DoubleVector;
+import org.apache.samoa.moa.core.MiscUtils;
+import org.apache.samoa.topology.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+/**
+ * The Class BoostVHTProcessor.
+ */
+public class BoostVHTProcessor implements Processor {
+
+  private static final long serialVersionUID = -1550901409625192730L;
+  private static final Logger logger = LoggerFactory.getLogger(BoostVHTProcessor.class);
+  
+  //The following are configured from the user in BoostVHT
+  private SplitCriterion splitCriterion;
+  
+  private Double splitConfidence;
+  
+  private Double tieThreshold;
+  
+  private int gracePeriod;
+  
+  private int parallelismHint;
+  
+  private int timeOut;
+
+  private SplittingOption splittingOption;
+
+  /** The input dataset to BoostVHT. */
+  private Instances dataset;
+  
+  /** The ensemble size. */
+  private int ensembleSize;
+  
+  /** The result stream. */
+  private Stream resultStream;
+  
+  /** The control stream. */
+  private Stream controlStream;
+  
+  /** The attribute stream. */
+  private Stream attributeStream;
+  
+  protected BoostMAProcessor[] mAPEnsemble;
+
+  /** Ramdom number generator. */
+  protected Random random;
+
+  private int seed;
+
+  // lambda_m correct
+  protected double[] scms;
+  // lambda_m wrong
+  protected double[] swms;
+  private double[] e_m;
+
+  private double trainingWeightSeenByModel;
+
+  private int numberOfClasses;
+
+  private int maxBufferSize;
+
+  private BoostVHTProcessor(Builder builder) {
+    this.dataset = builder.dataset;
+    this.ensembleSize = builder.ensembleSize;
+    this.seed = builder.seed;
+    this.numberOfClasses = builder.numberOfClasses;
+    this.splitCriterion = builder.splitCriterion;
+    this.splitConfidence = builder.splitConfidence;
+    this.tieThreshold = builder.tieThreshold;
+    this.gracePeriod = builder.gracePeriod;
+    this.parallelismHint = builder.parallelismHint;
+    this.timeOut = builder.timeOut;
+    this.splittingOption = builder.splittingOption;
+    this.maxBufferSize = builder.maxBufferSize;
+  }
+
+  /**
+   * On event.
+   * 
+   * @param event the event
+   * @return true, if successful
+   */
+  public boolean process(ContentEvent event) {
+
+    if (event instanceof InstanceContentEvent) {
+      InstanceContentEvent inEvent = (InstanceContentEvent) event;
+      //todo:: check if any precondition is needed
+
+      if (inEvent.isTesting()) {
+        double[] combinedPrediction = computeBoosting(inEvent);
+        this.resultStream.put(newResultContentEvent(combinedPrediction, inEvent));
+      }
+
+      // estimate model parameters using the training data
+      if (inEvent.isTraining()) {
+        train(inEvent);
+      }
+    } else if (event instanceof LocalResultContentEvent) {
+      LocalResultContentEvent lrce = (LocalResultContentEvent) event;
+      mAPEnsemble[lrce.getEnsembleId()].updateModel(lrce);
+    }
+
+
+    return true;
+  }
+
+  @Override
+  public void onCreate(int id) {
+    
+    mAPEnsemble = new BoostMAProcessor[ensembleSize];
+
+    random = new Random(seed);
+
+    this.scms = new double[ensembleSize];
+    this.swms = new double[ensembleSize];
+    this.e_m = new double[ensembleSize];
+
+    //instantiate the MAs
+    for (int i = 0; i < ensembleSize; i++) {
+      BoostMAProcessor newProc = new BoostMAProcessor.BoostMABuilder(dataset)
+          .splitCriterion(splitCriterion)
+          .splitConfidence(splitConfidence)
+          .tieThreshold(tieThreshold)
+          .gracePeriod(gracePeriod)
+          .parallelismHint(parallelismHint)
+          .timeOut(timeOut)
+          .processorID(i) // The BoostMA processors get incremental ids
+          .maxBufferSize(maxBufferSize)
+          .splittingOption(splittingOption)
+          .build();
+      newProc.setAttributeStream(this.attributeStream);
+      newProc.setControlStream(this.controlStream);
+      mAPEnsemble[i] = newProc;
+    }
+  }
+  
+  private double[] computeBoosting(InstanceContentEvent inEvent) {
+    
+    Instance testInstance = inEvent.getInstance();
+    DoubleVector combinedPredictions = new DoubleVector();
+
+    for (int i = 0; i < ensembleSize; i++) {
+      double memberWeight = getEnsembleMemberWeight(i);
+      if (memberWeight > 0.0) {
+        DoubleVector vote = new DoubleVector(mAPEnsemble[i].getVotesForInstance(testInstance));
+        if (vote.sumOfValues() > 0.0) {
+          vote.normalize();
+          vote.scaleValues(memberWeight);
+          combinedPredictions.addValues(vote);
+        }
+      } else {
+        break;
+      }
+    }
+    return combinedPredictions.getArrayRef();
+  }
+  
+  /**
+   * Train.
+   *
+   * @param inEvent
+   *          the in event
+   */
+  protected void train(InstanceContentEvent inEvent) {
+    Instance trainInstance = inEvent.getInstance();
+
+    this.trainingWeightSeenByModel += trainInstance.weight();
+    double lambda_d = 1.0;
+    
+    for (int i = 0; i < ensembleSize; i++) { //for each base model
+      int k = MiscUtils.poisson(lambda_d, this.random); //set k according to poisson
+
+      if (k > 0) {
+        Instance weightedInstance = trainInstance.copy();
+        weightedInstance.setWeight(trainInstance.weight() * k);
+        mAPEnsemble[i].trainOnInstanceImpl(weightedInstance);
+      }
+      //get prediction for the instance from the specific learner of the ensemble
+      double[] prediction = mAPEnsemble[i].getVotesForInstance(trainInstance);
+
+      if (mAPEnsemble[i].correctlyClassifies(trainInstance,prediction)) {
+        this.scms[i] += lambda_d;
+        lambda_d *= this.trainingWeightSeenByModel / (2 * this.scms[i]);
+      } else {
+        this.swms[i] += lambda_d;
+        lambda_d *= this.trainingWeightSeenByModel / (2 * this.swms[i]);
+      }
+    }
+  }
+  
+  private double getEnsembleMemberWeight(int i) {
+    double em = this.swms[i] / (this.scms[i] + this.swms[i]);
+//    if ((em == 0.0) || (em > 0.5)) {
+    if ((em == 0.0) || (em > (1.0 - 1.0/this.numberOfClasses))) { //for SAMME
+      return 0.0;
+    }
+    double Bm = em / (1.0 - em);
+//    return Math.log(1.0 / Bm);
+    return Math.log(1.0 / Bm ) + Math.log(this.numberOfClasses - 1); //for SAMME
+  }
+  
+  /**
+   * Helper method to generate new ResultContentEvent based on an instance and its prediction result.
+   *
+   * @param combinedPrediction
+   *          The predicted class label from the Boost-VHT decision tree model.
+   * @param inEvent
+   *          The associated instance content event
+   * @return ResultContentEvent to be sent into Evaluator PI or other destination PI.
+   */
+  private ResultContentEvent newResultContentEvent(double[] combinedPrediction, InstanceContentEvent inEvent) {
+    ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(),
+            inEvent.getClassId(), combinedPrediction, inEvent.isLastEvent(), inEvent.getArrivalTimestamp());
+    rce.setEvaluationIndex(inEvent.getEvaluationIndex());
+    return rce;
+  }
+
+  public static class Builder {
+    // BoostVHT processor parameters
+    private final Instances dataset;
+    private int ensembleSize;
+    private int numberOfClasses;
+
+    // BoostMAProcessor parameters
+    private SplitCriterion splitCriterion = new InfoGainSplitCriterion();
+    private double splitConfidence;
+    private double tieThreshold;
+    private int gracePeriod;
+    private int parallelismHint;
+    private int timeOut = Integer.MAX_VALUE;
+    private SplittingOption splittingOption;
+    private int maxBufferSize;
+    private int seed;
+
+    public Builder(Instances dataset) {
+      this.dataset = dataset;
+    }
+
+    public Builder(BoostVHTProcessor oldProcessor) {
+      this.dataset = oldProcessor.getDataset();
+      this.ensembleSize = oldProcessor.getEnsembleSize();
+      this.numberOfClasses = oldProcessor.getNumberOfClasses();
+      this.splitCriterion = oldProcessor.getSplitCriterion();
+      this.splitConfidence = oldProcessor.getSplitConfidence();
+      this.tieThreshold = oldProcessor.getTieThreshold();
+      this.gracePeriod = oldProcessor.getGracePeriod();
+      this.parallelismHint = oldProcessor.getParallelismHint();
+      this.timeOut = oldProcessor.getTimeOut();
+      this.splittingOption = oldProcessor.splittingOption;
+      this.seed = oldProcessor.getSeed();
+    }
+
+    public Builder ensembleSize(int ensembleSize) {
+      this.ensembleSize = ensembleSize;
+      return this;
+    }
+
+    public Builder numberOfClasses(int numberOfClasses) {
+      this.numberOfClasses = numberOfClasses;
+      return this;
+    }
+
+    public Builder splitCriterion(SplitCriterion splitCriterion) {
+      this.splitCriterion = splitCriterion;
+      return this;
+    }
+
+    public Builder splitConfidence(double splitConfidence) {
+      this.splitConfidence = splitConfidence;
+      return this;
+    }
+
+    public Builder tieThreshold(double tieThreshold) {
+      this.tieThreshold = tieThreshold;
+      return this;
+    }
+
+    public Builder gracePeriod(int gracePeriod) {
+      this.gracePeriod = gracePeriod;
+      return this;
+    }
+
+    public Builder parallelismHint(int parallelismHint) {
+      this.parallelismHint = parallelismHint;
+      return this;
+    }
+
+    public Builder timeOut(int timeOut) {
+      this.timeOut = timeOut;
+      return this;
+    }
+
+    public Builder splittingOption(SplittingOption splittingOption) {
+      this.splittingOption = splittingOption;
+      return this;
+    }
+
+    public Builder maxBufferSize(int maxBufferSize) {
+      this.maxBufferSize= maxBufferSize;
+      return this;
+    }
+
+    public Builder seed(int seed) {
+      this.seed = seed;
+      return this;
+    }
+
+    public BoostVHTProcessor build() {
+      return new BoostVHTProcessor(this);
+    }
+  }
+  
+  public Instances getInputInstances() {
+    return dataset;
+  }
+  
+  public void setInputInstances(Instances dataset) {
+    this.dataset = dataset;
+  }
+  
+  public Stream getResultStream() {
+    return this.resultStream;
+  }
+  
+  public void setResultStream(Stream resultStream) {
+    this.resultStream = resultStream;
+  }
+
+  public int getEnsembleSize() {
+    return ensembleSize;
+  }
+
+  public Stream getControlStream() {
+    return controlStream;
+  }
+  
+  public void setControlStream(Stream controlStream) {
+    this.controlStream = controlStream;
+  }
+
+  public Stream getAttributeStream() {
+    return attributeStream;
+  }
+
+  public void setAttributeStream(Stream attributeStream) {
+    this.attributeStream = attributeStream;
+  }
+
+  public SplitCriterion getSplitCriterion() {
+    return splitCriterion;
+  }
+  
+
+  public Double getSplitConfidence() {
+    return splitConfidence;
+  }
+  
+
+  public Double getTieThreshold() {
+    return tieThreshold;
+  }
+
+  public int getSeed() {
+    return seed;
+  }
+
+  public int getGracePeriod() {
+    return gracePeriod;
+  }
+  
+
+  public int getParallelismHint() {
+    return parallelismHint;
+  }
+
+  public int getTimeOut() {
+    return timeOut;
+  }
+  
+  public void setTimeOut(int timeOut) {
+    this.timeOut = timeOut;
+  }
+  
+  public int getNumberOfClasses() {
+    return numberOfClasses;
+  }
+  
+  public void setNumberOfClasses(int numberOfClasses) {
+    this.numberOfClasses = numberOfClasses;
+  }
+  
+  public Instances getDataset() {
+    return dataset;
+  }
+  
+  @Override
+  public Processor newProcessor(Processor sourceProcessor) {
+    BoostVHTProcessor originProcessor = (BoostVHTProcessor) sourceProcessor;
+    BoostVHTProcessor newProcessor = new BoostVHTProcessor.Builder(originProcessor).build();
+
+    if (originProcessor.getResultStream() != null) {
+      newProcessor.setResultStream(originProcessor.getResultStream());
+      newProcessor.setControlStream(originProcessor.getControlStream());
+      newProcessor.setAttributeStream(originProcessor.getAttributeStream());
+    }
+    return newProcessor;
+  }
+}
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ActiveLearningNode.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ActiveLearningNode.java
index a437719..1e27acc 100644
--- a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ActiveLearningNode.java
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ActiveLearningNode.java
@@ -28,26 +28,26 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class ActiveLearningNode extends LearningNode {
+public class ActiveLearningNode extends LearningNode {
   /**
-	 * 
+	 *
 	 */
   private static final long serialVersionUID = -2892102872646338908L;
   private static final Logger logger = LoggerFactory.getLogger(ActiveLearningNode.class);
 
-  private double weightSeenAtLastSplitEvaluation;
+  protected double weightSeenAtLastSplitEvaluation;
 
-  private final Map<Integer, String> attributeContentEventKeys;
+  protected Map<Integer, String> attributeContentEventKeys;
 
-  private AttributeSplitSuggestion bestSuggestion;
-  private AttributeSplitSuggestion secondBestSuggestion;
+  protected AttributeSplitSuggestion bestSuggestion;
+  protected AttributeSplitSuggestion secondBestSuggestion;
 
-  private final long id;
-  private final int parallelismHint;
-  private int suggestionCtr;
-  private int thrownAwayInstance;
+  protected long id;
+  protected int parallelismHint;
+  protected int suggestionCtr;
+  protected int thrownAwayInstance;
 
-  private boolean isSplitting;
+  protected boolean isSplitting;
 
   ActiveLearningNode(double[] classObservation, int parallelismHint) {
     super(classObservation);
@@ -58,7 +58,7 @@
     this.parallelismHint = parallelismHint;
   }
 
-  long getId() {
+  protected long getId() {
     return id;
   }
 
@@ -73,7 +73,7 @@
   }
 
   @Override
-  void learnFromInstance(Instance inst, ModelAggregatorProcessor proc) {
+  public void learnFromInstance(Instance inst, ModelAggregatorProcessor proc) {
     // TODO: what statistics should we keep for unused instance?
     if (isSplitting) { // currently throw all instance will splitting
       this.thrownAwayInstance++;
@@ -120,23 +120,23 @@
   }
 
   @Override
-  double[] getClassVotes(Instance inst, ModelAggregatorProcessor map) {
+  public double[] getClassVotes(Instance inst, ModelAggregatorProcessor map) {
     return this.observedClassDistribution.getArrayCopy();
   }
 
-  double getWeightSeen() {
+  public double getWeightSeen() {
     return this.observedClassDistribution.sumOfValues();
   }
 
-  void setWeightSeenAtLastSplitEvaluation(double weight) {
+  public void setWeightSeenAtLastSplitEvaluation(double weight) {
     this.weightSeenAtLastSplitEvaluation = weight;
   }
 
-  double getWeightSeenAtLastSplitEvaluation() {
+  public double getWeightSeenAtLastSplitEvaluation() {
     return this.weightSeenAtLastSplitEvaluation;
   }
 
-  void requestDistributedSuggestions(long splitId, ModelAggregatorProcessor modelAggrProc) {
+  public void requestDistributedSuggestions(long splitId, ModelAggregatorProcessor modelAggrProc) {
     this.isSplitting = true;
     this.suggestionCtr = 0;
     this.thrownAwayInstance = 0;
@@ -146,7 +146,7 @@
     modelAggrProc.sendToControlStream(cce);
   }
 
-  void addDistributedSuggestions(AttributeSplitSuggestion bestSuggestion, AttributeSplitSuggestion secondBestSuggestion) {
+  public void addDistributedSuggestions(AttributeSplitSuggestion bestSuggestion, AttributeSplitSuggestion secondBestSuggestion) {
     // starts comparing from the best suggestion
     if (bestSuggestion != null) {
       if ((this.bestSuggestion == null) || (bestSuggestion.compareTo(this.bestSuggestion) > 0)) {
@@ -170,7 +170,7 @@
     this.suggestionCtr++;
   }
 
-  boolean isSplitting() {
+  public boolean isSplitting() {
     return this.isSplitting;
   }
 
@@ -182,15 +182,15 @@
     this.secondBestSuggestion = null;
   }
 
-  AttributeSplitSuggestion getDistributedBestSuggestion() {
+  public AttributeSplitSuggestion getDistributedBestSuggestion() {
     return this.bestSuggestion;
   }
 
-  AttributeSplitSuggestion getDistributedSecondBestSuggestion() {
+  public AttributeSplitSuggestion getDistributedSecondBestSuggestion() {
     return this.secondBestSuggestion;
   }
 
-  boolean isAllSuggestionsCollected() {
+  public boolean isAllSuggestionsCollected() {
     return (this.suggestionCtr == this.parallelismHint);
   }
 
@@ -198,7 +198,7 @@
     return inst.classIndex() > index ? index : index + 1;
   }
 
-  private String generateKey(int obsIndex) {
+  protected String generateKey(int obsIndex) {
     final int prime = 31;
     int result = 1;
     result = prime * result + (int) (this.id ^ (this.id >>> 32));
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/AttributeSliceEvent.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/AttributeSliceEvent.java
new file mode 100644
index 0000000..9e3f245
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/AttributeSliceEvent.java
@@ -0,0 +1,90 @@
+package org.apache.samoa.learners.classifiers.trees;
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+
+/**
+ * Attribute Slice Event represents the instances that split into parallelismHint (no. of local stats processors - LSP)
+ * and send only one message per LSP for BoostVHT algorithm which contains that slice of the attributes along with required information
+ * to update the class observers.
+ *
+ */
+
+public class AttributeSliceEvent implements ContentEvent{
+  private static final long serialVersionUID = 6752449086753238767L;
+  private final long learningNodeId;
+  private final int attributeStartingIndex;
+  private final transient String key;
+  private final boolean[] isNominalSlice;
+  private final double[] attributeSlice;
+  private final int classValue;
+  private final double weight;
+
+  public AttributeSliceEvent(
+      long learningNodeId, int attributeStartingIndex, String key, boolean[] isNominalSlice, double[] attributeSlice,
+      int classValue, double weight) {
+    this.learningNodeId = learningNodeId;
+    this.attributeStartingIndex = attributeStartingIndex;
+    this.key = key;
+    this.isNominalSlice = isNominalSlice;
+    this.attributeSlice = attributeSlice;
+    this.classValue = classValue;
+    this.weight = weight;
+  }
+
+  public int getClassValue() {
+    return classValue;
+  }
+
+  public double getWeight() {
+    return weight;
+  }
+
+  public long getLearningNodeId() {
+    return learningNodeId;
+  }
+
+  public int getAttributeStartingIndex() {
+    return attributeStartingIndex;
+  }
+
+  public boolean[] getIsNominalSlice() {
+    return isNominalSlice;
+  }
+
+  public double[] getAttributeSlice() {
+    return attributeSlice;
+  }
+
+  @Override
+  public String getKey() {
+    return key;
+  }
+
+  @Override
+  public void setKey(String key) {
+  }
+
+  @Override
+  public boolean isLastEvent() { // TODO
+    return false;
+  }
+}
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/BoostVHTActiveLearningNode.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/BoostVHTActiveLearningNode.java
new file mode 100644
index 0000000..65d89d3
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/BoostVHTActiveLearningNode.java
@@ -0,0 +1,145 @@
+package org.apache.samoa.learners.classifiers.trees;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.google.common.collect.EvictingQueue;
+import org.apache.samoa.instances.Attribute;
+import org.apache.samoa.instances.Instance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Queue;
+
+public final class BoostVHTActiveLearningNode extends ActiveLearningNode {
+
+  public enum SplittingOption {
+    THROW_AWAY, KEEP
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(BoostVHTActiveLearningNode.class);
+
+  private final SplittingOption splittingOption;
+  private final int maxBufferSize;
+  private final Queue<Instance> buffer;
+  private int ensembleId;
+
+  public BoostVHTActiveLearningNode(double[] classObservation, int parallelism_hint, SplittingOption splitOption, int maxBufferSize) {
+    super(classObservation, parallelism_hint);
+    weightSeenAtLastSplitEvaluation = this.getWeightSeen();
+    id = VerticalHoeffdingTree.LearningNodeIdGenerator.generate();
+    attributeContentEventKeys = new HashMap<>();
+    isSplitting = false;
+    parallelismHint = parallelism_hint;
+    this.splittingOption = splitOption;
+    this.maxBufferSize = maxBufferSize;
+    this.buffer = EvictingQueue.create(maxBufferSize);
+  }
+
+  @Override
+  public void learnFromInstance(Instance inst, ModelAggregatorProcessor proc) {
+    if (isSplitting) {
+      switch (this.splittingOption) {
+        case THROW_AWAY:
+          //logger.trace("node {}: splitting is happening, throw away the instance", this.id); // throw all instance will splitting
+          thrownAwayInstance++;
+          return;
+        case KEEP:
+          //logger.trace("node {}: keep instance with max buffer size: {}, continue sending to local stats", this.id, this.maxBufferSize);
+          //logger.trace("node {}: add to buffer", this.id);
+          buffer.add(inst);
+          break;
+        default:
+          logger.error("node {}: invalid splittingOption option: {}", id, this.splittingOption);
+          break;
+      }
+    }
+
+    // What we do is slice up the attributes array into parallelismHint (no. of local stats processors - LSP)
+    // and send only one message per LSP which contains that slice of the attributes along with required information
+    // to update the class observers.
+    // Given that we are sending slices, there's probably some optimizations that can be made at the LSP level,
+    // like being smarter about how we update the observers.
+    this.observedClassDistribution.addToValue((int) inst.classValue(),
+            inst.weight());
+    double[] attributeArray = inst.toDoubleArray();
+    int sliceSize = (attributeArray.length - 1) / parallelismHint;
+    boolean[] isNominalAll = new boolean[inst.numAttributes() - 1];
+    for (int i = 0; i < inst.numAttributes() - 1; i++) {
+      Attribute att = inst.attribute(i);
+      if (att.isNominal()) {
+        isNominalAll[i] = true;
+      }
+    }
+    int startingIndex = 0;
+    for (int localStatsIndex = 0; localStatsIndex < parallelismHint; localStatsIndex++) {
+      // The endpoint for the slice is either the end of the previous slice, or the end of the array
+      // TODO: Note that we assume class is at the end of the instance attribute array, hence the length-1 here
+      // We can do proper handling later
+      int endpoint = localStatsIndex == (parallelismHint - 1) ? (attributeArray.length - 1) : (localStatsIndex + 1) * sliceSize;
+      double[] attributeSlice = Arrays.copyOfRange(
+              attributeArray, localStatsIndex * sliceSize, endpoint);
+      boolean[] isNominalSlice = Arrays.copyOfRange(
+              isNominalAll, localStatsIndex * sliceSize, endpoint);
+      AttributeSliceEvent attributeSliceEvent = new AttributeSliceEvent(
+              this.id, startingIndex, Integer.toString(localStatsIndex), isNominalSlice, attributeSlice,
+              (int) inst.classValue(), inst.weight());
+      proc.sendToAttributeStream(attributeSliceEvent);
+      startingIndex = endpoint;
+    }
+  }
+
+  @Override
+  public void requestDistributedSuggestions(long splitId, ModelAggregatorProcessor modelAggrProc) {
+    this.isSplitting = true;
+    this.suggestionCtr = 0;
+    this.thrownAwayInstance = 0;
+
+    ComputeContentEvent cce = new ComputeContentEvent(splitId, this.id,
+            this.getObservedClassDistribution());
+    cce.setEnsembleId(this.ensembleId);
+    modelAggrProc.sendToControlStream(cce);
+  }
+
+  @Override
+  public void endSplitting() {
+    super.endSplitting();
+    this.buffer.clear();
+  }
+
+  @Override
+  protected String generateKey(int obsIndex) {
+    return Integer.toString(obsIndex % parallelismHint);
+  }
+
+  public Queue<Instance> getBuffer() {
+    return buffer;
+  }
+
+  public int getEnsembleId() {
+    return ensembleId;
+  }
+
+  public void setEnsembleId(int ensembleId) {
+    this.ensembleId = ensembleId;
+  }
+}
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ComputeContentEvent.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ComputeContentEvent.java
index fe56cc1..fe5ece8 100644
--- a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ComputeContentEvent.java
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ComputeContentEvent.java
@@ -38,6 +38,8 @@
 
   private final double[] preSplitDist;
   private final long splitId;
+  
+  private int ensembleId; //the id of the ensemble that send the event
 
   public ComputeContentEvent() {
     super(-1);
@@ -142,4 +144,11 @@
 
   }
 
+  public int getEnsembleId() {
+    return ensembleId;
+  }
+  
+  public void setEnsembleId(int ensembleId) {
+    this.ensembleId = ensembleId;
+  }
 }
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FoundNode.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FoundNode.java
index 61d9b19..b3607cc 100644
--- a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FoundNode.java
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FoundNode.java
@@ -50,7 +50,7 @@
    * 
    * @return The node where the instance is routed/filtered
    */
-  Node getNode() {
+  public Node getNode() {
     return this.node;
   }
 
@@ -71,7 +71,7 @@
    * 
    * @return The parent of the node
    */
-  SplitNode getParent() {
+  public SplitNode getParent() {
     return this.parent;
   }
 
@@ -81,7 +81,7 @@
    * 
    * @return The index of the node in its parent node.
    */
-  int getParentBranch() {
+  public int getParentBranch() {
     return this.parentBranch;
   }
 
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/InactiveLearningNode.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/InactiveLearningNode.java
index 1e38377..beb8aa6 100644
--- a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/InactiveLearningNode.java
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/InactiveLearningNode.java
@@ -36,12 +36,12 @@
 	 */
   private static final long serialVersionUID = -814552382883472302L;
 
-  InactiveLearningNode(double[] initialClassObservation) {
+  public InactiveLearningNode(double[] initialClassObservation) {
     super(initialClassObservation);
   }
 
   @Override
-  void learnFromInstance(Instance inst, ModelAggregatorProcessor proc) {
+  public void learnFromInstance(Instance inst, ModelAggregatorProcessor proc) {
     this.observedClassDistribution.addToValue(
         (int) inst.classValue(), inst.weight());
   }
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LearningNode.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LearningNode.java
index f7f7826..ad41ee7 100644
--- a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LearningNode.java
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LearningNode.java
@@ -38,13 +38,13 @@
 
   /**
    * Method to process the instance for learning
-   * 
+   *
    * @param inst
    *          The processed instance
    * @param proc
    *          The model aggregator processor where this learning node exists
    */
-  abstract void learnFromInstance(Instance inst, ModelAggregatorProcessor proc);
+  public abstract void learnFromInstance(Instance inst, ModelAggregatorProcessor proc);
 
   @Override
   protected boolean isLeaf() {
@@ -52,7 +52,7 @@
   }
 
   @Override
-  protected FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent,
+  public FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent,
       int parentBranch) {
     return new FoundNode(this, parent, parentBranch);
   }
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalResultContentEvent.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalResultContentEvent.java
index 485ac75..ddc3121 100644
--- a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalResultContentEvent.java
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalResultContentEvent.java
@@ -37,6 +37,7 @@
   private final AttributeSplitSuggestion bestSuggestion;
   private final AttributeSplitSuggestion secondBestSuggestion;
   private final long splitId;
+  private int ensembleId; //the id of the ensemble that asked for the local statistics
 
   public LocalResultContentEvent() {
     bestSuggestion = null;
@@ -60,7 +61,7 @@
    * 
    * @return The best attribute split suggestion.
    */
-  AttributeSplitSuggestion getBestSuggestion() {
+  public AttributeSplitSuggestion getBestSuggestion() {
     return this.bestSuggestion;
   }
 
@@ -69,7 +70,7 @@
    * 
    * @return The second best attribute split suggestion.
    */
-  AttributeSplitSuggestion getSecondBestSuggestion() {
+  public AttributeSplitSuggestion getSecondBestSuggestion() {
     return this.secondBestSuggestion;
   }
 
@@ -78,7 +79,7 @@
    * 
    * @return The split id of this local calculation result
    */
-  long getSplitId() {
+  public long getSplitId() {
     return this.splitId;
   }
 
@@ -92,4 +93,12 @@
   public boolean isLastEvent() {
     return false;
   }
+
+  public int getEnsembleId() {
+    return ensembleId;
+  }
+  
+  public void setEnsembleId(int ensembleId) {
+    this.ensembleId = ensembleId;
+  }
 }
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java
index 6e7c174..fde2300 100644
--- a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java
@@ -24,7 +24,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Vector;
 
 import org.apache.samoa.core.ContentEvent;
 import org.apache.samoa.core.Processor;
@@ -65,6 +64,7 @@
   private final boolean binarySplit;
   private final AttributeClassObserver nominalClassObserver;
   private final AttributeClassObserver numericClassObserver;
+  private int id;
 
   // the two observer classes below are also needed to be setup from the Tree
   private LocalStatisticsProcessor(Builder builder) {
@@ -102,66 +102,94 @@
        * = (AttributeContentEvent) event; Long learningNodeId =
        * Long.valueOf(ace.getLearningNodeId()); Integer obsIndex =
        * Integer.valueOf(ace.getObsIndex());
-       * 
+       *
        * AttributeClassObserver obs = localStats.get( learningNodeId, obsIndex);
-       * 
+       *
        * if (obs == null) { obs = ace.isNominal() ? newNominalClassObserver() :
        * newNumericClassObserver(); localStats.put(ace.getLearningNodeId(),
        * obsIndex, obs); } obs.observeAttributeClass(ace.getAttrVal(),
        * ace.getClassVal(), ace.getWeight());
        */
-    } else if (event instanceof ComputeContentEvent) {
-      // process ComputeContentEvent by calculating the local statistic
-      // and send back the calculation results via computation result stream.
+    } else if (event instanceof AttributeSliceEvent) {
+      AttributeSliceEvent ase = (AttributeSliceEvent) event;
+      processAttributeSlice(ase);
+
+    } else if (event instanceof ComputeContentEvent){
       ComputeContentEvent cce = (ComputeContentEvent) event;
-      Long learningNodeId = cce.getLearningNodeId();
-      double[] preSplitDist = cce.getPreSplitDist();
+      processComputeEvent(cce);
 
-      Map<Integer, AttributeClassObserver> learningNodeRowMap = localStats
-          .row(learningNodeId);
-      List<AttributeSplitSuggestion> suggestions = new Vector<>();
-
-      for (Entry<Integer, AttributeClassObserver> entry : learningNodeRowMap.entrySet()) {
-        AttributeClassObserver obs = entry.getValue();
-        AttributeSplitSuggestion suggestion = obs
-            .getBestEvaluatedSplitSuggestion(splitCriterion,
-                preSplitDist, entry.getKey(), binarySplit);
-        if (suggestion != null) {
-          suggestions.add(suggestion);
-        }
-      }
-
-      AttributeSplitSuggestion[] bestSuggestions = suggestions
-          .toArray(new AttributeSplitSuggestion[suggestions.size()]);
-
-      Arrays.sort(bestSuggestions);
-
-      AttributeSplitSuggestion bestSuggestion = null;
-      AttributeSplitSuggestion secondBestSuggestion = null;
-
-      if (bestSuggestions.length >= 1) {
-        bestSuggestion = bestSuggestions[bestSuggestions.length - 1];
-
-        if (bestSuggestions.length >= 2) {
-          secondBestSuggestion = bestSuggestions[bestSuggestions.length - 2];
-        }
-      }
-
-      // create the local result content event
-      LocalResultContentEvent lcre =
-          new LocalResultContentEvent(cce.getSplitId(), bestSuggestion, secondBestSuggestion);
-      computationResultStream.put(lcre);
-      logger.debug("Finish compute event");
     } else if (event instanceof DeleteContentEvent) {
       DeleteContentEvent dce = (DeleteContentEvent) event;
       Long learningNodeId = dce.getLearningNodeId();
       localStats.rowMap().remove(learningNodeId);
     }
-    return false;
+    return true;
+  }
+
+  private void processComputeEvent(ComputeContentEvent cce) {
+    Long learningNodeId = cce.getLearningNodeId();
+    double[] preSplitDist = cce.getPreSplitDist();
+
+    Map<Integer, AttributeClassObserver> learningNodeRowMap = localStats.row(learningNodeId);
+    AttributeSplitSuggestion[] suggestions = new AttributeSplitSuggestion[learningNodeRowMap.size()];
+
+    int curIndex = 0;
+    for (Entry<Integer, AttributeClassObserver> entry : learningNodeRowMap.entrySet()) {
+      AttributeClassObserver obs = entry.getValue();
+      AttributeSplitSuggestion suggestion = obs
+          .getBestEvaluatedSplitSuggestion(splitCriterion,
+              preSplitDist, entry.getKey(), binarySplit);
+      if (suggestion == null) {
+        suggestion = new AttributeSplitSuggestion();
+      }
+      suggestions[curIndex] = suggestion;
+      curIndex++;
+    }
+
+    // Doing this sort instead of keeping the max and second max seems faster for some reason
+    Arrays.sort(suggestions);
+
+    AttributeSplitSuggestion bestSuggestion = null;
+    AttributeSplitSuggestion secondBestSuggestion = null;
+
+    if (suggestions.length >= 1) {
+    bestSuggestion = suggestions[suggestions.length - 1];
+
+      if (suggestions.length >= 2) {
+        secondBestSuggestion = suggestions[suggestions.length - 2];
+      }
+    }
+
+    // create the local result content event
+    LocalResultContentEvent lcre =
+        new LocalResultContentEvent(cce.getSplitId(), bestSuggestion, secondBestSuggestion);
+    lcre.setEnsembleId(cce.getEnsembleId());
+    computationResultStream.put(lcre);
+  }
+
+  private void processAttributeSlice(AttributeSliceEvent ase) {
+    //      System.out.printf("Event with key: %s processed by LSP: %d%n", ase.getKey(), id);
+    double[] attributeSlice = ase.getAttributeSlice();
+    boolean[] isNominal = ase.getIsNominalSlice();
+    int startingIndex = ase.getAttributeStartingIndex();
+    Long learningNodeId = ase.getLearningNodeId();
+    int classValue = ase.getClassValue();
+    double weight = ase.getWeight();
+
+    for (int i = 0; i < attributeSlice.length; i++) {
+      Integer obsIndex = i + startingIndex;
+      AttributeClassObserver obs = localStats.get(learningNodeId, obsIndex);
+      if (obs == null) {
+        obs = isNominal[i] ? newNominalClassObserver() : newNumericClassObserver();
+        localStats.put(learningNodeId, obsIndex, obs);
+      }
+      obs.observeAttributeClass(attributeSlice[i], classValue, weight);
+    }
   }
 
   @Override
   public void onCreate(int id) {
+    this.id = id;
     this.localStats = HashBasedTable.create();
   }
 
@@ -170,7 +198,7 @@
     LocalStatisticsProcessor oldProcessor = (LocalStatisticsProcessor) p;
     LocalStatisticsProcessor newProcessor = new LocalStatisticsProcessor.Builder(oldProcessor).build();
 
-    newProcessor.setComputationResultStream(oldProcessor.computationResultStream);
+    newProcessor.setComputationResultStream(oldProcessor.getComputationResultStream());
 
     return newProcessor;
   }
@@ -180,16 +208,16 @@
    * 
    * @param computeStream
    */
-  void setComputationResultStream(Stream computeStream) {
+  public void setComputationResultStream(Stream computeStream) {
     this.computationResultStream = computeStream;
   }
 
   private AttributeClassObserver newNominalClassObserver() {
-    return (AttributeClassObserver) this.nominalClassObserver.copy();
+    return new NominalAttributeClassObserver(); //further investigate this change
   }
 
   private AttributeClassObserver newNumericClassObserver() {
-    return (AttributeClassObserver) this.numericClassObserver.copy();
+    return new GaussianNumericAttributeClassObserver();//further investigate this change
   }
 
   /**
@@ -198,45 +226,66 @@
    * @author Arinto Murdopo
    * 
    */
-  static class Builder {
+  public static class Builder {
 
     private SplitCriterion splitCriterion = new InfoGainSplitCriterion();
     private boolean binarySplit = false;
     private AttributeClassObserver nominalClassObserver = new NominalAttributeClassObserver();
     private AttributeClassObserver numericClassObserver = new GaussianNumericAttributeClassObserver();
 
-    Builder() {
+    public Builder() {
 
     }
 
-    Builder(LocalStatisticsProcessor oldProcessor) {
-      this.splitCriterion = oldProcessor.splitCriterion;
-      this.binarySplit = oldProcessor.binarySplit;
+    public Builder(LocalStatisticsProcessor oldProcessor) {
+      this.splitCriterion = oldProcessor.getSplitCriterion();
+      this.binarySplit = oldProcessor.isBinarySplit();
+      this.nominalClassObserver = oldProcessor.getNominalClassObserver();
+      this.numericClassObserver = oldProcessor.getNumericClassObserver();
     }
 
-    Builder splitCriterion(SplitCriterion splitCriterion) {
+    public Builder splitCriterion(SplitCriterion splitCriterion) {
       this.splitCriterion = splitCriterion;
       return this;
     }
 
-    Builder binarySplit(boolean binarySplit) {
+    public Builder binarySplit(boolean binarySplit) {
       this.binarySplit = binarySplit;
       return this;
     }
 
-    Builder nominalClassObserver(AttributeClassObserver nominalClassObserver) {
+    public Builder nominalClassObserver(AttributeClassObserver nominalClassObserver) {
       this.nominalClassObserver = nominalClassObserver;
       return this;
     }
 
-    Builder numericClassObserver(AttributeClassObserver numericClassObserver) {
+    public Builder numericClassObserver(AttributeClassObserver numericClassObserver) {
       this.numericClassObserver = numericClassObserver;
       return this;
     }
 
-    LocalStatisticsProcessor build() {
+    public LocalStatisticsProcessor build() {
       return new LocalStatisticsProcessor(this);
     }
   }
-
+  
+  public SplitCriterion getSplitCriterion() {
+    return splitCriterion;
+  }
+  
+  public boolean isBinarySplit() {
+    return binarySplit;
+  }
+  
+  public AttributeClassObserver getNominalClassObserver() {
+    return nominalClassObserver;
+  }
+  
+  public AttributeClassObserver getNumericClassObserver() {
+    return numericClassObserver;
+  }
+  
+  public Stream getComputationResultStream() {
+    return computationResultStream;
+  }
 }
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
index a4f6fe1..d389102 100644
--- a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
@@ -64,44 +64,45 @@
  * @author Arinto Murdopo
  * 
  */
-public final class ModelAggregatorProcessor implements Processor {
+public class ModelAggregatorProcessor implements Processor {
 
   private static final long serialVersionUID = -1685875718300564886L;
   private static final Logger logger = LoggerFactory.getLogger(ModelAggregatorProcessor.class);
 
-  private int processorId;
+  protected int processorId;
 
-  private Node treeRoot;
+  protected Node treeRoot;
 
-  private int activeLeafNodeCount;
-  private int inactiveLeafNodeCount;
-  private int decisionNodeCount;
-  private boolean growthAllowed;
+  protected int activeLeafNodeCount;
+  protected int inactiveLeafNodeCount;
+  protected int decisionNodeCount;
+  protected boolean growthAllowed;
 
-  private final Instances dataset;
+  protected final Instances dataset;
 
   // to support concurrent split
-  private long splitId;
-  private ConcurrentMap<Long, SplittingNodeInfo> splittingNodes;
-  private BlockingQueue<Long> timedOutSplittingNodes;
+  protected long splitId;
+  protected ConcurrentMap<Long, SplittingNodeInfo> splittingNodes;
+  public BlockingQueue<Long> timedOutSplittingNodes;
 
   // available streams
-  private Stream resultStream;
-  private Stream attributeStream;
-  private Stream controlStream;
+  public Stream resultStream;
+  public Stream attributeStream;
+  public Stream controlStream;
 
-  private transient ScheduledExecutorService executor;
+  protected transient ScheduledExecutorService executor;
 
-  private final SplitCriterion splitCriterion;
-  private final double splitConfidence;
-  private final double tieThreshold;
-  private final int gracePeriod;
-  private final int parallelismHint;
-  private final long timeOut;
+  public final SplitCriterion splitCriterion;
+  public final double splitConfidence;
+  public final double tieThreshold;
+  public final int gracePeriod;
+  public final int parallelismHint;
+  public final long timeOut;
 
   // private constructor based on Builder pattern
-  private ModelAggregatorProcessor(Builder builder) {
+  protected ModelAggregatorProcessor(Builder<?> builder) {
     this.dataset = builder.dataset;
+    this.processorId = builder.processorID;
     this.splitCriterion = builder.splitCriterion;
     this.splitConfidence = builder.splitConfidence;
     this.tieThreshold = builder.tieThreshold;
@@ -226,19 +227,19 @@
     this.resultStream = resultStream;
   }
 
-  void setAttributeStream(Stream attributeStream) {
+  public void setAttributeStream(Stream attributeStream) {
     this.attributeStream = attributeStream;
   }
 
-  void setControlStream(Stream controlStream) {
+  public void setControlStream(Stream controlStream) {
     this.controlStream = controlStream;
   }
 
-  void sendToAttributeStream(ContentEvent event) {
+  public void sendToAttributeStream(ContentEvent event) {
     this.attributeStream.put(event);
   }
 
-  void sendToControlStream(ContentEvent event) {
+  public void sendToControlStream(ContentEvent event) {
     this.controlStream.put(event);
   }
 
@@ -318,11 +319,11 @@
     }
   }
 
-  private boolean correctlyClassifies(Instance inst, double[] prediction) {
+  public boolean correctlyClassifies(Instance inst, double[] prediction) {
     return maxIndex(prediction) == (int) inst.classValue();
   }
 
-  private void resetLearning() {
+  public void resetLearning() {
     this.treeRoot = null;
     // Remove nodes
     FoundNode[] learningNodes = findNodes();
@@ -338,13 +339,13 @@
     }
   }
 
-  protected FoundNode[] findNodes() {
+  public FoundNode[] findNodes() {
     List<FoundNode> foundList = new LinkedList<>();
     findNodes(this.treeRoot, null, -1, foundList);
     return foundList.toArray(new FoundNode[foundList.size()]);
   }
 
-  protected void findNodes(Node node, SplitNode parent, int parentBranch, List<FoundNode> found) {
+  public void findNodes(Node node, SplitNode parent, int parentBranch, List<FoundNode> found) {
     if (node != null) {
       found.add(new FoundNode(node, parent, parentBranch));
       if (node instanceof SplitNode) {
@@ -362,7 +363,7 @@
    * @param inst
    * @return
    */
-  private double[] getVotesForInstance(Instance inst) {
+  public double[] getVotesForInstance(Instance inst) {
     return getVotesForInstance(inst, false);
   }
 
@@ -383,7 +384,7 @@
 
     }
 
-    // Training after testing to speed up the process
+    // Training after testing to speed up the process. Never called for BoostMAProcessor
     if (isTraining) {
       if (this.treeRoot == null) {
         this.treeRoot = newLearningNode(this.parallelismHint);
@@ -401,7 +402,7 @@
    * 
    * @param inst
    */
-  private void trainOnInstanceImpl(Instance inst) {
+  public void trainOnInstanceImpl(Instance inst) {
     if (this.treeRoot == null) {
       this.treeRoot = newLearningNode(this.parallelismHint);
       this.activeLeafNodeCount = 1;
@@ -411,7 +412,7 @@
     trainOnInstanceImpl(foundNode, inst);
   }
 
-  private void trainOnInstanceImpl(FoundNode foundNode, Instance inst) {
+  public void trainOnInstanceImpl(FoundNode foundNode, Instance inst) {
 
     Node leafNode = foundNode.getNode();
 
@@ -440,7 +441,7 @@
    * @param foundNode
    *          The data structure to represents the filtering of the instance using the tree model.
    */
-  private void attemptToSplit(ActiveLearningNode activeLearningNode, FoundNode foundNode) {
+  public void attemptToSplit(ActiveLearningNode activeLearningNode, FoundNode foundNode) {
     if (!activeLearningNode.observedClassDistributionIsPure()) {
       // Increment the split ID
       this.splitId++;
@@ -469,7 +470,7 @@
    * @param foundNode
    *          The data structure to represents the filtering of the instance using the tree model.
    */
-  private void continueAttemptToSplit(ActiveLearningNode activeLearningNode, FoundNode foundNode) {
+  protected void continueAttemptToSplit(ActiveLearningNode activeLearningNode, FoundNode foundNode) {
     AttributeSplitSuggestion bestSuggestion = activeLearningNode.getDistributedBestSuggestion();
     AttributeSplitSuggestion secondBestSuggestion = activeLearningNode.getDistributedSecondBestSuggestion();
 
@@ -490,7 +491,7 @@
     boolean shouldSplit = false;
 
     if (secondBestSuggestion == null) {
-      shouldSplit = (bestSuggestion != null);
+      shouldSplit = true;
     } else {
       double hoeffdingBound = computeHoeffdingBound(
           this.splitCriterion.getRangeOfMerit(activeLearningNode.getObservedClassDistribution()), this.splitConfidence,
@@ -544,7 +545,7 @@
    * @param parentBranch
    *          the branch index of the node in the parent node
    */
-  private void deactivateLearningNode(ActiveLearningNode toDeactivate, SplitNode parent, int parentBranch) {
+  public void deactivateLearningNode(ActiveLearningNode toDeactivate, SplitNode parent, int parentBranch) {
     Node newLeaf = new InactiveLearningNode(toDeactivate.getObservedClassDistribution());
     if (parent == null) {
       this.treeRoot = newLeaf;
@@ -556,11 +557,11 @@
     this.inactiveLeafNodeCount++;
   }
 
-  private LearningNode newLearningNode(int parallelismHint) {
+  protected LearningNode newLearningNode(int parallelismHint) {
     return newLearningNode(new double[0], parallelismHint);
   }
 
-  private LearningNode newLearningNode(double[] initialClassObservations, int parallelismHint) {
+  protected LearningNode newLearningNode(double[] initialClassObservations, int parallelismHint) {
     // for VHT optimization, we need to dynamically instantiate the appropriate
     // ActiveLearningNode
     return new ActiveLearningNode(initialClassObservations, parallelismHint);
@@ -571,7 +572,7 @@
    * 
    * @param ih
    */
-  private void setModelContext(InstancesHeader ih) {
+  public void setModelContext(InstancesHeader ih) {
     // TODO possibly refactored
     if ((ih != null) && (ih.classIndex() < 0)) {
       throw new IllegalArgumentException("Context for a classifier must include a class to learn");
@@ -582,7 +583,7 @@
     logger.trace("Model context: {}", ih.toString());
   }
 
-  private static double computeHoeffdingBound(double range, double confidence, double n) {
+  public static double computeHoeffdingBound(double range, double confidence, double n) {
     return Math.sqrt((Math.pow(range, 2.0) * Math.log(1.0 / confidence)) / (2.0 * n));
   }
 
@@ -593,7 +594,7 @@
    * @author Arinto Murdopo
    * 
    */
-  static class AggregationTimeOutHandler implements Runnable {
+  protected static class AggregationTimeOutHandler implements Runnable {
 
     private static final Logger logger = LoggerFactory.getLogger(AggregationTimeOutHandler.class);
     private final Long splitId;
@@ -622,17 +623,30 @@
    * @author Arinto Murdopo
    * 
    */
-  static class SplittingNodeInfo implements Serializable {
+  protected static class SplittingNodeInfo implements Serializable {
 
     private final ActiveLearningNode activeLearningNode;
     private final FoundNode foundNode;
-    private final transient ScheduledFuture<?> scheduledFuture;
+    private transient ScheduledFuture<?> scheduledFuture;
 
     SplittingNodeInfo(ActiveLearningNode activeLearningNode, FoundNode foundNode, ScheduledFuture<?> scheduledFuture) {
       this.activeLearningNode = activeLearningNode;
       this.foundNode = foundNode;
       this.scheduledFuture = scheduledFuture;
     }
+
+    public SplittingNodeInfo(BoostVHTActiveLearningNode activeLearningNode, FoundNode foundNode) {
+      this.activeLearningNode = activeLearningNode;
+      this.foundNode = foundNode;
+    }
+
+    public ActiveLearningNode getActiveLearningNode() {
+      return activeLearningNode;
+    }
+
+    public FoundNode getFoundNode() {
+      return foundNode;
+    }
   }
 
   protected ChangeDetector changeDetector;
@@ -651,10 +665,11 @@
    * @author Arinto Murdopo
    * 
    */
-  static class Builder {
+  public static class Builder<T extends Builder<T>>{
 
     // required parameters
     private final Instances dataset;
+    private int processorID;
 
     // default values
     private SplitCriterion splitCriterion = new InfoGainSplitCriterion();
@@ -665,11 +680,11 @@
     private long timeOut = 30;
     private ChangeDetector changeDetector = null;
 
-    Builder(Instances dataset) {
+    public Builder(Instances dataset) {
       this.dataset = dataset;
     }
 
-    Builder(ModelAggregatorProcessor oldProcessor) {
+    public Builder(ModelAggregatorProcessor oldProcessor) {
       this.dataset = oldProcessor.dataset;
       this.splitCriterion = oldProcessor.splitCriterion;
       this.splitConfidence = oldProcessor.splitConfidence;
@@ -677,46 +692,96 @@
       this.gracePeriod = oldProcessor.gracePeriod;
       this.parallelismHint = oldProcessor.parallelismHint;
       this.timeOut = oldProcessor.timeOut;
+      this.processorID = oldProcessor.getProcessorId();
     }
 
-    Builder splitCriterion(SplitCriterion splitCriterion) {
+    public T splitCriterion(SplitCriterion splitCriterion) {
       this.splitCriterion = splitCriterion;
-      return this;
+      return getThis();
     }
 
-    Builder splitConfidence(double splitConfidence) {
+    public T splitConfidence(double splitConfidence) {
       this.splitConfidence = splitConfidence;
-      return this;
+      return getThis();
     }
 
-    Builder tieThreshold(double tieThreshold) {
+    public T tieThreshold(double tieThreshold) {
       this.tieThreshold = tieThreshold;
-      return this;
+      return getThis();
     }
 
-    Builder gracePeriod(int gracePeriod) {
+    public T gracePeriod(int gracePeriod) {
       this.gracePeriod = gracePeriod;
-      return this;
+      return getThis();
     }
 
-    Builder parallelismHint(int parallelismHint) {
+    public T parallelismHint(int parallelismHint) {
       this.parallelismHint = parallelismHint;
-      return this;
+      return getThis();
     }
 
-    Builder timeOut(long timeOut) {
+    public T timeOut(long timeOut) {
       this.timeOut = timeOut;
-      return this;
+      return getThis();
     }
 
-    Builder changeDetector(ChangeDetector changeDetector) {
+    public T processorID(int processorID) {
+      this.processorID = processorID;
+      return getThis();
+    }
+
+    T changeDetector(ChangeDetector changeDetector) {
       this.changeDetector = changeDetector;
-      return this;
+      return getThis();
     }
 
-    ModelAggregatorProcessor build() {
+    public T getThis(){
+      return (T) this;
+    }
+
+    public ModelAggregatorProcessor build() {
       return new ModelAggregatorProcessor(this);
     }
+
   }
 
+  public Instances getDataset() {
+    return dataset;
+  }
+
+  public Stream getAttributeStream() {
+    return attributeStream;
+  }
+
+  public Stream getControlStream() {
+    return controlStream;
+  }
+
+  public SplitCriterion getSplitCriterion() {
+    return splitCriterion;
+  }
+
+  public double getSplitConfidence() {
+    return splitConfidence;
+  }
+
+  public double getTieThreshold() {
+    return tieThreshold;
+  }
+
+  public int getGracePeriod() {
+    return gracePeriod;
+  }
+
+  public int getParallelismHint() {
+    return parallelismHint;
+  }
+
+  public long getTimeOut() {
+    return timeOut;
+  }
+
+  public int getProcessorId() {
+    return processorId;
+  }
 }
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/Node.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/Node.java
index 898a433..8ac6150 100644
--- a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/Node.java
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/Node.java
@@ -29,7 +29,7 @@
  * @author Arinto Murdopo
  * 
  */
-abstract class Node implements java.io.Serializable {
+public abstract class Node implements java.io.Serializable {
 
   private static final long serialVersionUID = 4008521239214180548L;
 
@@ -46,7 +46,7 @@
    *          The index of the current node in the parent
    * @return FoundNode which is the data structure to represent the resulting leaf.
    */
-  abstract FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, int parentBranch);
+  public abstract FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, int parentBranch);
 
   /**
    * Method to return the predicted class of the instance based on the statistic inside the node.
@@ -81,7 +81,7 @@
    * 
    * @return Observed class distribution
    */
-  protected double[] getObservedClassDistribution() {
+  public double[] getObservedClassDistribution() {
     return this.observedClassDistribution.getArrayCopy();
   }
 
@@ -90,7 +90,7 @@
    * 
    * @return Flag whether class distribution is pure or not.
    */
-  protected boolean observedClassDistributionIsPure() {
+  public boolean observedClassDistributionIsPure() {
     return (observedClassDistribution.numNonZeroEntries() < 2);
   }
 
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/SplitNode.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/SplitNode.java
index c2b1a47..d05caa0 100644
--- a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/SplitNode.java
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/SplitNode.java
@@ -46,7 +46,7 @@
   }
 
   @Override
-  FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, int parentBranch) {
+  public FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, int parentBranch) {
     int childIndex = instanceChildIndex(inst);
     if (childIndex >= 0) {
       Node child = getChild(childIndex);
@@ -85,7 +85,7 @@
    * @param child
    *          The child node
    */
-  void setChild(int index, Node child) {
+  public void setChild(int index, Node child) {
     if ((this.splitTest.maxBranches() >= 0)
         && (index >= this.splitTest.maxBranches())) {
       throw new IndexOutOfBoundsException();
diff --git a/samoa-api/src/main/java/org/apache/samoa/moa/classifiers/core/AttributeSplitSuggestion.java b/samoa-api/src/main/java/org/apache/samoa/moa/classifiers/core/AttributeSplitSuggestion.java
index 2eeade5..e75622e 100644
--- a/samoa-api/src/main/java/org/apache/samoa/moa/classifiers/core/AttributeSplitSuggestion.java
+++ b/samoa-api/src/main/java/org/apache/samoa/moa/classifiers/core/AttributeSplitSuggestion.java
@@ -40,6 +40,7 @@
   public double merit;
 
   public AttributeSplitSuggestion() {
+    merit = Double.NEGATIVE_INFINITY;
   }
 
   public AttributeSplitSuggestion(InstanceConditionalTest splitTest,
diff --git a/samoa-api/src/main/java/org/apache/samoa/moa/classifiers/functions/DecisionStump.java b/samoa-api/src/main/java/org/apache/samoa/moa/classifiers/functions/DecisionStump.java
new file mode 100644
index 0000000..730b69a
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/moa/classifiers/functions/DecisionStump.java
@@ -0,0 +1,157 @@
+package org.apache.samoa.moa.classifiers.functions;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.github.javacliparser.FlagOption;
+import com.github.javacliparser.IntOption;
+import org.apache.samoa.moa.classifiers.AbstractClassifier;
+import org.apache.samoa.moa.classifiers.core.AttributeSplitSuggestion;
+import org.apache.samoa.moa.classifiers.core.attributeclassobservers.AttributeClassObserver;
+import org.apache.samoa.moa.classifiers.core.attributeclassobservers.GaussianNumericAttributeClassObserver;
+import org.apache.samoa.moa.classifiers.core.attributeclassobservers.NominalAttributeClassObserver;
+import org.apache.samoa.moa.classifiers.core.splitcriteria.SplitCriterion;
+import org.apache.samoa.moa.core.AutoExpandVector;
+import org.apache.samoa.moa.core.DoubleVector;
+import org.apache.samoa.moa.core.Measurement;
+import org.apache.samoa.moa.options.ClassOption;
+import org.apache.samoa.instances.Instance;
+
+/**
+ * Decision trees of one level.<br />
+ *
+ * Parameters:</p>
+ * <ul>
+ * <li>-g : The number of instances to observe between model changes</li>
+ * <li>-b : Only allow binary splits</li>
+ * <li>-c : Split criterion to use. Example : InfoGainSplitCriterion</li>
+ * <li>-r : Seed for random behaviour of the classifier</li>
+ * </ul>
+ *
+ * @author Richard Kirkby (rkirkby@cs.waikato.ac.nz)
+ * @version $Revision: 7 $
+ */
+public class DecisionStump extends AbstractClassifier {
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public String getPurposeString() {
+    return "Decision trees of one level.";
+  }
+
+  public IntOption gracePeriodOption = new IntOption("gracePeriod", 'g',
+      "The number of instances to observe between model changes.", 1000,
+      0, Integer.MAX_VALUE);
+
+  public FlagOption binarySplitsOption = new FlagOption("binarySplits", 'b',
+      "Only allow binary splits.");
+
+  public ClassOption splitCriterionOption = new ClassOption("splitCriterion",
+      'c', "Split criterion to use.", SplitCriterion.class,
+      "InfoGainSplitCriterion");
+
+  protected AttributeSplitSuggestion bestSplit;
+
+  protected DoubleVector observedClassDistribution;
+
+  protected AutoExpandVector<AttributeClassObserver> attributeObservers;
+
+  protected double weightSeenAtLastSplit;
+
+  @Override
+  public void resetLearningImpl() {
+    this.bestSplit = null;
+    this.observedClassDistribution = new DoubleVector();
+    this.attributeObservers = new AutoExpandVector<AttributeClassObserver>();
+    this.weightSeenAtLastSplit = 0.0;
+  }
+
+  @Override
+  protected Measurement[] getModelMeasurementsImpl() {
+    return null;
+  }
+
+  @Override
+  public void getModelDescription(StringBuilder out, int indent) {
+    // TODO Auto-generated method stub
+  }
+
+  @Override
+  public void trainOnInstanceImpl(Instance inst) {
+    this.observedClassDistribution.addToValue((int) inst.classValue(), inst.weight());
+    for (int i = 0; i < inst.numAttributes() - 1; i++) {
+      int instAttIndex = modelAttIndexToInstanceAttIndex(i);
+      AttributeClassObserver obs = this.attributeObservers.get(i);
+      if (obs == null) {
+        obs = inst.attribute(instAttIndex).isNominal() ? newNominalClassObserver()
+            : newNumericClassObserver();
+        this.attributeObservers.set(i, obs);
+      }
+      obs.observeAttributeClass(inst.value(instAttIndex), (int) inst.classValue(), inst.weight());
+    }
+    if (this.trainingWeightSeenByModel - this.weightSeenAtLastSplit >= this.gracePeriodOption.getValue()) {
+      this.bestSplit = findBestSplit((SplitCriterion) getPreparedClassOption(this.splitCriterionOption));
+      this.weightSeenAtLastSplit = this.trainingWeightSeenByModel;
+    }
+  }
+
+  @Override
+  public double[] getVotesForInstance(Instance inst) {
+    if (this.bestSplit != null) {
+      int branch = this.bestSplit.splitTest.branchForInstance(inst);
+      if (branch >= 0) {
+        return this.bestSplit.resultingClassDistributionFromSplit(branch);
+      }
+    }
+    return this.observedClassDistribution.getArrayCopy();
+  }
+
+  @Override
+  public boolean isRandomizable() {
+    return false;
+  }
+
+  protected AttributeClassObserver newNominalClassObserver() {
+    return new NominalAttributeClassObserver();
+  }
+
+  protected AttributeClassObserver newNumericClassObserver() {
+    return new GaussianNumericAttributeClassObserver();
+  }
+
+  protected AttributeSplitSuggestion findBestSplit(SplitCriterion criterion) {
+    AttributeSplitSuggestion bestFound = null;
+    double bestMerit = Double.NEGATIVE_INFINITY;
+    double[] preSplitDist = this.observedClassDistribution.getArrayCopy();
+    for (int i = 0; i < this.attributeObservers.size(); i++) {
+      AttributeClassObserver obs = this.attributeObservers.get(i);
+      if (obs != null) {
+        AttributeSplitSuggestion suggestion = obs.getBestEvaluatedSplitSuggestion(criterion,
+            preSplitDist, i, this.binarySplitsOption.isSet());
+        if (suggestion.merit > bestMerit) {
+          bestMerit = suggestion.merit;
+          bestFound = suggestion;
+        }
+      }
+    }
+    return bestFound;
+  }
+}
diff --git a/samoa-api/src/main/java/org/apache/samoa/moa/core/Vote.java b/samoa-api/src/main/java/org/apache/samoa/moa/core/Vote.java
index 24ea3f3..2fda158 100644
--- a/samoa-api/src/main/java/org/apache/samoa/moa/core/Vote.java
+++ b/samoa-api/src/main/java/org/apache/samoa/moa/core/Vote.java
@@ -1,5 +1,25 @@
 package org.apache.samoa.moa.core;
 
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
 import java.io.Serializable;
 
 /*