blob: 892ba4a171c918bc92ba299f4591fd283cbdb832 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.fluo.core.impl;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.fluo.accumulo.util.LongUtil;
import org.apache.fluo.accumulo.util.ZookeeperPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Provides cache of all Fluo transactors. Used by clients to determine if transactor is running.
*/
public class TransactorCache implements AutoCloseable {
public enum TcStatus {
OPEN, CLOSED
}
private final Environment env;
private final PathChildrenCache cache;
private final Cache<Long, AtomicLong> timeoutCache;
private TcStatus status;
private static final Logger log = LoggerFactory.getLogger(TransactorCache.class);
public TransactorCache(Environment env) {
timeoutCache = CacheBuilder.newBuilder().maximumSize(1 << 15)
.expireAfterAccess(FluoConfigurationImpl.TX_INFO_CACHE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS)
.concurrencyLevel(10).build();
this.env = env;
cache = new PathChildrenCache(env.getSharedResources().getCurator(),
ZookeeperPath.TRANSACTOR_NODES, true);
try {
cache.start(StartMode.BUILD_INITIAL_CACHE);
status = TcStatus.OPEN;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void logTimedoutTransactor(Long transactorId, long lockTs, Long startTime) {
log.warn("Transactor ID {} was unresponsive for {} secs, marking as dead for lockTs <= {}",
LongUtil.toMaxRadixString(transactorId), (System.currentTimeMillis() - startTime) / 1000.0,
lockTs);
}
public void addTimedoutTransactor(final Long transactorId, final long lockTs,
final Long startTime) {
try {
AtomicLong cachedLockTs = timeoutCache.get(transactorId, new Callable<AtomicLong>() {
@Override
public AtomicLong call() throws Exception {
logTimedoutTransactor(transactorId, lockTs, startTime);
return new AtomicLong(lockTs);
}
});
long currVal = cachedLockTs.get();
while (lockTs > currVal) {
if (cachedLockTs.compareAndSet(currVal, lockTs)) {
logTimedoutTransactor(transactorId, lockTs, startTime);
}
// its possible another thread updates and the above compare and set failed, so the
// following will get us out of loop in this case... it will also get
// us out of loop in case where compared and set succeeds
currVal = cachedLockTs.get();
}
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
public boolean checkTimedout(Long transactorId, long lockTs) {
AtomicLong timedoutLockTs = timeoutCache.getIfPresent(transactorId);
return timedoutLockTs != null && lockTs <= timedoutLockTs.get();
}
public boolean checkExists(Long transactorId) {
return cache.getCurrentData(TransactorNode.getNodePath(env, transactorId)) != null;
}
public TcStatus getStatus() {
return status;
}
@Override
public void close() {
status = TcStatus.CLOSED;
try {
cache.close();
} catch (IOException e) {
log.error("Failed to close cache");
throw new IllegalStateException(e);
}
}
}