Merge pull request #119 from 243826/semantic-versioning-patch
enable semantic versioning
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index a8adf08..683eb8d 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -126,7 +126,7 @@
public void setAuthToken(byte[] authToken)
{
- this.authToken = authToken;
+ this.authToken = authToken == null? null: Arrays.copyOf(authToken, authToken.length);
}
/**
@@ -352,15 +352,9 @@
class AuthClient extends com.datatorrent.bufferserver.client.AuthClient
{
- boolean ignore;
-
@Override
public void onMessage(byte[] buffer, int offset, int size)
{
- if (ignore) {
- return;
- }
-
authenticateMessage(buffer, offset, size);
unregistered(key);
@@ -374,21 +368,18 @@
client.transferBuffer(buffer, readOffset + size, len);
}
- ignore = true;
+ // Remaining data has been transferred to next client in the chain and is going to be processed there so we would
+ // not be processing it here, hence discarding it
+ discardReadBuffer();
}
}
class UnidentifiedClient extends SeedDataClient
{
- boolean ignore;
@Override
public void onMessage(byte[] buffer, int offset, int size)
{
- if (ignore) {
- return;
- }
-
Tuple request = Tuple.getTuple(buffer, offset, size);
switch (request.getType()) {
case PUBLISHER_REQUEST:
@@ -432,7 +423,10 @@
if (len > 0) {
publisher.transferBuffer(this.buffer, readOffset + size, len);
}
- ignore = true;
+
+ // Remaining data transferred to next client and being processed there, not processed here anymore hence
+ // discarding it
+ discardReadBuffer();
break;
@@ -441,7 +435,9 @@
* unregister the unidentified client since its job is done!
*/
unregistered(key);
- ignore = true;
+ // Control is being transferred to next client in the chain so no more processing in this client after this
+ // message
+ discardReadBuffer();
logger.info("Received subscriber request: {}", request);
SubscribeRequestTuple subscriberRequest = (SubscribeRequestTuple)request;
@@ -765,24 +761,24 @@
}
- abstract class SeedDataClient extends AbstractLengthPrependerClient
+ static abstract class SeedDataClient extends AbstractLengthPrependerClient
{
- public SeedDataClient()
+ SeedDataClient()
{
}
- public SeedDataClient(int readBufferSize, int sendBufferSize)
+ SeedDataClient(int readBufferSize, int sendBufferSize)
{
super(readBufferSize, sendBufferSize);
}
- public SeedDataClient(byte[] readbuffer, int position, int sendBufferSize)
+ SeedDataClient(byte[] readbuffer, int position, int sendBufferSize)
{
super(readbuffer, position, sendBufferSize);
}
- public void transferBuffer(byte[] array, int offset, int len)
+ void transferBuffer(byte[] array, int offset, int len)
{
int remainingCapacity;
do {
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 35861f1..71364a2 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -905,7 +905,8 @@
bssc.put(StreamContext.EVENT_LOOP, eventloop);
bssc.setBufferServerAddress(InetSocketAddress.createUnresolved(nodi.bufferServerHost, nodi.bufferServerPort));
bssc.put(StreamContext.BUFFER_SERVER_TOKEN, nodi.bufferServerToken);
- if (NetUtils.isLocalAddress(bssc.getBufferServerAddress().getAddress())) {
+ InetAddress inetAddress = bssc.getBufferServerAddress().getAddress();
+ if (inetAddress != null && NetUtils.isLocalAddress(inetAddress)) {
bssc.setBufferServerAddress(new InetSocketAddress(InetAddress.getByName(null), nodi.bufferServerPort));
}
@@ -1096,7 +1097,8 @@
StreamContext context = new StreamContext(nidi.declaredStreamId);
context.setBufferServerAddress(InetSocketAddress.createUnresolved(nidi.bufferServerHost, nidi.bufferServerPort));
- if (NetUtils.isLocalAddress(context.getBufferServerAddress().getAddress())) {
+ InetAddress inetAddress = context.getBufferServerAddress().getAddress();
+ if (inetAddress != null && NetUtils.isLocalAddress(inetAddress)) {
context.setBufferServerAddress(new InetSocketAddress(InetAddress.getByName(null), nidi.bufferServerPort));
}
context.put(StreamContext.BUFFER_SERVER_TOKEN, nidi.bufferServerToken);
diff --git a/pom.xml b/pom.xml
index 6ee1580..7812c30 100644
--- a/pom.xml
+++ b/pom.xml
@@ -264,6 +264,10 @@
<version>2.7</version>
</plugin>
<plugin>
+ <artifactId>maven-versions-plugin</artifactId>
+ <version>2.1</version>
+ </plugin>
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<executions>