Merge pull request #44 from chandnisingh/SPOI-5391

SPOI-5391: adding default aggregator for primitive customMetrics
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java
new file mode 100644
index 0000000..273889d
--- /dev/null
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java
@@ -0,0 +1,35 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.bufferserver.auth;
+
+import java.security.SecureRandom;
+
+/**
+ * <p>Auth Manager class.</p>
+ */
+public class AuthManager
+{
+  private final static int BUFFER_SERVER_TOKEN_LENGTH = 20;
+
+  private static SecureRandom generator = new SecureRandom();
+
+  public static byte[] generateToken()
+  {
+    byte[] token = new byte[BUFFER_SERVER_TOKEN_LENGTH];
+    generator.nextBytes(token);
+    return token;
+  }
+}
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java
new file mode 100644
index 0000000..99ff3a2
--- /dev/null
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java
@@ -0,0 +1,72 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.bufferserver.client;
+
+import java.security.AccessControlException;
+
+import com.datatorrent.netlet.AbstractLengthPrependerClient;
+
+/**
+ * <p>Auth Client class.</p>
+ */
+public abstract class AuthClient extends AbstractLengthPrependerClient
+{
+  private byte[] token;
+
+  public AuthClient()
+  {
+  }
+
+  public AuthClient(int readBufferSize, int sendBufferSize)
+  {
+    super(readBufferSize, sendBufferSize);
+  }
+
+  public AuthClient(byte[] readbuffer, int position, int sendBufferSize)
+  {
+    super(readbuffer, position, sendBufferSize);
+  }
+
+  protected void sendAuthenticate() {
+    if (token != null) {
+      write(token);
+    }
+  }
+
+  protected void authenticateMessage(byte[] buffer, int offset, int size)
+  {
+    if (token != null) {
+      boolean authenticated = false;
+      if (size == token.length) {
+        int match = 0;
+        while ((match < token.length) && (buffer[offset + match] == token[match])) {
+          ++match;
+        }
+        if (match == token.length) {
+          authenticated = true;
+        }
+      }
+      if (!authenticated) {
+        throw new AccessControlException("Buffer server security is enabled. Access is restricted without proper credentials.");
+      }
+    }
+  }
+
+  public void setToken(byte[] token)
+  {
+    this.token = token;
+  }
+}
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java
index 2b8838c..51efa67 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java
@@ -31,7 +31,7 @@
 *
  * @since 0.3.2
  */
