Merge pull request #44 from chandnisingh/SPOI-5391
SPOI-5391: adding default aggregator for primitive customMetrics
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java
new file mode 100644
index 0000000..273889d
--- /dev/null
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java
@@ -0,0 +1,35 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.bufferserver.auth;
+
+import java.security.SecureRandom;
+
+/**
+ * <p>Auth Manager class.</p>
+ */
+public class AuthManager
+{
+ private final static int BUFFER_SERVER_TOKEN_LENGTH = 20;
+
+ private static SecureRandom generator = new SecureRandom();
+
+ public static byte[] generateToken()
+ {
+ byte[] token = new byte[BUFFER_SERVER_TOKEN_LENGTH];
+ generator.nextBytes(token);
+ return token;
+ }
+}
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java
new file mode 100644
index 0000000..99ff3a2
--- /dev/null
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java
@@ -0,0 +1,72 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.bufferserver.client;
+
+import java.security.AccessControlException;
+
+import com.datatorrent.netlet.AbstractLengthPrependerClient;
+
+/**
+ * <p>Auth Client class.</p>
+ */
+public abstract class AuthClient extends AbstractLengthPrependerClient
+{
+ private byte[] token;
+
+ public AuthClient()
+ {
+ }
+
+ public AuthClient(int readBufferSize, int sendBufferSize)
+ {
+ super(readBufferSize, sendBufferSize);
+ }
+
+ public AuthClient(byte[] readbuffer, int position, int sendBufferSize)
+ {
+ super(readbuffer, position, sendBufferSize);
+ }
+
+ protected void sendAuthenticate() {
+ if (token != null) {
+ write(token);
+ }
+ }
+
+ protected void authenticateMessage(byte[] buffer, int offset, int size)
+ {
+ if (token != null) {
+ boolean authenticated = false;
+ if (size == token.length) {
+ int match = 0;
+ while ((match < token.length) && (buffer[offset + match] == token[match])) {
+ ++match;
+ }
+ if (match == token.length) {
+ authenticated = true;
+ }
+ }
+ if (!authenticated) {
+ throw new AccessControlException("Buffer server security is enabled. Access is restricted without proper credentials.");
+ }
+ }
+ }
+
+ public void setToken(byte[] token)
+ {
+ this.token = token;
+ }
+}
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java
index 2b8838c..51efa67 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java
@@ -31,7 +31,7 @@
*
* @since 0.3.2
*/
-public abstract class Controller extends AbstractLengthPrependerClient
+public abstract class Controller extends AuthClient
{
String id;
@@ -43,12 +43,14 @@
public void purge(String version, String sourceId, long windowId)
{
+ sendAuthenticate();
write(PurgeRequestTuple.getSerializedRequest(version, sourceId, windowId));
logger.debug("Sent purge request sourceId = {}, windowId = {}", sourceId, Codec.getStringWindowId(windowId));
}
public void reset(String version, String sourceId, long windowId)
{
+ sendAuthenticate();
write(ResetRequestTuple.getSerializedRequest(version, sourceId, windowId));
logger.debug("Sent reset request sourceId = {}, windowId = {}", sourceId, Codec.getStringWindowId(windowId));
}
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Publisher.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Publisher.java
index 2a378df..571182b 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Publisher.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Publisher.java
@@ -20,14 +20,13 @@
import org.slf4j.LoggerFactory;
import com.datatorrent.bufferserver.packet.PublishRequestTuple;
-import com.datatorrent.netlet.AbstractLengthPrependerClient;
/**
* <p>Abstract Publisher class.</p>
*
* @since 0.3.2
*/
-public abstract class Publisher extends AbstractLengthPrependerClient
+public abstract class Publisher extends AuthClient
{
private final String id;
@@ -48,6 +47,7 @@
*/
public void activate(String version, long windowId)
{
+ sendAuthenticate();
write(PublishRequestTuple.getSerializedRequest(version, id, windowId));
}
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java
index 751afe8..589e685 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java
@@ -21,7 +21,6 @@
import org.slf4j.LoggerFactory;
import com.datatorrent.bufferserver.packet.SubscribeRequestTuple;
-import com.datatorrent.netlet.AbstractLengthPrependerClient;
/**
*
@@ -34,7 +33,7 @@
*
* @since 0.3.2
*/
-public abstract class Subscriber extends AbstractLengthPrependerClient
+public abstract class Subscriber extends AuthClient
{
private final String id;
@@ -46,6 +45,7 @@
public void activate(String version, String type, String sourceId, int mask, Collection<Integer> partitions, long windowId, int bufferSize)
{
+ sendAuthenticate();
write(SubscribeRequestTuple.getSerializedRequest(
version,
id,
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index 5ddf162..a8adf08 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -37,11 +37,11 @@
import com.datatorrent.bufferserver.packet.*;
import com.datatorrent.bufferserver.storage.Storage;
import com.datatorrent.common.util.NameableThreadFactory;
-import com.datatorrent.netlet.util.VarInt;
import com.datatorrent.netlet.AbstractLengthPrependerClient;
import com.datatorrent.netlet.DefaultEventLoop;
import com.datatorrent.netlet.EventLoop;
import com.datatorrent.netlet.Listener.ServerListener;
+import com.datatorrent.netlet.util.VarInt;
/**
* The buffer server application<p>
@@ -61,6 +61,8 @@
private final ExecutorService serverHelperExecutor;
private final ExecutorService storageHelperExecutor;
+ private byte[] authToken;
+
/**
* @param port - port number to bind to or 0 to auto select a free port
*/
@@ -122,6 +124,11 @@
return address;
}
+ public void setAuthToken(byte[] authToken)
+ {
+ this.authToken = authToken;
+ }
+
/**
*
* @param args
@@ -322,7 +329,15 @@
@Override
public ClientListener getClientConnection(SocketChannel sc, ServerSocketChannel ssc)
{
- return new UnidentifiedClient(sc);
+ ClientListener client;
+ if (authToken == null) {
+ client = new UnidentifiedClient();
+ } else {
+ AuthClient authClient = new AuthClient();
+ authClient.setToken(authToken);
+ client = authClient;
+ }
+ return client;
}
@Override
@@ -335,15 +350,37 @@
throw new RuntimeException(cce);
}
- class UnidentifiedClient extends AbstractLengthPrependerClient
+ class AuthClient extends com.datatorrent.bufferserver.client.AuthClient
{
- SocketChannel channel;
boolean ignore;
- UnidentifiedClient(SocketChannel channel)
+ @Override
+ public void onMessage(byte[] buffer, int offset, int size)
{
- this.channel = channel;
+ if (ignore) {
+ return;
+ }
+
+ authenticateMessage(buffer, offset, size);
+
+ unregistered(key);
+ UnidentifiedClient client = new UnidentifiedClient();
+ key.attach(client);
+ key.interestOps(SelectionKey.OP_READ);
+ client.registered(key);
+
+ int len = writeOffset - readOffset - size;
+ if (len > 0) {
+ client.transferBuffer(buffer, readOffset + size, len);
+ }
+
+ ignore = true;
}
+ }
+
+ class UnidentifiedClient extends SeedDataClient
+ {
+ boolean ignore;
@Override
public void onMessage(byte[] buffer, int offset, int size)
@@ -559,7 +596,7 @@
* this is the end on the server side which handles all the communication.
*
*/
- class Publisher extends AbstractLengthPrependerClient
+ class Publisher extends SeedDataClient
{
private final DataList datalist;
boolean dirty;
@@ -570,26 +607,6 @@
this.datalist = dl;
}
- public void transferBuffer(byte[] array, int offset, int len)
- {
- int remainingCapacity;
- do {
- remainingCapacity = buffer.length - writeOffset;
- if (len < remainingCapacity) {
- remainingCapacity = len;
- byteBuffer.position(writeOffset + remainingCapacity);
- }
- else {
- byteBuffer.position(buffer.length);
- }
- System.arraycopy(array, offset, buffer, writeOffset, remainingCapacity);
- read(remainingCapacity);
-
- offset += remainingCapacity;
- }
- while ((len -= remainingCapacity) > 0);
- }
-
@Override
public void onMessage(byte[] buffer, int offset, int size)
{
@@ -748,5 +765,43 @@
}
+ abstract class SeedDataClient extends AbstractLengthPrependerClient
+ {
+
+ public SeedDataClient()
+ {
+ }
+
+ public SeedDataClient(int readBufferSize, int sendBufferSize)
+ {
+ super(readBufferSize, sendBufferSize);
+ }
+
+ public SeedDataClient(byte[] readbuffer, int position, int sendBufferSize)
+ {
+ super(readbuffer, position, sendBufferSize);
+ }
+
+ public void transferBuffer(byte[] array, int offset, int len)
+ {
+ int remainingCapacity;
+ do {
+ remainingCapacity = buffer.length - writeOffset;
+ if (len < remainingCapacity) {
+ remainingCapacity = len;
+ byteBuffer.position(writeOffset + remainingCapacity);
+ }
+ else {
+ byteBuffer.position(buffer.length);
+ }
+ System.arraycopy(array, offset, buffer, writeOffset, remainingCapacity);
+ read(remainingCapacity);
+
+ offset += remainingCapacity;
+ }
+ while ((len -= remainingCapacity) > 0);
+ }
+ }
+
private static final Logger logger = LoggerFactory.getLogger(Server.class);
}
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java
index 8ccdb7e..600f18c 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java
@@ -15,6 +15,17 @@
*/
package com.datatorrent.bufferserver.server;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.SecureRandom;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
import com.datatorrent.bufferserver.packet.BeginWindowTuple;
import com.datatorrent.bufferserver.packet.EndWindowTuple;
import com.datatorrent.bufferserver.packet.PayloadTuple;
@@ -24,16 +35,8 @@
import com.datatorrent.bufferserver.support.Subscriber;
import com.datatorrent.netlet.DefaultEventLoop;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
/**
*
@@ -49,6 +52,8 @@
static DefaultEventLoop eventloopServer;
static DefaultEventLoop eventloopClient;
+ static byte[] authToken;
+
@BeforeClass
public static void setupServerAndClients() throws Exception
{
@@ -65,6 +70,10 @@
instance = new Server(0, 4096,8);
address = instance.run(eventloopServer);
assert (address instanceof InetSocketAddress);
+
+ SecureRandom random = new SecureRandom();
+ authToken = new byte[20];
+ random.nextBytes(authToken);
}
@AfterClass
@@ -444,5 +453,76 @@
eventloopClient.disconnect(bss);
}
+ @Test(dependsOnMethods = {"testEarlySubscriberForLaterWindow"})
+ public void testAuth() throws InterruptedException
+ {
+ instance.setAuthToken(authToken);
+
+ bsp = new Publisher("MyPublisher");
+ bsp.setToken(authToken);
+ eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp);
+
+ bss = new Subscriber("MySubscriber");
+ bss.setToken(authToken);
+ eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss);
+
+ bsp.activate(null, 0L);
+ bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0);
+
+ long resetInfo = 0x7afebabe000000faL;
+
+ bsp.publishMessage(ResetWindowTuple.getSerializedTuple((int) (resetInfo >> 32), 500));
+
+ for (int i = 0; i < spinCount; i++) {
+ Thread.sleep(10);
+ if (!bss.resetPayloads.isEmpty()) {
+ break;
+ }
+ }
+ Thread.sleep(10);
+
+ eventloopClient.disconnect(bss);
+ eventloopClient.disconnect(bsp);
+
+ assertEquals(bss.tupleCount.get(), 1);
+ Assert.assertFalse(bss.resetPayloads.isEmpty());
+ }
+
+ @Test(dependsOnMethods = {"testAuth"})
+ public void testAuthFailure() throws InterruptedException
+ {
+ byte[] authToken = ServerTest.authToken.clone();
+ authToken[0] = (byte)(authToken[0] + 1);
+
+ bsp = new Publisher("MyPublisher");
+ bsp.setToken(authToken);
+ eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp);
+
+ bss = new Subscriber("MySubscriber");
+ bss.setToken(authToken);
+ eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss);
+
+ bsp.activate(null, 0L);
+ bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0);
+
+ long resetInfo = 0x7afebabe000000faL;
+
+ bsp.publishMessage(ResetWindowTuple.getSerializedTuple((int) (resetInfo >> 32), 500));
+
+ for (int i = 0; i < spinCount; i++) {
+ Thread.sleep(10);
+ if (!bss.resetPayloads.isEmpty()) {
+ break;
+ }
+ }
+ Thread.sleep(10);
+
+ eventloopClient.disconnect(bss);
+ eventloopClient.disconnect(bsp);
+
+ assertEquals(bss.tupleCount.get(), 0);
+ Assert.assertTrue(bss.resetPayloads.isEmpty());
+ }
+
private static final Logger logger = LoggerFactory.getLogger(ServerTest.class);
}
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java
index 848abc1..dfabb68 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java
@@ -15,14 +15,15 @@
*/
package com.datatorrent.bufferserver.support;
-import com.datatorrent.bufferserver.packet.Tuple;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.bufferserver.packet.Tuple;
+
/**
*
*/
diff --git a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
index b09c23e..c2a57fc 100644
--- a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
+++ b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
@@ -49,8 +49,8 @@
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.stram.client.StramClientUtils;
+import com.datatorrent.stram.client.StramClientUtils;
import com.datatorrent.stram.engine.StreamingContainer;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTOperator;
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index f4b38fb..47dd721 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -15,7 +15,6 @@
*/
package com.datatorrent.stram;
-import com.datatorrent.stram.api.AppDataSource;
import java.io.*;
import java.net.InetSocketAddress;
import java.net.URI;
@@ -67,6 +66,7 @@
import com.datatorrent.api.StringCodec;
import com.datatorrent.stram.StreamingContainerManager.ContainerResource;
+import com.datatorrent.stram.api.AppDataSource;
import com.datatorrent.stram.api.BaseContext;
import com.datatorrent.stram.api.StramEvent;
import com.datatorrent.stram.appdata.AppDataPushAgent;
@@ -500,7 +500,7 @@
protected void serviceStart() throws Exception
{
super.serviceStart();
- if (delegationTokenManager != null) {
+ if (UserGroupInformation.isSecurityEnabled()) {
delegationTokenManager.startThreads();
}
@@ -539,7 +539,7 @@
protected void serviceStop() throws Exception
{
super.serviceStop();
- if (delegationTokenManager != null) {
+ if (UserGroupInformation.isSecurityEnabled()) {
delegationTokenManager.stopThreads();
}
if (nmClient != null) {
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
index d610818..2af056f 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
@@ -19,12 +19,13 @@
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import com.google.common.collect.Sets;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Sets;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG.Locality;
@@ -189,6 +190,7 @@
if (!out.isDownStreamInline()) {
portInfo.bufferServerHost = oper.getContainer().bufferServerAddress.getHostName();
portInfo.bufferServerPort = oper.getContainer().bufferServerAddress.getPort();
+ portInfo.bufferServerToken = oper.getContainer().getBufferServerToken();
// Build the stream codec configuration of all sinks connected to this port
for (PTOperator.PTInput input : out.sinks) {
// Create mappings for all non-inline operators
@@ -254,12 +256,14 @@
} else {
// buffer server input
- InetSocketAddress addr = sourceOutput.source.getContainer().bufferServerAddress;
+ PTContainer container = sourceOutput.source.getContainer();
+ InetSocketAddress addr = container.bufferServerAddress;
if (addr == null) {
throw new AssertionError("upstream address not assigned: " + sourceOutput);
}
inputInfo.bufferServerHost = addr.getHostName();
inputInfo.bufferServerPort = addr.getPort();
+ inputInfo.bufferServerToken = container.getBufferServerToken();
}
// On the input side there is a unlikely scenario of partitions even for inline stream that is being
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 94a30b2..462aca0 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -15,7 +15,6 @@
*/
package com.datatorrent.stram;
-import com.datatorrent.common.experimental.AppData;
import java.io.*;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Field;
@@ -40,6 +39,13 @@
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.config.BusConfiguration;
+
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ToStringBuilder;
@@ -51,19 +57,13 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.webapp.NotFoundException;
-import com.datatorrent.common.util.FSStorageAgent;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.datatorrent.api.*;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Operator.InputPort;
@@ -71,9 +71,11 @@
import com.datatorrent.api.Stats.OperatorStats;
import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.bufferserver.auth.AuthManager;
import com.datatorrent.bufferserver.util.Codec;
-import com.datatorrent.common.util.Pair;
+import com.datatorrent.common.experimental.AppData;
import com.datatorrent.common.util.FSStorageAgent;
+import com.datatorrent.common.util.Pair;
import com.datatorrent.stram.Journal.Recoverable;
import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest;
import com.datatorrent.stram.api.*;
@@ -1197,6 +1199,10 @@
container.setExternalId(resource.containerId);
container.host = resource.host;
container.bufferServerAddress = bufferServerAddr;
+ if (UserGroupInformation.isSecurityEnabled()) {
+ byte[] token = AuthManager.generateToken();
+ container.setBufferServerToken(token);
+ }
container.nodeHttpAddress = resource.nodeHttpAddress;
container.setAllocatedMemoryMB(resource.memoryMB);
container.setAllocatedVCores(resource.vCores);
@@ -1225,6 +1231,7 @@
StreamingContainerContext scc = new StreamingContainerContext(plan.getLogicalPlan().getAttributes().clone(), null);
scc.attributes.put(ContainerContext.IDENTIFIER, container.getExternalId());
scc.attributes.put(ContainerContext.BUFFER_SERVER_MB, bufferServerMemory);
+ scc.attributes.put(ContainerContext.BUFFER_SERVER_TOKEN, container.getBufferServerToken());
scc.startWindowMillis = this.vars.windowStartMillis;
return scc;
}
@@ -1945,6 +1952,7 @@
private BufferServerController getBufferServerClient(PTOperator operator)
{
BufferServerController bsc = new BufferServerController(operator.getLogicalId());
+ bsc.setToken(operator.getContainer().getBufferServerToken());
InetSocketAddress address = operator.getContainer().bufferServerAddress;
StreamingContainer.eventloop.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsc);
return bsc;
diff --git a/engine/src/main/java/com/datatorrent/stram/api/ContainerContext.java b/engine/src/main/java/com/datatorrent/stram/api/ContainerContext.java
index 7a7755d..ce1dba9 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/ContainerContext.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/ContainerContext.java
@@ -28,6 +28,7 @@
{
public static final Attribute<String> IDENTIFIER = new Attribute<String>("unknown_container_id");
public static final Attribute<Integer> BUFFER_SERVER_MB = new Attribute<Integer>(8*64);
+ public static final Attribute<byte[]> BUFFER_SERVER_TOKEN = new Attribute<byte[]>(null, null);
public static final Attribute<RequestFactory> REQUEST_FACTORY = new Attribute<RequestFactory>(null, null);
@SuppressWarnings("FieldNameHidesFieldInSuperclass")
long serialVersionUID = AttributeInitializer.initialize(ContainerContext.class);
diff --git a/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java b/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java
index 067eb26..2a4d758 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/OperatorDeployInfo.java
@@ -120,6 +120,7 @@
*/
public String bufferServerHost;
public int bufferServerPort;
+ public byte[] bufferServerToken;
/**
* Class name of tuple SerDe (buffer server stream only).
*/
@@ -212,6 +213,7 @@
*/
public String bufferServerHost;
public int bufferServerPort;
+ public byte[] bufferServerToken;
public Map<Integer, StreamCodec<?>> streamCodecs = new HashMap<Integer, StreamCodec<?>>();
/**
* Context attributes for output port
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamContext.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamContext.java
index 433f059..131c825 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamContext.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamContext.java
@@ -44,6 +44,7 @@
public class StreamContext extends DefaultAttributeMap implements Context
{
public static final Attribute<InetSocketAddress> BUFFER_SERVER_ADDRESS = new Attribute<InetSocketAddress>(null, null);
+ public static final Attribute<byte[]> BUFFER_SERVER_TOKEN = new Attribute<byte[]>(null, null);
public static final Attribute<EventLoop> EVENT_LOOP = new Attribute<EventLoop>(null, null);
public static final Attribute<StreamCodec<?>> CODEC = new Attribute<StreamCodec<?>>(new DefaultStatefulStreamCodec<Object>(), null);
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index ce750c7..8cf9f12 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -31,6 +31,9 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import net.engio.mbassy.bus.MBassador;
+import net.engio.mbassy.bus.config.BusConfiguration;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,8 +60,8 @@
import com.datatorrent.bufferserver.storage.DiskStorage;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.common.util.ScheduledThreadPoolExecutor;
-import com.datatorrent.netlet.util.Slice;
import com.datatorrent.netlet.DefaultEventLoop;
+import com.datatorrent.netlet.util.Slice;
import com.datatorrent.stram.ComponentContextPair;
import com.datatorrent.stram.RecoverableRpcProxy;
import com.datatorrent.stram.StramUtils.YarnContainerMain;
@@ -76,9 +79,6 @@
import com.datatorrent.stram.security.StramUserLogin;
import com.datatorrent.stram.stream.*;
-import net.engio.mbassy.bus.MBassador;
-import net.engio.mbassy.bus.config.BusConfiguration;
-
/**
* Object which controls the container process launched by {@link com.datatorrent.stram.StreamingAppMaster}.
*
@@ -185,6 +185,7 @@
}
// start buffer server, if it was not set externally
bufferServer = new Server(0, blocksize * 1024 * 1024, blockCount);
+ bufferServer.setAuthToken(ctx.getValue(StreamingContainerContext.BUFFER_SERVER_TOKEN));
if (ctx.getValue(Context.DAGContext.BUFFER_SPOOLING)) {
bufferServer.setSpoolStorage(new DiskStorage());
}
@@ -903,6 +904,7 @@
bssc.put(StreamContext.CODEC, streamCodec);
bssc.put(StreamContext.EVENT_LOOP, eventloop);
bssc.setBufferServerAddress(InetSocketAddress.createUnresolved(nodi.bufferServerHost, nodi.bufferServerPort));
+ bssc.put(StreamContext.BUFFER_SERVER_TOKEN, nodi.bufferServerToken);
if (NetUtils.isLocalAddress(bssc.getBufferServerAddress().getAddress())) {
bssc.setBufferServerAddress(new InetSocketAddress(InetAddress.getByName(null), nodi.bufferServerPort));
}
@@ -1097,6 +1099,7 @@
if (NetUtils.isLocalAddress(context.getBufferServerAddress().getAddress())) {
context.setBufferServerAddress(new InetSocketAddress(InetAddress.getByName(null), nidi.bufferServerPort));
}
+ context.put(StreamContext.BUFFER_SERVER_TOKEN, nidi.bufferServerToken);
String connIdentifier = sourceIdentifier + Component.CONCAT_SEPARATOR + streamCodecIdentifier;
context.setPortId(nidi.portName);
context.put(StreamContext.CODEC, streamCodec);
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTContainer.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTContainer.java
index 1a00b64..1dc4ae5 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTContainer.java
@@ -94,6 +94,12 @@
}
c.host = in.readString();
c.nodeHttpAddress = in.readString();
+ int tokenLength = in.readInt();
+ if (tokenLength != -1) {
+ c.bufferServerToken = in.readBytes(tokenLength);
+ } else {
+ c.bufferServerToken = null;
+ }
break;
}
}
@@ -128,6 +134,10 @@
// host
out.writeString(container.host);
out.writeString(container.nodeHttpAddress);
+ out.writeInt((container.bufferServerToken == null) ? -1 : container.bufferServerToken.length);
+ if (container.bufferServerToken != null) {
+ out.write(container.bufferServerToken);
+ }
}
}
@@ -151,6 +161,8 @@
private long startedTime = -1;
private long finishedTime = -1;
+ private byte[] bufferServerToken;
+
PTContainer(PhysicalPlan plan) {
this.plan = plan;
this.seq = plan.containerSeq.incrementAndGet();
@@ -252,6 +264,16 @@
this.finishedTime = finishedTime;
}
+ public byte[] getBufferServerToken()
+ {
+ return bufferServerToken;
+ }
+
+ public void setBufferServerToken(byte[] bufferServerToken)
+ {
+ this.bufferServerToken = bufferServerToken;
+ }
+
public String toIdStateString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).
append("id", ""+seq + "(" + this.containerId + ")").
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
index a4aa568..d8e09c8 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
@@ -17,7 +17,6 @@
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicLong;
-import static java.lang.Thread.sleep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,6 +33,8 @@
import com.datatorrent.stram.engine.StreamContext;
import com.datatorrent.stram.tuple.Tuple;
+import static java.lang.Thread.sleep;
+
/**
* Implements tuple flow of node to then buffer server in a logical stream<p>
* <br>
@@ -139,6 +140,7 @@
@SuppressWarnings("unchecked")
public void activate(StreamContext context)
{
+ setToken(context.get(StreamContext.BUFFER_SERVER_TOKEN));
InetSocketAddress address = context.getBufferServerAddress();
eventloop = context.get(StreamContext.EVENT_LOOP);
eventloop.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, this);
@@ -150,6 +152,7 @@
@Override
public void deactivate()
{
+ setToken(null);
eventloop.disconnect(this);
}
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
index 09873a1..56cb323 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
@@ -84,6 +84,7 @@
@Override
public void activate(StreamContext context)
{
+ setToken(context.get(StreamContext.BUFFER_SERVER_TOKEN));
InetSocketAddress address = context.getBufferServerAddress();
eventloop = context.get(StreamContext.EVENT_LOOP);
eventloop.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, this);
@@ -140,6 +141,7 @@
public void deactivate()
{
eventloop.disconnect(this);
+ setToken(null);
}
@Override
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java b/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
index 57a0093..188fb7a 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
@@ -21,7 +21,6 @@
import java.nio.ByteOrder;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
-import static java.lang.Thread.sleep;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
@@ -38,9 +37,13 @@
import com.datatorrent.stram.engine.StreamContext;
import com.datatorrent.stram.tuple.Tuple;
+import static java.lang.Thread.sleep;
+
/**
* <p>FastPublisher class.</p>
*
+ * TODO:- Implement token security
+ *
* @since 0.3.2
*/
public class FastPublisher extends Kryo implements ClientListener, Stream
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/FastSubscriber.java b/engine/src/main/java/com/datatorrent/stram/stream/FastSubscriber.java
index 66a4fcc..2653adc 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/FastSubscriber.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/FastSubscriber.java
@@ -26,6 +26,8 @@
/**
* <p>FastSubscriber class.</p>
*
+ * TODO:- Implement token security
+ *
* @since 0.3.2
*/
public class FastSubscriber extends BufferServerSubscriber