| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package com.datatorrent.bufferserver.server; |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.SelectionKey; |
| import java.nio.channels.ServerSocketChannel; |
| import java.nio.channels.SocketChannel; |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.Map.Entry; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.datatorrent.bufferserver.internal.DataList; |
| import com.datatorrent.bufferserver.internal.FastDataList; |
| import com.datatorrent.bufferserver.internal.LogicalNode; |
| import com.datatorrent.bufferserver.packet.PayloadTuple; |
| import com.datatorrent.bufferserver.packet.PublishRequestTuple; |
| import com.datatorrent.bufferserver.packet.PurgeRequestTuple; |
| import com.datatorrent.bufferserver.packet.ResetRequestTuple; |
| import com.datatorrent.bufferserver.packet.SubscribeRequestTuple; |
| import com.datatorrent.bufferserver.packet.Tuple; |
| import com.datatorrent.bufferserver.storage.Storage; |
| import com.datatorrent.common.util.NameableThreadFactory; |
| import com.datatorrent.netlet.AbstractLengthPrependerClient; |
| import com.datatorrent.netlet.AbstractServer; |
| import com.datatorrent.netlet.DefaultEventLoop; |
| import com.datatorrent.netlet.EventLoop; |
| import com.datatorrent.netlet.WriteOnlyLengthPrependerClient; |
| import com.datatorrent.netlet.util.VarInt; |
| |
| /** |
| * The buffer server application<p> |
| * <br> |
| * |
| * @since 0.3.2 |
| */ |
| public class Server extends AbstractServer |
| { |
| public static final int DEFAULT_BUFFER_SIZE = 64 * 1024 * 1024; |
| public static final int DEFAULT_NUMBER_OF_CACHED_BLOCKS = 8; |
| private final int port; |
| private String identity; |
| private Storage storage; |
| private final EventLoop eventloop; |
| private final ExecutorService serverHelperExecutor; |
| private final ExecutorService storageHelperExecutor; |
| private volatile CountDownLatch latch; |
| |
| private byte[] authToken; |
| |
| private static final boolean BACK_PRESSURE_ENABLED = !Boolean.getBoolean("org.apache.apex.bufferserver.backpressure.disable"); |
| |
| /** |
| * @param port - port number to bind to or 0 to auto select a free port |
| */ |
| public Server(EventLoop eventloop, int port) |
| { |
| this(eventloop, port, DEFAULT_BUFFER_SIZE, DEFAULT_NUMBER_OF_CACHED_BLOCKS); |
| } |
| |
| public Server(EventLoop eventloop, int port, int blocksize, int numberOfCacheBlocks) |
| { |
| this.eventloop = eventloop; |
| this.port = port; |
| this.blockSize = blocksize; |
| this.numberOfCacheBlocks = numberOfCacheBlocks; |
| serverHelperExecutor = Executors.newSingleThreadExecutor(new NameableThreadFactory("ServerHelper")); |
| final ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(numberOfCacheBlocks); |
| final NameableThreadFactory threadFactory = new NameableThreadFactory("StorageHelper"); |
| storageHelperExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory, |
| new ThreadPoolExecutor.CallerRunsPolicy()); |
| } |
| |
| public void setSpoolStorage(Storage storage) |
| { |
| this.storage = storage; |
| } |
| |
| @Override |
| public void registered(SelectionKey key) |
| { |
| super.registered(key); |
| logger.info("Server started listening at {}", getServerAddress()); |
| latch.countDown(); |
| latch = null; |
| } |
| |
| @Override |
| public void unregistered(SelectionKey key) |
| { |
| for (LogicalNode ln : subscriberGroups.values()) { |
| ln.boot(); |
| } |
| /* |
| * There may be un-register tasks scheduled to run on the event loop that use serverHelperExecutor. |
| */ |
| eventloop.submit(new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| serverHelperExecutor.shutdown(); |
| storageHelperExecutor.shutdown(); |
| try { |
| serverHelperExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS); |
| storageHelperExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS); |
| } catch (InterruptedException ex) { |
| logger.debug("Executor Termination", ex); |
| } |
| logger.info("Server stopped listening at {}", getServerAddress()); |
| latch.countDown(); |
| latch = null; |
| } |
| }); |
| } |
| |
| public InetSocketAddress run() |
| { |
| final CountDownLatch latch = new CountDownLatch(1); |
| this.latch = latch; |
| eventloop.start(null, port, this); |
| try { |
| latch.await(); |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| return (InetSocketAddress)getServerAddress(); |
| } |
| |
| public InetSocketAddress run(long time) |
| { |
| if (time < 0) { |
| throw new IllegalArgumentException(String.format("Wait time %d can't be negative", time)); |
| } |
| final CountDownLatch latch = new CountDownLatch(1); |
| this.latch = latch; |
| eventloop.start(null, port, this); |
| final long deadline = System.currentTimeMillis() + time; |
| try { |
| while (latch.getCount() != 0 && time > 0 && latch.await(time, TimeUnit.MILLISECONDS)) { |
| time = deadline - System.currentTimeMillis(); |
| } |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| return (InetSocketAddress)getServerAddress(); |
| } |
| |
| public void stop() |
| { |
| final CountDownLatch latch = new CountDownLatch(1); |
| this.latch = latch; |
| eventloop.stop(this); |
| try { |
| latch.await(); |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } finally { |
| shutdownExecutors(latch.getCount() == 0); |
| } |
| } |
| |
| public void stop(long time) |
| { |
| if (time < 0) { |
| throw new IllegalArgumentException(String.format("Wait time %d can't be negative", time)); |
| } |
| final CountDownLatch latch = new CountDownLatch(1); |
| this.latch = latch; |
| eventloop.stop(this); |
| final long deadline = System.currentTimeMillis() + time; |
| try { |
| while (latch.getCount() != 0 && time > 0 && latch.await(time, TimeUnit.MILLISECONDS)) { |
| time = deadline - System.currentTimeMillis(); |
| } |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } finally { |
| shutdownExecutors(latch.getCount() == 0); |
| } |
| } |
| |
| private void shutdownExecutors(boolean isTerminated) |
| { |
| if (!isTerminated) { |
| logger.warn("Buffer server {} did not terminate.", this); |
| try { |
| if (!serverHelperExecutor.isTerminated()) { |
| logger.warn("Forcing termination of {}", serverHelperExecutor); |
| serverHelperExecutor.shutdownNow(); |
| } |
| if (!storageHelperExecutor.isTerminated()) { |
| logger.warn("Forcing termination of {}", storageHelperExecutor); |
| storageHelperExecutor.shutdownNow(); |
| } |
| } catch (RuntimeException e) { |
| logger.error("Exception while terminating executors", e); |
| } |
| } |
| } |
| |
| public void setAuthToken(byte[] authToken) |
| { |
| this.authToken = authToken; |
| } |
| |
| /** |
| * |
| * @param args |
| * @throws Exception |
| */ |
| public static void main(String[] args) throws Exception |
| { |
| int port; |
| if (args.length > 0) { |
| port = Integer.parseInt(args[0]); |
| } else { |
| port = 0; |
| } |
| |
| DefaultEventLoop eventloop = DefaultEventLoop.createEventLoop("alone"); |
| Thread thread = eventloop.start(); |
| new Server(eventloop, port).run(); |
| thread.join(); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode()) + "{address=" + getServerAddress() + "}"; |
| } |
| |
| private final ConcurrentHashMap<String, DataList> publisherBuffers = new ConcurrentHashMap<>(1, 0.75f, 1); |
| private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new ConcurrentHashMap<>(); |
| private final ConcurrentHashMap<String, AbstractLengthPrependerClient> publisherChannels = new ConcurrentHashMap<>(); |
| private final int blockSize; |
| private final int numberOfCacheBlocks; |
| |
| private void handlePurgeRequest(PurgeRequestTuple request, final AbstractLengthPrependerClient ctx) throws IOException |
| { |
| DataList dl; |
| dl = publisherBuffers.get(request.getIdentifier()); |
| |
| byte[] message; |
| if (dl == null) { |
| message = ("Invalid identifier '" + request.getIdentifier() + "'").getBytes(); |
| } else { |
| dl.purge((long)request.getBaseSeconds() << 32 | request.getWindowId()); |
| message = ("Request sent for processing: " + request).getBytes(); |
| } |
| |
| final byte[] tuple = PayloadTuple.getSerializedTuple(0, message.length); |
| System.arraycopy(message, 0, tuple, tuple.length - message.length, message.length); |
| if (ctx.write(tuple)) { |
| ctx.write(); |
| } else { |
| logger.error("Failed to deliver purge ack message. {} send buffers are full.", ctx); |
| throw new RuntimeException("Failed to deliver purge ack message. " + ctx + "send buffers are full."); |
| } |
| } |
| |
| public void purge(long windowId) |
| { |
| for (DataList dataList: publisherBuffers.values()) { |
| dataList.purge(windowId); |
| } |
| } |
| |
| private void handleResetRequest(ResetRequestTuple request, final AbstractLengthPrependerClient ctx) throws IOException |
| { |
| DataList dl; |
| dl = publisherBuffers.remove(request.getIdentifier()); |
| |
| byte[] message; |
| if (dl == null) { |
| message = ("Invalid identifier '" + request.getIdentifier() + "'").getBytes(); |
| } else { |
| AbstractLengthPrependerClient channel = publisherChannels.remove(request.getIdentifier()); |
| if (channel != null) { |
| eventloop.disconnect(channel); |
| } |
| dl.reset(); |
| message = ("Request sent for processing: " + request).getBytes(); |
| } |
| |
| final byte[] tuple = PayloadTuple.getSerializedTuple(0, message.length); |
| System.arraycopy(message, 0, tuple, tuple.length - message.length, message.length); |
| if (ctx.write(tuple)) { |
| ctx.write(); |
| } else { |
| logger.error("Failed to deliver reset ack message. {} send buffers are full.", ctx); |
| throw new RuntimeException("Failed to deliver reset ack message. " + ctx + "send buffers are full."); |
| } |
| } |
| |
| /** |
| * |
| * @param request |
| * @param key |
| */ |
| private void handleSubscriberRequest(final SubscribeRequestTuple request, final SelectionKey key) |
| { |
| try { |
| serverHelperExecutor.submit(new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| final String upstream_identifier = request.getUpstreamIdentifier(); |
| |
| /* |
| * if there is already a datalist registered for the type in which this client is interested, |
| * then get a iterator on the data items of that data list. If the datalist is not registered, |
| * then create one and register it. Hopefully this one would be used by future upstream nodes. |
| */ |
| DataList dl = publisherBuffers.get(upstream_identifier); |
| if (dl == null) { |
| dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? |
| new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks, BACK_PRESSURE_ENABLED) : |
| new DataList(upstream_identifier, blockSize, numberOfCacheBlocks, BACK_PRESSURE_ENABLED); |
| DataList odl = publisherBuffers.putIfAbsent(upstream_identifier, dl); |
| if (odl != null) { |
| dl = odl; |
| } |
| } |
| |
| final String identifier = request.getIdentifier(); |
| final String type = request.getStreamType(); |
| final long skipWindowId = (long)request.getBaseSeconds() << 32 | request.getWindowId(); |
| final LogicalNode ln = new LogicalNode(identifier, upstream_identifier, type, dl |
| .newIterator(skipWindowId), skipWindowId, eventloop); |
| |
| int mask = request.getMask(); |
| if (mask != 0) { |
| for (Integer bs : request.getPartitions()) { |
| ln.addPartition(bs, mask); |
| } |
| } |
| final LogicalNode oln = subscriberGroups.put(type, ln); |
| if (oln != null) { |
| oln.boot(); |
| } |
| final Subscriber subscriber = new Subscriber(ln, request.getBufferSize()); |
| eventloop.submit(new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| key.attach(subscriber); |
| subscriber.registered(key); |
| subscriber.connected(); |
| } |
| }); |
| } |
| }); |
| } catch (RejectedExecutionException e) { |
| logger.error("Received subscriber request {} after server {} termination. Disconnecting {}.", request, this, key.channel(), e); |
| if (key.isValid()) { |
| try { |
| key.channel().close(); |
| } catch (IOException ioe) { |
| logger.error("Failed to close channel {}", key.channel(), ioe); |
| } |
| } |
| } |
| } |
| |
| private void handleSubscriberTeardown(final SelectionKey key) |
| { |
| try { |
| final Subscriber subscriber = (Subscriber)key.attachment(); |
| if (subscriber != null) { |
| serverHelperExecutor.submit(new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| try { |
| final LogicalNode ln = subscriber.ln; |
| if (ln != null) { |
| ln.removeChannel(subscriber); |
| if (ln.getPhysicalNodeCount() == 0) { |
| DataList dl = publisherBuffers.get(ln.getUpstream()); |
| if (dl != null) { |
| logger.info("Removing ln {} from dl {}", ln, dl); |
| dl.removeDataListener(ln); |
| } |
| subscriberGroups.remove(ln.getGroup(), ln); |
| ln.getIterator().close(); |
| } |
| subscriber.ln = null; |
| } |
| } catch (Throwable t) { |
| logger.error("Buffer server {} failed to tear down subscriber {}.", Server.this, subscriber, t); |
| } |
| } |
| |
| @Override |
| public String toString() |
| { |
| return subscriber + " teardown task."; |
| } |
| }); |
| } else { |
| logger.error("Selection key {} has unexpected attachment {}.", key, key.attachment()); |
| } |
| } catch (ClassCastException e) { |
| logger.error("Selection key {} has unexpected attachment {}.", key, key.attachment()); |
| } catch (RejectedExecutionException e) { |
| logger.error("Subscriber {} teardown after server {} termination.", key.attachment(), this, e); |
| } |
| } |
| |
| /** |
| * |
| * @param request |
| * @param connection |
| * @return |
| */ |
| public DataList handlePublisherRequest(PublishRequestTuple request, AbstractLengthPrependerClient connection) |
| { |
| String identifier = request.getIdentifier(); |
| |
| DataList dl = publisherBuffers.get(identifier); |
| |
| if (dl != null) { |
| /* |
| * close previous connection with the same identifier which is guaranteed to be unique. |
| */ |
| AbstractLengthPrependerClient previous = publisherChannels.put(identifier, connection); |
| if (previous != null) { |
| eventloop.disconnect(previous); |
| } |
| |
| try { |
| dl.rewind(request.getBaseSeconds(), request.getWindowId()); |
| } catch (IOException ie) { |
| throw new RuntimeException(ie); |
| } |
| } else { |
| dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? |
| new FastDataList(identifier, blockSize, numberOfCacheBlocks, BACK_PRESSURE_ENABLED) : |
| new DataList(identifier, blockSize, numberOfCacheBlocks, BACK_PRESSURE_ENABLED); |
| DataList odl = publisherBuffers.putIfAbsent(identifier, dl); |
| if (odl != null) { |
| dl = odl; |
| } |
| } |
| dl.setSecondaryStorage(storage, storageHelperExecutor); |
| |
| return dl; |
| } |
| |
| @Override |
| public ClientListener getClientConnection(SocketChannel sc, ServerSocketChannel ssc) |
| { |
| ClientListener client; |
| if (authToken == null) { |
| client = new UnidentifiedClient(); |
| } else { |
| AuthClient authClient = new AuthClient(); |
| authClient.setToken(authToken); |
| client = authClient; |
| } |
| return client; |
| } |
| |
| @Override |
| public void handleException(Exception cce, EventLoop el) |
| { |
| if (cce instanceof RuntimeException) { |
| throw (RuntimeException)cce; |
| } |
| |
| throw new RuntimeException(cce); |
| } |
| |
| class AuthClient extends com.datatorrent.bufferserver.client.AuthClient |
| { |
| boolean ignore; |
| |
| @Override |
| public void onMessage(byte[] buffer, int offset, int size) |
| { |
| if (ignore) { |
| return; |
| } |
| |
| authenticateMessage(buffer, offset, size); |
| |
| unregistered(key); |
| UnidentifiedClient client = new UnidentifiedClient(); |
| key.attach(client); |
| key.interestOps(SelectionKey.OP_READ); |
| client.registered(key); |
| client.connected(); |
| |
| int len = writeOffset - readOffset - size; |
| if (len > 0) { |
| client.transferBuffer(buffer, readOffset + size, len); |
| } |
| |
| ignore = true; |
| } |
| } |
| |
| class UnidentifiedClient extends SeedDataClient |
| { |
| boolean ignore; |
| |
| @Override |
| public void onMessage(byte[] buffer, int offset, int size) |
| { |
| if (ignore) { |
| return; |
| } |
| |
| Tuple request = Tuple.getTuple(buffer, offset, size); |
| switch (request.getType()) { |
| case PUBLISHER_REQUEST: |
| |
| /* |
| * unregister the unidentified client since its job is done! |
| */ |
| unregistered(key); |
| logger.info("Received publisher request: {}", request); |
| PublishRequestTuple publisherRequest = (PublishRequestTuple)request; |
| |
| DataList dl = handlePublisherRequest(publisherRequest, this); |
| dl.setAutoFlushExecutor(serverHelperExecutor); |
| |
| Publisher publisher; |
| if (publisherRequest.getVersion().equals(Tuple.FAST_VERSION)) { |
| publisher = new Publisher(dl, (long)request.getBaseSeconds() << 32 | request.getWindowId()) |
| { |
| @Override |
| public int readSize() |
| { |
| if (writeOffset - readOffset < 2) { |
| return -1; |
| } |
| |
| short s = buffer[readOffset++]; |
| return s | (buffer[readOffset++] << 8); |
| } |
| |
| }; |
| } else { |
| publisher = new Publisher(dl, (long)request.getBaseSeconds() << 32 | request.getWindowId()); |
| } |
| |
| key.attach(publisher); |
| key.interestOps(SelectionKey.OP_READ); |
| publisher.registered(key); |
| |
| int len = writeOffset - readOffset - size; |
| if (len > 0) { |
| publisher.transferBuffer(this.buffer, readOffset + size, len); |
| } |
| ignore = true; |
| |
| break; |
| |
| case SUBSCRIBER_REQUEST: |
| /* |
| * unregister the unidentified client since its job is done! |
| */ |
| unregistered(key.interestOps(0)); |
| ignore = true; |
| logger.info("Received subscriber request: {}", request); |
| |
| handleSubscriberRequest((SubscribeRequestTuple)request, key); |
| break; |
| |
| case PURGE_REQUEST: |
| logger.info("Received purge request: {}", request); |
| try { |
| handlePurgeRequest((PurgeRequestTuple)request, this); |
| } catch (IOException io) { |
| throw new RuntimeException(io); |
| } |
| break; |
| |
| case RESET_REQUEST: |
| logger.info("Received reset all request: {}", request); |
| try { |
| handleResetRequest((ResetRequestTuple)request, this); |
| } catch (IOException io) { |
| throw new RuntimeException(io); |
| } |
| break; |
| |
| default: |
| throw new RuntimeException("unexpected message: " + request.toString()); |
| } |
| } |
| |
| } |
| |
| private class Subscriber extends WriteOnlyLengthPrependerClient |
| { |
| private LogicalNode ln; |
| |
| Subscriber(LogicalNode ln, int bufferSize) |
| { |
| super(1024 * 1024, bufferSize == 0 ? 256 * 1024 : bufferSize); |
| this.ln = ln; |
| ln.addConnection(this); |
| } |
| |
| @Override |
| public void connected() |
| { |
| super.connected(); |
| serverHelperExecutor.submit(new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| final DataList dl = publisherBuffers.get(ln.getUpstream()); |
| if (dl != null) { |
| dl.addDataListener(ln); |
| } else { |
| logger.error("Disconnecting {} with no matching data list.", this); |
| ln.boot(); |
| } |
| } |
| }); |
| } |
| |
| @Override |
| public void unregistered(final SelectionKey key) |
| { |
| handleSubscriberTeardown(key); |
| super.unregistered(key); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode()) + "{ln=" + ln + "}"; |
| } |
| } |
| |
| /** |
| * When the publisher connects to the server and starts publishing the data, |
| * this is the end on the server side which handles all the communication. |
| * |
| */ |
| class Publisher extends SeedDataClient |
| { |
| private final DataList datalist; |
| boolean dirty; |
| |
| Publisher(DataList dl, long windowId) |
| { |
| super(dl.getBuffer(windowId), dl.getPosition(), 1024); |
| this.datalist = dl; |
| } |
| |
| @Override |
| public void onMessage(byte[] buffer, int offset, int size) |
| { |
| //if (buffer[offset] == MessageType.BEGIN_WINDOW_VALUE || buffer[offset] == MessageType.END_WINDOW_VALUE) { |
| // logger.debug("server received {}", Tuple.getTuple(buffer, offset, size)); |
| //} |
| dirty = true; |
| } |
| |
| /** |
| * Schedules a task to conditionally resume I/O channel read operations. |
| * No-op if {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ} |
| * is already set in the key {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}. |
| * Otherwise, calls {@linkplain #read(int) read(0)} to process data |
| * left in the Publisher read buffer and registers {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ} |
| * in the key {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}. |
| * @return true |
| */ |
| @Override |
| public boolean resumeReadIfSuspended() |
| { |
| eventloop.submit(new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| final int interestOps = key.interestOps(); |
| if ((interestOps & SelectionKey.OP_READ) == 0) { |
| if (readExt(0)) { |
| logger.debug("Resuming read on key {} with attachment {}", key, key.attachment()); |
| key.interestOps(interestOps | SelectionKey.OP_READ); |
| } else { |
| logger.debug("Keeping read on key {} with attachment {} suspended. ", key, key.attachment(), datalist); |
| datalist.notifyListeners(); |
| } |
| } |
| } |
| }); |
| return true; |
| } |
| |
| @Override |
| public void read(int len) |
| { |
| readExt(len); |
| } |
| |
| private boolean readExt(int len) |
| { |
| //logger.debug("read {} bytes", len); |
| writeOffset += len; |
| do { |
| if (size <= 0) { |
| switch (size = readSize()) { |
| case -1: |
| if (writeOffset == buffer.length) { |
| if (readOffset > writeOffset - 5) { |
| dirty = false; |
| datalist.flush(writeOffset); |
| /* |
| * if the data is not corrupt, we are limited by space to receive full varint. |
| * so we allocate a new byteBuffer and copy over the partially written data to the |
| * new byteBuffer and start as if we always had full room but not enough data. |
| */ |
| if (!switchToNewBufferOrSuspendRead(buffer, readOffset, size + VarInt.getSize(size))) { |
| return false; |
| } |
| } |
| } else if (dirty) { |
| dirty = false; |
| datalist.flush(writeOffset); |
| } |
| return true; |
| |
| case 0: |
| continue; |
| |
| default: |
| break; |
| } |
| } |
| |
| if (writeOffset - readOffset >= size) { |
| onMessage(buffer, readOffset, size); |
| readOffset += size; |
| size = 0; |
| } else { |
| if (writeOffset == buffer.length) { |
| dirty = false; |
| datalist.flush(writeOffset); |
| /* |
| * hit wall while writing serialized data, so have to allocate a new byteBuffer. |
| */ |
| if (!switchToNewBufferOrSuspendRead(buffer, readOffset - VarInt.getSize(size), size + VarInt.getSize(size))) { |
| readOffset -= VarInt.getSize(size); |
| size = 0; |
| return false; |
| } |
| size = 0; |
| } else if (dirty) { |
| dirty = false; |
| datalist.flush(writeOffset); |
| } |
| return true; |
| } |
| } while (true); |
| } |
| |
| private boolean switchToNewBufferOrSuspendRead(final byte[] array, final int offset, final int size) |
| { |
| if (switchToNewBuffer(array, offset, size)) { |
| return true; |
| } |
| datalist.suspendRead(this); |
| return false; |
| } |
| |
| private boolean switchToNewBuffer(final byte[] array, final int offset, final int size) |
| { |
| if ((datalist.isMemoryBlockAvailable() || ((storage == null)) && !datalist.areSubscribersBehindByMax())) { |
| final byte[] newBuffer = datalist.newBuffer(size); |
| byteBuffer = ByteBuffer.wrap(newBuffer); |
| if (array == null || array.length - offset == 0) { |
| writeOffset = 0; |
| } else { |
| writeOffset = array.length - offset; |
| System.arraycopy(buffer, offset, newBuffer, 0, writeOffset); |
| byteBuffer.position(writeOffset); |
| } |
| buffer = newBuffer; |
| readOffset = 0; |
| datalist.addBuffer(buffer); |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public void unregistered(final SelectionKey key) |
| { |
| super.unregistered(key); |
| teardown(); |
| } |
| |
| @Override |
| public void handleException(Exception cce, EventLoop el) |
| { |
| teardown(); |
| |
| if (cce instanceof RejectedExecutionException && serverHelperExecutor.isTerminated()) { |
| logger.warn("Terminated Executor Exception for {}.", this, cce); |
| el.disconnect(this); |
| } else { |
| super.handleException(cce, el); |
| } |
| } |
| |
| @Override |
| public String toString() |
| { |
| return getClass().getName() + '@' + Integer.toHexString(hashCode()) + " {datalist=" + datalist + '}'; |
| } |
| |
| private volatile boolean torndown; |
| |
| private void teardown() |
| { |
| //logger.debug("Teardown is being called {}", torndown, new Exception()); |
| if (torndown) { |
| return; |
| } |
| torndown = true; |
| |
| /* |
| * if the publisher unregistered, all the downstream guys are going to be unregistered anyways |
| * in our world. So it makes sense to kick them out proactively. Otherwise these clients since |
| * are not being written to, just stick around till the next publisher shows up and eat into |
| * the data it's publishing for the new subscribers. |
| */ |
| |
| /** |
| * since the publisher server died, the queue which it was using would stop pumping the data unless |
| * a new publisher comes up with the same name. We leave it to the stream to decide when to bring up a new node |
| * with the same identifier as the one which just died. |
| */ |
| if (publisherChannels.containsValue(this)) { |
| final Iterator<Entry<String, AbstractLengthPrependerClient>> i = publisherChannels.entrySet().iterator(); |
| while (i.hasNext()) { |
| if (i.next().getValue() == this) { |
| i.remove(); |
| break; |
| } |
| } |
| } |
| |
| ArrayList<LogicalNode> list = new ArrayList<>(); |
| String publisherIdentifier = datalist.getIdentifier(); |
| Iterator<LogicalNode> iterator = subscriberGroups.values().iterator(); |
| while (iterator.hasNext()) { |
| LogicalNode ln = iterator.next(); |
| if (publisherIdentifier.equals(ln.getUpstream())) { |
| list.add(ln); |
| } |
| } |
| |
| for (LogicalNode ln : list) { |
| ln.boot(); |
| } |
| } |
| |
| } |
| |
| abstract class SeedDataClient extends AbstractLengthPrependerClient |
| { |
| |
| public SeedDataClient() |
| { |
| } |
| |
| public SeedDataClient(int readBufferSize, int sendBufferSize) |
| { |
| super(readBufferSize, sendBufferSize); |
| } |
| |
| public SeedDataClient(byte[] readbuffer, int position, int sendBufferSize) |
| { |
| super(readbuffer, position, sendBufferSize); |
| } |
| |
| public void transferBuffer(byte[] array, int offset, int len) |
| { |
| int remainingCapacity; |
| do { |
| remainingCapacity = buffer.length - writeOffset; |
| if (len < remainingCapacity) { |
| remainingCapacity = len; |
| byteBuffer.position(writeOffset + remainingCapacity); |
| } else { |
| byteBuffer.position(buffer.length); |
| } |
| System.arraycopy(array, offset, buffer, writeOffset, remainingCapacity); |
| read(remainingCapacity); |
| |
| offset += remainingCapacity; |
| } while ((len -= remainingCapacity) > 0); |
| } |
| } |
| |
| private static final Logger logger = LoggerFactory.getLogger(Server.class); |
| } |