blob: e891f8d456dd426309c5b3ace4bc5f784452e6bf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.s4.comm;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.s4.base.Emitter;
import org.apache.s4.base.EventMessage;
import com.google.inject.Inject;
import com.google.inject.name.Named;
public class QueueingEmitter implements Emitter, Runnable {
private Emitter emitter;
private BlockingQueue<MessageHolder> queue;
private long dropCount = 0;
private volatile Thread thread;
@Inject
public QueueingEmitter(@Named("ll") Emitter emitter, @Named("comm.queue_emmiter_size") int queueSize) {
this.emitter = emitter;
queue = new LinkedBlockingQueue<MessageHolder>(queueSize);
}
public long getDropCount() {
return dropCount;
}
public void start() {
if (thread != null) {
throw new IllegalStateException("QueueingEmitter is already started");
}
thread = new Thread(this, "QueueingEmitter");
thread.start();
}
public void stop() {
if (thread == null) {
throw new IllegalStateException("QueueingEmitter is already stopped");
}
thread.interrupt();
thread = null;
}
@Override
public boolean send(int partitionId, EventMessage message) {
MessageHolder mh = new MessageHolder(partitionId, message);
if (!queue.offer(mh)) {
dropCount++;
return true;
} else {
return false;
}
}
public void run() {
while (!Thread.interrupted()) {
try {
MessageHolder mh = queue.take();
// System.out.println("QueueingEmitter: Sending message on low-level emitter");
emitter.send(mh.getPartitionId(), mh.getMessage());
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
public int getPartitionCount() {
return emitter.getPartitionCount();
}
class MessageHolder {
private int partitionId;
private EventMessage message;
public int getPartitionId() {
return partitionId;
}
public void setPartitionId(int partitionId) {
this.partitionId = partitionId;
}
public EventMessage getMessage() {
return message;
}
public void setMessage(EventMessage message) {
this.message = message;
}
public MessageHolder(int partitionId, EventMessage message) {
super();
this.partitionId = partitionId;
this.message = message;
}
}
@Override
public void close() {
// TODO Auto-generated method stub
}
}