Merged the trunk changes to branch HAMA-505-branch

git-svn-id: https://svn.apache.org/repos/asf/hama/branches/HAMA-505-branch@1369566 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index f0b4622..529b2b1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,15 @@
 Hama Change Log
 
+Release 0.6 (unreleased changes)
+
+  NEW FEATURES
+
+  BUG FIXES
+
+   HAMA-608: LocalRunner should honor the configured queues (tjungblut)
+
+  IMPROVEMENTS
+
 Release 0.5 - April 10, 2012 
 
   NEW FEATURES
diff --git a/conf/hama-default.xml b/conf/hama-default.xml
index 400323c..ce36cf5 100644
--- a/conf/hama-default.xml
+++ b/conf/hama-default.xml
@@ -135,7 +135,11 @@
     <value>1</value>
     <description>If bsp.checkpoint.enabled is set to true, the checkpointing is initiated on the valueth synchronization process of BSP tasks.</description>
   </property>
-
+  <property>
+    <name>bsp.groomserver.pingperiod</name>
+    <value>10000</value>
+    <description>The default timeout period for checking groom server health.</description>
+  </property>
   
   <!--
   Beginning of properties that are directly mapped from ZooKeeper's zoo.cfg.
diff --git a/core/pom.xml b/core/pom.xml
index 7643481..3488c5a 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -15,20 +15,19 @@
    limitations under the License.
 -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
   <parent>
     <groupId>org.apache.hama</groupId>
     <artifactId>hama-parent</artifactId>
-    <version>0.5.0-SNAPSHOT</version>
+    <version>0.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hama</groupId>
   <artifactId>hama-core</artifactId>
   <name>core</name>
-  <version>0.5.0-SNAPSHOT</version>
+  <version>0.6.0-SNAPSHOT</version>
   <packaging>jar</packaging>
 
   <dependencies>
@@ -197,11 +196,9 @@
             </goals>
             <configuration>
               <target>
-                <mkdir
-                  dir="${project.build.directory}/generated-sources/java" />
+                <mkdir dir="${project.build.directory}/generated-sources/java" />
                 <exec executable="sh">
-                  <arg
-                    line="${basedir}/dev-support/saveVersion.sh ${project.version} ${project.build.directory}/generated-sources/annotations" />
+                  <arg line="${basedir}/dev-support/saveVersion.sh ${project.version} ${project.build.directory}/generated-sources/annotations" />
                 </exec>
               </target>
             </configuration>
