blob: 80ed8863a4ff361c859121494c32a1e81f2a4188 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.rpc;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* Modified implementation of countdown latch that allows a barrier to be unilaterally opened and closed. All others simply wait when it is closed. Is initialized in a closed state.
*/
public class ResettableBarrier {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResettableBarrier.class);
private final InternalSynchronizer sync = new InternalSynchronizer();
public ResettableBarrier() {
}
private static final class InternalSynchronizer extends AbstractQueuedSynchronizer {
private InternalSynchronizer() {
setState(1);
}
@Override
protected int tryAcquireShared(int acquires) {
assert acquires == 1;
return (getState() == 0) ? 1 : -1;
}
@Override
protected boolean tryReleaseShared(int releases) {
assert releases == 1;
while(true) {
int c = getState();
if (c == 0) {
return false;
}
int nextc = c - 1;
if (compareAndSetState(c, nextc)) {
return nextc == 0;
}
}
}
protected void reset() {
setState(1);
}
}
public void await() throws InterruptedException {
// logger.debug("awaiting barrier interruptibly.");
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
// logger.debug("awaiting barrier with timeout {}.", timeout);
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void openBarrier() {
// logger.debug("opening barrier.");
sync.releaseShared(1);
}
public void closeBarrier() {
// logger.debug("closing barrier.");
sync.reset();
}
}