blob: 338ba99d896cf469df6e5460cd2f3270f221e82e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.partitioned.rebalance;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.geode.CancelException;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
/**
* A bucket operator that will perform operations on a bucket asynchronously.
*
* This class wraps a delegate bucket operator that is synchronous. That is, the delegate bucket
* operator is expected to move the bucket and notify the Completion within the scope of the call to
* create bucket.
*
* What this class does in make that call asynchronous. A task to create the bucket is handed to the
* thread pool and executed there. After it is done, the completion is notified.
*
* Calling waitForOperations waits for all previously submitted operations and ensures the
* completions are notified.
*
* Note that only createRedundantBucket is asynchronous, the rest of the operations are synchronous.
*
*/
public class ParallelBucketOperator implements BucketOperator {
private final BucketOperator delegate;
private final ExecutorService executor;
private final Semaphore operationSemaphore;
private final int maxParallelOperations;
private final ConcurrentLinkedQueue<Completion> pendingSuccess =
new ConcurrentLinkedQueue<BucketOperator.Completion>();
private final ConcurrentLinkedQueue<Completion> pendingFailure =
new ConcurrentLinkedQueue<BucketOperator.Completion>();
/**
* Create a parallel bucket operator
*
* @param maxParallelOperations The number of operations that can execute concurrently. Futher
* calls to createRedundantBucket will block.
* @param executor the executor to submit tasks to. This executor should be able to create at
* least maxParallelOperations threads.
* @param operator A bucket operator that is synchronous that will do the actual work of creating
* a bucket.
*/
public ParallelBucketOperator(int maxParallelOperations, ExecutorService executor,
BucketOperator operator) {
this.maxParallelOperations = maxParallelOperations;
this.operationSemaphore = new Semaphore(maxParallelOperations);
this.delegate = operator;
this.executor = executor;
}
/**
* Create a redundant bucket asynchronously. If maxParallelOperations is not reached, this call
* will submit a task and return immediately. Otherwise, it will block until an executor thread is
* available to take a task.
*
* The completion will not be notified until the caller makes another call to createRedundant
* bucket or waitForOperations.
*/
@Override
public void createRedundantBucket(final InternalDistributedMember targetMember,
final int bucketId, final Map<String, Long> colocatedRegionBytes,
final Completion completion) {
drainCompletions();
operationSemaphore.acquireUninterruptibly();
executor.execute(new Runnable() {
@Override
public void run() {
try {
delegate.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes,
new Completion() {
@Override
public void onSuccess() {
pendingSuccess.add(completion);
}
@Override
public void onFailure() {
pendingFailure.add(completion);
}
});
} catch (CancelException e) {
// ignore
} catch (RegionDestroyedException e) {
// ignore
} finally {
operationSemaphore.release();
}
}
});
}
@Override
public boolean removeBucket(InternalDistributedMember memberId, int id,
Map<String, Long> colocatedRegionSizes) {
return delegate.removeBucket(memberId, id, colocatedRegionSizes);
}
@Override
public boolean moveBucket(InternalDistributedMember sourceMember,
InternalDistributedMember targetMember, int bucketId,
Map<String, Long> colocatedRegionBytes) {
return delegate.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
}
@Override
public boolean movePrimary(InternalDistributedMember source, InternalDistributedMember target,
int bucketId) {
return delegate.movePrimary(source, target, bucketId);
}
public void drainCompletions() {
Completion next = null;
while ((next = pendingSuccess.poll()) != null) {
next.onSuccess();
}
while ((next = pendingFailure.poll()) != null) {
next.onFailure();
}
}
/**
* Wait for any pending operations, and notify the the completions that the operations and done.
*/
@Override
public void waitForOperations() {
boolean interrupted = false;
while (!executor.isShutdown()) {
try {
if (operationSemaphore.tryAcquire(maxParallelOperations, 1, TimeUnit.SECONDS)) {
operationSemaphore.release(maxParallelOperations);
drainCompletions();
if (interrupted) {
Thread.currentThread().interrupt();
}
return;
}
} catch (InterruptedException e) {
interrupted = true;
}
}
}
}