-public abstract class Controller extends AbstractLengthPrependerClient
+public abstract class Controller extends AuthClient
 {
   String id;
 
@@ -43,12 +43,14 @@
 
   public void purge(String version, String sourceId, long windowId)
   {
+    sendAuthenticate();
     write(PurgeRequestTuple.getSerializedRequest(version, sourceId, windowId));
     logger.debug("Sent purge request sourceId = {}, windowId = {}", sourceId, Codec.getStringWindowId(windowId));
   }
 
   public void reset(String version, String sourceId, long windowId)
   {
+    sendAuthenticate();
     write(ResetRequestTuple.getSerializedRequest(version, sourceId, windowId));
     logger.debug("Sent reset request sourceId = {}, windowId = {}", sourceId, Codec.getStringWindowId(windowId));
   }
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Publisher.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Publisher.java
index 2a378df..571182b 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Publisher.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Publisher.java
@@ -20,14 +20,13 @@
 import org.slf4j.LoggerFactory;
 
 import com.datatorrent.bufferserver.packet.PublishRequestTuple;
-import com.datatorrent.netlet.AbstractLengthPrependerClient;
 
 /**
  * <p>Abstract Publisher class.</p>
  *
  * @since 0.3.2
  */
-public abstract class Publisher extends AbstractLengthPrependerClient
+public abstract class Publisher extends AuthClient
 {
   private final String id;
 
@@ -48,6 +47,7 @@
    */
   public void activate(String version, long windowId)
   {
+    sendAuthenticate();
     write(PublishRequestTuple.getSerializedRequest(version, id, windowId));
   }
 
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java
index 751afe8..589e685 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java
@@ -21,7 +21,6 @@
 import org.slf4j.LoggerFactory;
 
 import com.datatorrent.bufferserver.packet.SubscribeRequestTuple;
-import com.datatorrent.netlet.AbstractLengthPrependerClient;
 
 /**
  *
@@ -34,7 +33,7 @@
  *
  * @since 0.3.2
  */
-public abstract class Subscriber extends AbstractLengthPrependerClient
+public abstract class Subscriber extends AuthClient
 {
   private final String id;
 
@@ -46,6 +45,7 @@
 
   public void activate(String version, String type, String sourceId, int mask, Collection<Integer> partitions, long windowId, int bufferSize)
   {
+    sendAuthenticate();
     write(SubscribeRequestTuple.getSerializedRequest(
             version,
             id,
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index 5ddf162..a8adf08 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -37,11 +37,11 @@
 import com.datatorrent.bufferserver.packet.*;
 import com.datatorrent.bufferserver.storage.Storage;
 import com.datatorrent.common.util.NameableThreadFactory;
-import com.datatorrent.netlet.util.VarInt;
 import com.datatorrent.netlet.AbstractLengthPrependerClient;
 import com.datatorrent.netlet.DefaultEventLoop;
 import com.datatorrent.netlet.EventLoop;
 import com.datatorrent.netlet.Listener.ServerListener;
+import com.datatorrent.netlet.util.VarInt;
 
 /**
  * The buffer server application<p>
@@ -61,6 +61,8 @@
   private final ExecutorService serverHelperExecutor;
   private final ExecutorService storageHelperExecutor;
 
+  private byte[] authToken;
+
   /**
    * @param port - port number to bind to or 0 to auto select a free port
    */
@@ -122,6 +124,11 @@
     return address;
   }
 
+  public void setAuthToken(byte[] authToken)
+  {
+    this.authToken = authToken;
+  }
+
   /**
    *
    * @param args
@@ -322,7 +329,15 @@
   @Override
   public ClientListener getClientConnection(SocketChannel sc, ServerSocketChannel ssc)
   {
-    return new UnidentifiedClient(sc);
+    ClientListener client;
+    if (authToken == null) {
+      client = new UnidentifiedClient();
+    } else {
+      AuthClient authClient = new AuthClient();
+      authClient.setToken(authToken);
+      client = authClient;
+    }
+    return client;
   }
 
   @Override
@@ -335,15 +350,37 @@
     throw new RuntimeException(cce);
   }
 
-  class UnidentifiedClient extends AbstractLengthPrependerClient
+  class AuthClient extends com.datatorrent.bufferserver.client.AuthClient
   {
-    SocketChannel channel;
     boolean ignore;
 
-    UnidentifiedClient(SocketChannel channel)
+    @Override
+    public void onMessage(byte[] buffer, int offset, int size)
     {
-      this.channel = channel;
+      if (ignore) {
+        return;
+      }
+
+      authenticateMessage(buffer, offset, size);
+
+      unregistered(key);
+      UnidentifiedClient client = new UnidentifiedClient();
+      key.attach(client);
+      key.interestOps(SelectionKey.OP_READ);
+      client.registered(key);
+
+      int len = writeOffset - readOffset - size;
+      if (len > 0) {
+        client.transferBuffer(buffer, readOffset + size, len);
+      }
+
+      ignore = true;
     }
+  }
+
+  class UnidentifiedClient extends SeedDataClient
+  {
+    boolean ignore;
 
     @Override
     public void onMessage(byte[] buffer, int offset, int size)
@@ -559,7 +596,7 @@
    * this is the end on the server side which handles all the communication.
    *
    */
-  class Publisher extends AbstractLengthPrependerClient
+  class Publisher extends SeedDataClient
   {
     private final DataList datalist;
     boolean dirty;
@@ -570,26 +607,6 @@
       this.datalist = dl;
     }
 
-    public void transferBuffer(byte[] array, int offset, int len)
-    {
-      int remainingCapacity;
-      do {
-        remainingCapacity = buffer.length - writeOffset;
-        if (len < remainingCapacity) {
-          remainingCapacity = len;
-          byteBuffer.position(writeOffset + remainingCapacity);
-        }
-        else {
-          byteBuffer.position(buffer.length);
-        }
-        System.arraycopy(array, offset, buffer, writeOffset, remainingCapacity);
-        read(remainingCapacity);
-
-        offset += remainingCapacity;
-      }
-      while ((len -= remainingCapacity) > 0);
-    }
-
     @Override
     public void onMessage(byte[] buffer, int offset, int size)
     {
@@ -748,5 +765,43 @@
 
   }
 
+  abstract class SeedDataClient extends AbstractLengthPrependerClient
+  {
+
+    public SeedDataClient()
+    {
+    }
+
+    public SeedDataClient(int readBufferSize, int sendBufferSize)
+    {
+      super(readBufferSize, sendBufferSize);
+    }
+
+    public SeedDataClient(byte[] readbuffer, int position, int sendBufferSize)
+    {
+      super(readbuffer, position, sendBufferSize);
+    }
+
+    public void transferBuffer(byte[] array, int offset, int len)
+    {
+      int remainingCapacity;
+      do {
+        remainingCapacity = buffer.length - writeOffset;
+        if (len < remainingCapacity) {
+          remainingCapacity = len;
+          byteBuffer.position(writeOffset + remainingCapacity);
+        }
+        else {
+          byteBuffer.position(buffer.length);
+        }
+        System.arraycopy(array, offset, buffer, writeOffset, remainingCapacity);
+        read(remainingCapacity);
+
+        offset += remainingCapacity;
+      }
+      while ((len -= remainingCapacity) > 0);
+    }
+  }
+
   private static final Logger logger = LoggerFactory.getLogger(Server.class);
 }
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java
index 8ccdb7e..600f18c 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java
@@ -15,6 +15,17 @@
  */
 package com.datatorrent.bufferserver.server;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.SecureRandom;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
 import com.datatorrent.bufferserver.packet.BeginWindowTuple;
 import com.datatorrent.bufferserver.packet.EndWindowTuple;
 import com.datatorrent.bufferserver.packet.PayloadTuple;
@@ -24,16 +35,8 @@
 import com.datatorrent.bufferserver.support.Subscriber;
 import com.datatorrent.netlet.DefaultEventLoop;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
 
 /**
  *
@@ -49,6 +52,8 @@
   static DefaultEventLoop eventloopServer;
   static DefaultEventLoop eventloopClient;
 
+  static byte[] authToken;
+
   @BeforeClass
   public static void setupServerAndClients() throws Exception
   {
@@ -65,6 +70,10 @@
     instance = new Server(0, 4096,8);
     address = instance.run(eventloopServer);
     assert (address instanceof InetSocketAddress);
+
+    SecureRandom random = new SecureRandom();
+    authToken = new byte[20];
+    random.nextBytes(authToken);
   }
 
   @AfterClass
@@ -444,5 +453,76 @@
     eventloopClient.disconnect(bss);
   }
 
+  @Test(dependsOnMethods = {"testEarlySubscriberForLaterWindow"})
+  public void testAuth() throws InterruptedException
+  {
+    instance.setAuthToken(authToken);
+
+    bsp = new Publisher("MyPublisher");
+    bsp.setToken(authToken);
+    eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp);
+
+    bss = new Subscriber("MySubscriber");
+    bss.setToken(authToken);
+    eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss);
+
+    bsp.activate(null, 0L);
+    bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0);
+
+    long resetInfo = 0x7afebabe000000faL;
+
+    bsp.publishMessage(ResetWindowTuple.getSerializedTuple((int) (resetInfo >> 32), 500));
+
+    for (int i = 0; i < spinCount; i++) {
+      Thread.sleep(10);
+      if (!bss.resetPayloads.isEmpty()) {
+        break;
+      }
+    }
+    Thread.sleep(10);
+
+    eventloopClient.disconnect(bss);
+    eventloopClient.disconnect(bsp);
+
+    assertEquals(bss.tupleCount.get(), 1);
+    Assert.assertFalse(bss.resetPayloads.isEmpty());
+  }
+
+  @Test(dependsOnMethods = {"testAuth"})
+  public void testAuthFailure() throws InterruptedException
+  {
+    byte[] authToken = ServerTest.authToken.clone();
+    authToken[0] = (byte)(authToken[0] + 1);
+
+    bsp = new Publisher("MyPublisher");
+    bsp.setToken(authToken);
+    eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp);
+
+    bss = new Subscriber("MySubscriber");
+    bss.setToken(authToken);
+    eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss);
+
+    bsp.activate(null, 0L);
+    bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0);
+
+    long resetInfo = 0x7afebabe000000faL;
+
+    bsp.publishMessage(ResetWindowTuple.getSerializedTuple((int) (resetInfo >> 32), 500));
+
+    for (int i = 0; i < spinCount; i++) {
+      Thread.sleep(10);
+      if (!bss.resetPayloads.isEmpty()) {
+        break;
+      }
+    }
+    Thread.sleep(10);
+
+    eventloopClient.disconnect(bss);
+    eventloopClient.disconnect(bsp);
+
+    assertEquals(bss.tupleCount.get(), 0);
+    Assert.assertTrue(bss.resetPayloads.isEmpty());
+  }
+
   private static final Logger logger = LoggerFactory.getLogger(ServerTest.class);
 }
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java
index 848abc1..dfabb68 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java
@@ -15,14 +15,15 @@
  */
 package com.datatorrent.bufferserver.support;
 
-import com.datatorrent.bufferserver.packet.Tuple;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.bufferserver.packet.Tuple;
+
 /**
  *
  */
diff --git a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
index b09c23e..c2a57fc 100644
--- a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
+++ b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
@@ -49,8 +49,8 @@
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.stram.client.StramClientUtils;
 
+import com.datatorrent.stram.client.StramClientUtils;
 import com.datatorrent.stram.engine.StreamingContainer;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.physical.PTOperator;
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index f4b38fb..47dd721 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -15,7 +15,6 @@
  */
 package com.datatorrent.stram;
 
-import com.datatorrent.stram.api.AppDataSource;
 import java.io.*;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -67,6 +66,7 @@
 import com.datatorrent.api.StringCodec;
 
 import com.datatorrent.stram.StreamingContainerManager.ContainerResource;
+import com.datatorrent.stram.api.AppDataSource;
 import com.datatorrent.stram.api.BaseContext;
 import com.datatorrent.stram.api.StramEvent;
 import com.datatorrent.stram.appdata.AppDataPushAgent;
@@ -500,7 +500,7 @@
   protected void serviceStart() throws Exception
   {
     super.serviceStart();
-    if (delegationTokenManager != null) {
+    if (UserGroupInformation.isSecurityEnabled()) {
       delegationTokenManager.startThreads();
     }
 
@@ -539,7 +539,7 @@
   protected void serviceStop() throws Exception
   {
     super.serviceStop();
-    if (delegationTokenManager != null) {
+    if (UserGroupInformation.isSecurityEnabled()) {
       delegationTokenManager.stopThreads();
     }
     if (nmClient != null) {
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
index d610818..2af056f 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
@@ -19,12 +19,13 @@
 import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import com.google.common.collect.Sets;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Sets;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG.Locality;
@@ -189,6 +190,7 @@
         if (!out.isDownStreamInline()) {
           portInfo.bufferServerHost = oper.getContainer().bufferServerAddress.getHostName();
           portInfo.bufferServerPort = oper.getContainer().bufferServerAddress.getPort();
+          portInfo.bufferServerToken = oper.getContainer().getBufferServerToken();
           // Build the stream codec configuration of all sinks connected to this port
           for (PTOperator.PTInput input : out.sinks) {
             // Create mappings for all non-inline operators
@@ -254,12 +256,14 @@
 
         } else {
           // buffer server input
-          InetSocketAddress addr = sourceOutput.source.getContainer().bufferServerAddress;
+          PTContainer container = sourceOutput.source.getContainer();
+          InetSocketAddress addr = container.bufferServerAddress;
           if (addr == null) {
             throw new AssertionError("upstream address not assigned: " + sourceOutput);
           }
           inputInfo.bufferServerHost = addr.getHostName();
           inputInfo.bufferServerPort = addr.getPort();
+          inputInfo.bufferServerToken = container.getBufferServerToken();
         }
 
         // On the input side there is a unlikely scenario of partitions even for inline stream that is being
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 94a30b2..462aca0 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -15,7 +15,6 @@
  */
 package com.datatorrent.stram;
 
-import com.datatorrent.common.experimental.AppData;
 import java.io.*;
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Field;
@@ -40,6 +39,13 @@
 
 import net.engio.mbassy.bus.MBassador;
 import net.engio.mbassy.bus.config.BusConfiguration;
+
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.ToStringBuilder;
@@ -51,19 +57,13 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
 
-import com.datatorrent.common.util.FSStorageAgent;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.datatorrent.api.*;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Operator.InputPort;
@@ -71,9 +71,11 @@
 import com.datatorrent.api.Stats.OperatorStats;
 import com.datatorrent.api.annotation.Stateless;
 
+import com.datatorrent.bufferserver.auth.AuthManager;
 import com.datatorrent.bufferserver.util.Codec;
-import com.datatorrent.common.util.Pair;
+import com.datatorrent.common.experimental.AppData;
 import com.datatorrent.common.util.FSStorageAgent;
+import com.datatorrent.common.util.Pair;
 import com.datatorrent.stram.Journal.Recoverable;
 import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest;
 import com.datatorrent.stram.api.*;
@@ -1197,6 +1199,10 @@
     container.setExternalId(resource.containerId);
     container.host = resource.host;
     container.bufferServerAddress = bufferServerAddr;
+    if (UserGroupInformation.isSecurityEnabled()) {
+      byte[] token = AuthManager.generateToken();
+      container.setBufferServerToken(token);
+    }
     container.nodeHttpAddress = resource.nodeHttpAddress;
     container.setAllocatedMemoryMB(resource.memoryMB);
     container.setAllocatedVCores(resource.vCores);
@@ -1225,6 +1231,7 @@
       StreamingContainerContext scc = new StreamingContainerContext(plan.getLogicalPlan().getAttributes().clone(), null);
       scc.attributes.put(ContainerContext.IDENTIFIER, container.getExternalId());
       scc.attributes.put(ContainerContext.BUFFER_SERVER_MB, bufferServerMemory);
+      scc.attributes.put(ContainerContext.BUFFER_SERVER_TOKEN, container.getBufferServerToken());
       scc.startWindowMillis = this.vars.windowStartMillis;
       return scc;
     }
@@ -1945,6 +1952,7 @@
   private BufferServerController getBufferServerClient(PTOperator operator)
   {
     BufferServerController bsc = new BufferServerController(operator.getLogicalId());
+    bsc.setToken(operator.getContainer().getBufferServerToken());
     InetSocketAddress address = operator.getContainer().bufferServerAddress;
     StreamingContainer.eventloop.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsc);
     return bsc;
diff --git a/engine/src/main/java/com/datatorrent/stram/api/ContainerContext.java b/engine/src/main/java/com/datatorrent/stram/api/ContainerContext.java
index 7a7755d..ce1dba9 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/ContainerContext.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/ContainerContext.java
@@ -28,6 +28,7 @@
 {
   public static final Attribute<String> IDENTIFIER = new Attribute<String>("unknown_container_id");
   public static final Attribute<Integer> BUFFER_SERVER_MB = new Attribute<Integer>(8*64);
+  public static final Attribute<byte[]> BUFFER_SERVER_TOKEN = new Attribute<byte[]>(null, null);
   public static final Attribute<RequestFactory> REQUEST_FACTORY = new Attribute<RequestFactory>(null, null);
   @SuppressWarnings("FieldNameHidesFieldInSuperclass")
   long serialVersionUID = AttributeInitializer.initialize(ContainerContext.class);
diff --git a/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java b/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java
index 067eb26..2a4d758 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java
@@ -120,6 +120,7 @@
      */
     public String bufferServerHost;
     public int bufferServerPort;
+    public byte[] bufferServerToken;
     /**
      * Class name of tuple SerDe (buffer server stream only).
      */
@@ -212,6 +213,7 @@
      */
     public String bufferServerHost;
     public int bufferServerPort;
+    public byte[] bufferServerToken;
     public Map<Integer, StreamCodec<?>> streamCodecs = new HashMap<Integer, StreamCodec<?>>();
     /**
      * Context attributes for output port
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamContext.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamContext.java
index 433f059..131c825 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamContext.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamContext.java
@@ -44,6 +44,7 @@
 public class StreamContext extends DefaultAttributeMap implements Context
 {
   public static final Attribute<InetSocketAddress> BUFFER_SERVER_ADDRESS = new Attribute<InetSocketAddress>(null, null);
+  public static final Attribute<byte[]> BUFFER_SERVER_TOKEN = new Attribute<byte[]>(null, null);
   public static final Attribute<EventLoop> EVENT_LOOP = new Attribute<EventLoop>(null, null);
   public static final Attribute<StreamCodec<?>> CODEC = new Attribute<StreamCodec<?>>(new DefaultStatefulStreamCodec<Object>(), null);
 
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index ce750c7..8cf9f12 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -31,6 +31,9 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 
+import net.engio.mbassy.bus.MBassador;
+import net.engio.mbassy.bus.config.BusConfiguration;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,8 +60,8 @@
 import com.datatorrent.bufferserver.storage.DiskStorage;
 import com.datatorrent.bufferserver.util.Codec;
 import com.datatorrent.common.util.ScheduledThreadPoolExecutor;
-import com.datatorrent.netlet.util.Slice;
 import com.datatorrent.netlet.DefaultEventLoop;
+import com.datatorrent.netlet.util.Slice;
 import com.datatorrent.stram.ComponentContextPair;
 import com.datatorrent.stram.RecoverableRpcProxy;
 import com.datatorrent.stram.StramUtils.YarnContainerMain;
@@ -76,9 +79,6 @@
 import com.datatorrent.stram.security.StramUserLogin;
 import com.datatorrent.stram.stream.*;
 
-import net.engio.mbassy.bus.MBassador;
-import net.engio.mbassy.bus.config.BusConfiguration;
-
 /**
  * Object which controls the container process launched by {@link com.datatorrent.stram.StreamingAppMaster}.
  *
@@ -185,6 +185,7 @@
         }
         // start buffer server, if it was not set externally
         bufferServer = new Server(0, blocksize * 1024 * 1024, blockCount);
+        bufferServer.setAuthToken(ctx.getValue(StreamingContainerContext.BUFFER_SERVER_TOKEN));
         if (ctx.getValue(Context.DAGContext.BUFFER_SPOOLING)) {
           bufferServer.setSpoolStorage(new DiskStorage());
         }
@@ -903,6 +904,7 @@
     bssc.put(StreamContext.CODEC, streamCodec);
     bssc.put(StreamContext.EVENT_LOOP, eventloop);
     bssc.setBufferServerAddress(InetSocketAddress.createUnresolved(nodi.bufferServerHost, nodi.bufferServerPort));
+    bssc.put(StreamContext.BUFFER_SERVER_TOKEN, nodi.bufferServerToken);
     if (NetUtils.isLocalAddress(bssc.getBufferServerAddress().getAddress())) {
       bssc.setBufferServerAddress(new InetSocketAddress(InetAddress.getByName(null), nodi.bufferServerPort));
     }
@@ -1097,6 +1099,7 @@
             if (NetUtils.isLocalAddress(context.getBufferServerAddress().getAddress())) {
               context.setBufferServerAddress(new InetSocketAddress(InetAddress.getByName(null), nidi.bufferServerPort));
             }
+            context.put(StreamContext.BUFFER_SERVER_TOKEN, nidi.bufferServerToken);
             String connIdentifier = sourceIdentifier + Component.CONCAT_SEPARATOR + streamCodecIdentifier;
             context.setPortId(nidi.portName);
             context.put(StreamContext.CODEC, streamCodec);
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTContainer.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTContainer.java
index 1a00b64..1dc4ae5 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTContainer.java
@@ -94,6 +94,12 @@
           }
           c.host = in.readString();
           c.nodeHttpAddress = in.readString();
+          int tokenLength = in.readInt();
+          if (tokenLength != -1) {
+            c.bufferServerToken = in.readBytes(tokenLength);
+          } else {
+            c.bufferServerToken = null;
+          }
           break;
         }
       }
@@ -128,6 +134,10 @@
       // host
       out.writeString(container.host);
       out.writeString(container.nodeHttpAddress);
+      out.writeInt((container.bufferServerToken == null) ? -1 : container.bufferServerToken.length);
+      if (container.bufferServerToken != null) {
+        out.write(container.bufferServerToken);
+      }
     }
   }
 
@@ -151,6 +161,8 @@
   private long startedTime = -1;
   private long finishedTime = -1;
 
+  private byte[] bufferServerToken;
+
   PTContainer(PhysicalPlan plan) {
     this.plan = plan;
     this.seq = plan.containerSeq.incrementAndGet();
@@ -252,6 +264,16 @@
     this.finishedTime = finishedTime;
   }
 
+  public byte[] getBufferServerToken()
+  {
+    return bufferServerToken;
+  }
+
+  public void setBufferServerToken(byte[] bufferServerToken)
+  {
+    this.bufferServerToken = bufferServerToken;
+  }
+
   public String toIdStateString() {
     return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).
         append("id", ""+seq + "(" + this.containerId + ")").
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
index a4aa568..d8e09c8 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
@@ -17,7 +17,6 @@
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.atomic.AtomicLong;
-import static java.lang.Thread.sleep;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,6 +33,8 @@
 import com.datatorrent.stram.engine.StreamContext;
 import com.datatorrent.stram.tuple.Tuple;
 
+import static java.lang.Thread.sleep;
+
 /**
  * Implements tuple flow of node to then buffer server in a logical stream<p>
  * <br>
@@ -139,6 +140,7 @@
   @SuppressWarnings("unchecked")
   public void activate(StreamContext context)
   {
+    setToken(context.get(StreamContext.BUFFER_SERVER_TOKEN));
     InetSocketAddress address = context.getBufferServerAddress();
     eventloop = context.get(StreamContext.EVENT_LOOP);
     eventloop.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, this);
@@ -150,6 +152,7 @@
   @Override
   public void deactivate()
   {
+    setToken(null);
     eventloop.disconnect(this);
   }
 
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
index 09873a1..56cb323 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
@@ -84,6 +84,7 @@
   @Override
   public void activate(StreamContext context)
   {
+    setToken(context.get(StreamContext.BUFFER_SERVER_TOKEN));
     InetSocketAddress address = context.getBufferServerAddress();
     eventloop = context.get(StreamContext.EVENT_LOOP);
     eventloop.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, this);
@@ -140,6 +141,7 @@
   public void deactivate()
   {
     eventloop.disconnect(this);
+    setToken(null);
   }
 
   @Override
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java b/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
index 57a0093..188fb7a 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
@@ -21,7 +21,6 @@
 import java.nio.ByteOrder;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
-import static java.lang.Thread.sleep;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.KryoException;
@@ -38,9 +37,13 @@
 import com.datatorrent.stram.engine.StreamContext;
 import com.datatorrent.stram.tuple.Tuple;
 
+import static java.lang.Thread.sleep;
+
 /**
  * <p>FastPublisher class.</p>
  *
+ * TODO:- Implement token security
+ *
  * @since 0.3.2
  */
 public class FastPublisher extends Kryo implements ClientListener, Stream
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/FastSubscriber.java b/engine/src/main/java/com/datatorrent/stram/stream/FastSubscriber.java
index 66a4fcc..2653adc 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/FastSubscriber.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/FastSubscriber.java
@@ -26,6 +26,8 @@
 /**
  * <p>FastSubscriber class.</p>
  *
+ * TODO:- Implement token security
+ *
  * @since 0.3.2
  */
 public class FastSubscriber extends BufferServerSubscriber