Create Junit test case for caffe
diff --git a/contrib/docker/Dockerfile b/contrib/docker/Dockerfile
index 42b3718..754ff21 100644
--- a/contrib/docker/Dockerfile
+++ b/contrib/docker/Dockerfile
@@ -64,6 +64,9 @@
 RUN git clone https://github.com/yahoo/CaffeOnSpark.git --recursive
 RUN bash /tmp/config-caffe.sh
 
+RUN chmod 755 /caffe-test/train/train.sh
+RUN chmod 755 /caffe-test/tera/tera.sh
+
 RUN wget https://www.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
 RUN wget https://www.apache.org/dist/hadoop/common/hadoop-2.7.2/hadoop-2.7.2.tar.gz 
 RUN wget https://www.apache.org/dist/hbase/1.2.5/hbase-1.2.5-bin.tar.gz
diff --git a/contrib/docker/makeImage.sh b/contrib/docker/makeImage.sh
index 0d31f15..1839b26 100644
--- a/contrib/docker/makeImage.sh
+++ b/contrib/docker/makeImage.sh
@@ -1,5 +1,5 @@
 GLOG_logtostderr=1 /CaffeOnSpark/caffe-public/.build_release/tools/convert_imageset \
-   --resize_height=200 --resize_width=200 --shuffle --encoded \
+   --resize_height=200 --resize_width=1000 --shuffle --encoded \
    /caffe-test/train/data/ \
    /caffe-test/train/data/labels.txt \
    /caffe-test/train/lmdb
diff --git a/contrib/docker/train_test.prototxt b/contrib/docker/train_test.prototxt
index f35b02c..34df5d9 100644
--- a/contrib/docker/train_test.prototxt
+++ b/contrib/docker/train_test.prototxt
@@ -13,7 +13,7 @@
     batch_size: 1
     channels: 1
     height: 200
