blob: d0e7d33934b4bb748b186f7f4074c4c4c7f77cff [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package streamer;
import org.apache.log4j.Logger;
/**
* Link to transfer data in bounds of single thread (synchronized transfer).
* Must not be used to send data to elements served in different threads.
*/
public class SyncLink implements Link {
private static final Logger s_logger = Logger.getLogger(SyncLink.class);
/**
* When null packet is pulled from source element, then make slight delay to
* avoid consuming of 100% of CPU in main loop in cases when link is pauses or
* source element cannot produce data right now.
*/
public static final long STANDARD_DELAY_FOR_EMPTY_PACKET = 10; // Milliseconds
/**
* Delay for null packets in poll method when blocking is requested, in
* milliseconds.
*/
protected long delay = STANDARD_DELAY_FOR_EMPTY_PACKET;
/**
* Set to true to print debugging messages.
*/
protected boolean verbose = System.getProperty("streamer.Link.debug", "false").equals("true");;
/**
* ID of this link.
*/
protected String id = null;
/**
* Buffer with data to hold because link is paused, on hold, or data is pushed
* back from output element.
*/
protected ByteBuffer cacheBuffer = null;
/**
* Size of expected packet. Data must be hold in link until full packet will
* be read.
*/
protected int expectedPacketSize = 0;
/**
* Number of packets and packet header transferred to element.
*/
protected int packetNumber = 0;
/**
* Element to pull data from, when in pull mode.
*/
protected Element source = null;
/**
* Element to send data to in both pull and push modes.
*/
protected Element sink = null;
/**
* When in loop, indicates that loop must be stopped.
*
* @see run()
*/
private boolean shutdown = false;
/**
* Indicates that event STREAM_START is passed over this link, so main loop
* can be started to pull data from source element.
*/
protected boolean started = false;
/**
* Set to true to hold all data in link until it will be set to false again.
*/
protected boolean paused = false;
/**
* Used by pull() method to hold all data in this link to avoid recursion when
* source element is asked to push new data to it outputs.
*/
protected boolean hold = false;
/**
* Operate in pull mode instead of default push mode. In pull mode, link will
* ask it source element for new data.
*/
protected boolean pullMode = false;
public SyncLink() {
}
public SyncLink(String id) {
this.id = id;
}
@Override
public void pushBack(ByteBuffer buf) {
if (verbose)
s_logger.debug("[" + this + "] INFO: Buffer pushed back: " + buf + ".");
if (cacheBuffer != null) {
ByteBuffer tmp = cacheBuffer.join(buf);
cacheBuffer.unref();
cacheBuffer = tmp;
} else {
cacheBuffer = buf;
cacheBuffer.ref();
}
resetCursor();
}
private void resetCursor() {
// Reset cursor
cacheBuffer.cursor = 0;
}
@Override
public void pushBack(ByteBuffer buf, int lengthOfFullPacket) {
pushBack(buf);
expectedPacketSize = lengthOfFullPacket;
}
@Override
public String toString() {
return "SyncLink(" + ((id != null) ? id + ", " : "") + source + ":" + sink + ")";
}
/**
* Push data to sink. Call with null to push cached data.
*/
@Override
public void sendData(ByteBuffer buf) {
if (!hold && pullMode)
throw new RuntimeException("[" + this + "] ERROR: link is not in push mode.");
if (verbose)
s_logger.debug("[" + this + "] INFO: Incoming buffer: " + buf + ".");
if (buf == null && cacheBuffer == null)
return;
if (cacheBuffer != null && buf != null) {
// Join old data with fresh data
buf = cacheBuffer.join(buf);
cacheBuffer.unref();
cacheBuffer = buf;
}
// Store buffer in cache field to simplify following loop
if (buf != null)
cacheBuffer = buf;
// When data pushed back and length of data is less than length of full
// packet, then feed data to sink element immediately
while (cacheBuffer != null) {
if (paused || hold) {
if (verbose)
s_logger.debug("[" + this + "] INFO: Transfer is paused. Data in cache buffer: " + cacheBuffer + ".");
// Wait until rest of packet will be read
return;
}
if (expectedPacketSize > 0 && cacheBuffer.length < expectedPacketSize) {
if (verbose)
s_logger.debug("[" + this + "] INFO: Transfer is suspended because available data is less than expected packet size. Expected packet size: "
+ expectedPacketSize + ", data in cache buffer: " + cacheBuffer + ".");
// Wait until rest of packet will be read
return;
}
// Full packet or packet header is read, feed it to element
buf = cacheBuffer;
cacheBuffer = null;
expectedPacketSize = 0;
packetNumber++;
if (sink == null)
throw new NullPointerException("[" + this + "] ERROR: Cannot send data to sink: sink is null. Data: " + buf + ".");
sink.handleData(buf, this);
// cacheBuffer and expectedPacketSize can be changed at this time
}
}
@SuppressWarnings("incomplete-switch")
@Override
public void sendEvent(Event event, Direction direction) {
if (verbose)
s_logger.debug("[" + this + "] INFO: Event " + event + " is received.");
// Shutdown main loop (if any) when STREAM_CLOSE event is received.
switch (event) {
case STREAM_START: {
if (!started)
started = true;
else
// Event already sent trough this link
return;
break;
}
case STREAM_CLOSE: {
if (!shutdown)
shutdown = true;
else
// Event already sent trough this link
return;
break;
}
case LINK_SWITCH_TO_PULL_MODE: {
setPullMode();
break;
}
}
switch (direction) {
case IN:
source.handleEvent(event, direction);
break;
case OUT:
sink.handleEvent(event, direction);
break;
}
}
@Override
public ByteBuffer pull(boolean block) {
if (!pullMode)
throw new RuntimeException("[" + this + "] ERROR: This link is not in pull mode.");
if (hold)
throw new RuntimeException("[" + this + "] ERROR: This link is already on hold, waiting for data to be pulled in. Circular reference?");
if (paused) {
if (verbose)
s_logger.debug("[" + this + "] INFO: Cannot pull, link is paused.");
// Make slight delay in such case, to avoid consuming 100% of CPU
if (block) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
s_logger.info("[ignored] interrupted during pull", e);
}
}
return null;
}
// If data in cache can be sent immediately,
// then return it instead of asking for more data from source
if (cacheBuffer != null && (expectedPacketSize == 0 || (expectedPacketSize > 0 && cacheBuffer.length >= expectedPacketSize))) {
if (verbose)
s_logger.debug("[" + this + "] INFO: Data pulled from cache buffer: " + cacheBuffer + ".");
ByteBuffer tmp = cacheBuffer;
cacheBuffer = null;
return tmp;
}
// Pause this link, so incoming data will not be sent to sink
// immediately, then ask source element for more data
try {
hold = true;
source.poll(block);
} finally {
hold = false;
}
// Can return something only when data was stored in buffer
if (cacheBuffer != null && (expectedPacketSize == 0 || (expectedPacketSize > 0 && cacheBuffer.length >= expectedPacketSize))) {
if (verbose)
s_logger.debug("[" + this + "] INFO: Data pulled from source: " + cacheBuffer + ".");
ByteBuffer tmp = cacheBuffer;
cacheBuffer = null;
return tmp;
} else {
return null;
}
}
@Override
public Element setSink(Element sink) {
if (sink != null && this.sink != null)
throw new RuntimeException("[" + this + "] ERROR: This link sink element is already set. Link: " + this + ", new sink: " + sink + ", existing sink: "
+ this.sink + ".");
if (sink == null && cacheBuffer != null)
throw new RuntimeException("[" + this + "] ERROR: Cannot drop link: cache is not empty. Link: " + this + ", cache: " + cacheBuffer);
this.sink = sink;
return sink;
}
@Override
public Element setSource(Element source) {
if (this.source != null && source != null)
throw new RuntimeException("[" + this + "] ERROR: This link source element is already set. Link: " + this + ", new source: " + source
+ ", existing source: " + this.source + ".");
this.source = source;
return source;
}
@Override
public Element getSource() {
return source;
}
@Override
public Element getSink() {
return sink;
}
@Override
public void pause() {
if (paused)
throw new RuntimeException("[" + this + "] ERROR: Link is already paused.");
paused = true;
}
@Override
public void resume() {
paused = false;
}
/**
* Run pull loop to actively pull data from source and push it to sink. It
* must be only one pull loop per thread.
*
* Pull loop will start after event STREAM_START. This link and source element
* incoming links will be switched to pull mode before pull loop will be
* started using event LINK_SWITCH_TO_PULL_MODE.
*/
@Override
public void run() {
// Wait until even STREAM_START will arrive
while (!started) {
delay();
}
sendEvent(Event.LINK_SWITCH_TO_PULL_MODE, Direction.IN);
if (verbose)
s_logger.debug("[" + this + "] INFO: Starting pull loop.");
// Pull source in loop
while (!shutdown) {
// Pull data from source element and send it to sink element
ByteBuffer data = pull(false);
if (data != null)
sink.handleData(data, this);
if (!shutdown && data == null) {
// Make slight delay to avoid consuming of 100% of CPU
delay();
}
}
if (verbose)
s_logger.debug("[" + this + "] INFO: Pull loop finished.");
}
protected void delay() {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new RuntimeException("[" + this + "] ERROR: Interrupted in main loop.", e);
}
}
@Override
public void setPullMode() {
if (verbose)
s_logger.debug("[" + this + "] INFO: Switching to PULL mode.");
pullMode = true;
}
@Override
public void drop() {
if (pullMode)
throw new RuntimeException("[" + this + "] ERROR: Cannot drop link in pull mode.");
if (cacheBuffer != null)
throw new RuntimeException("[" + this + "] ERROR: Cannot drop link when cache contains data: " + cacheBuffer + ".");
source.dropLink(this);
sink.dropLink(this);
}
}