blob: da1a8aae41876e9311c8321b397bf3d4c79ee604 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.flume.operator;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.flume.discovery.Discovery;
import org.apache.apex.malhar.flume.sink.Server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flume.Event;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Stats.OperatorStats;
import com.datatorrent.api.StreamCodec;
import org.apache.apex.malhar.flume.discovery.ZKAssistedDiscovery;
import com.datatorrent.netlet.AbstractLengthPrependerClient;
import com.datatorrent.netlet.DefaultEventLoop;
import com.datatorrent.netlet.util.Slice;
import static java.lang.Thread.sleep;
/**
* <p>
* Abstract AbstractFlumeInputOperator class.</p>
*
* @param <T> Type of the output payload.
* @since 0.9.2
*/
public abstract class AbstractFlumeInputOperator<T>
implements InputOperator, Operator.ActivationListener<OperatorContext>, Operator.IdleTimeHandler,
Operator.CheckpointListener, Partitioner<AbstractFlumeInputOperator<T>>
{
public final transient DefaultOutputPort<T> output = new DefaultOutputPort<T>();
public final transient DefaultOutputPort<Slice> drop = new DefaultOutputPort<Slice>();
@NotNull
private String[] connectionSpecs;
@NotNull
private StreamCodec<Event> codec;
private final ArrayList<RecoveryAddress> recoveryAddresses;
@SuppressWarnings("FieldMayBeFinal") // it's not final because that mucks with the serialization somehow
private transient ArrayBlockingQueue<Slice> handoverBuffer;
private transient int idleCounter;
private transient int eventCounter;
private transient DefaultEventLoop eventloop;
private transient volatile boolean connected;
private transient OperatorContext context;
private transient Client client;
private transient long windowId;
private transient byte[] address;
@Min(0)
private long maxEventsPerSecond;
//This is calculated from maxEventsPerSecond, App window count and streaming window size
private transient long maxEventsPerWindow;
public AbstractFlumeInputOperator()
{
handoverBuffer = new ArrayBlockingQueue<Slice>(1024 * 5);
connectionSpecs = new String[0];
recoveryAddresses = new ArrayList<RecoveryAddress>();
maxEventsPerSecond = Long.MAX_VALUE;
}
@Override
public void setup(OperatorContext context)
{
long windowDurationMillis = context.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) *
context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
maxEventsPerWindow = (long)(windowDurationMillis / 1000.0 * maxEventsPerSecond);
logger.debug("max-events per-second {} per-window {}", maxEventsPerSecond, maxEventsPerWindow);
try {
eventloop = new DefaultEventLoop("EventLoop-" + context.getId());
eventloop.start();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
@Override
@SuppressWarnings({"unchecked"})
public void activate(OperatorContext ctx)
{
if (connectionSpecs.length == 0) {
logger.info("Discovered zero DTFlumeSink");
} else if (connectionSpecs.length == 1) {
for (String connectAddresse: connectionSpecs) {
logger.debug("Connection spec is {}", connectAddresse);
String[] parts = connectAddresse.split(":");
eventloop.connect(new InetSocketAddress(parts[1], Integer.parseInt(parts[2])), client = new Client(parts[0]));
}
} else {
throw new IllegalArgumentException(
String.format("A physical %s operator cannot connect to more than 1 addresses!",
this.getClass().getSimpleName()));
}
context = ctx;
}
@Override
public void beginWindow(long windowId)
{
this.windowId = windowId;
idleCounter = 0;
eventCounter = 0;
}
@Override
public void emitTuples()
{
int i = handoverBuffer.size();
if (i > 0 && eventCounter < maxEventsPerWindow) {
while (--i > 0 && eventCounter < maxEventsPerWindow - 1) {
final Slice slice = handoverBuffer.poll();
slice.offset += 8;
slice.length -= 8;
T convert = convert((Event)codec.fromByteArray(slice));
if (convert == null) {
drop.emit(slice);
} else {
output.emit(convert);
}
eventCounter++;
}
final Slice slice = handoverBuffer.poll();
slice.offset += 8;
slice.length -= 8;
T convert = convert((Event)codec.fromByteArray(slice));
if (convert == null) {
drop.emit(slice);
} else {
output.emit(convert);
}
eventCounter++;
address = Arrays.copyOfRange(slice.buffer, slice.offset - 8, slice.offset);
}
}
@Override
public void endWindow()
{
if (connected) {
byte[] array = new byte[Server.Request.FIXED_SIZE];
array[0] = Server.Command.WINDOWED.getOrdinal();
Server.writeInt(array, 1, eventCounter);
Server.writeInt(array, 5, idleCounter);
Server.writeLong(array, Server.Request.TIME_OFFSET, System.currentTimeMillis());
logger.debug("wrote {} with eventCounter = {} and idleCounter = {}", Server.Command.WINDOWED, eventCounter, idleCounter);
client.write(array);
}
if (address != null) {
RecoveryAddress rAddress = new RecoveryAddress();
rAddress.address = address;
address = null;
rAddress.windowId = windowId;
recoveryAddresses.add(rAddress);
}
}
@Override
public void deactivate()
{
if (connected) {
eventloop.disconnect(client);
}
context = null;
}
@Override
public void teardown()
{
eventloop.stop();
eventloop = null;
}
@Override
public void handleIdleTime()
{
idleCounter++;
try {
sleep(context.getValue(OperatorContext.SPIN_MILLIS));
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
public abstract T convert(Event event);
/**
* @return the connectAddress
*/
public String[] getConnectAddresses()
{
return connectionSpecs.clone();
}
/**
* @param specs - sinkid:host:port specification of all the sinks.
*/
public void setConnectAddresses(String[] specs)
{
this.connectionSpecs = specs.clone();
}
/**
* @return the codec
*/
public StreamCodec<Event> getCodec()
{
return codec;
}
/**
* @param codec the codec to set
*/
public void setCodec(StreamCodec<Event> codec)
{
this.codec = codec;
}
private static class RecoveryAddress implements Serializable
{
long windowId;
byte[] address;
@Override
public String toString()
{
return "RecoveryAddress{" + "windowId=" + windowId + ", address=" + Arrays.toString(address) + '}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof RecoveryAddress)) {
return false;
}
RecoveryAddress that = (RecoveryAddress)o;
if (windowId != that.windowId) {
return false;
}
return Arrays.equals(address, that.address);
}
@Override
public int hashCode()
{
int result = (int)(windowId ^ (windowId >>> 32));
result = 31 * result + (address != null ? Arrays.hashCode(address) : 0);
return result;
}
private static final long serialVersionUID = 201312021432L;
}
@Override
public void checkpointed(long windowId)
{
/* dont do anything */
}
@Override
public void committed(long windowId)
{
if (!connected) {
return;
}
synchronized (recoveryAddresses) {
byte[] addr = null;
Iterator<RecoveryAddress> iterator = recoveryAddresses.iterator();
while (iterator.hasNext()) {
RecoveryAddress ra = iterator.next();
if (ra.windowId > windowId) {
break;
}
iterator.remove();
if (ra.address != null) {
addr = ra.address;
}
}
if (addr != null) {
/*
* Make sure that we store the last valid address processed
*/
if (recoveryAddresses.isEmpty()) {
RecoveryAddress ra = new RecoveryAddress();
ra.address = addr;
recoveryAddresses.add(ra);
}
int arraySize = 1/* for the type of the message */
+ 8 /* for the location to commit */
+ 8 /* for storing the current time stamp*/;
byte[] array = new byte[arraySize];
array[0] = Server.Command.COMMITTED.getOrdinal();
System.arraycopy(addr, 0, array, 1, 8);
Server.writeLong(array, Server.Request.TIME_OFFSET, System.currentTimeMillis());
logger.debug("wrote {} with recoveryOffset = {}", Server.Command.COMMITTED, Arrays.toString(addr));
client.write(array);
}
}
}
@Override
public Collection<Partition<AbstractFlumeInputOperator<T>>> definePartitions(
Collection<Partition<AbstractFlumeInputOperator<T>>> partitions, PartitioningContext context)
{
Collection<Discovery.Service<byte[]>> discovered = discoveredFlumeSinks.get();
if (discovered == null) {
return partitions;
}
HashMap<String, ArrayList<RecoveryAddress>> allRecoveryAddresses = abandonedRecoveryAddresses.get();
ArrayList<String> allConnectAddresses = new ArrayList<String>(partitions.size());
for (Partition<AbstractFlumeInputOperator<T>> partition: partitions) {
String[] lAddresses = partition.getPartitionedInstance().connectionSpecs;
allConnectAddresses.addAll(Arrays.asList(lAddresses));
for (int i = lAddresses.length; i-- > 0;) {
String[] parts = lAddresses[i].split(":", 2);
allRecoveryAddresses.put(parts[0], partition.getPartitionedInstance().recoveryAddresses);
}
}
HashMap<String, String> connections = new HashMap<String, String>(discovered.size());
for (Discovery.Service<byte[]> service: discovered) {
String previousSpec = connections.get(service.getId());
String newspec = service.getId() + ':' + service.getHost() + ':' + service.getPort();
if (previousSpec == null) {
connections.put(service.getId(), newspec);
} else {
boolean found = false;
for (ConnectionStatus cs: partitionedInstanceStatus.get().values()) {
if (previousSpec.equals(cs.spec) && !cs.connected) {
connections.put(service.getId(), newspec);
found = true;
break;
}
}
if (!found) {
logger.warn("2 sinks found with the same id: {} and {}... Ignoring previous.", previousSpec, newspec);
connections.put(service.getId(), newspec);
}
}
}
for (int i = allConnectAddresses.size(); i-- > 0;) {
String[] parts = allConnectAddresses.get(i).split(":");
String connection = connections.remove(parts[0]);
if (connection == null) {
allConnectAddresses.remove(i);
} else {
allConnectAddresses.set(i, connection);
}
}
allConnectAddresses.addAll(connections.values());
partitions.clear();
try {
if (allConnectAddresses.isEmpty()) {
/* return at least one of them; otherwise stram becomes grumpy */
@SuppressWarnings("unchecked")
AbstractFlumeInputOperator<T> operator = getClass().newInstance();
operator.setCodec(codec);
operator.setMaxEventsPerSecond(maxEventsPerSecond);
for (ArrayList<RecoveryAddress> lRecoveryAddresses: allRecoveryAddresses.values()) {
operator.recoveryAddresses.addAll(lRecoveryAddresses);
}
operator.connectionSpecs = new String[allConnectAddresses.size()];
for (int i = connectionSpecs.length; i-- > 0;) {
connectionSpecs[i] = allConnectAddresses.get(i);
}
partitions.add(new DefaultPartition<AbstractFlumeInputOperator<T>>(operator));
} else {
long maxEventsPerSecondPerOperator = maxEventsPerSecond / allConnectAddresses.size();
for (int i = allConnectAddresses.size(); i-- > 0;) {
@SuppressWarnings("unchecked")
AbstractFlumeInputOperator<T> operator = getClass().newInstance();
operator.setCodec(codec);
operator.setMaxEventsPerSecond(maxEventsPerSecondPerOperator);
String connectAddress = allConnectAddresses.get(i);
operator.connectionSpecs = new String[] {connectAddress};
String[] parts = connectAddress.split(":", 2);
ArrayList<RecoveryAddress> remove = allRecoveryAddresses.remove(parts[0]);
if (remove != null) {
operator.recoveryAddresses.addAll(remove);
}
partitions.add(new DefaultPartition<AbstractFlumeInputOperator<T>>(operator));
}
}
} catch (IllegalAccessException ex) {
throw new RuntimeException(ex);
} catch (InstantiationException ex) {
throw new RuntimeException(ex);
}
logger.debug("Requesting partitions: {}", partitions);
return partitions;
}
@Override
public void partitioned(Map<Integer, Partition<AbstractFlumeInputOperator<T>>> partitions)
{
logger.debug("Partitioned Map: {}", partitions);
HashMap<Integer, ConnectionStatus> map = partitionedInstanceStatus.get();
map.clear();
for (Entry<Integer, Partition<AbstractFlumeInputOperator<T>>> entry: partitions.entrySet()) {
if (map.containsKey(entry.getKey())) {
// what can be done here?
} else {
map.put(entry.getKey(), null);
}
}
}
@Override
public String toString()
{
return "AbstractFlumeInputOperator{" + "connected=" + connected + ", connectionSpecs=" +
(connectionSpecs.length == 0 ? "empty" : connectionSpecs[0]) + ", recoveryAddresses=" + recoveryAddresses + '}';
}
class Client extends AbstractLengthPrependerClient
{
private final String id;
Client(String id)
{
this.id = id;
}
@Override
public void onMessage(byte[] buffer, int offset, int size)
{
try {
handoverBuffer.put(new Slice(buffer, offset, size));
} catch (InterruptedException ex) {
handleException(ex, eventloop);
}
}
@Override
public void connected()
{
super.connected();
byte[] address;
synchronized (recoveryAddresses) {
if (recoveryAddresses.size() > 0) {
address = recoveryAddresses.get(recoveryAddresses.size() - 1).address;
} else {
address = new byte[8];
}
}
int len = 1 /* for the message type SEEK */
+ 8 /* for the address */
+ 8 /* for storing the current time stamp*/;
byte[] array = new byte[len];
array[0] = Server.Command.SEEK.getOrdinal();
System.arraycopy(address, 0, array, 1, 8);
Server.writeLong(array, 9, System.currentTimeMillis());
write(array);
connected = true;
ConnectionStatus connectionStatus = new ConnectionStatus();
connectionStatus.connected = true;
connectionStatus.spec = connectionSpecs[0];
OperatorContext ctx = context;
synchronized (ctx) {
logger.debug("{} Submitting ConnectionStatus = {}", AbstractFlumeInputOperator.this, connectionStatus);
context.setCounters(connectionStatus);
}
}
@Override
public void disconnected()
{
connected = false;
ConnectionStatus connectionStatus = new ConnectionStatus();
connectionStatus.connected = false;
connectionStatus.spec = connectionSpecs[0];
OperatorContext ctx = context;
synchronized (ctx) {
logger.debug("{} Submitting ConnectionStatus = {}", AbstractFlumeInputOperator.this, connectionStatus);
context.setCounters(connectionStatus);
}
super.disconnected();
}
}
public static class ZKStatsListner extends ZKAssistedDiscovery implements com.datatorrent.api.StatsListener,
Serializable
{
/*
* In the current design, one input operator is able to connect
* to only one flume adapter. Sometime in future, we should support
* any number of input operators connecting to any number of flume
* sinks and vice a versa.
*
* Until that happens the following map should be sufficient to
* keep track of which input operator is connected to which flume sink.
*/
long intervalMillis;
private final Response response;
private transient long nextMillis;
public ZKStatsListner()
{
intervalMillis = 60 * 1000L;
response = new Response();
}
@Override
public Response processStats(BatchedOperatorStats stats)
{
final HashMap<Integer, ConnectionStatus> map = partitionedInstanceStatus.get();
response.repartitionRequired = false;
Object lastStat = null;
List<OperatorStats> lastWindowedStats = stats.getLastWindowedStats();
for (OperatorStats os: lastWindowedStats) {
if (os.counters != null) {
lastStat = os.counters;
logger.debug("Received custom stats = {}", lastStat);
}
}
if (lastStat instanceof ConnectionStatus) {
ConnectionStatus cs = (ConnectionStatus)lastStat;
map.put(stats.getOperatorId(), cs);
if (!cs.connected) {
logger.debug("setting repatitioned = true because of lastStat = {}", lastStat);
response.repartitionRequired = true;
}
}
if (System.currentTimeMillis() >= nextMillis) {
logger.debug("nextMillis = {}", nextMillis);
try {
super.setup(null);
Collection<Discovery.Service<byte[]>> addresses;
try {
addresses = discover();
} finally {
super.teardown();
}
AbstractFlumeInputOperator.discoveredFlumeSinks.set(addresses);
logger.debug("\ncurrent map = {}\ndiscovered sinks = {}", map, addresses);
switch (addresses.size()) {
case 0:
response.repartitionRequired = map.size() != 1;
break;
default:
if (addresses.size() == map.size()) {
for (ConnectionStatus value: map.values()) {
if (value == null || !value.connected) {
response.repartitionRequired = true;
break;
}
}
} else {
response.repartitionRequired = true;
}
break;
}
} catch (Error er) {
throw er;
} catch (Throwable cause) {
logger.warn("Unable to discover services, using values from last successful discovery", cause);
} finally {
nextMillis = System.currentTimeMillis() + intervalMillis;
logger.debug("Proposed NextMillis = {}", nextMillis);
}
}
return response;
}
/**
* @return the intervalMillis
*/
public long getIntervalMillis()
{
return intervalMillis;
}
/**
* @param intervalMillis the intervalMillis to set
*/
public void setIntervalMillis(long intervalMillis)
{
this.intervalMillis = intervalMillis;
}
private static final long serialVersionUID = 201312241646L;
}
public static class ConnectionStatus implements Serializable
{
int id;
String spec;
boolean connected;
@Override
public int hashCode()
{
return spec.hashCode();
}
@Override
public boolean equals(Object obj)
{
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final ConnectionStatus other = (ConnectionStatus)obj;
return spec == null ? other.spec == null : spec.equals(other.spec);
}
@Override
public String toString()
{
return "ConnectionStatus{" + "id=" + id + ", spec=" + spec + ", connected=" + connected + '}';
}
private static final long serialVersionUID = 201312261615L;
}
private static final transient ThreadLocal<HashMap<Integer, ConnectionStatus>> partitionedInstanceStatus =
new ThreadLocal<HashMap<Integer, ConnectionStatus>>()
{
@Override
protected HashMap<Integer, ConnectionStatus> initialValue()
{
return new HashMap<Integer, ConnectionStatus>();
}
};
/**
* When a sink goes away and a replacement sink is not found, we stash the recovery addresses associated
* with the sink in a hope that the new sink may show up in near future.
*/
private static final transient ThreadLocal<HashMap<String, ArrayList<RecoveryAddress>>> abandonedRecoveryAddresses =
new ThreadLocal<HashMap<String, ArrayList<RecoveryAddress>>>()
{
@Override
protected HashMap<String, ArrayList<RecoveryAddress>> initialValue()
{
return new HashMap<String, ArrayList<RecoveryAddress>>();
}
};
private static final transient ThreadLocal<Collection<Discovery.Service<byte[]>>> discoveredFlumeSinks =
new ThreadLocal<Collection<Discovery.Service<byte[]>>>();
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof AbstractFlumeInputOperator)) {
return false;
}
AbstractFlumeInputOperator<?> that = (AbstractFlumeInputOperator<?>)o;
if (!Arrays.equals(connectionSpecs, that.connectionSpecs)) {
return false;
}
return recoveryAddresses.equals(that.recoveryAddresses);
}
@Override
public int hashCode()
{
int result = connectionSpecs != null ? Arrays.hashCode(connectionSpecs) : 0;
result = 31 * result + (recoveryAddresses.hashCode());
return result;
}
public void setMaxEventsPerSecond(long maxEventsPerSecond)
{
this.maxEventsPerSecond = maxEventsPerSecond;
}
public long getMaxEventsPerSecond()
{
return maxEventsPerSecond;
}
private static final Logger logger = LoggerFactory.getLogger(AbstractFlumeInputOperator.class);
}