blob: 14d9ff406b002170158c62a659650a0027d93b67 [file] [log] [blame]
/**
* Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.flume.sink;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.flume.discovery.Discovery;
import com.datatorrent.flume.discovery.Discovery.Service;
import com.datatorrent.netlet.AbstractLengthPrependerClient;
import com.datatorrent.netlet.AbstractServer;
import com.datatorrent.netlet.EventLoop;
import com.datatorrent.netlet.util.Slice;
/**
* <p>
* Server class.</p>
*
* @author Chetan Narsude <chetan@datatorrent.com>
* @since 0.9.2
*/
public class Server extends AbstractServer
{
private final String id;
private final Discovery<byte[]> discovery;
private final long acceptedTolerance;
public Server(String id, Discovery<byte[]> discovery, long acceptedTolerance)
{
this.id = id;
this.discovery = discovery;
this.acceptedTolerance = acceptedTolerance;
}
@Override
public void handleException(Exception cce, EventLoop el)
{
logger.error("Server Error", cce);
Request r = new Request(Command.SERVER_ERROR, null)
{
@Override
public Slice getAddress()
{
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int getEventCount()
{
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int getIdleCount()
{
throw new UnsupportedOperationException("Not supported yet.");
}
};
synchronized (requests) {
requests.add(r);
}
}
private final Service<byte[]> service = new Service<byte[]>()
{
@Override
public String getHost()
{
return ((InetSocketAddress)getServerAddress()).getHostName();
}
@Override
public int getPort()
{
return ((InetSocketAddress)getServerAddress()).getPort();
}
@Override
public byte[] getPayload()
{
return null;
}
@Override
public String getId()
{
return id;
}
@Override
public String toString()
{
return "Server.Service{id=" + id + ", host=" + getHost() + ", port=" + getPort() + ", payload=" +
Arrays.toString(getPayload()) + '}';
}
};
@Override
public void unregistered(final SelectionKey key)
{
discovery.unadvertise(service);
super.unregistered(key);
}
@Override
public void registered(final SelectionKey key)
{
super.registered(key);
discovery.advertise(service);
}
public enum Command
{
ECHO((byte)0),
SEEK((byte)1),
COMMITTED((byte)2),
CHECKPOINTED((byte)3),
CONNECTED((byte)4),
DISCONNECTED((byte)5),
WINDOWED((byte)6),
SERVER_ERROR((byte)7);
Command(byte b)
{
this.ord = b;
}
public byte getOrdinal()
{
return ord;
}
public static Command getCommand(byte b)
{
Command c;
switch (b) {
case 0:
c = ECHO;
break;
case 1:
c = SEEK;
break;
case 2:
c = COMMITTED;
break;
case 3:
c = CHECKPOINTED;
break;
case 4:
c = CONNECTED;
break;
case 5:
c = DISCONNECTED;
break;
case 6:
c = WINDOWED;
break;
case 7:
c = SERVER_ERROR;
break;
default:
throw new IllegalArgumentException(String.format("No Command defined for ordinal %b", b));
}
assert (b == c.ord);
return c;
}
private final byte ord;
}
public final ArrayList<Request> requests = new ArrayList<Request>(4);
@Override
public ClientListener getClientConnection(SocketChannel sc, ServerSocketChannel ssc)
{
Client lClient = new Client();
lClient.connected();
return lClient;
}
public class Client extends AbstractLengthPrependerClient
{
@Override
public void onMessage(byte[] buffer, int offset, int size)
{
if (size != Request.FIXED_SIZE) {
logger.warn("Invalid Request Received: {} from {}", Arrays.copyOfRange(buffer, offset, offset + size),
key.channel());
return;
}
long requestTime = Server.readLong(buffer, offset + Request.TIME_OFFSET);
if (System.currentTimeMillis() > (requestTime + acceptedTolerance)) {
logger.warn("Expired Request Received: {} from {}", Arrays.copyOfRange(buffer, offset, offset + size),
key.channel());
return;
}
try {
if (Command.getCommand(buffer[offset]) == Command.ECHO) {
write(buffer, offset, size);
return;
}
} catch (IllegalArgumentException ex) {
logger.warn("Invalid Request Received: {} from {}!", Arrays.copyOfRange(buffer, offset, offset + size),
key.channel(), ex);
return;
}
Request r = Request.getRequest(buffer, offset, this);
synchronized (requests) {
requests.add(r);
}
}
@Override
public void disconnected()
{
synchronized (requests) {
requests.add(Request.getRequest(
new byte[] {Command.DISCONNECTED.getOrdinal(), 0, 0, 0, 0, 0, 0, 0, 0}, 0, this));
}
super.disconnected();
}
public boolean write(byte[] address, Slice event)
{
if (event.offset == 0 && event.length == event.buffer.length) {
return write(address, event.buffer);
}
// a better method would be to replace the write implementation and allow it to natively support writing slices
return write(address, event.toByteArray());
}
}
public abstract static class Request
{
public static final int FIXED_SIZE = 17;
public static final int TIME_OFFSET = 9;
public final Command type;
public final Client client;
public Request(Command type, Client client)
{
this.type = type;
this.client = client;
}
public abstract Slice getAddress();
public abstract int getEventCount();
public abstract int getIdleCount();
@Override
public String toString()
{
return "Request{" + "type=" + type + '}';
}
public static Request getRequest(final byte[] buffer, final int offset, Client client)
{
Command command = Command.getCommand(buffer[offset]);
switch (command) {
case WINDOWED:
return new Request(Command.WINDOWED, client)
{
final int eventCount;
final int idleCount;
{
eventCount = Server.readInt(buffer, offset + 1);
idleCount = Server.readInt(buffer, offset + 5);
}
@Override
public Slice getAddress()
{
throw new UnsupportedOperationException();
}
@Override
public int getEventCount()
{
return eventCount;
}
@Override
public int getIdleCount()
{
return idleCount;
}
@Override
public String toString()
{
return "Request{" + "type=" + type + ", eventCount=" + eventCount + ", idleCount=" + idleCount + '}';
}
};
default:
return new Request(command, client)
{
final Slice address;
{
address = new Slice(buffer, offset + 1, 8);
}
@Override
public Slice getAddress()
{
return address;
}
@Override
public int getEventCount()
{
throw new UnsupportedOperationException();
}
@Override
public int getIdleCount()
{
throw new UnsupportedOperationException();
}
@Override
public String toString()
{
return "Request{" + "type=" + type + ", address=" + address + '}';
}
};
}
}
}
public static int readInt(byte[] buffer, int offset)
{
return buffer[offset++] & 0xff
| (buffer[offset++] & 0xff) << 8
| (buffer[offset++] & 0xff) << 16
| (buffer[offset++] & 0xff) << 24;
}
public static void writeInt(byte[] buffer, int offset, int i)
{
buffer[offset++] = (byte)i;
buffer[offset++] = (byte)(i >>> 8);
buffer[offset++] = (byte)(i >>> 16);
buffer[offset++] = (byte)(i >>> 24);
}
public static long readLong(byte[] buffer, int offset)
{
return (long)buffer[offset++] & 0xff
| (long)(buffer[offset++] & 0xff) << 8
| (long)(buffer[offset++] & 0xff) << 16
| (long)(buffer[offset++] & 0xff) << 24
| (long)(buffer[offset++] & 0xff) << 32
| (long)(buffer[offset++] & 0xff) << 40
| (long)(buffer[offset++] & 0xff) << 48
| (long)(buffer[offset++] & 0xff) << 56;
}
public static void writeLong(byte[] buffer, int offset, long l)
{
buffer[offset++] = (byte)l;
buffer[offset++] = (byte)(l >>> 8);
buffer[offset++] = (byte)(l >>> 16);
buffer[offset++] = (byte)(l >>> 24);
buffer[offset++] = (byte)(l >>> 32);
buffer[offset++] = (byte)(l >>> 40);
buffer[offset++] = (byte)(l >>> 48);
buffer[offset++] = (byte)(l >>> 56);
}
private static final Logger logger = LoggerFactory.getLogger(Server.class);
}