blob: a0f648cc57d8a772ee0c30821e6f60c3a911f560 [file] [log] [blame]
package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionContext;
import com.gemstone.gemfire.cache.execute.FunctionException;
import com.gemstone.gemfire.cache.execute.FunctionService;
import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
import com.gemstone.gemfire.cache.hdfs.internal.FlushObserver.AsyncFlushResult;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
import com.gemstone.gemfire.i18n.LogWriterI18n;
import com.gemstone.gemfire.internal.InternalEntity;
import com.gemstone.gemfire.internal.cache.ForceReattemptException;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
import com.gemstone.gemfire.internal.cache.execute.LocalResultCollector;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
public class HDFSFlushQueueFunction implements Function, InternalEntity{
private static final int MAX_RETRIES = Integer.getInteger("gemfireXD.maxFlushQueueRetries", 3);
private static final boolean VERBOSE = Boolean.getBoolean("hdfsFlushQueueFunction.VERBOSE");
private static final Logger logger = LogService.getLogger();
private static final String ID = HDFSFlushQueueFunction.class.getName();
public static void flushQueue(PartitionedRegion pr, int maxWaitTime) {
Set<Integer> buckets = new HashSet<Integer>(pr.getRegionAdvisor().getBucketSet());
maxWaitTime *= 1000;
long start = System.currentTimeMillis();
int retries = 0;
long remaining = 0;
while (retries++ < MAX_RETRIES && (remaining = waitTime(start, maxWaitTime)) > 0) {
if (logger.isDebugEnabled() || VERBOSE) {
logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flushing buckets " + buckets
+ ", attempt = " + retries
+ ", remaining = " + remaining));
}
HDFSFlushQueueArgs args = new HDFSFlushQueueArgs(buckets, remaining);
HDFSFlushQueueResultCollector rc = new HDFSFlushQueueResultCollector(buckets);
AbstractExecution exec = (AbstractExecution) FunctionService
.onRegion(pr)
.withArgs(args)
.withCollector(rc);
exec.setWaitOnExceptionFlag(true);
try {
exec.execute(ID);
if (rc.getResult()) {
if (logger.isDebugEnabled() || VERBOSE) {
logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flushed all buckets successfully"));
}
return;
}
} catch (FunctionException e) {
if (logger.isDebugEnabled() || VERBOSE) {
logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Encountered error flushing queue"), e);
}
}
buckets.removeAll(rc.getSuccessfulBuckets());
for (int bucketId : buckets) {
remaining = waitTime(start, maxWaitTime);
if (logger.isDebugEnabled() || VERBOSE) {
logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Waiting for bucket " + bucketId));
}
pr.getNodeForBucketWrite(bucketId, new PartitionedRegion.RetryTimeKeeper((int) remaining));
}
}
pr.checkReadiness();
throw new FunctionException("Unable to flush the following buckets: " + buckets);
}
private static long waitTime(long start, long max) {
if (max == 0) {
return Integer.MAX_VALUE;
}
return start + max - System.currentTimeMillis();
}
@Override
public void execute(FunctionContext context) {
RegionFunctionContext rfc = (RegionFunctionContext) context;
PartitionedRegion pr = (PartitionedRegion) rfc.getDataSet();
HDFSFlushQueueArgs args = (HDFSFlushQueueArgs) rfc.getArguments();
Set<Integer> buckets = new HashSet<Integer>(args.getBuckets());
buckets.retainAll(pr.getDataStore().getAllLocalPrimaryBucketIds());
Map<Integer, AsyncFlushResult> flushes = new HashMap<Integer, AsyncFlushResult>();
for (int bucketId : buckets) {
try {
HDFSBucketRegionQueue brq = getQueue(pr, bucketId);
if (brq != null) {
if (logger.isDebugEnabled() || VERBOSE) {
logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flushing bucket " + bucketId));
}
flushes.put(bucketId, brq.flush());
}
} catch (ForceReattemptException e) {
if (logger.isDebugEnabled() || VERBOSE) {
logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Encountered error flushing bucket " + bucketId), e);
}
}
}
try {
long start = System.currentTimeMillis();
for (Map.Entry<Integer, AsyncFlushResult> flush : flushes.entrySet()) {
long remaining = waitTime(start, args.getMaxWaitTime());
if (logger.isDebugEnabled() || VERBOSE) {
logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Waiting for bucket " + flush.getKey()
+ " to complete flushing, remaining = " + remaining));
}
if (flush.getValue().waitForFlush(remaining, TimeUnit.MILLISECONDS)) {
if (logger.isDebugEnabled() || VERBOSE) {
logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Bucket " + flush.getKey() + " flushed successfully"));
}
rfc.getResultSender().sendResult(new FlushStatus(flush.getKey()));
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (logger.isDebugEnabled() || VERBOSE) {
logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Sending final flush result"));
}
rfc.getResultSender().lastResult(FlushStatus.last());
}
private HDFSBucketRegionQueue getQueue(PartitionedRegion pr, int bucketId)
throws ForceReattemptException {
AsyncEventQueueImpl aeq = pr.getHDFSEventQueue();
AbstractGatewaySender gw = (AbstractGatewaySender) aeq.getSender();
AbstractGatewaySenderEventProcessor ep = gw.getEventProcessor();
if (ep == null) {
return null;
}
ConcurrentParallelGatewaySenderQueue queue = (ConcurrentParallelGatewaySenderQueue) ep.getQueue();
return queue.getBucketRegionQueue(pr, bucketId);
}
@Override
public String getId() {
return ID;
}
@Override
public boolean hasResult() {
return true;
}
@Override
public boolean optimizeForWrite() {
return true;
}
@Override
public boolean isHA() {
return false;
}
public static class HDFSFlushQueueResultCollector implements LocalResultCollector<Object, Boolean> {
private final CountDownLatch complete;
private final Set<Integer> expectedBuckets;
private final Set<Integer> successfulBuckets;
private volatile ReplyProcessor21 processor;
public HDFSFlushQueueResultCollector(Set<Integer> expectedBuckets) {
this.expectedBuckets = expectedBuckets;
complete = new CountDownLatch(1);
successfulBuckets = new HashSet<Integer>();
}
public Set<Integer> getSuccessfulBuckets() {
synchronized (successfulBuckets) {
return new HashSet<Integer>(successfulBuckets);
}
}
@Override
public Boolean getResult() throws FunctionException {
try {
complete.await();
synchronized (successfulBuckets) {
LogWriterI18n logger = InternalDistributedSystem.getLoggerI18n();
if (logger.fineEnabled() || VERBOSE) {
logger.info(LocalizedStrings.DEBUG, "Expected buckets: " + expectedBuckets);
logger.info(LocalizedStrings.DEBUG, "Successful buckets: " + successfulBuckets);
}
return expectedBuckets.equals(successfulBuckets);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
GemFireCacheImpl.getExisting().getCancelCriterion().checkCancelInProgress(e);
throw new FunctionException(e);
}
}
@Override
public Boolean getResult(long timeout, TimeUnit unit)
throws FunctionException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public synchronized void addResult(DistributedMember memberID, Object result) {
if (result instanceof FlushStatus) {
FlushStatus status = (FlushStatus) result;
if (!status.isLast()) {
synchronized (successfulBuckets) {
successfulBuckets.add(status.getBucketId());
}
}
}
}
@Override
public void endResults() {
complete.countDown();
}
@Override
public void clearResults() {
}
@Override
public void setProcessor(ReplyProcessor21 processor) {
this.processor = processor;
}
@Override
public ReplyProcessor21 getProcessor() {
return processor;
}
@Override
public void setException(Throwable exception) {
// TODO Auto-generated method stub
}
}
}