diff --git a/core/src/main/java/org/apache/hama/bsp/BSPTask.java b/core/src/main/java/org/apache/hama/bsp/BSPTask.java
index 5bf618f..679d053 100644
--- a/core/src/main/java/org/apache/hama/bsp/BSPTask.java
+++ b/core/src/main/java/org/apache/hama/bsp/BSPTask.java
@@ -121,7 +121,6 @@
       if (pingPeriod > 0) {
         pingService.scheduleWithFixedDelay(new PingGroomServer(umbilical,
             taskId), 0, pingPeriod, TimeUnit.MILLISECONDS);
-        LOG.error("Started pinging to groom");
       }
     } catch (Exception e) {
       LOG.error("Error scheduling ping service", e);
diff --git a/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java b/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
index e60c16e..97113ec 100644
--- a/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
+++ b/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
@@ -20,17 +20,13 @@
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
-import java.util.Map.Entry;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.commons.logging.Log;
@@ -45,11 +41,9 @@
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPJobClient.RawSplit;
 import org.apache.hama.bsp.BSPMaster.State;
-import org.apache.hama.bsp.message.MemoryQueue;
-import org.apache.hama.bsp.message.MessageEventListener;
+import org.apache.hama.bsp.message.AbstractMessageManager;
 import org.apache.hama.bsp.message.MessageManager;
 import org.apache.hama.bsp.message.MessageManagerFactory;
-import org.apache.hama.bsp.message.MessageQueue;
 import org.apache.hama.bsp.sync.BSPPeerSyncClient;
 import org.apache.hama.bsp.sync.SyncClient;
 import org.apache.hama.bsp.sync.SyncEvent;
@@ -58,7 +52,6 @@
 import org.apache.hama.bsp.sync.SyncServiceFactory;
 import org.apache.hama.ipc.BSPPeerProtocol;
 import org.apache.hama.ipc.JobSubmissionProtocol;
-import org.apache.hama.util.BSPNetUtils;
 
 /**
  * A multithreaded local BSP runner that can be used for debugging and local
@@ -72,7 +65,7 @@
   private volatile ThreadPoolExecutor threadPool;
 
   @SuppressWarnings("rawtypes")
-  private static final LinkedList<Future<BSPPeerImpl>> futureList = new LinkedList<Future<BSPPeerImpl>>();
+  private static final LinkedList<Future<BSPPeerImpl>> FUTURE_LIST = new LinkedList<Future<BSPPeerImpl>>();
 
   private String jobFile;
   private String jobName;
@@ -156,7 +149,7 @@
     peerNames = new String[numBspTask];
     for (int i = 0; i < numBspTask; i++) {
       peerNames[i] = "local:" + i;
-      futureList.add(threadPool.submit(new BSPRunner(new Configuration(conf),
+      FUTURE_LIST.add(threadPool.submit(new BSPRunner(new Configuration(conf),
           job, i, splits)));
       globalCounters.incrCounter(JobInProgress.JobCounter.LAUNCHED_TASKS, 1L);
     }
@@ -306,7 +299,7 @@
     @Override
     public void run() {
       boolean success = true;
-      for (Future<BSPPeerImpl> future : futureList) {
+      for (Future<BSPPeerImpl> future : FUTURE_LIST) {
         try {
           BSPPeerImpl bspPeerImpl = future.get();
           currentJobStatus.getCounter().incrAllCounters(
@@ -331,53 +324,17 @@
 
   }
 
-  public static class LocalMessageManager<M extends Writable> implements
-      MessageManager<M> {
+  public static class LocalMessageManager<M extends Writable> extends
+      AbstractMessageManager<M> {
 
     @SuppressWarnings("rawtypes")
-    private static final ConcurrentHashMap<InetSocketAddress, LocalMessageManager> managerMap = new ConcurrentHashMap<InetSocketAddress, LocalBSPRunner.LocalMessageManager>();
-
-    private final HashMap<InetSocketAddress, MessageQueue<M>> localOutgoingMessages = new HashMap<InetSocketAddress, MessageQueue<M>>();
-    private static final ConcurrentHashMap<String, InetSocketAddress> socketCache = new ConcurrentHashMap<String, InetSocketAddress>();
-    private final LinkedBlockingDeque<M> localIncomingMessages = new LinkedBlockingDeque<M>();
-
-    private BSPPeer<?, ?, ?, ?, M> peer;
+    private static final ConcurrentHashMap<InetSocketAddress, LocalMessageManager> MANAGER_MAP = new ConcurrentHashMap<InetSocketAddress, LocalBSPRunner.LocalMessageManager>();
 
     @Override
     public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
         Configuration conf, InetSocketAddress peerAddress) {
-      this.peer = peer;
-      managerMap.put(peerAddress, this);
-    }
-
-    @Override
-    public void close() {
-
-    }
-
-    @Override
-    public M getCurrentMessage() throws IOException {
-      if (localIncomingMessages.isEmpty()) {
-        return null;
-      } else {
-        return localIncomingMessages.pop();
-      }
-    }
-
-    @Override
-    public void send(String peerName, M msg) throws IOException {
-      InetSocketAddress inetSocketAddress = socketCache.get(peerName);
-      if (inetSocketAddress == null) {
-        inetSocketAddress = BSPNetUtils.getAddress(peerName);
-        socketCache.put(peerName, inetSocketAddress);
-      }
-      MessageQueue<M> msgs = localOutgoingMessages.get(inetSocketAddress);
-      if (msgs == null) {
-        msgs = new MemoryQueue<M>();
-      }
-      msgs.add(msg);
-      peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
-      localOutgoingMessages.put(inetSocketAddress, msgs);
+      super.init(attemptId, peer, conf, peerAddress);
+      MANAGER_MAP.put(peerAddress, this);
     }
 
     @SuppressWarnings("unchecked")
@@ -385,54 +342,11 @@
     public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
         throws IOException {
       for (M value : bundle.getMessages()) {
-        managerMap.get(addr).localIncomingMessages.add(value);
+        MANAGER_MAP.get(addr).localQueueForNextIteration.add(value);
         peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
             1L);
       }
     }
-
-    @Override
-    public Iterator<Entry<InetSocketAddress, MessageQueue<M>>> getMessageIterator() {
-      return localOutgoingMessages.entrySet().iterator();
-    }
-
-    @Override
-    public void clearOutgoingQueues() {
-      localOutgoingMessages.clear();
-    }
-
-    @Override
-    public int getNumCurrentMessages() {
-      return localIncomingMessages.size();
-    }
-
-    @Override
-    public void finishSendPhase() throws IOException {
-    }
-
-    @Override
-    public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) {
-      for (Writable value : bundle.getMessages()) {
-        loopBackMessage(value);
-      }
-      
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void loopBackMessage(Writable message) {
-      localIncomingMessages.add((M)message);
-      peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
-          1L);
-    }
-
-    @Override
-    public void registerListener(MessageEventListener<M> listener)
-        throws IOException {
-    }
-    
-    
-
   }
 
   public static class LocalUmbilical implements BSPPeerProtocol {
@@ -550,13 +464,9 @@
     }
 
     @Override
-    public void close() throws IOException {
+    public void close() {
       barrier = null;
     }
-    @Override
-    public boolean getInformation(String key, Writable writableVal) {
-      return false;
-    }
 
     @Override
     public String constructKey(BSPJobID jobId, String... args) {
@@ -570,6 +480,11 @@
     }
 
     @Override
+    public boolean getInformation(String key, Writable valueHolder) {
+      return false;
+    }
+
+    @Override
     public boolean addKey(String key, boolean permanent,
         SyncEventListener listener) {
       return false;
@@ -581,23 +496,20 @@
     }
 
     @Override
-    public boolean registerListener(String key,
-        SyncEvent event,
-        SyncEventListener listener) {
-      return false;
+    public String[] getChildKeySet(String key, SyncEventListener listener) {
+      return null;
     }
 
     @Override
-    public String[] getChildKeySet(String key, SyncEventListener listener) {
-      return null;
+    public boolean registerListener(String key, SyncEvent event,
+        SyncEventListener listener) {
+      return false;
     }
 
     @Override
     public boolean remove(String key, SyncEventListener listener) {
       return false;
     }
-
-	
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/hama/util/BSPNetUtils.java b/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
index 4576f09..1e3017c 100644
--- a/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
+++ b/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
@@ -17,17 +17,13 @@
  */
 package org.apache.hama.util;
 
-import java.io.IOException;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.UnknownHostException;
-import java.util.NoSuchElementException;
-
 import org.apache.hama.Constants;
 import org.apache.mina.util.AvailablePortFinder;
 
+import java.io.IOException;
+import java.net.*;
+import java.util.NoSuchElementException;
+
 /**
  * NetUtils for our needs.
  */
@@ -37,7 +33,7 @@
   /**
    * Gets the canonical hostname of this machine.
    * 
-   * @return
+   * @return String representing host canonical host name
    * @throws UnknownHostException
    */
   public static String getCanonicalHostname() throws UnknownHostException {
@@ -71,8 +67,8 @@
    * Gets a new InetSocketAddress from the given peerName. peerName must contain
    * a colon to distinct between host and port.
    * 
-   * @param peerName
-   * @return
+   * @param peerName the name as a String of the BSP peer to get the address from
+   * @return the InetSocketAddress of the given BSP peer
    */
   public static InetSocketAddress getAddress(String peerName) {
     String[] peerAddrParts = peerName.split(":");
diff --git a/core/src/main/java/org/apache/hama/util/BSPServletUtil.java b/core/src/main/java/org/apache/hama/util/BSPServletUtil.java
index d2be184..7b14b54 100644
--- a/core/src/main/java/org/apache/hama/util/BSPServletUtil.java
+++ b/core/src/main/java/org/apache/hama/util/BSPServletUtil.java
@@ -17,17 +17,17 @@
  */
 package org.apache.hama.util;
 
-import java.io.IOException;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.Map.Entry;
-
 import org.apache.hadoop.util.ServletUtil;
 import org.apache.hama.bsp.BSPMaster;
 import org.apache.hama.bsp.ClusterStatus;
 import org.apache.hama.bsp.GroomServerStatus;
 import org.apache.hama.bsp.JobStatus;
 
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Map.Entry;
+
 public class BSPServletUtil extends ServletUtil {
 
   public static final String HTML_TAIL = "<hr />\n"
@@ -103,7 +103,7 @@
     for (Entry<String, GroomServerStatus> entry : status
         .getActiveGroomServerStatus().entrySet()) {
       sb.append("<tr><td>");
-      sb.append("<a href='http://" + entry.getKey() + "'>");
+      sb.append("<a href='http://").append(entry.getKey()).append("'>");
       sb.append(entry.getKey()).append("</a></td><td>");
       sb.append(entry.getValue().getGroomHostName()).append("</td>").append(
           "<td>").append(entry.getValue().getMaxTasks()).append("</td><td>");
diff --git a/core/src/main/java/org/apache/hama/util/Bytes.java b/core/src/main/java/org/apache/hama/util/Bytes.java
index 164d877..848748e 100644
--- a/core/src/main/java/org/apache/hama/util/Bytes.java
+++ b/core/src/main/java/org/apache/hama/util/Bytes.java
@@ -17,6 +17,13 @@
  */
 package org.apache.hama.util;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.Constants;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -25,13 +32,6 @@
 import java.nio.ByteBuffer;
 import java.util.Comparator;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hama.Constants;
-
 /**
  * Utility class that handles byte arrays, conversions to/from other types,
  * comparisons, hash code generation, manufacturing keys for HashMaps or
@@ -892,10 +892,7 @@
   public static boolean equals(final byte[] left, final byte[] right) {
     // Could use Arrays.equals?
     // noinspection SimplifiableConditionalExpression
-    if (left == null && right == null) {
-      return true;
-    }
-    return (!(left == null || right == null || (left.length != right.length)) && compareTo(left, right) == 0);
+    return left == null && right == null || (!(left == null || right == null || (left.length != right.length)) && compareTo(left, right) == 0);
   }
 
   /**
diff --git a/core/src/main/java/org/apache/hama/util/ClusterUtil.java b/core/src/main/java/org/apache/hama/util/ClusterUtil.java
index f476110..6e8ec91 100644
--- a/core/src/main/java/org/apache/hama/util/ClusterUtil.java
+++ b/core/src/main/java/org/apache/hama/util/ClusterUtil.java
@@ -17,9 +17,6 @@
  */
 package org.apache.hama.util;
 
-import java.io.IOException;
-import java.util.List;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -27,6 +24,9 @@
 import org.apache.hama.bsp.BSPMaster;
 import org.apache.hama.bsp.GroomServer;
 
+import java.io.IOException;
+import java.util.List;
+
 public class ClusterUtil {
   private static final Log LOG = LogFactory.getLog(ClusterUtil.class);
   
@@ -86,9 +86,9 @@
 
   /**
    * Start the cluster.
-   * @param m
-   * @param conf 
-   * @param groomservers
+   * @param m the BSP master
+   * @param conf cluster configuration to be used
+   * @param groomservers list of threads holding groom servers
    * @return Address to use contacting master.
    * @throws InterruptedException 
    * @throws IOException 
diff --git a/core/src/main/java/org/apache/hama/util/RandomVariable.java b/core/src/main/java/org/apache/hama/util/RandomVariable.java
index 05c95d0..b3445a2 100644
--- a/core/src/main/java/org/apache/hama/util/RandomVariable.java
+++ b/core/src/main/java/org/apache/hama/util/RandomVariable.java
@@ -31,8 +31,7 @@
    * @return a double between 0 and 1.
    */
   public static double rand() {
-    double x = Math.random();
-    return x;
+    return Math.random();
   }
 
   /**
@@ -44,8 +43,7 @@
    */
   public static int randInt(int i0, int i1) {
     double x = rand();
-    int i = i0 + (int) Math.floor((i1 - i0 + 1) * x);
-    return i;
+    return i0 + (int) Math.floor((i1 - i0 + 1) * x);
   }
   
   /**
@@ -74,8 +72,7 @@
    * @return a double.
    */
   public static double uniform(double min, double max) {
-    double x = min + (max - min) * rand();
-    return x;
+    return min + (max - min) * rand();
   }
 
   /**
@@ -109,9 +106,8 @@
    * @return a double.
    */
   public static double normal(double mu, double sigma) {
-    double x = mu + sigma * Math.cos(2 * Math.PI * rand())
+    return mu + sigma * Math.cos(2 * Math.PI * rand())
         * Math.sqrt(-2 * Math.log(rand()));
-    return x;
   }
 
   /**
@@ -137,9 +133,8 @@
    * @return a double.
    */
   public static double logNormal(double mu, double sigma) {
-    double x = mu + sigma * Math.cos(2 * Math.PI * rand())
+    return mu + sigma * Math.cos(2 * Math.PI * rand())
         * Math.sqrt(-2 * Math.log(rand()));
-    return x;
   }
 
   /**
@@ -150,8 +145,7 @@
    * @return a double.
    */
   public static double exponential(double lambda) {
-    double x = -1 / lambda * Math.log(rand());
-    return x;
+    return -1 / lambda * Math.log(rand());
   }
 
   /**
@@ -162,9 +156,8 @@
    * @return a double.
    */
   public static double triangular(double min, double max) {
-    double x = min / 2 + (max - min) * rand() / 2 + min / 2 + (max - min)
+    return min / 2 + (max - min) * rand() / 2 + min / 2 + (max - min)
         * rand() / 2;
-    return x;
   }
 
   /**
@@ -177,10 +170,9 @@
    */
   public static double triangular(double min, double med, double max) {
     double y = rand();
-    double x = (y < ((med - min) / (max - min))) ? (min + Math.sqrt(y
+    return (y < ((med - min) / (max - min))) ? (min + Math.sqrt(y
         * (max - min) * (med - min))) : (max - Math.sqrt((1 - y) * (max - min)
         * (max - med)));
-    return x;
   }
 
   /**
@@ -209,8 +201,7 @@
    * @return a double.
    */
   public static double cauchy(double mu, double sigma) {
-    double x = sigma * Math.tan(Math.PI * (rand() - 0.5)) + mu;
-    return x;
+    return sigma * Math.tan(Math.PI * (rand() - 0.5)) + mu;
   }
 
   /**
@@ -221,7 +212,6 @@
    * @return a double.
    */
   public static double weibull(double lambda, double c) {
-    double x = Math.pow(-Math.log(1 - rand()), 1 / c) / lambda;
-    return x;
+    return Math.pow(-Math.log(1 - rand()), 1 / c) / lambda;
   }
 }
diff --git a/core/src/main/java/org/apache/hama/util/VersionInfo.java b/core/src/main/java/org/apache/hama/util/VersionInfo.java
index 847197a..78667e1 100644
--- a/core/src/main/java/org/apache/hama/util/VersionInfo.java
+++ b/core/src/main/java/org/apache/hama/util/VersionInfo.java
@@ -40,7 +40,7 @@
 
   /**
    * Get the meta-data for the Hama package.
-   * @return
+   * @return the current package
    */
   static Package getPackage() {
     return myPackage;
diff --git a/dist/pom.xml b/dist/pom.xml
index 0fae82d..7716b2c 100644
--- a/dist/pom.xml
+++ b/dist/pom.xml
@@ -14,15 +14,13 @@
    See the License for the specific language governing permissions and
    limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   
   <parent>
     <groupId>org.apache.hama</groupId>
     <artifactId>hama-parent</artifactId>
-    <version>0.5.0-SNAPSHOT</version>
+    <version>0.6.0-SNAPSHOT</version>
   </parent>
   
   <artifactId>hama-dist</artifactId>
diff --git a/examples/pom.xml b/examples/pom.xml
index 88b3bbe..e5b1bc5 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -15,20 +15,19 @@
    limitations under the License.
 -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
   <parent>
     <groupId>org.apache.hama</groupId>
     <artifactId>hama-parent</artifactId>
-    <version>0.5.0-SNAPSHOT</version>
+    <version>0.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hama</groupId>
   <artifactId>hama-examples</artifactId>
   <name>examples</name>
-  <version>0.5.0-SNAPSHOT</version>
+  <version>0.6.0-SNAPSHOT</version>
   <packaging>jar</packaging>
 
   <dependencies>
diff --git a/graph/pom.xml b/graph/pom.xml
index c76ff6d..dc0fa41 100644
--- a/graph/pom.xml
+++ b/graph/pom.xml
@@ -15,20 +15,19 @@
    limitations under the License.
 -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
   <parent>
     <groupId>org.apache.hama</groupId>
     <artifactId>hama-parent</artifactId>
-    <version>0.5.0-SNAPSHOT</version>
+    <version>0.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hama</groupId>
   <artifactId>hama-graph</artifactId>
   <name>graph</name>
-  <version>0.5.0-SNAPSHOT</version>
+  <version>0.6.0-SNAPSHOT</version>
   <packaging>jar</packaging>
 
   <dependencies>
@@ -40,9 +39,9 @@
     <dependency>
       <groupId>org.apache.hama</groupId>
       <artifactId>hama-core</artifactId>
+      <version>${project.version}</version>
       <type>test-jar</type>
       <scope>test</scope>
-      <version>${project.version}</version>
     </dependency>
   </dependencies>
   <build>
diff --git a/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java b/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
index 2cbe9ca..a5759d6 100644
--- a/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
+++ b/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
@@ -391,16 +391,16 @@
     Vertex<V, E, M> vertex = newVertexInstance(vertexClass, conf);
     vertex.setPeer(peer);
     vertex.runner = this;
-    while (true) {
-      KeyValuePair<Writable, Writable> next = peer.readNext();
-      if (next == null) {
-        break;
-      }
+    
+    KeyValuePair<Writable, Writable> next = null;
+    int lines = 0;
+    while ((next = peer.readNext()) != null) {
       boolean vertexFinished = reader.parseVertex(next.getKey(),
           next.getValue(), vertex);
       if (!vertexFinished) {
         continue;
       }
+      
       if (vertex.getEdges() == null) {
         vertex.setEdges(new ArrayList<Edge<V, E>>(0));
       }
@@ -420,12 +420,26 @@
         }
         peer.send(peer.getPeerName(partition), new GraphJobMessage(vertex));
       } else {
+        // FIXME need to set destination names
         vertex.setup(conf);
         vertices.put(vertex.getVertexID(), vertex);
       }
       vertex = newVertexInstance(vertexClass, conf);
       vertex.setPeer(peer);
       vertex.runner = this;
+      
+      lines++;
+      if((lines % 100000) == 0) {
+        peer.sync();
+        GraphJobMessage msg = null;
+        while ((msg = peer.getCurrentMessage()) != null) {
+          Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
+          messagedVertex.setPeer(peer);
+          messagedVertex.runner = this;
+          messagedVertex.setup(conf);
+          vertices.put(messagedVertex.getVertexID(), messagedVertex);
+        }
+      }
     }
 
     if (runtimePartitioning) {
@@ -440,6 +454,8 @@
       }
     }
 
+    LOG.info("Loading finished at " + peer.getSuperstepCount() + " steps.");
+    
     /*
      * If the user want to repair the graph, it should traverse through that
      * local chunk of adjancency list and message the corresponding peer to
diff --git a/ml/pom.xml b/ml/pom.xml
index 8363b54..4811632 100644
--- a/ml/pom.xml
+++ b/ml/pom.xml
@@ -15,20 +15,19 @@
    limitations under the License.
 -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
   <parent>
     <groupId>org.apache.hama</groupId>
     <artifactId>hama-parent</artifactId>
-    <version>0.5.0-SNAPSHOT</version>
+    <version>0.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hama</groupId>
   <artifactId>hama-ml</artifactId>
   <name>machine learning</name>
-  <version>0.5.0-SNAPSHOT</version>
+  <version>0.6.0-SNAPSHOT</version>
   <packaging>jar</packaging>
 
   <dependencies>
diff --git a/ml/src/main/java/org/apache/hama/ml/math/DenseDoubleMatrix.java b/ml/src/main/java/org/apache/hama/ml/math/DenseDoubleMatrix.java
new file mode 100644
index 0000000..8e02272
--- /dev/null
+++ b/ml/src/main/java/org/apache/hama/ml/math/DenseDoubleMatrix.java
@@ -0,0 +1,781 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.math;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Random;
+
+/**
+ * Dense double matrix implementation, internally uses two dimensional double
+ * arrays.
+ */
+public final class DenseDoubleMatrix implements DoubleMatrix {
+
+  protected final double[][] matrix;
+  protected final int numRows;
+  protected final int numColumns;
+
+  /**
+   * Creates a new empty matrix from the rows and columns.
+   * 
+   * @param rows the num of rows.
+   * @param columns the num of columns.
+   */
+  public DenseDoubleMatrix(int rows, int columns) {
+    this.numRows = rows;
+    this.numColumns = columns;
+    this.matrix = new double[rows][columns];
+  }
+
+  /**
+   * Creates a new empty matrix from the rows and columns filled with the given
+   * default value.
+   * 
+   * @param rows the num of rows.
+   * @param columns the num of columns.
+   * @param defaultValue the default value.
+   */
+  public DenseDoubleMatrix(int rows, int columns, double defaultValue) {
+    this.numRows = rows;
+    this.numColumns = columns;
+    this.matrix = new double[rows][columns];
+
+    for (int i = 0; i < numRows; i++) {
+      Arrays.fill(matrix[i], defaultValue);
+    }
+  }
+
+  /**
+   * Creates a new empty matrix from the rows and columns filled with the given
+   * random values.
+   * 
+   * @param rows the num of rows.
+   * @param columns the num of columns.
+   * @param rand the random instance to use.
+   */
+  public DenseDoubleMatrix(int rows, int columns, Random rand) {
+    this.numRows = rows;
+    this.numColumns = columns;
+    this.matrix = new double[rows][columns];
+
+    for (int i = 0; i < numRows; i++) {
+      for (int j = 0; j < numColumns; j++) {
+        matrix[i][j] = rand.nextDouble();
+      }
+    }
+  }
+
+  /**
+   * Simple copy constructor, but does only bend the reference to this instance.
+   * 
+   * @param otherMatrix the other matrix.
+   */
+  public DenseDoubleMatrix(double[][] otherMatrix) {
+    this.matrix = otherMatrix;
+    this.numRows = otherMatrix.length;
+    if (matrix.length > 0)
+      this.numColumns = matrix[0].length;
+    else
+      this.numColumns = numRows;
+  }
+
+  /**
+   * Generates a matrix out of an vector array. it treats the array entries as
+   * rows and the vector itself contains the values of the columns.
+   * 
+   * @param vectorArray the array of vectors.
+   */
+  public DenseDoubleMatrix(DoubleVector[] vectorArray) {
+    this.matrix = new double[vectorArray.length][];
+    this.numRows = vectorArray.length;
+
+    for (int i = 0; i < vectorArray.length; i++) {
+      this.setRowVector(i, vectorArray[i]);
+    }
+
+    if (matrix.length > 0)
+      this.numColumns = matrix[0].length;
+    else
+      this.numColumns = numRows;
+  }
+
+  /**
+   * Sets the first column of this matrix to the given vector.
+   * 
+   * @param first the new first column of the given vector
+   */
+  public DenseDoubleMatrix(DenseDoubleVector first) {
+    this(first.getLength(), 1);
+    setColumn(0, first.toArray());
+  }
+
+  /**
+   * Copies the given double array v into the first row of this matrix, and
+   * creates this with the number of given rows and columns.
+   * 
+   * @param v the values to put into the first row.
+   * @param rows the number of rows.
+   * @param columns the number of columns.
+   */
+  public DenseDoubleMatrix(double[] v, int rows, int columns) {
+    this.matrix = new double[rows][columns];
+
+    for (int i = 0; i < rows; i++) {
+      System.arraycopy(v, i * columns, this.matrix[i], 0, columns);
+    }
+
+    int index = 0;
+    for (int col = 0; col < columns; col++) {
+      for (int row = 0; row < rows; row++) {
+        matrix[row][col] = v[index++];
+      }
+    }
+
+    this.numRows = rows;
+    this.numColumns = columns;
+  }
+
+  /**
+   * Creates a new matrix with the given vector into the first column and the
+   * other matrix to the other columns.
+   * 
+   * @param first the new first column.
+   * @param otherMatrix the other matrix to set on from the second column.
+   */
+  public DenseDoubleMatrix(DenseDoubleVector first, DoubleMatrix otherMatrix) {
+    this(otherMatrix.getRowCount(), otherMatrix.getColumnCount() + 1);
+    setColumn(0, first.toArray());
+    for (int col = 1; col < otherMatrix.getColumnCount() + 1; col++)
+      setColumnVector(col, otherMatrix.getColumnVector(col - 1));
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#get(int, int)
+   */
+  @Override
+  public final double get(int row, int col) {
+    return this.matrix[row][col];
+  }
+
+  /**
+   * Gets a whole column of the matrix as a double array.
+   */
+  public final double[] getColumn(int col) {
+    final double[] column = new double[numRows];
+    for (int r = 0; r < numRows; r++) {
+      column[r] = matrix[r][col];
+    }
+    return column;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#getColumnCount()
+   */
+  @Override
+  public final int getColumnCount() {
+    return numColumns;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#getColumnVector(int)
+   */
+  @Override
+  public final DoubleVector getColumnVector(int col) {
+    return new DenseDoubleVector(getColumn(col));
+  }
+
+  /**
+   * Get the matrix as 2-dimensional double array (first dimension is the row,
+   * second the column) to faster access the values.
+   */
+  public final double[][] getValues() {
+    return matrix;
+  }
+
+  /**
+   * Get a single row of the matrix as a double array.
+   */
+  public final double[] getRow(int row) {
+    return matrix[row];
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#getRowCount()
+   */
+  @Override
+  public final int getRowCount() {
+    return numRows;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#getRowVector(int)
+   */
+  @Override
+  public final DoubleVector getRowVector(int row) {
+    return new DenseDoubleVector(getRow(row));
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#set(int, int, double)
+   */
+  @Override
+  public final void set(int row, int col, double value) {
+    this.matrix[row][col] = value;
+  }
+
+  /**
+   * Sets the row to a given double array. This does not copy, rather than just
+   * bends the references.
+   */
+  public final void setRow(int row, double[] value) {
+    this.matrix[row] = value;
+  }
+
+  /**
+   * Sets the column to a given double array. This does not copy, rather than
+   * just bends the references.
+   */
+  public final void setColumn(int col, double[] values) {
+    for (int i = 0; i < getRowCount(); i++) {
+      this.matrix[i][col] = values[i];
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#setColumnVector(int,
+   * de.jungblut.math.DoubleVector)
+   */
+  @Override
+  public void setColumnVector(int col, DoubleVector column) {
+    this.setColumn(col, column.toArray());
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#setRowVector(int,
+   * de.jungblut.math.DoubleVector)
+   */
+  @Override
+  public void setRowVector(int rowIndex, DoubleVector row) {
+    this.setRow(rowIndex, row.toArray());
+  }
+
+  /**
+   * Returns the size of the matrix as string (ROWSxCOLUMNS).
+   */
+  public String sizeToString() {
+    return numRows + "x" + numColumns;
+  }
+
+  /**
+   * Splits the last column from this matrix. Usually used to get a prediction
+   * column from some machine learning problem.
+   * 
+   * @return a tuple of a new sliced matrix and a vector which was the last
+   *         column.
+   */
+  public final Tuple<DenseDoubleMatrix, DenseDoubleVector> splitLastColumn() {
+    DenseDoubleMatrix m = new DenseDoubleMatrix(getRowCount(),
+        getColumnCount() - 1);
+    for (int i = 0; i < getRowCount(); i++) {
+      for (int j = 0; j < getColumnCount() - 1; j++) {
+        m.set(i, j, get(i, j));
+      }
+    }
+    DenseDoubleVector v = new DenseDoubleVector(getColumn(getColumnCount() - 1));
+    return new Tuple<DenseDoubleMatrix, DenseDoubleVector>(m, v);
+  }
+
+  /**
+   * Creates two matrices out of this by the given percentage. It uses a random
+   * function to determine which rows should belong to the matrix including the
+   * given percentage amount of rows.
+   * 
+   * @param percentage A float value between 0.0f and 1.0f
+   * @return A tuple which includes two matrices, the first contains the
+   *         percentage of the rows from the original matrix (rows are chosen
+   *         randomly) and the second one contains all other rows.
+   */
+  public final Tuple<DenseDoubleMatrix, DenseDoubleMatrix> splitRandomMatrices(
+      float percentage) {
+    if (percentage < 0.0f || percentage > 1.0f) {
+      throw new IllegalArgumentException(
+          "Percentage must be between 0.0 and 1.0! Given " + percentage);
+    }
+
+    if (percentage == 1.0f) {
+      return new Tuple<DenseDoubleMatrix, DenseDoubleMatrix>(this, null);
+    } else if (percentage == 0.0f) {
+      return new Tuple<DenseDoubleMatrix, DenseDoubleMatrix>(null, this);
+    }
+
+    final Random rand = new Random(System.nanoTime());
+    int firstMatrixRowsCount = Math.round(percentage * numRows);
+
+    // we first choose needed rows number of items to pick
+    final HashSet<Integer> lowerMatrixRowIndices = new HashSet<Integer>();
+    int missingRows = firstMatrixRowsCount;
+    while (missingRows > 0) {
+      final int nextIndex = rand.nextInt(numRows);
+      if (lowerMatrixRowIndices.add(nextIndex)) {
+        missingRows--;
+      }
+    }
+
+    // make to new matrixes
+    final double[][] firstMatrix = new double[firstMatrixRowsCount][numColumns];
+    int firstMatrixIndex = 0;
+    final double[][] secondMatrix = new double[numRows - firstMatrixRowsCount][numColumns];
+    int secondMatrixIndex = 0;
+
+    // then we loop over all items and put split the matrix
+    for (int r = 0; r < numRows; r++) {
+      if (lowerMatrixRowIndices.contains(r)) {
+        firstMatrix[firstMatrixIndex++] = matrix[r];
+      } else {
+        secondMatrix[secondMatrixIndex++] = matrix[r];
+      }
+    }
+
+    return new Tuple<DenseDoubleMatrix, DenseDoubleMatrix>(
+        new DenseDoubleMatrix(firstMatrix), new DenseDoubleMatrix(secondMatrix));
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#multiply(double)
+   */
+  @Override
+  public final DenseDoubleMatrix multiply(double scalar) {
+    DenseDoubleMatrix m = new DenseDoubleMatrix(this.numRows, this.numColumns);
+    for (int i = 0; i < numRows; i++) {
+      for (int j = 0; j < numColumns; j++) {
+        m.set(i, j, this.matrix[i][j] * scalar);
+      }
+    }
+    return m;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#multiply(de.jungblut.math.DoubleMatrix)
+   */
+  @Override
+  public final DoubleMatrix multiply(DoubleMatrix other) {
+    DenseDoubleMatrix matrix = new DenseDoubleMatrix(this.getRowCount(),
+        other.getColumnCount());
+
+    final int m = this.numRows;
+    final int n = this.numColumns;
+    final int p = other.getColumnCount();
+
+    for (int j = p; --j >= 0;) {
+      for (int i = m; --i >= 0;) {
+        double s = 0;
+        for (int k = n; --k >= 0;) {
+          s += get(i, k) * other.get(k, j);
+        }
+        matrix.set(i, j, s + matrix.get(i, j));
+      }
+    }
+
+    return matrix;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see
+   * de.jungblut.math.DoubleMatrix#multiplyElementWise(de.jungblut.math.DoubleMatrix
+   * )
+   */
+  @Override
+  public final DoubleMatrix multiplyElementWise(DoubleMatrix other) {
+    DenseDoubleMatrix matrix = new DenseDoubleMatrix(this.numRows,
+        this.numColumns);
+
+    for (int i = 0; i < numRows; i++) {
+      for (int j = 0; j < numColumns; j++) {
+        matrix.set(i, j, this.get(i, j) * (other.get(i, j)));
+      }
+    }
+
+    return matrix;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see
+   * de.jungblut.math.DoubleMatrix#multiplyVector(de.jungblut.math.DoubleVector)
+   */
+  @Override
+  public final DoubleVector multiplyVector(DoubleVector v) {
+    DoubleVector vector = new DenseDoubleVector(this.getRowCount());
+    for (int row = 0; row < numRows; row++) {
+      double sum = 0.0d;
+      for (int col = 0; col < numColumns; col++) {
+        sum += (matrix[row][col] * v.get(col));
+      }
+      vector.set(row, sum);
+    }
+
+    return vector;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#transpose()
+   */
+  @Override
+  public DenseDoubleMatrix transpose() {
+    DenseDoubleMatrix m = new DenseDoubleMatrix(this.numColumns, this.numRows);
+    for (int i = 0; i < numRows; i++) {
+      for (int j = 0; j < numColumns; j++) {
+        m.set(j, i, this.matrix[i][j]);
+      }
+    }
+    return m;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#subtractBy(double)
+   */
+  @Override
+  public DenseDoubleMatrix subtractBy(double amount) {
+    DenseDoubleMatrix m = new DenseDoubleMatrix(this.numRows, this.numColumns);
+    for (int i = 0; i < numRows; i++) {
+      for (int j = 0; j < numColumns; j++) {
+        m.set(i, j, amount - this.matrix[i][j]);
+      }
+    }
+    return m;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#subtract(double)
+   */
+  @Override
+  public DenseDoubleMatrix subtract(double amount) {
+    DenseDoubleMatrix m = new DenseDoubleMatrix(this.numRows, this.numColumns);
+    for (int i = 0; i < numRows; i++) {
+      for (int j = 0; j < numColumns; j++) {
+        m.set(i, j, this.matrix[i][j] - amount);
+      }
+    }
+    return m;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#subtract(de.jungblut.math.DoubleMatrix)
+   */
+  @Override
+  public DoubleMatrix subtract(DoubleMatrix other) {
+    DoubleMatrix m = new DenseDoubleMatrix(this.numRows, this.numColumns);
+    for (int i = 0; i < numRows; i++) {
+      for (int j = 0; j < numColumns; j++) {
+        m.set(i, j, this.matrix[i][j] - other.get(i, j));
+      }
+    }
+    return m;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#subtract(de.jungblut.math.DoubleVector)
+   */
+  @Override
+  public DenseDoubleMatrix subtract(DoubleVector vec) {
+    DenseDoubleMatrix cop = new DenseDoubleMatrix(this.getRowCount(),
+        this.getColumnCount());
+    for (int i = 0; i < this.getColumnCount(); i++) {
+      cop.setColumn(i, getColumnVector(i).subtract(vec.get(i)).toArray());
+    }
+    return cop;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#divide(de.jungblut.math.DoubleVector)
+   */
+  @Override
+  public DoubleMatrix divide(DoubleVector vec) {
+    DoubleMatrix cop = new DenseDoubleMatrix(this.getRowCount(),
+        this.getColumnCount());
+    for (int i = 0; i < this.getColumnCount(); i++) {
+      cop.setColumnVector(i, getColumnVector(i).divide(vec.get(i)));
+    }
+    return cop;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#divide(de.jungblut.math.DoubleMatrix)
+   */
+  @Override
+  public DoubleMatrix divide(DoubleMatrix other) {
+    DoubleMatrix m = new DenseDoubleMatrix(this.numRows, this.numColumns);
+    for (int i = 0; i < numRows; i++) {
+      for (int j = 0; j < numColumns; j++) {
+        m.set(i, j, this.matrix[i][j] / other.get(i, j));
+      }
+    }
+    return m;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#divide(double)
+   */
+  @Override
+  public DoubleMatrix divide(double scalar) {
+    DoubleMatrix m = new DenseDoubleMatrix(this.numRows, this.numColumns);
+    for (int i = 0; i < numRows; i++) {
+      for (int j = 0; j < numColumns; j++) {
+        m.set(i, j, this.matrix[i][j] / scalar);
+      }
+    }
+    return m;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#add(de.jungblut.math.DoubleMatrix)
+   */
+  @Override
+  public DoubleMatrix add(DoubleMatrix other) {
+    DoubleMatrix m = new DenseDoubleMatrix(this.numRows, this.numColumns);
+    for (int i = 0; i < numRows; i++) {
+      for (int j = 0; j < numColumns; j++) {
+        m.set(i, j, this.matrix[i][j] + other.get(i, j));
+      }
+    }
+    return m;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#pow(int)
+   */
+  @Override
+  public DoubleMatrix pow(int x) {
+    DoubleMatrix m = new DenseDoubleMatrix(this.numRows, this.numColumns);
+    for (int i = 0; i < numRows; i++) {
+      for (int j = 0; j < numColumns; j++) {
+        m.set(i, j, Math.pow(matrix[i][j], x));
+      }
+    }
+    return m;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#max(int)
+   */
+  @Override
+  public double max(int column) {
+    double max = Double.MIN_VALUE;
+    for (int i = 0; i < getRowCount(); i++) {
+      double d = matrix[i][column];
+      if (d > max) {
+        max = d;
+      }
+    }
+    return max;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#min(int)
+   */
+  @Override
+  public double min(int column) {
+    double min = Double.MAX_VALUE;
+    for (int i = 0; i < getRowCount(); i++) {
+      double d = matrix[i][column];
+      if (d < min) {
+        min = d;
+      }
+    }
+    return min;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#slice(int, int)
+   */
+  @Override
+  public DoubleMatrix slice(int rows, int cols) {
+    return slice(0, rows, 0, cols);
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#slice(int, int, int, int)
+   */
+  @Override
+  public DoubleMatrix slice(int rowOffset, int rowMax, int colOffset, int colMax) {
+    DenseDoubleMatrix m = new DenseDoubleMatrix(rowMax - rowOffset, colMax
+        - colOffset);
+    for (int row = rowOffset; row < rowMax; row++) {
+      for (int col = colOffset; col < colMax; col++) {
+        m.set(row - rowOffset, col - colOffset, this.get(row, col));
+      }
+    }
+    return m;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#isSparse()
+   */
+  @Override
+  public boolean isSparse() {
+    return false;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#sum()
+   */
+  @Override
+  public double sum() {
+    double x = 0.0d;
+    for (int i = 0; i < numRows; i++) {
+      for (int j = 0; j < numColumns; j++) {
+        x += Math.abs(matrix[i][j]);
+      }
+    }
+    return x;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleMatrix#columnIndices()
+   */
+  @Override
+  public int[] columnIndices() {
+    int[] x = new int[getColumnCount()];
+    for (int i = 0; i < getColumnCount(); i++)
+      x[i] = i;
+    return x;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + Arrays.hashCode(matrix);
+    result = prime * result + numColumns;
+    result = prime * result + numRows;
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    DenseDoubleMatrix other = (DenseDoubleMatrix) obj;
+    if (!Arrays.deepEquals(matrix, other.matrix))
+      return false;
+    if (numColumns != other.numColumns)
+      return false;
+    return numRows == other.numRows;
+  }
+
+  @Override
+  public String toString() {
+    if (numRows < 10) {
+      StringBuilder sb = new StringBuilder();
+      for (int i = 0; i < numRows; i++) {
+        sb.append(Arrays.toString(matrix[i]));
+        sb.append('\n');
+      }
+      return sb.toString();
+    } else {
+      return numRows + "x" + numColumns;
+    }
+  }
+
+  /**
+   * Gets the eye matrix (ones on the main diagonal) with a given dimension.
+   */
+  public static DenseDoubleMatrix eye(int dimension) {
+    DenseDoubleMatrix m = new DenseDoubleMatrix(dimension, dimension);
+
+    for (int i = 0; i < dimension; i++) {
+      m.set(i, i, 1);
+    }
+
+    return m;
+  }
+
+  /**
+   * Deep copies the given matrix into a new returned one.
+   */
+  public static DenseDoubleMatrix copy(DenseDoubleMatrix matrix) {
+    final double[][] src = matrix.getValues();
+    final double[][] dest = new double[matrix.getRowCount()][matrix
+        .getColumnCount()];
+
+    for (int i = 0; i < dest.length; i++)
+      System.arraycopy(src[i], 0, dest[i], 0, src[i].length);
+
+    return new DenseDoubleMatrix(dest);
+  }
+
+  /**
+   * Some strange function I found in octave but I don't know what it was named.
+   * It does however multiply the elements from the transposed vector and the
+   * normal vector and sets it into the according indices of a new constructed
+   * matrix.
+   */
+  public static DenseDoubleMatrix multiplyTransposedVectors(
+      DoubleVector transposed, DoubleVector normal) {
+    DenseDoubleMatrix m = new DenseDoubleMatrix(transposed.getLength(),
+        normal.getLength());
+    for (int row = 0; row < transposed.getLength(); row++) {
+      for (int col = 0; col < normal.getLength(); col++) {
+        m.set(row, col, transposed.get(row) * normal.get(col));
+      }
+    }
+
+    return m;
+  }
+
+  /**
+   * Just a absolute error function.
+   */
+  public static double error(DenseDoubleMatrix a, DenseDoubleMatrix b) {
+    return a.subtract(b).sum();
+  }
+
+}
diff --git a/ml/src/main/java/org/apache/hama/ml/math/DenseDoubleVector.java b/ml/src/main/java/org/apache/hama/ml/math/DenseDoubleVector.java
new file mode 100644
index 0000000..dfccf5b
--- /dev/null
+++ b/ml/src/main/java/org/apache/hama/ml/math/DenseDoubleVector.java
@@ -0,0 +1,631 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.math;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.AbstractIterator;
+
+/**
+ * Dense double vector implementation.
+ */
+public final class DenseDoubleVector implements DoubleVector {
+
+  private final double[] vector;
+
+  /**
+   * Creates a new vector with the given length.
+   */
+  public DenseDoubleVector(int length) {
+    this.vector = new double[length];
+  }
+
+  /**
+   * Creates a new vector with the given length and default value.
+   */
+  public DenseDoubleVector(int length, double val) {
+    this(length);
+    Arrays.fill(vector, val);
+  }
+
+  /**
+   * Creates a new vector with the given array.
+   */
+  public DenseDoubleVector(double[] arr) {
+    this.vector = arr;
+  }
+
+  /**
+   * Creates a new vector with the given array and the last value f1.
+   */
+  public DenseDoubleVector(double[] array, double f1) {
+    this.vector = new double[array.length + 1];
+    System.arraycopy(array, 0, this.vector, 0, array.length);
+    this.vector[array.length] = f1;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#get(int)
+   */
+  @Override
+  public final double get(int index) {
+    return vector[index];
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#getLength()
+   */
+  @Override
+  public final int getLength() {
+    return vector.length;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#getDimension()
+   */
+  @Override
+  public int getDimension() {
+    return getLength();
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#set(int, double)
+   */
+  @Override
+  public final void set(int index, double value) {
+    vector[index] = value;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#apply(de.jungblut.math.function.
+   * DoubleVectorFunction)
+   */
+  @Override
+  public DoubleVector apply(DoubleVectorFunction func) {
+    DenseDoubleVector newV = new DenseDoubleVector(this.vector);
+    for (int i = 0; i < vector.length; i++) {
+      newV.vector[i] = func.calculate(i, vector[i]);
+    }
+    return newV;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#apply(de.jungblut.math.DoubleVector,
+   * de.jungblut.math.function.DoubleDoubleVectorFunction)
+   */
+  @Override
+  public DoubleVector apply(DoubleVector other, DoubleDoubleVectorFunction func) {
+    DenseDoubleVector newV = (DenseDoubleVector) deepCopy();
+    for (int i = 0; i < vector.length; i++) {
+      newV.vector[i] = func.calculate(i, vector[i], other.get(i));
+    }
+    return newV;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#add(de.jungblut.math.DoubleVector)
+   */
+  @Override
+  public final DoubleVector add(DoubleVector v) {
+    DenseDoubleVector newv = new DenseDoubleVector(v.getLength());
+    for (int i = 0; i < v.getLength(); i++) {
+      newv.set(i, this.get(i) + v.get(i));
+    }
+    return newv;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#add(double)
+   */
+  @Override
+  public final DoubleVector add(double scalar) {
+    DoubleVector newv = new DenseDoubleVector(this.getLength());
+    for (int i = 0; i < this.getLength(); i++) {
+      newv.set(i, this.get(i) + scalar);
+    }
+    return newv;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#subtract(de.jungblut.math.DoubleVector)
+   */
+  @Override
+  public final DoubleVector subtract(DoubleVector v) {
+    DoubleVector newv = new DenseDoubleVector(v.getLength());
+    for (int i = 0; i < v.getLength(); i++) {
+      newv.set(i, this.get(i) - v.get(i));
+    }
+    return newv;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#subtract(double)
+   */
+  @Override
+  public final DoubleVector subtract(double v) {
+    DenseDoubleVector newv = new DenseDoubleVector(vector.length);
+    for (int i = 0; i < vector.length; i++) {
+      newv.set(i, vector[i] - v);
+    }
+    return newv;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#subtractFrom(double)
+   */
+  @Override
+  public final DoubleVector subtractFrom(double v) {
+    DenseDoubleVector newv = new DenseDoubleVector(vector.length);
+    for (int i = 0; i < vector.length; i++) {
+      newv.set(i, v - vector[i]);
+    }
+    return newv;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#multiply(double)
+   */
+  @Override
+  public DoubleVector multiply(double scalar) {
+    DoubleVector v = new DenseDoubleVector(this.getLength());
+    for (int i = 0; i < v.getLength(); i++) {
+      v.set(i, this.get(i) * scalar);
+    }
+    return v;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#multiply(de.jungblut.math.DoubleVector)
+   */
+  @Override
+  public DoubleVector multiply(DoubleVector vector) {
+    DoubleVector v = new DenseDoubleVector(this.getLength());
+    for (int i = 0; i < v.getLength(); i++) {
+      v.set(i, this.get(i) * vector.get(i));
+    }
+    return v;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#divide(double)
+   */
+  @Override
+  public DoubleVector divide(double scalar) {
+    DenseDoubleVector v = new DenseDoubleVector(this.getLength());
+    for (int i = 0; i < v.getLength(); i++) {
+      v.set(i, this.get(i) / scalar);
+    }
+    return v;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#pow(int)
+   */
+  @Override
+  public DoubleVector pow(int x) {
+    DenseDoubleVector v = new DenseDoubleVector(getLength());
+    for (int i = 0; i < v.getLength(); i++) {
+      double value = 0.0d;
+      // it is faster to multiply when we having ^2
+      if (x == 2) {
+        value = vector[i] * vector[i];
+      } else {
+        value = Math.pow(vector[i], x);
+      }
+      v.set(i, value);
+    }
+    return v;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#sqrt()
+   */
+  @Override
+  public DoubleVector sqrt() {
+    DoubleVector v = new DenseDoubleVector(getLength());
+    for (int i = 0; i < v.getLength(); i++) {
+      v.set(i, Math.sqrt(vector[i]));
+    }
+    return v;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#sum()
+   */
+  @Override
+  public double sum() {
+    double sum = 0.0d;
+    for (double aVector : vector) {
+      sum += aVector;
+    }
+    return sum;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#abs()
+   */
+  @Override
+  public DoubleVector abs() {
+    DoubleVector v = new DenseDoubleVector(getLength());
+    for (int i = 0; i < v.getLength(); i++) {
+      v.set(i, Math.abs(vector[i]));
+    }
+    return v;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#divideFrom(double)
+   */
+  @Override
+  public DoubleVector divideFrom(double scalar) {
+    DoubleVector v = new DenseDoubleVector(this.getLength());
+    for (int i = 0; i < v.getLength(); i++) {
+      if (this.get(i) != 0.0d) {
+        double result = scalar / this.get(i);
+        v.set(i, result);
+      } else {
+        v.set(i, 0.0d);
+      }
+    }
+    return v;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#dot(de.jungblut.math.DoubleVector)
+   */
+  @Override
+  public double dot(DoubleVector s) {
+    double dotProduct = 0.0d;
+    for (int i = 0; i < getLength(); i++) {
+      dotProduct += this.get(i) * s.get(i);
+    }
+    return dotProduct;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#slice(int)
+   */
+  @Override
+  public DoubleVector slice(int length) {
+    return slice(0, length);
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#slice(int, int)
+   */
+  @Override
+  public DoubleVector slice(int offset, int length) {
+    DoubleVector nv = new DenseDoubleVector(length - offset);
+    int index = 0;
+    for (int i = offset; i < length; i++) {
+      nv.set(index++, vector[i]);
+    }
+
+    return nv;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#max()
+   */
+  @Override
+  public double max() {
+    double max = -Double.MAX_VALUE;
+    for (int i = 0; i < getLength(); i++) {
+      double d = vector[i];
+      if (d > max) {
+        max = d;
+      }
+    }
+    return max;
+  }
+
+  /**
+   * @return the index where the maximum resides.
+   */
+  public int maxIndex() {
+    double max = -Double.MAX_VALUE;
+    int maxIndex = 0;
+    for (int i = 0; i < getLength(); i++) {
+      double d = vector[i];
+      if (d > max) {
+        max = d;
+        maxIndex = i;
+      }
+    }
+    return maxIndex;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#min()
+   */
+  @Override
+  public double min() {
+    double min = Double.MAX_VALUE;
+    for (int i = 0; i < getLength(); i++) {
+      double d = vector[i];
+      if (d < min) {
+        min = d;
+      }
+    }
+    return min;
+  }
+
+  /**
+   * @return the index where the minimum resides.
+   */
+  public int minIndex() {
+    double min = Double.MAX_VALUE;
+    int minIndex = 0;
+    for (int i = 0; i < getLength(); i++) {
+      double d = vector[i];
+      if (d < min) {
+        min = d;
+        minIndex = i;
+      }
+    }
+    return minIndex;
+  }
+
+  /**
+   * @return a new vector which has rinted each element.
+   */
+  public DenseDoubleVector rint() {
+    DenseDoubleVector v = new DenseDoubleVector(getLength());
+    for (int i = 0; i < getLength(); i++) {
+      double d = vector[i];
+      v.set(i, Math.rint(d));
+    }
+    return v;
+  }
+
+  /**
+   * @return a new vector which has rounded each element.
+   */
+  public DenseDoubleVector round() {
+    DenseDoubleVector v = new DenseDoubleVector(getLength());
+    for (int i = 0; i < getLength(); i++) {
+      double d = vector[i];
+      v.set(i, Math.round(d));
+    }
+    return v;
+  }
+
+  /**
+   * @return a new vector which has ceiled each element.
+   */
+  public DenseDoubleVector ceil() {
+    DenseDoubleVector v = new DenseDoubleVector(getLength());
+    for (int i = 0; i < getLength(); i++) {
+      double d = vector[i];
+      v.set(i, Math.ceil(d));
+    }
+    return v;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#toArray()
+   */
+  @Override
+  public final double[] toArray() {
+    return vector;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#isSparse()
+   */
+  @Override
+  public boolean isSparse() {
+    return false;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#deepCopy()
+   */
+  @Override
+  public DoubleVector deepCopy() {
+    final double[] src = vector;
+    final double[] dest = new double[vector.length];
+    System.arraycopy(src, 0, dest, 0, vector.length);
+    return new DenseDoubleVector(dest);
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#iterateNonZero()
+   */
+  @Override
+  public Iterator<DoubleVectorElement> iterateNonZero() {
+    return new NonZeroIterator();
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see de.jungblut.math.DoubleVector#iterate()
+   */
+  @Override
+  public Iterator<DoubleVectorElement> iterate() {
+    return new DefaultIterator();
+  }
+
+  @Override
+  public final String toString() {
+    if (getLength() < 20) {
+      return Arrays.toString(vector);
+    } else {
+      return getLength() + "x1";
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + Arrays.hashCode(vector);
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    DenseDoubleVector other = (DenseDoubleVector) obj;
+    return Arrays.equals(vector, other.vector);
+  }
+
+  /**
+   * Non-zero iterator for vector elements.
+   */
+  private final class NonZeroIterator extends
+      AbstractIterator<DoubleVectorElement> {
+
+    private final DoubleVectorElement element = new DoubleVectorElement();
+    private final double[] array;
+    private int currentIndex = 0;
+
+    private NonZeroIterator() {
+      this.array = vector;
+    }
+
+    @Override
+    protected final DoubleVectorElement computeNext() {
+      while (array[currentIndex] == 0.0d) {
+        currentIndex++;
+        if (currentIndex >= array.length)
+          return endOfData();
+      }
+      element.setIndex(currentIndex);
+      element.setValue(array[currentIndex]);
+      return element;
+    }
+  }
+
+  /**
+   * Iterator for all elements.
+   */
+  private final class DefaultIterator extends
+      AbstractIterator<DoubleVectorElement> {
+
+    private final DoubleVectorElement element = new DoubleVectorElement();
+    private final double[] array;
+    private int currentIndex = 0;
+
+    private DefaultIterator() {
+      this.array = vector;
+    }
+
+    @Override
+    protected final DoubleVectorElement computeNext() {
+      if (currentIndex < array.length) {
+        element.setIndex(currentIndex);
+        element.setValue(array[currentIndex]);
+        currentIndex++;
+        return element;
+      } else {
+        return endOfData();
+      }
+    }
+
+  }
+
+  /**
+   * @return a new vector with dimension num and a default value of 1.
+   */
+  public static DenseDoubleVector ones(int num) {
+    return new DenseDoubleVector(num, 1.0d);
+  }
+
+  /**
+   * @return a new vector filled from index, to index, with a given stepsize.
+   */
+  public static DenseDoubleVector fromUpTo(double from, double to,
+      double stepsize) {
+    DenseDoubleVector v = new DenseDoubleVector(
+        (int) (Math.round(((to - from) / stepsize) + 0.5)));
+
+    for (int i = 0; i < v.getLength(); i++) {
+      v.set(i, from + i * stepsize);
+    }
+    return v;
+  }
+
+  /**
+   * Some crazy sort function.
+   */
+  public static List<Tuple<Double, Integer>> sort(DoubleVector vector,
+      final Comparator<Double> scoreComparator) {
+    List<Tuple<Double, Integer>> list = new ArrayList<Tuple<Double, Integer>>(
+        vector.getLength());
+    for (int i = 0; i < vector.getLength(); i++) {
+      list.add(new Tuple<Double, Integer>(vector.get(i), i));
+    }
+    Collections.sort(list, new Comparator<Tuple<Double, Integer>>() {
+      @Override
+      public int compare(Tuple<Double, Integer> o1, Tuple<Double, Integer> o2) {
+        return scoreComparator.compare(o1.getFirst(), o2.getFirst());
+      }
+    });
+    return list;
+  }
+
+  @Override
+  public boolean isNamed() {
+    return false;
+  }
+
+  @Override
+  public String getName() {
+    return null;
+  }
+
+}
diff --git a/ml/src/main/java/org/apache/hama/ml/math/DoubleDoubleVectorFunction.java b/ml/src/main/java/org/apache/hama/ml/math/DoubleDoubleVectorFunction.java
new file mode 100644
index 0000000..4cf9bf4
--- /dev/null
+++ b/ml/src/main/java/org/apache/hama/ml/math/DoubleDoubleVectorFunction.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.math;
+
+/**
+ * A function that can be applied to two double vectors via {@link DoubleVector}
+ * #apply({@link DoubleVector} v, {@link DoubleDoubleVectorFunction} f);
+ */
+public interface DoubleDoubleVectorFunction {
+
+  /**
+   * Calculates the result of the left and right value of two vectors at a given
+   * index.
+   */
+  public double calculate(int index, double left, double right);
+
+}
diff --git a/ml/src/main/java/org/apache/hama/ml/math/DoubleMatrix.java b/ml/src/main/java/org/apache/hama/ml/math/DoubleMatrix.java
new file mode 100644
index 0000000..65f3df4
--- /dev/null
+++ b/ml/src/main/java/org/apache/hama/ml/math/DoubleMatrix.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.math;
+
+/**
+ * Standard matrix interface for double elements. Every implementation should
+ * return a fresh new Matrix when operating with other elements.
+ */
+public interface DoubleMatrix {
+
+  /**
+   * Not flagged value for sparse matrices, it is default to 0.0d.
+   */
+  public static final double NOT_FLAGGED = 0.0d;
+
+  /**
+   * Get a specific value of the matrix.
+   * 
+   * @return Returns the integer value at in the column at the row.
+   */
+  public double get(int row, int col);
+
+  /**
+   * Returns the number of columns in the matrix. Always a constant time
+   * operation.
+   */
+  public int getColumnCount();
+
+  /**
+   * Get a whole column of the matrix as vector.
+   */
+  public DoubleVector getColumnVector(int col);
+
+  /**
+   * Returns the number of rows in this matrix. Always a constant time
+   * operation.
+   */
+  public int getRowCount();
+
+  /**
+   * Get a single row of the matrix as a vector.
+   */
+  public DoubleVector getRowVector(int row);
+
+  /**
+   * Sets the value at the given row and column index.
+   */
+  public void set(int row, int col, double value);
+
+  /**
+   * Sets a whole column at index col with the given vector.
+   */
+  public void setColumnVector(int col, DoubleVector column);
+
+  /**
+   * Sets the whole row at index rowIndex with the given vector.
+   */
+  public void setRowVector(int rowIndex, DoubleVector row);
+
+  /**
+   * Multiplies this matrix (each element) with the given scalar and returns a
+   * new matrix.
+   */
+  public DoubleMatrix multiply(double scalar);
+
+  /**
+   * Multiplies this matrix with the given other matrix.
+   */
+  public DoubleMatrix multiply(DoubleMatrix other);
+
+  /**
+   * Multiplies this matrix per element with a given matrix.
+   */
+  public DoubleMatrix multiplyElementWise(DoubleMatrix other);
+
+  /**
+   * Multiplies this matrix with a given vector v. The returning vector contains
+   * the sum of the rows.
+   */
+  public DoubleVector multiplyVector(DoubleVector v);
+
+  /**
+   * Transposes this matrix.
+   */
+  public DoubleMatrix transpose();
+
+  /**
+   * Substracts the given amount by each element in this matrix. <br/>
+   * = (amount - matrix value)
+   */
+  public DoubleMatrix subtractBy(double amount);
+
+  /**
+   * Subtracts each element in this matrix by the given amount.<br/>
+   * = (matrix value - amount)
+   */
+  public DoubleMatrix subtract(double amount);
+
+  /**
+   * Subtracts this matrix by the given other matrix.
+   */
+  public DoubleMatrix subtract(DoubleMatrix other);
+
+  /**
+   * Subtracts each element in a column by the related element in the given
+   * vector.
+   */
+  public DoubleMatrix subtract(DoubleVector vec);
+
+  /**
+   * Divides each element in a column by the related element in the given
+   * vector.
+   */
+  public DoubleMatrix divide(DoubleVector vec);
+
+  /**
+   * Divides this matrix by the given other matrix. (Per element division).
+   */
+  public DoubleMatrix divide(DoubleMatrix other);
+
+  /**
+   * Divides each element in this matrix by the given scalar.
+   */
+  public DoubleMatrix divide(double scalar);
+
+  /**
+   * Adds the elements in the given matrix to the elements in this matrix.
+   */
+  public DoubleMatrix add(DoubleMatrix other);
+
+  /**
+   * Pows each element by the given argument. <br/>
+   * = (matrix element^x)
+   */
+  public DoubleMatrix pow(int x);
+
+  /**
+   * Returns the maximum value of the given column.
+   */
+  public double max(int column);
+
+  /**
+   * Returns the minimum value of the given column.
+   */
+  public double min(int column);
+
+  /**
+   * Sums all elements.
+   */
+  public double sum();
+
+  /**
+   * Returns an array of column indices existing in this matrix.
+   */
+  public int[] columnIndices();
+
+  /**
+   * Returns true if the underlying implementation is sparse.
+   */
+  public boolean isSparse();
+
+  /**
+   * Slices the given matrix from 0-rows and from 0-columns.
+   */
+  public DoubleMatrix slice(int rows, int cols);
+
+  /**
+   * Slices the given matrix from rowOffset-rowMax and from colOffset-colMax.
+   */
+  public DoubleMatrix slice(int rowOffset, int rowMax, int colOffset, int colMax);
+
+}
diff --git a/ml/src/main/java/org/apache/hama/ml/math/DoubleVector.java b/ml/src/main/java/org/apache/hama/ml/math/DoubleVector.java
new file mode 100644
index 0000000..ebcd18e
--- /dev/null
+++ b/ml/src/main/java/org/apache/hama/ml/math/DoubleVector.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.math;
+
+import java.util.Iterator;
+
+/**
+ * Vector with doubles. Some of the operations are mutable, unlike the apply and
+ * math functions, they return a fresh instance every time.
+ * 
+ */
+public interface DoubleVector {
+
+  /**
+   * Retrieves the value at given index.
+   * 
+   * @param index the index.
+   * @return a double value at the index.
+   */
+  public double get(int index);
+
+  /**
+   * Get the length of a vector, for sparse instance it is the actual length.
+   * (not the dimension!) Always a constant time operation.
+   * 
+   * @return the length of the vector.
+   */
+  public int getLength();
+
+  /**
+   * Get the dimension of a vector, for dense instance it is the same like the
+   * length, for sparse instances it is usually not the same. Always a constant
+   * time operation.
+   * 
+   * @return the dimension of the vector.
+   */
+  public int getDimension();
+
+  /**
+   * Set a value at the given index.
+   * 
+   * @param index the index of the vector to set.
+   * @param value the value at the index of the vector to set.
+   */
+  public void set(int index, double value);
+
+  /**
+   * Apply a given {@link DoubleVectorFunction} to this vector and return a new
+   * one.
+   * 
+   * @param func the function to apply.
+   * @return a new vector with the applied function.
+   */
+  public DoubleVector apply(DoubleVectorFunction func);
+
+  /**
+   * Apply a given {@link DoubleDoubleVectorFunction} to this vector and the
+   * other given vector.
+   * 
+   * @param other the other vector.
+   * @param func the function to apply on this and the other vector.
+   * @return a new vector with the result of the function of the two vectors.
+   */
+  public DoubleVector apply(DoubleVector other, DoubleDoubleVectorFunction func);
+
+  /**
+   * Adds the given {@link DoubleVector} to this vector.
+   * 
+   * @param v the other vector.
+   * @return a new vector with the sum of both vectors at each element index.
+   */
+  public DoubleVector add(DoubleVector v);
+
+  /**
+   * Adds the given scalar to this vector.
+   * 
+   * @param scalar the scalar.
+   * @return a new vector with the result at each element index.
+   */
+  public DoubleVector add(double scalar);
+
+  /**
+   * Subtracts this vector by the given {@link DoubleVector}.
+   * 
+   * @param v the other vector.
+   * @return a new vector with the difference of both vectors.
+   */
+  public DoubleVector subtract(DoubleVector v);
+
+  /**
+   * Subtracts the given scalar to this vector. (vector - scalar).
+   * 
+   * @param scalar the scalar.
+   * @return a new vector with the result at each element index.
+   */
+  public DoubleVector subtract(double scalar);
+
+  /**
+   * Subtracts the given scalar from this vector. (scalar - vector).
+   * 
+   * @param scalar the scalar.
+   * @return a new vector with the result at each element index.
+   */
+  public DoubleVector subtractFrom(double scalar);
+
+  /**
+   * Multiplies the given scalar to this vector.
+   * 
+   * @param scalar the scalar.
+   * @return a new vector with the result of the operation.
+   */
+  public DoubleVector multiply(double scalar);
+
+  /**
+   * Multiplies the given {@link DoubleVector} with this vector.
+   * 
+   * @param vector the other vector.
+   * @return a new vector with the result of the operation.
+   */
+  public DoubleVector multiply(DoubleVector vector);
+
+  /**
+   * Divides this vector by the given scalar. (= vector/scalar).
+   * 
+   * @param scalar the given scalar.
+   * @return a new vector with the result of the operation.
+   */
+  public DoubleVector divide(double scalar);
+
+  /**
+   * Divides the given scalar by this vector. (= scalar/vector).
+   * 
+   * @param scalar the given scalar.
+   * @return a new vector with the result of the operation.
+   */
+  public DoubleVector divideFrom(double scalar);
+
+  /**
+   * Powers this vector by the given amount. (=vector^x).
+   * 
+   * @param x the given exponent.
+   * @return a new vector with the result of the operation.
+   */
+  public DoubleVector pow(int x);
+
+  /**
+   * Absolutes the vector at each element.
+   * 
+   * @return a new vector that does not contain negative values anymore.
+   */
+  public DoubleVector abs();
+
+  /**
+   * Square-roots each element.
+   * 
+   * @return a new vector.
+   */
+  public DoubleVector sqrt();
+
+  /**
+   * @return the sum of all elements in this vector.
+   */
+  public double sum();
+
+  /**
+   * Calculates the dot product between this vector and the given vector.
+   * 
+   * @param s the given vector s.
+   * @return the dot product as a double.
+   */
+  public double dot(DoubleVector s);
+
+  /**
+   * Slices this vector from index 0 to the given length.
+   * 
+   * @param length must be > 0 and smaller than the dimension of the vector.
+   * @return a new vector that is only length long.
+   */
+  public DoubleVector slice(int length);
+
+  /**
+   * Slices this vector from index offset with the given length. So you end at
+   * the upper bound of (offset+length).
+   * 
+   * @param offset must be > 0 and smaller than the dimension of the vector
+   * @param length must be > 0 and smaller than the dimension of the vector.
+   *          This must be greater than the offset.
+   * @return a new vector that is only (length) long.
+   */
+  public DoubleVector slice(int offset, int length);
+
+  /**
+   * @return the maximum element value in this vector.
+   */
+  public double max();
+
+  /**
+   * @return the minimum element value in this vector.
+   */
+  public double min();
+
+  /**
+   * @return an array representation of this vector.
+   */
+  public double[] toArray();
+
+  /**
+   * @return a fresh new copy of this vector, copies all elements to a new
+   *         vector. (Does not reuse references or stuff).
+   */
+  public DoubleVector deepCopy();
+
+  /**
+   * @return an iterator that only iterates over non zero elements.
+   */
+  public Iterator<DoubleVectorElement> iterateNonZero();
+
+  /**
+   * @return an iterator that iterates over all elements.
+   */
+  public Iterator<DoubleVectorElement> iterate();
+
+  /**
+   * @return true if this instance is a sparse vector. Smarter and faster than
+   *         instanceof.
+   */
+  public boolean isSparse();
+
+  /**
+   * @return true if this instance is a named vector.Smarter and faster than
+   *         instanceof.
+   */
+  public boolean isNamed();
+
+  /**
+   * @return If this vector is a named instance, this will return its name. Or
+   *         null if this is not a named instance.
+   * 
+   */
+  public String getName();
+
+  /**
+   * Class for iteration of elements, consists of an index and a value at this
+   * index. Can be reused for performance purposes.
+   */
+  public static final class DoubleVectorElement {
+
+    private int index;
+    private double value;
+
+    public DoubleVectorElement() {
+      super();
+    }
+
+    public DoubleVectorElement(int index, double value) {
+      super();
+      this.index = index;
+      this.value = value;
+    }
+
+    public final int getIndex() {
+      return index;
+    }
+
+    public final double getValue() {
+      return value;
+    }
+
+    public final void setIndex(int in) {
+      this.index = in;
+    }
+
+    public final void setValue(double in) {
+      this.value = in;
+    }
+  }
+
+}
diff --git a/ml/src/main/java/org/apache/hama/ml/math/DoubleVectorFunction.java b/ml/src/main/java/org/apache/hama/ml/math/DoubleVectorFunction.java
new file mode 100644
index 0000000..cd180d5
--- /dev/null
+++ b/ml/src/main/java/org/apache/hama/ml/math/DoubleVectorFunction.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.math;
+
+/**
+ * A function that can be applied to a double vector via {@link DoubleVector}
+ * #apply({@link DoubleVectorFunction} f);
+ */
+public interface DoubleVectorFunction {
+
+  /**
+   * Calculates the result with a given index and value of a vector.
+   */
+  public double calculate(int index, double value);
+
+}
diff --git a/ml/src/main/java/org/apache/hama/ml/math/Tuple.java b/ml/src/main/java/org/apache/hama/ml/math/Tuple.java
new file mode 100644
index 0000000..be67db6
--- /dev/null
+++ b/ml/src/main/java/org/apache/hama/ml/math/Tuple.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.math;
+
+/**
+ * Tuple class to hold two generic attributes. This class implements hashcode,
+ * equals and comparable via the first element.
+ */
+public final class Tuple<FIRST, SECOND> implements
+    Comparable<Tuple<FIRST, SECOND>> {
+
+  private final FIRST first;
+  private final SECOND second;
+
+  public Tuple(FIRST first, SECOND second) {
+    super();
+    this.first = first;
+    this.second = second;
+  }
+
+  public final FIRST getFirst() {
+    return first;
+  }
+
+  public final SECOND getSecond() {
+    return second;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((first == null) ? 0 : first.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    @SuppressWarnings("rawtypes")
+    Tuple other = (Tuple) obj;
+    if (first == null) {
+      if (other.first != null)
+        return false;
+    } else if (!first.equals(other.first))
+      return false;
+    return true;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public int compareTo(Tuple<FIRST, SECOND> o) {
+    if (o.getFirst() instanceof Comparable && getFirst() instanceof Comparable) {
+      return ((Comparable<FIRST>) getFirst()).compareTo(o.getFirst());
+    } else {
+      return 0;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "Tuple [first=" + first + ", second=" + second + "]";
+  }
+
+}
diff --git a/ml/src/main/java/org/apache/hama/ml/writable/MatrixWritable.java b/ml/src/main/java/org/apache/hama/ml/writable/MatrixWritable.java
new file mode 100644
index 0000000..ec946df
--- /dev/null
+++ b/ml/src/main/java/org/apache/hama/ml/writable/MatrixWritable.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.ml.math.DenseDoubleMatrix;
+import org.apache.hama.ml.math.DoubleMatrix;
+
+/**
+ * Majorly designed for dense matrices, can be extended for sparse ones as well.
+ */
+public final class MatrixWritable implements Writable {
+
+  private DoubleMatrix mat;
+
+  public MatrixWritable() {
+  }
+
+  public MatrixWritable(DoubleMatrix mat) {
+    this.mat = mat;
+
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    mat = read(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    write(mat, out);
+  }
+
+  public static void write(DoubleMatrix mat, DataOutput out) throws IOException {
+    out.writeInt(mat.getRowCount());
+    out.writeInt(mat.getColumnCount());
+    for (int row = 0; row < mat.getRowCount(); row++) {
+      for (int col = 0; col < mat.getColumnCount(); col++) {
+        out.writeDouble(mat.get(row, col));
+      }
+    }
+  }
+
+  public static DoubleMatrix read(DataInput in) throws IOException {
+    DoubleMatrix mat = new DenseDoubleMatrix(in.readInt(), in.readInt());
+    for (int row = 0; row < mat.getRowCount(); row++) {
+      for (int col = 0; col < mat.getColumnCount(); col++) {
+        mat.set(row, col, in.readDouble());
+      }
+    }
+    return mat;
+  }
+
+}
diff --git a/ml/src/main/java/org/apache/hama/ml/writable/VectorWritable.java b/ml/src/main/java/org/apache/hama/ml/writable/VectorWritable.java
new file mode 100644
index 0000000..cd2b340
--- /dev/null
+++ b/ml/src/main/java/org/apache/hama/ml/writable/VectorWritable.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.ml.math.DoubleVector;
+import org.apache.hama.ml.math.DenseDoubleVector;
+
+/**
+ * Writable for dense vectors.
+ */
+public final class VectorWritable implements WritableComparable<VectorWritable> {
+
+  private DoubleVector vector;
+
+  public VectorWritable() {
+    super();
+  }
+
+  public VectorWritable(VectorWritable v) {
+    this.vector = v.getVector();
+  }
+
+  public VectorWritable(DoubleVector v) {
+    this.vector = v;
+  }
+
+  @Override
+  public final void write(DataOutput out) throws IOException {
+    writeVector(this.vector, out);
+  }
+
+  @Override
+  public final void readFields(DataInput in) throws IOException {
+    this.vector = readVector(in);
+  }
+
+  @Override
+  public final int compareTo(VectorWritable o) {
+    return compareVector(this, o);
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((vector == null) ? 0 : vector.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    VectorWritable other = (VectorWritable) obj;
+    if (vector == null) {
+      if (other.vector != null)
+        return false;
+    } else if (!vector.equals(other.vector))
+      return false;
+    return true;
+  }
+
+  /**
+   * @return the embedded vector
+   */
+  public DoubleVector getVector() {
+    return vector;
+  }
+
+  @Override
+  public String toString() {
+    return vector.toString();
+  }
+
+  public static void writeVector(DoubleVector vector, DataOutput out)
+      throws IOException {
+    out.writeBoolean(vector.isSparse());
+    out.writeInt(vector.getLength());
+    if (vector.isSparse()) {
+      out.writeInt(vector.getDimension());
+      Iterator<DoubleVector.DoubleVectorElement> iterateNonZero = vector
+          .iterateNonZero();
+      while (iterateNonZero.hasNext()) {
+        DoubleVector.DoubleVectorElement next = iterateNonZero.next();
+        out.writeInt(next.getIndex());
+        out.writeDouble(next.getValue());
+      }
+    } else {
+      for (int i = 0; i < vector.getDimension(); i++) {
+        out.writeDouble(vector.get(i));
+      }
+    }
+    if (vector.isNamed() && vector.getName() != null) {
+      out.writeBoolean(true);
+      out.writeUTF(vector.getName());
+    } else {
+      out.writeBoolean(false);
+    }
+  }
+
+  public static DoubleVector readVector(DataInput in) throws IOException {
+    int length = in.readInt();
+    DoubleVector vector;
+    vector = new DenseDoubleVector(length);
+    for (int i = 0; i < length; i++) {
+      vector.set(i, in.readDouble());
+    }
+    return vector;
+  }
+
+  public static int compareVector(VectorWritable a, VectorWritable o) {
+    return compareVector(a.getVector(), o.getVector());
+  }
+
+  public static int compareVector(DoubleVector a, DoubleVector o) {
+    DoubleVector subtract = a.subtract(o);
+    return (int) subtract.sum();
+  }
+
+  public static VectorWritable wrap(DoubleVector a) {
+    return new VectorWritable(a);
+  }
+}
diff --git a/pom.xml b/pom.xml
index 5b2b3a3..5d302f2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -15,8 +15,7 @@
    limitations under the License.
 -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
   <parent>
     <groupId>org.apache</groupId>
@@ -28,7 +27,7 @@
   <groupId>org.apache.hama</groupId>
   <artifactId>hama-parent</artifactId>
   <name>Apache Hama parent POM</name>
-  <version>0.5.0-SNAPSHOT</version>
+  <version>0.6.0-SNAPSHOT</version>
   <url>http://hama.apache.org</url>
   <packaging>pom</packaging>
   <prerequisites>
@@ -277,7 +276,7 @@
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-site-plugin</artifactId>
-        <version>2.1.1</version>
+        <version>3.1</version>
       </plugin>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
diff --git a/src/site/resources/css/site.css b/src/site/resources/css/site.css
index 48f0c97..8327f99 100644
--- a/src/site/resources/css/site.css
+++ b/src/site/resources/css/site.css
@@ -36,7 +36,7 @@
 
 h5 {
   font-size: 15px;
-  margin-top: -0.8em;
+  margin-top: -0.1em;
 }
 
 table {
@@ -56,11 +56,17 @@
  background: #edf7f4;
  background-color: #edf7f4;
 }
+
+pre {
+ width: 95%;
+ margin: 20px 20px 20px 20px;
+}
+
 .green a:link, .green a:active, .green a:visited { color: #0a4d39; }
 .green a:hover { color: #888800; }
 .green h3 {
   margin-bottom:-5px;
-  margin-top:-2px;
+  margin-top:5px;
   padding: 0px 0px 0px 0px;
   border: 0px;
   color: #74240f;
@@ -90,3 +96,15 @@
   padding-left: 1em;
   list-style-position: inside;
 }
+
+#leftColumn li.none{
+  text-indent:-1em;
+  margin-left:-1em;
+}
+
+#leftColumn h3 {
+  font-size: 16px;
+  margin-left:-0.7em;
+}
+
+body.topBarDisabled{padding-top:0px;}
\ No newline at end of file
diff --git a/src/site/site.xml b/src/site/site.xml
index be5b7d2..4c7db42 100644
--- a/src/site/site.xml
+++ b/src/site/site.xml
@@ -27,16 +27,16 @@
 
   <skin>
     <groupId>org.apache.maven.skins</groupId>
-    <artifactId>maven-default-skin</artifactId>
-    <version>1.0</version>
+    <artifactId>maven-fluido-skin</artifactId>
+    <version>1.2.2</version>
   </skin>
 
   <body>
     <links position="left">
-      <item name="Wiki" href="https://wiki.apache.org/hama" />
+      <item name="Wiki" href="http://wiki.apache.org/hama" />
       <item name="Blog" href="http://blogs.apache.org/hama/" />
-      <item name="JIRA" href="https://issues.apache.org/jira/browse/HAMA" />
-      <item name="SVN" href="https://svn.apache.org/repos/asf/hama/" />
+      <item name="JIRA" href="http://issues.apache.org/jira/browse/HAMA" />
+      <item name="SVN" href="http://svn.apache.org/repos/asf/hama/" />
       <item name="Twitter" href="http://twitter.com/apachehama" />
     </links>
 
@@ -62,8 +62,8 @@
     <menu name="Documentation">
       <item name="Getting Started" href="getting_started_with_hama.html" />
       <item name="Hama on Clouds" href="hama_on_clouds.html" />
-      <item name="Hama BSP tutorial" href="hama_bsp_tutorial.html" />
-      <item name="Hama Graph tutorial" href="hama_graph_tutorial.html" />
+      <item name="BSP tutorial" href="hama_bsp_tutorial.html" />
+      <item name="Graph tutorial" href="hama_graph_tutorial.html" />
       <item name="Benchmarks" href="http://wiki.apache.org/hama/Benchmarks" />
       <item name="Presentations" href="http://wiki.apache.org/hama/Presentations" />
       <item name="Research Papers" href="http://wiki.apache.org/hama/Articles" />
@@ -72,13 +72,26 @@
 
     <menu name="Related Projects">
       <item name="Hadoop" href="http://hadoop.apache.org/" />
-      <item name="Whirr" href="https://whirr.apache.org/" />
-      <item name="Bigtop" href="https://incubator.apache.org/bigtop/" />
+      <item name="Whirr" href="http://whirr.apache.org/" />
+      <item name="Bigtop" href="http://incubator.apache.org/bigtop/" />
       <item name="Mahout" href="http://mahout.apache.org/" />
     </menu>
     
     <menu name="Art works">
       <item name="Artwork and Clothing" href="artwork.html" />
     </menu>
+
+    <menu name="ASF">
+      <item name="How Apache Works" href="http://www.apache.org/foundation/how-it-works.html"/>
+      <item name="Foundation" href="http://www.apache.org/foundation/"/>
+      <item name="Sponsoring Apache" href="http://www.apache.org/foundation/sponsorship.html"/>
+      <item name="Thanks" href="http://www.apache.org/foundation/thanks.html"/>
+    </menu>
+    <footer>
+      <div class="row span16">Apache Hama, Apache, the Apache feather logo, and
+          the Apache Hama project logos are trademarks of The Apache Software
+          Foundation. All other marks mentioned may be trademarks or registered
+          trademarks of their respective owners.</div>
+    </footer>
   </body>
 </project>
diff --git a/src/site/xdoc/downloads.xml b/src/site/xdoc/downloads.xml
index e44006a..ec18e2b 100644
--- a/src/site/xdoc/downloads.xml
+++ b/src/site/xdoc/downloads.xml
@@ -42,10 +42,10 @@
     which is located within each download directory.
     <br/><br/>
     Always use the signature files to verify the authenticity of the distribution, e.g.,
-    <pre class="green">  % pgpk -a KEYS
+    <pre>  % pgpk -a KEYS
   % pgpv hama-x.x.x.tar.gz.asc</pre>
     or,
-    <pre class="green">  % gpg --import KEYS
+    <pre>  % gpg --import KEYS
   % gpg --verify hama-x.x.x.tar.gz.asc</pre>
     We offer MD5 hashes as an alternative to validate the integrity of the downloaded files.
     A unix program called md5 or md5sum is included in many unix distributions.
diff --git a/src/site/xdoc/getting_started_with_hama.xml b/src/site/xdoc/getting_started_with_hama.xml
index 8c1b826..2c8f58d 100644
--- a/src/site/xdoc/getting_started_with_hama.xml
+++ b/src/site/xdoc/getting_started_with_hama.xml
@@ -62,7 +62,7 @@
     <ul><li>BSPMaster and Zookeeper settings - Figure out where to run your HDFS namenode and BSPMaster. Set the variable bsp.master.address to the BSPMaster's intended host:port. Set the variable fs.default.name to the HDFS Namenode's intended host:port.</li></ul>
     <p>Here's an example of a hama-site.xml file:</p>
 
-<pre class="green">
+<pre>
   &lt;?xml version="1.0"?&gt;
   &lt;?xml-stylesheet type="text/xsl" href="configuration.xsl"?&gt;
   &lt;configuration&gt;
@@ -99,7 +99,7 @@
 
 <p>If you are managing your own ZooKeeper, you have to specify the port number as below:</p>
 
-<pre class="green">
+<pre>
   &lt;property&gt;
     &lt;name&gt;hama.zookeeper.property.clientPort&lt;/name&gt;
     &lt;value&gt;2181&lt;/value&gt;
@@ -111,16 +111,16 @@
     <p>NOTE: Skip this step if you're in Local Mode.
     <br/>Run the command:</p>
     
-  <pre class="green">
+  <pre>
   % $HAMA_HOME/bin/start-bspd.sh</pre>
     <p>This will startup a BSPMaster, GroomServers and Zookeeper on your machine.</p>
     <p>Run the command:</p>
-  <pre class="green">
+  <pre>
   % $HAMA_HOME/bin/stop-bspd.sh</pre>
 
     <p>to stop all the daemons running on your cluster.</p>
     <subsection name="Execute Hama Examples"></subsection>
-  <pre class="green">
+  <pre>
   % $HAMA_HOME/bin/hama jar hama-examples-x.x.x.jar [args]</pre>
     </body>
 </document>
diff --git a/src/site/xdoc/hama_bsp_tutorial.xml b/src/site/xdoc/hama_bsp_tutorial.xml
index 5613c9e..398034d 100644
--- a/src/site/xdoc/hama_bsp_tutorial.xml
+++ b/src/site/xdoc/hama_bsp_tutorial.xml
@@ -40,7 +40,7 @@
     <br/>
     The extending class must override the bsp() method, which is declared like this:
     </p>
-    <pre class="green">
+    <pre>
   public abstract void bsp(BSPPeer&lt;K1, V1, K2, V2, M extends Writable&gt; peer) throws IOException, 
     SyncException, InterruptedException;</pre>
     
@@ -56,7 +56,7 @@
     <br/><br/>
     After your own BSP is created, you will need to configure a <b>BSPJob</b> and submit it to Hama cluster to execute a job.
     The BSP job configuration and submission interfaces is almost the same as the MapReduce job configuration:</p>
-    <pre class="green">
+    <pre>
   HamaConfiguration conf = new HamaConfiguration();
   BSPJob job = new BSPJob(conf, MyBSP.class);
   job.setJobName("My BSP program");
@@ -68,9 +68,9 @@
     <p>See the below section for more detailed description of BSP user interfaces.</p>
     
     <subsection name="User Interfaces"></subsection>
-    <h5>Inputs and Outputs</h5>
+    <h4>Inputs and Outputs</h4>
     <p>When setting up a BSPJob, you can provide a Input/OutputFormat and Paths like this:</p>
-    <pre class="green">
+    <pre>
   job.setInputPath(new Path("/tmp/sequence.dat");
   job.setInputFormat(org.apache.hama.bsp.SequenceFileInputFormat.class);
   or,
@@ -87,7 +87,7 @@
     <p>Then, you can read the input and write the output from the methods in BSP class which has "BSPPeer" which contains an communication, counters, and IO interfaces as parameter.
     In this case we read a normal text file:</p>
 
-    <pre class="green">
+    <pre>
  @Override
   public final void bsp(
       BSPPeer&lt;LongWritable, Text, Text, LongWritable, Text&gt; peer)
@@ -108,7 +108,7 @@
     There is also a function which allows you to re-read the input from the beginning.
     This snippet reads the input five times:
     </p>
-    <pre class="green">
+    <pre>
   for(int i = 0; i &lt; 5; i++){
     LongWritable key = new LongWritable();
     Text value = new Text();
@@ -119,7 +119,7 @@
     peer.reopenInput()
   }</pre>
   
-    <h5>Communication</h5>
+    <h4>Communication</h4>
     <p>Hama BSP provides simple but powerful communication APIs for many purposes. 
     We tried to follow the standard library of BSP world as much as possible. 
     The following table describes all the methods you can use:</p>
@@ -137,7 +137,7 @@
 
     <p>The send() and all the other functions are very flexible. Here is an example that sends a message to all peers:</p>
     
-    <pre class="green">
+    <pre>
   @Override
   public void bsp(
       BSPPeer&lt;NullWritable, NullWritable, Text, DoubleWritable, Text&gt; peer)
@@ -150,7 +150,7 @@
     peer.sync();
   }</pre>
     
-    <h5>Synchronization</h5>
+    <h4>Synchronization</h4>
 
     <p>When all the processes have entered the barrier via the sync() method, 
     the Hama proceeds to the next superstep. 
@@ -162,7 +162,7 @@
     For example, the sync() method also can be called in a for loop 
     so that you can use to program the iterative methods sequentially:</p>
     
-    <pre class="green">
+    <pre>
   @Override
   public void bsp(
       BSPPeer&lt;NullWritable, NullWritable, Text, DoubleWritable, Text&gt; peer)
@@ -191,7 +191,7 @@
 
     <subsection name="Example: Pi Calculation"></subsection>
     <p>Here is an BSP-based Pi Calculation example and submit it to Hama cluster:</p>
-    <pre class="green">
+    <pre>
   private static Path TMP_OUTPUT = new Path("/tmp/pi-" + System.currentTimeMillis());
 
   public static class MyEstimator extends
diff --git a/src/site/xdoc/hama_graph_tutorial.xml b/src/site/xdoc/hama_graph_tutorial.xml
index b061b38..b065784 100644
--- a/src/site/xdoc/hama_graph_tutorial.xml
+++ b/src/site/xdoc/hama_graph_tutorial.xml
@@ -30,7 +30,7 @@
     <subsection name="Vertex API"></subsection>    
 
     <p>Writing a Hama graph application involves subclassing the predefined Vertex class. Its template arguments define three value types, associated with vertices, edges, and messages.</p>
-    <pre class="green">
+    <pre>
   public abstract class Vertex&lt;V extends Writable, E extends Writable, M extends Writable&gt;
       implements VertexInterface&lt;V, E, M&gt; {
 
@@ -48,7 +48,7 @@
 From Superstep 1 to 30, each vertex sums up the values arriving on all its messages and sets its tentative page rank to (1 - 0.85) / numOfVertices + (0.85 * sum).
    </p>
 
-    <pre class="green">
+    <pre>
   public static class PageRankVertex extends
       Vertex&lt;Text, NullWritable, DoubleWritable&gt; {
 
diff --git a/src/site/xdoc/hama_on_clouds.xml b/src/site/xdoc/hama_on_clouds.xml
index 44651ac..59fe943 100644
--- a/src/site/xdoc/hama_on_clouds.xml
+++ b/src/site/xdoc/hama_on_clouds.xml
@@ -27,7 +27,7 @@
     
     <subsection name="Deploy a Hama cluster on EC2"></subsection>
     <p>The following commands install Whirr and start a 5 node Hama cluster on Amazon EC2 in 5 minutes or less.
-    <pre class="green">
+    <pre>
   % curl -O http://www.apache.org/dist/whirr/whirr-0.x.0/whirr-0.x.0.tar.gz
   % tar zxf whirr-0.x.0.tar.gz; cd whirr-0.x.0
 
@@ -38,7 +38,7 @@
   % bin/whirr launch-cluster --config recipes/hama-ec2.properties --private-key-file ~/.ssh/id_rsa_whirr</pre>
     </p>
     <subsection name="Execute Hama Examples"></subsection>
-    <pre class="green">
+    <pre>
   % cd /usr/local/hama-0.x.0
   % bin/hama jar hama-examples-x.x.x.jar [args]</pre>
     </body>
diff --git a/src/site/xdoc/index.xml b/src/site/xdoc/index.xml
index 4aa4176..a57b0ad 100644
--- a/src/site/xdoc/index.xml
+++ b/src/site/xdoc/index.xml
@@ -26,8 +26,7 @@
       <p>
       <strong>Apache Hama</strong> is a pure BSP (Bulk Synchronous Parallel) computing framework on top of HDFS (Hadoop Distributed File System) for massive scientific computations 
       such as matrix, graph and network algorithms.</p>
-      <table class="green" width="100%" cellspacing="0" cellpadding="5" border="1"> 
-      <tr><td> 
+      <div class="green">
           <h3 align="center">Recent News</h3>
           <ul>
             <li>June 31, 2012: release 0.5.0 available [<a href="downloads.html">downloads</a>]</li>
@@ -37,7 +36,7 @@
             <li>June 2, 2011: release 0.2.0 available</li>
             <li>Apr 30, 2010: Introduced in the <a href="http://www.bsp-worldwide.org/bspww3000.html">BSP Worldwide</a></li>
           </ul>
-      </td></tr></table>
+      </div>
     <subsection name="Why Hama and BSP?"></subsection>
     <p>Today, many practical data processing applications require a more flexible programming abstraction model that is compatible to run on highly scalable and massive data systems (e.g., HDFS, HBase, etc). A message passing paradigm beyond Map-Reduce framework would increase its flexibility in its communication capability. Bulk Synchronous Parallel (BSP) model fills the bill appropriately. Some of its significant advantages over MapReduce and MPI are:</p>
     <ul>
diff --git a/yarn/pom.xml b/yarn/pom.xml
index 5f5483b..0583784 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -14,20 +14,19 @@
    See the License for the specific language governing permissions and
    limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
   <parent>
     <groupId>org.apache.hama</groupId>
     <artifactId>hama-parent</artifactId>
-    <version>0.5.0-SNAPSHOT</version>
+    <version>0.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hama</groupId>
   <artifactId>hama-yarn</artifactId>
   <name>yarn</name>
-  <version>0.5.0-SNAPSHOT</version>
+  <version>0.6.0-SNAPSHOT</version>
   <packaging>jar</packaging>
 
   <dependencies>