blob: 71e8d2a7d617d2542f123706f64a221f58c47163 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.cache.asyncqueue.internal;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
import com.gemstone.gemfire.cache.wan.GatewayEventSubstitutionFilter;
import com.gemstone.gemfire.cache.wan.GatewaySender;
import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
import com.gemstone.gemfire.internal.cache.RegionQueue;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import com.gemstone.gemfire.internal.cache.wan.serial.ConcurrentSerialGatewaySenderEventProcessor;
public class AsyncEventQueueImpl implements AsyncEventQueue {
private GatewaySender sender = null;
private AsyncEventListener asyncEventListener = null;
public static final String ASYNC_EVENT_QUEUE_PREFIX = "AsyncEventQueue_";
public AsyncEventQueueImpl(GatewaySender sender, AsyncEventListener eventListener) {
this.sender = sender;
this.asyncEventListener = eventListener;
}
@Override
public String getId() {
return getAsyncEventQueueIdFromSenderId(this.sender.getId());
}
@Override
public AsyncEventListener getAsyncEventListener() {
return asyncEventListener;
}
@Override
public List<GatewayEventFilter> getGatewayEventFilters() {
return sender.getGatewayEventFilters();
}
@Override
public GatewayEventSubstitutionFilter getGatewayEventSubstitutionFilter() {
return sender.getGatewayEventSubstitutionFilter();
}
@Override
public int getBatchSize() {
return sender.getBatchSize();
}
@Override
public String getDiskStoreName() {
return sender.getDiskStoreName();
}
@Override
public int getBatchTimeInterval() {
return sender.getBatchTimeInterval();
}
@Override
public boolean isBatchConflationEnabled() {
return sender.isBatchConflationEnabled();
}
@Override
public int getMaximumQueueMemory() {
return sender.getMaximumQueueMemory();
}
@Override
public boolean isPersistent() {
return sender.isPersistenceEnabled();
}
@Override
public boolean isDiskSynchronous() {
return sender.isDiskSynchronous();
}
@Override
public int getDispatcherThreads() {
return sender.getDispatcherThreads();
}
@Override
public OrderPolicy getOrderPolicy() {
return sender.getOrderPolicy();
}
@Override
public boolean isPrimary() {
return ((AbstractGatewaySender) sender).isPrimary();
}
@Override
public int size() {
AbstractGatewaySenderEventProcessor eventProcessor =
((AbstractGatewaySender) sender).getEventProcessor();
int size = 0;
if (eventProcessor instanceof ConcurrentSerialGatewaySenderEventProcessor) {
Set<RegionQueue> queues =
((ConcurrentSerialGatewaySenderEventProcessor) eventProcessor).getQueues();
Iterator<RegionQueue> itr = queues.iterator();
while (itr.hasNext()) {
size = size + itr.next().size();
}
} else {
size = eventProcessor.getQueue().size();
}
return size;
}
public GatewaySender getSender() {
return this.sender;
}
public AsyncEventQueueStats getStatistics() {
AbstractGatewaySender abstractSender = (AbstractGatewaySender) this.sender;
return ((AsyncEventQueueStats) abstractSender.getStatistics());
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (this == obj) {
return true;
}
if (!(obj instanceof AsyncEventQueue)) {
return false;
}
AsyncEventQueueImpl asyncEventQueue = (AsyncEventQueueImpl) obj;
if (asyncEventQueue.getId().equals(this.getId())) {
return true;
}
return false;
}
@Override
public int hashCode() {
return getId().hashCode();
}
public static String getSenderIdFromAsyncEventQueueId(String asyncQueueId) {
StringBuilder builder = new StringBuilder();
builder.append(ASYNC_EVENT_QUEUE_PREFIX);
builder.append(asyncQueueId);
return builder.toString();
}
public static String getAsyncEventQueueIdFromSenderId(String senderId) {
if (!senderId.startsWith(ASYNC_EVENT_QUEUE_PREFIX)) {
return senderId;
}
else {
return senderId.substring(ASYNC_EVENT_QUEUE_PREFIX.length());
}
}
public static boolean isAsyncEventQueue(String senderId) {
return senderId.startsWith(ASYNC_EVENT_QUEUE_PREFIX);
}
public boolean isParallel() {
return sender.isParallel();
}
public void destroy() {
((AbstractGatewaySender)this.sender).destroy();
}
public boolean isBucketSorted() {
// TODO Auto-generated method stub
return false;
}
}