-    width: 200
+    width: 1000
     share_in_parallel: false
   }
   transform_param {
@@ -35,7 +35,7 @@
     batch_size: 1
     channels: 1
     height: 200
-    width: 200
+    width: 1000
     share_in_parallel: false
   }
   transform_param {
diff --git a/src/test/java/org/apache/hadoop/chukwa/caffe/ImageCreator.java b/src/test/java/org/apache/hadoop/chukwa/caffe/ImageCreator.java
new file mode 100644
index 0000000..d025823
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/chukwa/caffe/ImageCreator.java
@@ -0,0 +1,182 @@
+package org.apache.hadoop.chukwa.caffe;
+
+import java.awt.BasicStroke;
+import java.awt.Color;
+import java.awt.Graphics2D;
+import java.awt.image.BufferedImage;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import javax.imageio.ImageIO;
+
+
+/**
+ * Read csv files to create image files of dimension 1000 * 200
+ * 
+ */
+public class ImageCreator
+{
+  private static final int X_SIZE = 1000;
+  private static final int Y_SIZE = 200;
+  private String dirName = null;
+  
+  public ImageCreator (String dirName) {
+    this.dirName = dirName;
+  }
+    
+  public void drawImages () throws Exception
+  {
+    String outputFileName = dirName + "/labels.txt";
+    BufferedWriter bufferedWriter = null;
+    try {
+      FileWriter fileWriter = new FileWriter(outputFileName);
+      bufferedWriter = new BufferedWriter(fileWriter);
+    } catch (IOException e) {
+      e.printStackTrace ();
+    }
+       
+    //int start = 1;
+    File dir = new File (dirName);
+    File [] files = dir.listFiles ();
+    Arrays.sort(files);
+    
+    // find min and max memory usage
+    double minMem = 0;
+    double maxMem = 0;
+    long minTime = 0L;
+    long maxTime = 0L;
+    
+    // image size: 1000 *200
+    int lineNum = 0;
+    for (int i = 0; i < files.length; i++) {
+      String fileName = files [i].getName ();
+      if (!fileName.endsWith ("csv")) {
+        continue;
+      }
+      //System.out.println (">>>>> " + fileName);
+      BufferedReader bufferedReader = new BufferedReader(new FileReader(files [i]));
+      String line = null;
+
+      while ((line = bufferedReader.readLine()) != null)
+      {
+        lineNum ++;
+        String [] point = line.split (",");
+        long time = Long.parseLong (point[0]);
+        double mem = Double.parseDouble (point[1]);
+        point [1] = String.valueOf (mem);
+        if (maxMem == 0 || maxMem < mem){
+          maxMem = mem;
+        }
+        if (minMem == 0 || minMem > mem) {
+          minMem = mem;
+        }
+        if (maxTime == 0 || maxTime < time){
+          maxTime = time;
+        }
+        if (minTime == 0 || minTime > time) {
+          minTime = time;
+        }        
+      }
+      bufferedReader.close ();
+    }
+    //System.out.println ("minMem:" + minMem + ", maxMem:" + maxMem + ", total line number: " + lineNum);
+    //System.out.println ("minTime:" + minTime + ", maxTime:" + maxTime + ", total elapseTime: " + (maxTime - minTime));
+    
+    List <String []> dataList = new ArrayList<String []> ();
+    lineNum = 0;
+    long startTime = 0;
+    long endTime = 0;
+    int imageId = 1;
+    int totalPoint = 0;
+    for (int i = 0; i < files.length; i++) {
+      String fileName = files [i].getName ();
+      if (!fileName.endsWith ("csv")) {
+        continue;
+      }
+      System.out.println (">>>>> " + fileName);
+      BufferedReader bufferedReader = new BufferedReader(new FileReader(files [i]));
+      String line = null;
+
+      while ((line = bufferedReader.readLine()) != null)
+      {
+        lineNum ++;
+        String [] point = line.split (",");
+        long time = Long.parseLong (point[0]);
+        double mem = Double.parseDouble (point[1]);
+        point [1] = String.valueOf (mem);
+
+        if (startTime == 0) {
+          startTime = time;
+        } 
+        dataList.add (point);        
+        endTime = time;
+        long elapseTime = endTime - startTime;
+        if (elapseTime > X_SIZE) {
+          totalPoint = totalPoint + dataList.size ();
+          String imageFileName = dirName + "\\image" + imageId + ".png";
+          System.out.println ("elapseTime: " + elapseTime + ", data size: " + dataList.size () + ", imageFileName: " + imageFileName);
+          drawImage (dataList, imageFileName, X_SIZE, Y_SIZE);
+          bufferedWriter.write (imageFileName + " 0\n");
+          bufferedWriter.flush ();
+          dataList.clear ();
+          startTime = 0;
+          imageId ++;
+        }
+      }
+      bufferedReader.close ();
+      bufferedWriter.close ();
+    }
+    //System.out.println ("Total points: " + totalPoint + ", lineNum: " + lineNum);
+  }
+  
+  private static void drawImage (List <String []> dataList, String imageFileName, int x_size, int y_size) throws Exception
+  {
+    int size = dataList.size ();
+    String [] startPt = dataList.get (0);
+    //String [] endPt = dataList.get (size - 1);
+    long startTimeX = Long.parseLong (startPt [0]);
+    //long endTimeX = Long.parseLong (endPt [0]);
+    //System.out.println ("x_size: " + x_size + ", y_size: " + y_size + ", startTimeX: " + startTimeX + ", endTimeX: " + endTimeX);
+    BufferedImage img = new BufferedImage(x_size, y_size, BufferedImage.TYPE_INT_ARGB);
+
+    Graphics2D ig2 = img.createGraphics();
+    ig2.setBackground(Color.WHITE);
+
+    ig2.setColor (Color.BLACK);
+    ig2.setStroke(new BasicStroke(3));
+
+    MyPoint prevPoint = null;
+    for (int i = 0; i < size; i++) {
+      String [] point = (String []) dataList.get (i);
+      long time = Long.parseLong (point[0]);
+      double mem = Double.parseDouble (point[1]);
+      MyPoint currPoint = new MyPoint (time, mem);
+      //System.out.println ("time:" + time + ", mem:" + mem);
+      
+      if (prevPoint != null) {
+        ig2.drawLine ((int) (prevPoint.time - startTimeX), (int) (y_size - prevPoint.data), (int) (currPoint.time - startTimeX), (int) (y_size - currPoint.data));
+      }  
+      prevPoint = currPoint;
+    }
+    File f = new File(imageFileName);
+    ImageIO.write(img, "PNG", f);
+  }
+}
+
+class MyPoint 
+{
+  public long time;
+  public double data;
+
+  public MyPoint (long time, double data) {
+    this.time = time;
+    this.data = data;
+  }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/hadoop/chukwa/caffe/MetricsCollector.java b/src/test/java/org/apache/hadoop/chukwa/caffe/MetricsCollector.java
new file mode 100644
index 0000000..f264980
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/chukwa/caffe/MetricsCollector.java
@@ -0,0 +1,117 @@
+package org.apache.hadoop.chukwa.caffe;
+
+import java.io.BufferedWriter;
+import java.io.PrintWriter;
+import java.util.Calendar;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.hadoop.chukwa.datastore.ChukwaHBaseStore;
+import org.apache.hadoop.chukwa.hicc.bean.Series;
+import org.json.simple.JSONObject;
+
+//export CLASSPATH=/opt/apache/hadoop/etc/hadoop:/opt/apache/hbase/conf:/opt/apache/chukwa-0.8.0/share/chukwa/*:/opt/apache/chukwa-0.8.0/share/chukwa/lib/*:$CLASSPATH
+
+public class MetricsCollector
+{
+  private Timer getMetricSnapshotTimer = null;
+  private long intervalInMin;
+  private String hostname;
+  
+  public MetricsCollector (long intervalInMin, String hostname) {
+    this.intervalInMin = intervalInMin;
+    this.hostname = hostname;
+    getMetricSnapshotTimer = new Timer ("GetMetricSnapshot", true);
+  }
+  
+  public void start () {
+    if (getMetricSnapshotTimer != null)
+      getMetricSnapshotTimer.schedule (new GetMetricSnapshotTimerTask (hostname, intervalInMin), 0, intervalInMin);    
+  }
+  
+  public void cancel ()
+  {
+    if (getMetricSnapshotTimer != null)
+      getMetricSnapshotTimer.cancel ();
+  }
+  
+  class GetMetricSnapshotTimerTask extends TimerTask
+  {
+    private String hostname = null;
+    private BufferedWriter bufferedWriter = null;
+    private long intervalInMilli;
+
+    /**
+     * Normalize the timestamp in time series data to use seconds
+     */
+    private final static int XSCALE = 1000;
+
+    GetMetricSnapshotTimerTask (String hostname, long intervalInMin)
+    {
+      this.hostname = hostname;
+      this.intervalInMilli = intervalInMin * 60 * 1000;
+    }
+
+    public void run () 
+    {
+      TimeZone tz = TimeZone.getTimeZone("UTC");
+      Calendar now = Calendar.getInstance(tz);
+      long currTime=now.getTimeInMillis();
+
+      System.out.println ("currTime in UTC: " + currTime);
+      System.out.println ("currTime in current time zone" + System.currentTimeMillis ());
+
+      long startTime = currTime - intervalInMilli;
+      long endTime = currTime;
+      try {
+        System.out.println ("About to run");
+        getHadoopMetrics (startTime, endTime);
+        System.out.println ("Done run");
+      } catch (Exception e) {
+        e.printStackTrace ();
+      }
+    }
+
+    private void getHadoopMetrics(long startTime, long endTime) throws Exception 
+    {
+      String source = hostname + ":NodeManager";
+      System.out.println ("source: " + source);
+      System.out.println ("startTime: " + startTime);
+      System.out.println ("endTime: " + endTime); 
+      Series series = ChukwaHBaseStore.getSeries ("HadoopMetrics.jvm.JvmMetrics.MemHeapUsedM", source, startTime, endTime);
+      String value = series.toString ();
+      System.out.println ("value: " + value);
+
+      JSONObject jsonObj = (JSONObject) series.toJSONObject ();
+      Set set = jsonObj.keySet ();
+      Iterator iter = set.iterator ();
+      List list = (List) jsonObj.get ("data");
+      if (list != null) { 
+        int size = list.size ();
+        System.out.println ("size: " + size);
+        if (size > 0 ) {
+          String name = "NodeManager" + "_" + "HadoopMetrics.jvm.JvmMetrics.MemHeapUsedM" + "_" + hostname;
+          generateCsv (list, name, startTime, bufferedWriter);
+        }
+      }
+    }
+
+    private void generateCsv (List list, String name, long startTime, BufferedWriter bufferedWriter) throws Exception
+    {
+      String fileName = name + "_" + startTime;
+      PrintWriter writer = new PrintWriter(fileName + ".csv", "UTF-8");
+      int size = list.size ();
+      for (int i = 0; i < size; i++) {
+        List point = (List) list.get (i);
+        long time = (Long) point.get (0) / XSCALE;
+        double val = (Double) point.get (1);
+        writer.println(time + "," + val);
+      }
+      writer.close();
+    }
+  }
+}
diff --git a/src/test/java/org/apache/hadoop/chukwa/caffe/TestMemoryUsageDetection.java b/src/test/java/org/apache/hadoop/chukwa/caffe/TestMemoryUsageDetection.java
new file mode 100644
index 0000000..ac4ae59
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/chukwa/caffe/TestMemoryUsageDetection.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.caffe;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.InetAddress;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+
+/**
+ * (1) Run non-stop terasort and teragen  
+ * (2) Collect memory usage metrics from hbase every 5 minutes for 10 hours and write to csv files in /caffe-test/train/data
+ * (3) Create images of dimension 1000 * 200 from /caffe-test/train/data/*.csv. 
+ *     The files are saved in /caffe-test/train/data/*png
+ * (4) Train the image using caffe
+ *
+ */
+
+public class TestMemoryUsageDetection extends TestCase {
+
+  /**
+   * Run non-stop terasort and teragen to force memory leak
+   */
+  public void setUp() {
+    new Thread(new Runnable() {
+      public void run(){
+        try {
+          String target = new String("/caffe-test/tera/tera.sh");
+          Runtime rt = Runtime.getRuntime();
+          Process proc = rt.exec(target);
+          proc.waitFor();
+          BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+          String line = "";                       
+          while ((line = reader.readLine())!= null) {
+            System.out.println(line + "\n");
+          }
+        } catch (Exception e) {
+          fail(ExceptionUtil.getStackTrace(e));
+        }
+      }
+    }).start();
+  }
+
+  public void tearDown() {
+  }
+
+  /**
+   * Collect memory usage data every 15 min.
+   * Stop the timer after 10 hours
+   */
+  public void testCollectNodeManagerMetrics() {
+    int intervalInMin = 15;
+    long timerElapseTime = 10 * 60 * 60 * 1000;
+    String hostname = "";
+    try {
+      hostname = InetAddress.getLocalHost().getHostName();
+      System.out.println (hostname);
+    } catch (IOException e) {
+      fail(ExceptionUtil.getStackTrace(e));
+    }
+    MetricsCollector collector = new MetricsCollector (intervalInMin, hostname);
+    collector.start ();
+    try {
+      Thread.sleep (timerElapseTime);
+    } catch (InterruptedException e) {
+    }
+    collector.cancel ();
+      
+    // draw images of size 1000 * 200 from the collected csv files
+    try {
+      ImageCreator generator = new ImageCreator ("/caffe-test/train/data");
+      generator.drawImages ();
+    } catch (Exception e) {
+      fail(ExceptionUtil.getStackTrace(e));
+    }
+  }
+
+  /**
+   * Train the images
+   */
+  public void testCaffeTrain () {
+    try {
+      String target = new String("/caffe-test/train/train.sh");
+      Runtime rt = Runtime.getRuntime();
+      Process proc = rt.exec(target);
+      proc.waitFor();
+      BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+      String line = "";                       
+      while ((line = reader.readLine())!= null) {
+        System.out.println(line + "\n");
+      }
+    } catch (Exception e) {
+      fail(ExceptionUtil.getStackTrace(e));
+    }    
+  }
+}