blob: 78ff0a2eb58827ec7e8b8730f15f401ca837b574 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog.zk;
import com.twitter.distributedlog.util.PermitManager;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Manager to control all the log segments rolling.
*/
public class LimitedPermitManager implements PermitManager, Runnable, Watcher {
static final Logger LOG = LoggerFactory.getLogger(LimitedPermitManager.class);
static enum PermitState {
ALLOWED, DISALLOWED, DISABLED
}
class EpochPermit implements Permit {
final PermitState state;
final int epoch;
EpochPermit(PermitState state) {
this.state = state;
this.epoch = LimitedPermitManager.this.epoch.get();
}
int getEpoch() {
return epoch;
}
@Override
public boolean isAllowed() {
return PermitState.ALLOWED == state;
}
}
boolean enablePermits = true;
final Semaphore semaphore;
final int period;
final TimeUnit timeUnit;
final ScheduledExecutorService executorService;
final AtomicInteger epoch = new AtomicInteger(0);
private StatsLogger statsLogger = null;
private Gauge<Number> outstandingGauge = null;
public LimitedPermitManager(int concurrency, int period, TimeUnit timeUnit,
ScheduledExecutorService executorService) {
this(concurrency, period, timeUnit, executorService, NullStatsLogger.INSTANCE);
}
public LimitedPermitManager(final int concurrency, int period, TimeUnit timeUnit,
ScheduledExecutorService executorService, StatsLogger statsLogger) {
if (concurrency > 0) {
this.semaphore = new Semaphore(concurrency);
} else {
this.semaphore = null;
}
this.period = period;
this.timeUnit = timeUnit;
this.executorService = executorService;
this.statsLogger = statsLogger;
this.outstandingGauge = new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
}
@Override
public Number getSample() {
return null == semaphore ? 0 : concurrency - semaphore.availablePermits();
}
};
this.statsLogger.scope("permits").registerGauge("outstanding", this.outstandingGauge);
}
@Override
synchronized public Permit acquirePermit() {
if (!enablePermits) {
return new EpochPermit(PermitState.DISABLED);
}
if (null != semaphore) {
return semaphore.tryAcquire() ? new EpochPermit(PermitState.ALLOWED) :
new EpochPermit(PermitState.DISALLOWED);
} else {
return new EpochPermit(PermitState.ALLOWED);
}
}
@Override
synchronized public void releasePermit(Permit permit) {
if (null != semaphore && permit.isAllowed()) {
if (period <= 0) {
semaphore.release();
} else {
try {
executorService.schedule(this, period, timeUnit);
} catch (RejectedExecutionException ree) {
LOG.warn("Failed on scheduling releasing permit in given period ({}ms)." +
" Release it immediately : ", timeUnit.toMillis(period), ree);
semaphore.release();
}
}
}
}
@Override
synchronized public boolean disallowObtainPermits(Permit permit) {
if (!(permit instanceof EpochPermit)) {
return false;
}
if (epoch.getAndIncrement() == ((EpochPermit)permit).getEpoch()) {
this.enablePermits = false;
LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits, epoch.get());
return true;
} else {
return false;
}
}
@Override
public void close() {
unregisterGauge();
}
@Override
synchronized public boolean allowObtainPermits() {
forceSetAllowPermits(true);
return true;
}
synchronized void forceSetAllowPermits(boolean allowPermits) {
epoch.getAndIncrement();
this.enablePermits = allowPermits;
LOG.info("EnablePermits = {}, Epoch = {}.", this.enablePermits, epoch.get());
}
@Override
public void run() {
semaphore.release();
}
@Override
public void process(WatchedEvent event) {
if (event.getType().equals(Event.EventType.None)) {
switch (event.getState()) {
case SyncConnected:
forceSetAllowPermits(true);
break;
case Disconnected:
forceSetAllowPermits(false);
break;
case Expired:
forceSetAllowPermits(false);
break;
default:
break;
}
}
}
public void unregisterGauge() {
if(this.statsLogger != null && this.outstandingGauge != null) {
this.statsLogger.scope("permits").unregisterGauge("outstanding", this.outstandingGauge);
}
}
}