Create Junit test case for caffe
diff --git a/contrib/docker/Dockerfile b/contrib/docker/Dockerfile
index 42b3718..754ff21 100644
--- a/contrib/docker/Dockerfile
+++ b/contrib/docker/Dockerfile
@@ -64,6 +64,9 @@
RUN git clone https://github.com/yahoo/CaffeOnSpark.git --recursive
RUN bash /tmp/config-caffe.sh
+RUN chmod 755 /caffe-test/train/train.sh
+RUN chmod 755 /caffe-test/tera/tera.sh
+
RUN wget https://www.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
RUN wget https://www.apache.org/dist/hadoop/common/hadoop-2.7.2/hadoop-2.7.2.tar.gz
RUN wget https://www.apache.org/dist/hbase/1.2.5/hbase-1.2.5-bin.tar.gz
diff --git a/contrib/docker/makeImage.sh b/contrib/docker/makeImage.sh
index 0d31f15..1839b26 100644
--- a/contrib/docker/makeImage.sh
+++ b/contrib/docker/makeImage.sh
@@ -1,5 +1,5 @@
GLOG_logtostderr=1 /CaffeOnSpark/caffe-public/.build_release/tools/convert_imageset \
- --resize_height=200 --resize_width=200 --shuffle --encoded \
+ --resize_height=200 --resize_width=1000 --shuffle --encoded \
/caffe-test/train/data/ \
/caffe-test/train/data/labels.txt \
/caffe-test/train/lmdb
diff --git a/contrib/docker/train_test.prototxt b/contrib/docker/train_test.prototxt
index f35b02c..34df5d9 100644
--- a/contrib/docker/train_test.prototxt
+++ b/contrib/docker/train_test.prototxt
@@ -13,7 +13,7 @@
batch_size: 1
channels: 1
height: 200
- width: 200
+ width: 1000
share_in_parallel: false
}
transform_param {
@@ -35,7 +35,7 @@
batch_size: 1
channels: 1
height: 200
- width: 200
+ width: 1000
share_in_parallel: false
}
transform_param {
diff --git a/src/test/java/org/apache/hadoop/chukwa/caffe/ImageCreator.java b/src/test/java/org/apache/hadoop/chukwa/caffe/ImageCreator.java
new file mode 100644
index 0000000..d025823
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/chukwa/caffe/ImageCreator.java
@@ -0,0 +1,182 @@
+package org.apache.hadoop.chukwa.caffe;
+
+import java.awt.BasicStroke;
+import java.awt.Color;
+import java.awt.Graphics2D;
+import java.awt.image.BufferedImage;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import javax.imageio.ImageIO;
+
+
+/**
+ * Read csv files to create image files of dimension 1000 * 200
+ *
+ */
+public class ImageCreator
+{
+ private static final int X_SIZE = 1000;
+ private static final int Y_SIZE = 200;
+ private String dirName = null;
+
+ public ImageCreator (String dirName) {
+ this.dirName = dirName;
+ }
+
+ public void drawImages () throws Exception
+ {
+ String outputFileName = dirName + "/labels.txt";
+ BufferedWriter bufferedWriter = null;
+ try {
+ FileWriter fileWriter = new FileWriter(outputFileName);
+ bufferedWriter = new BufferedWriter(fileWriter);
+ } catch (IOException e) {
+ e.printStackTrace ();
+ }
+
+ //int start = 1;
+ File dir = new File (dirName);
+ File [] files = dir.listFiles ();
+ Arrays.sort(files);
+
+ // find min and max memory usage
+ double minMem = 0;
+ double maxMem = 0;
+ long minTime = 0L;
+ long maxTime = 0L;
+
+ // image size: 1000 *200
+ int lineNum = 0;
+ for (int i = 0; i < files.length; i++) {
+ String fileName = files [i].getName ();
+ if (!fileName.endsWith ("csv")) {
+ continue;
+ }
+ //System.out.println (">>>>> " + fileName);
+ BufferedReader bufferedReader = new BufferedReader(new FileReader(files [i]));
+ String line = null;
+
+ while ((line = bufferedReader.readLine()) != null)
+ {
+ lineNum ++;
+ String [] point = line.split (",");
+ long time = Long.parseLong (point[0]);
+ double mem = Double.parseDouble (point[1]);
+ point [1] = String.valueOf (mem);
+ if (maxMem == 0 || maxMem < mem){
+ maxMem = mem;
+ }
+ if (minMem == 0 || minMem > mem) {
+ minMem = mem;
+ }
+ if (maxTime == 0 || maxTime < time){
+ maxTime = time;
+ }
+ if (minTime == 0 || minTime > time) {
+ minTime = time;
+ }
+ }
+ bufferedReader.close ();
+ }
+ //System.out.println ("minMem:" + minMem + ", maxMem:" + maxMem + ", total line number: " + lineNum);
+ //System.out.println ("minTime:" + minTime + ", maxTime:" + maxTime + ", total elapseTime: " + (maxTime - minTime));
+
+ List <String []> dataList = new ArrayList<String []> ();
+ lineNum = 0;
+ long startTime = 0;
+ long endTime = 0;
+ int imageId = 1;
+ int totalPoint = 0;
+ for (int i = 0; i < files.length; i++) {
+ String fileName = files [i].getName ();
+ if (!fileName.endsWith ("csv")) {
+ continue;
+ }
+ System.out.println (">>>>> " + fileName);
+ BufferedReader bufferedReader = new BufferedReader(new FileReader(files [i]));
+ String line = null;
+
+ while ((line = bufferedReader.readLine()) != null)
+ {
+ lineNum ++;
+ String [] point = line.split (",");
+ long time = Long.parseLong (point[0]);
+ double mem = Double.parseDouble (point[1]);
+ point [1] = String.valueOf (mem);
+
+ if (startTime == 0) {
+ startTime = time;
+ }
+ dataList.add (point);
+ endTime = time;
+ long elapseTime = endTime - startTime;
+ if (elapseTime > X_SIZE) {
+ totalPoint = totalPoint + dataList.size ();
+ String imageFileName = dirName + "\\image" + imageId + ".png";
+ System.out.println ("elapseTime: " + elapseTime + ", data size: " + dataList.size () + ", imageFileName: " + imageFileName);
+ drawImage (dataList, imageFileName, X_SIZE, Y_SIZE);
+ bufferedWriter.write (imageFileName + " 0\n");
+ bufferedWriter.flush ();
+ dataList.clear ();
+ startTime = 0;
+ imageId ++;
+ }
+ }
+ bufferedReader.close ();
+ bufferedWriter.close ();
+ }
+ //System.out.println ("Total points: " + totalPoint + ", lineNum: " + lineNum);
+ }
+
+ private static void drawImage (List <String []> dataList, String imageFileName, int x_size, int y_size) throws Exception
+ {
+ int size = dataList.size ();
+ String [] startPt = dataList.get (0);
+ //String [] endPt = dataList.get (size - 1);
+ long startTimeX = Long.parseLong (startPt [0]);
+ //long endTimeX = Long.parseLong (endPt [0]);
+ //System.out.println ("x_size: " + x_size + ", y_size: " + y_size + ", startTimeX: " + startTimeX + ", endTimeX: " + endTimeX);
+ BufferedImage img = new BufferedImage(x_size, y_size, BufferedImage.TYPE_INT_ARGB);
+
+ Graphics2D ig2 = img.createGraphics();
+ ig2.setBackground(Color.WHITE);
+
+ ig2.setColor (Color.BLACK);
+ ig2.setStroke(new BasicStroke(3));
+
+ MyPoint prevPoint = null;
+ for (int i = 0; i < size; i++) {
+ String [] point = (String []) dataList.get (i);
+ long time = Long.parseLong (point[0]);
+ double mem = Double.parseDouble (point[1]);
+ MyPoint currPoint = new MyPoint (time, mem);
+ //System.out.println ("time:" + time + ", mem:" + mem);
+
+ if (prevPoint != null) {
+ ig2.drawLine ((int) (prevPoint.time - startTimeX), (int) (y_size - prevPoint.data), (int) (currPoint.time - startTimeX), (int) (y_size - currPoint.data));
+ }
+ prevPoint = currPoint;
+ }
+ File f = new File(imageFileName);
+ ImageIO.write(img, "PNG", f);
+ }
+}
+
+class MyPoint
+{
+ public long time;
+ public double data;
+
+ public MyPoint (long time, double data) {
+ this.time = time;
+ this.data = data;
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/hadoop/chukwa/caffe/MetricsCollector.java b/src/test/java/org/apache/hadoop/chukwa/caffe/MetricsCollector.java
new file mode 100644
index 0000000..f264980
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/chukwa/caffe/MetricsCollector.java
@@ -0,0 +1,117 @@
+package org.apache.hadoop.chukwa.caffe;
+
+import java.io.BufferedWriter;
+import java.io.PrintWriter;
+import java.util.Calendar;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.hadoop.chukwa.datastore.ChukwaHBaseStore;
+import org.apache.hadoop.chukwa.hicc.bean.Series;
+import org.json.simple.JSONObject;
+
+//export CLASSPATH=/opt/apache/hadoop/etc/hadoop:/opt/apache/hbase/conf:/opt/apache/chukwa-0.8.0/share/chukwa/*:/opt/apache/chukwa-0.8.0/share/chukwa/lib/*:$CLASSPATH
+
+public class MetricsCollector
+{
+ private Timer getMetricSnapshotTimer = null;
+ private long intervalInMin;
+ private String hostname;
+
+ public MetricsCollector (long intervalInMin, String hostname) {
+ this.intervalInMin = intervalInMin;
+ this.hostname = hostname;
+ getMetricSnapshotTimer = new Timer ("GetMetricSnapshot", true);
+ }
+
+ public void start () {
+ if (getMetricSnapshotTimer != null)
+ getMetricSnapshotTimer.schedule (new GetMetricSnapshotTimerTask (hostname, intervalInMin), 0, intervalInMin);
+ }
+
+ public void cancel ()
+ {
+ if (getMetricSnapshotTimer != null)
+ getMetricSnapshotTimer.cancel ();
+ }
+
+ class GetMetricSnapshotTimerTask extends TimerTask
+ {
+ private String hostname = null;
+ private BufferedWriter bufferedWriter = null;
+ private long intervalInMilli;
+
+ /**
+ * Normalize the timestamp in time series data to use seconds
+ */
+ private final static int XSCALE = 1000;
+
+ GetMetricSnapshotTimerTask (String hostname, long intervalInMin)
+ {
+ this.hostname = hostname;
+ this.intervalInMilli = intervalInMin * 60 * 1000;
+ }
+
+ public void run ()
+ {
+ TimeZone tz = TimeZone.getTimeZone("UTC");
+ Calendar now = Calendar.getInstance(tz);
+ long currTime=now.getTimeInMillis();
+
+ System.out.println ("currTime in UTC: " + currTime);
+ System.out.println ("currTime in current time zone" + System.currentTimeMillis ());
+
+ long startTime = currTime - intervalInMilli;
+ long endTime = currTime;
+ try {
+ System.out.println ("About to run");
+ getHadoopMetrics (startTime, endTime);
+ System.out.println ("Done run");
+ } catch (Exception e) {
+ e.printStackTrace ();
+ }
+ }
+
+ private void getHadoopMetrics(long startTime, long endTime) throws Exception
+ {
+ String source = hostname + ":NodeManager";
+ System.out.println ("source: " + source);
+ System.out.println ("startTime: " + startTime);
+ System.out.println ("endTime: " + endTime);
+ Series series = ChukwaHBaseStore.getSeries ("HadoopMetrics.jvm.JvmMetrics.MemHeapUsedM", source, startTime, endTime);
+ String value = series.toString ();
+ System.out.println ("value: " + value);
+
+ JSONObject jsonObj = (JSONObject) series.toJSONObject ();
+ Set set = jsonObj.keySet ();
+ Iterator iter = set.iterator ();
+ List list = (List) jsonObj.get ("data");
+ if (list != null) {
+ int size = list.size ();
+ System.out.println ("size: " + size);
+ if (size > 0 ) {
+ String name = "NodeManager" + "_" + "HadoopMetrics.jvm.JvmMetrics.MemHeapUsedM" + "_" + hostname;
+ generateCsv (list, name, startTime, bufferedWriter);
+ }
+ }
+ }
+
+ private void generateCsv (List list, String name, long startTime, BufferedWriter bufferedWriter) throws Exception
+ {
+ String fileName = name + "_" + startTime;
+ PrintWriter writer = new PrintWriter(fileName + ".csv", "UTF-8");
+ int size = list.size ();
+ for (int i = 0; i < size; i++) {
+ List point = (List) list.get (i);
+ long time = (Long) point.get (0) / XSCALE;
+ double val = (Double) point.get (1);
+ writer.println(time + "," + val);
+ }
+ writer.close();
+ }
+ }
+}
diff --git a/src/test/java/org/apache/hadoop/chukwa/caffe/TestMemoryUsageDetection.java b/src/test/java/org/apache/hadoop/chukwa/caffe/TestMemoryUsageDetection.java
new file mode 100644
index 0000000..ac4ae59
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/chukwa/caffe/TestMemoryUsageDetection.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.caffe;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.InetAddress;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+
+/**
+ * (1) Run non-stop terasort and teragen
+ * (2) Collect memory usage metrics from hbase every 5 minutes for 10 hours and write to csv files in /caffe-test/train/data
+ * (3) Create images of dimension 1000 * 200 from /caffe-test/train/data/*.csv.
+ * The files are saved in /caffe-test/train/data/*png
+ * (4) Train the image using caffe
+ *
+ */
+
+public class TestMemoryUsageDetection extends TestCase {
+
+ /**
+ * Run non-stop terasort and teragen to force memory leak
+ */
+ public void setUp() {
+ new Thread(new Runnable() {
+ public void run(){
+ try {
+ String target = new String("/caffe-test/tera/tera.sh");
+ Runtime rt = Runtime.getRuntime();
+ Process proc = rt.exec(target);
+ proc.waitFor();
+ BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+ String line = "";
+ while ((line = reader.readLine())!= null) {
+ System.out.println(line + "\n");
+ }
+ } catch (Exception e) {
+ fail(ExceptionUtil.getStackTrace(e));
+ }
+ }
+ }).start();
+ }
+
+ public void tearDown() {
+ }
+
+ /**
+ * Collect memory usage data every 15 min.
+ * Stop the timer after 10 hours
+ */
+ public void testCollectNodeManagerMetrics() {
+ int intervalInMin = 15;
+ long timerElapseTime = 10 * 60 * 60 * 1000;
+ String hostname = "";
+ try {
+ hostname = InetAddress.getLocalHost().getHostName();
+ System.out.println (hostname);
+ } catch (IOException e) {
+ fail(ExceptionUtil.getStackTrace(e));
+ }
+ MetricsCollector collector = new MetricsCollector (intervalInMin, hostname);
+ collector.start ();
+ try {
+ Thread.sleep (timerElapseTime);
+ } catch (InterruptedException e) {
+ }
+ collector.cancel ();
+
+ // draw images of size 1000 * 200 from the collected csv files
+ try {
+ ImageCreator generator = new ImageCreator ("/caffe-test/train/data");
+ generator.drawImages ();
+ } catch (Exception e) {
+ fail(ExceptionUtil.getStackTrace(e));
+ }
+ }
+
+ /**
+ * Train the images
+ */
+ public void testCaffeTrain () {
+ try {
+ String target = new String("/caffe-test/train/train.sh");
+ Runtime rt = Runtime.getRuntime();
+ Process proc = rt.exec(target);
+ proc.waitFor();
+ BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+ String line = "";
+ while ((line = reader.readLine())!= null) {
+ System.out.println(line + "\n");
+ }
+ } catch (Exception e) {
+ fail(ExceptionUtil.getStackTrace(e));
+ }
+ }
+}