blob: 3718b60cff8604a80f8cbbf519df913fbca86813 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.fate;
import java.io.Serializable;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This store removes Repos, in the store it wraps, that are in a finished or new state for more
* than a configurable time period.
*
* No external time source is used. It starts tracking idle time when its created.
*/
public class AgeOffStore<T> implements TStore<T> {
public interface TimeSource {
long currentTimeMillis();
}
private static final Logger log = LoggerFactory.getLogger(AgeOffStore.class);
private TStore<T> store;
private Map<Long,Long> candidates;
private long ageOffTime;
private long minTime;
private TimeSource timeSource;
private synchronized void updateMinTime() {
minTime = Long.MAX_VALUE;
for (Long time : candidates.values()) {
if (time < minTime)
minTime = time;
}
}
private synchronized void addCandidate(long txid) {
long time = timeSource.currentTimeMillis();
candidates.put(txid, time);
if (time < minTime)
minTime = time;
}
private synchronized void removeCandidate(long txid) {
Long time = candidates.remove(txid);
if (time != null && time <= minTime)
updateMinTime();
}
public void ageOff() {
HashSet<Long> oldTxs = new HashSet<>();
synchronized (this) {
long time = timeSource.currentTimeMillis();
if (minTime < time && time - minTime >= ageOffTime) {
for (Entry<Long,Long> entry : candidates.entrySet()) {
if (time - entry.getValue() >= ageOffTime) {
oldTxs.add(entry.getKey());
}
}
candidates.keySet().removeAll(oldTxs);
updateMinTime();
}
}
for (Long txid : oldTxs) {
try {
store.reserve(txid);
try {
switch (store.getStatus(txid)) {
case NEW:
case FAILED:
case SUCCESSFUL:
store.delete(txid);
log.debug("Aged off FATE tx {}", String.format("%016x", txid));
break;
default:
break;
}
} finally {
store.unreserve(txid, 0);
}
} catch (Exception e) {
log.warn("Failed to age off FATE tx " + String.format("%016x", txid), e);
}
}
}
public AgeOffStore(TStore<T> store, long ageOffTime, TimeSource timeSource) {
this.store = store;
this.ageOffTime = ageOffTime;
this.timeSource = timeSource;
candidates = new HashMap<>();
minTime = Long.MAX_VALUE;
List<Long> txids = store.list();
for (Long txid : txids) {
store.reserve(txid);
try {
switch (store.getStatus(txid)) {
case NEW:
case FAILED:
case SUCCESSFUL:
addCandidate(txid);
break;
default:
break;
}
} finally {
store.unreserve(txid, 0);
}
}
}
public AgeOffStore(TStore<T> store, long ageOffTime) {
this(store, ageOffTime, System::currentTimeMillis);
}
@Override
public long create() {
long txid = store.create();
addCandidate(txid);
return txid;
}
@Override
public long reserve() {
return store.reserve();
}
@Override
public void reserve(long tid) {
store.reserve(tid);
}
@Override
public void unreserve(long tid, long deferTime) {
store.unreserve(tid, deferTime);
}
@Override
public Repo<T> top(long tid) {
return store.top(tid);
}
@Override
public void push(long tid, Repo<T> repo) throws StackOverflowException {
store.push(tid, repo);
}
@Override
public void pop(long tid) {
store.pop(tid);
}
@Override
public org.apache.accumulo.fate.TStore.TStatus getStatus(long tid) {
return store.getStatus(tid);
}
@Override
public void setStatus(long tid, org.apache.accumulo.fate.TStore.TStatus status) {
store.setStatus(tid, status);
switch (status) {
case IN_PROGRESS:
case FAILED_IN_PROGRESS:
removeCandidate(tid);
break;
case FAILED:
case SUCCESSFUL:
addCandidate(tid);
break;
default:
break;
}
}
@Override
public org.apache.accumulo.fate.TStore.TStatus waitForStatusChange(long tid,
EnumSet<org.apache.accumulo.fate.TStore.TStatus> expected) {
return store.waitForStatusChange(tid, expected);
}
@Override
public void setProperty(long tid, String prop, Serializable val) {
store.setProperty(tid, prop, val);
}
@Override
public Serializable getProperty(long tid, String prop) {
return store.getProperty(tid, prop);
}
@Override
public void delete(long tid) {
store.delete(tid);
removeCandidate(tid);
}
@Override
public List<Long> list() {
return store.list();
}
@Override
public List<ReadOnlyRepo<T>> getStack(long tid) {
return store.getStack(tid);
}
}