blob: f4dce546d3f8fd96909221e5c781b539a4a4ac98 [file] [log] [blame]
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*******************************************************************************/
package org.ofbiz.base.concurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.ofbiz.base.lang.ObjectWrapper;
import org.ofbiz.base.lang.SourceMonitored;
import org.ofbiz.base.util.UtilGenerics;
@SourceMonitored
public abstract class TTLObject<T> implements ObjectWrapper<T> {
private static final ScheduledExecutorService updateExecutor = ExecutionPool.getExecutor(new ThreadGroup("TTLObject"), "TTLObject(async-update)", -2, true);
private static final <T> T getConfigForClass(ConcurrentHashMap<String, T> config, Class<?> c) {
Class<?> ptr = c;
T value = null;
while (value == null && ptr != null) {
value = config.get(ptr.getName());
ptr = ptr.getSuperclass();
}
return value;
}
private static final ConcurrentHashMap<String, Long> ttls = new ConcurrentHashMap<String, Long>();
public static void setDefaultTTLForClass(Class<?> c, long ttl) {
ttls.putIfAbsent(c.getName(), ttl);
}
public static void setTTLForClass(Class<?> c, long ttl) {
ttls.put(c.getName(), ttl);
}
public static long getTTLForClass(Class<?> c) throws ConfigurationException {
Long ttl = getConfigForClass(ttls, c);
if (ttl != null) return ttl.longValue();
throw new ConfigurationException("No TTL defined for " + c.getName());
}
private static final ConcurrentHashMap<String, Boolean> inForeground = new ConcurrentHashMap<String, Boolean>();
public static void setDefaultForegroundForClass(Class<?> c, boolean foreground) {
inForeground.putIfAbsent(c.getName(), foreground);
}
public static void setForegroundForClass(Class<?> c, boolean foreground) {
inForeground.put(c.getName(), foreground);
}
public static boolean getForegroundForClass(Class<?> c) {
Boolean foreground = getConfigForClass(inForeground, c);
if (foreground != null) return foreground.booleanValue();
return true;
}
public static void pulseAll() {
ExecutionPool.pulseAll(Pulse.class);
}
public enum State { INVALID, REGEN, REGENERATING, GENERATE, GENERATING, GENERATING_INITIAL, VALID, ERROR, ERROR_INITIAL, SET }
// DO NOT REMOVE THIS VARIABLE: to dumb smart editors it looks unused; it's actually only referenced thru the field updater.
private volatile ValueAndState<T> object = new StandardValueAndState<T>(this, null, null, State.INVALID, 0, null, null);
@SuppressWarnings("unchecked")
private static final AtomicReferenceFieldUpdater<TTLObject<?>, ValueAndState> objectAccessor = UtilGenerics.cast(AtomicReferenceFieldUpdater.newUpdater(TTLObject.class, ValueAndState.class, "object"));
private static final AtomicIntegerFieldUpdater<TTLObject<?>> serialAccessor = UtilGenerics.cast(AtomicIntegerFieldUpdater.newUpdater(TTLObject.class, "serial"));
protected volatile int serial;
protected static abstract class ValueAndState<T> {
protected final TTLObject<T> ttlObject;
protected final FutureTask<T> future;
protected final State state;
protected final int serial;
protected final Throwable t;
protected final Pulse pulse;
protected ValueAndState(TTLObject<T> ttlObject, FutureTask<T> future, State state, int serial, Throwable t, Pulse pulse) {
this.ttlObject = ttlObject;
this.future = future;
this.state = state;
this.serial = serial;
this.t = t;
this.pulse = pulse;
}
protected abstract T getValue();
protected ValueAndState<T> refresh(State nextState) {
return ttlObject.newValueAndState(getValue(), future, nextState, serial, null, null);
}
protected ValueAndState<T> valid(T value) {
return ttlObject.newValueAndState(value, null, State.VALID, serialAccessor.incrementAndGet(ttlObject), null, new Pulse(ttlObject));
}
protected ValueAndState<T> set(T value) {
return ttlObject.newValueAndState(value, null, State.SET, serialAccessor.incrementAndGet(ttlObject), null, null);
}
protected ValueAndState<T> submit(final T oldValue, State state) {
return ttlObject.newValueAndState(getValue(), createTask(oldValue), state, serial, null, null);
}
protected FutureTask<T> createTask(final T oldValue) {
return new FutureTask<T>(new Callable<T>() {
public T call() throws Exception {
return ttlObject.load(oldValue, serial);
}
});
}
protected ValueAndState<T> error(Throwable t) {
return ttlObject.newValueAndState(null, null, state != State.GENERATING_INITIAL ? State.ERROR : State.ERROR_INITIAL, serialAccessor.incrementAndGet(ttlObject), t, new Pulse(ttlObject));
}
}
protected ValueAndState<T> newValueAndState(T value, FutureTask<T> future, State state, int serial, Throwable t, Pulse pulse) {
return new StandardValueAndState<T>(this, value, future, state, serial, t, pulse);
}
@SuppressWarnings("hiding")
private class StandardValueAndState<T> extends ValueAndState<T> {
protected final T value;
protected StandardValueAndState(TTLObject<T> ttlObject, T value, FutureTask<T> future, State state, int serial, Throwable t, Pulse pulse) {
super(ttlObject, future, state, serial, t, pulse);
this.value = value;
}
@Override
protected T getValue() {
return value;
}
}
protected final static class Pulse extends ExecutionPool.Pulse {
protected final TTLObject<?> ttlObject;
protected Pulse(TTLObject<?> ttlObject) {
super(TimeUnit.NANOSECONDS.convert(ttlObject.getTTL(), TimeUnit.MILLISECONDS));
this.ttlObject = ttlObject;
}
public void run() {
ttlObject.refresh();
}
}
public State getState() {
return getContainer().state;
}
@SuppressWarnings("unchecked")
private final ValueAndState<T> getContainer() {
return objectAccessor.get(this);
}
public void refresh() {
ValueAndState<T> container;
ValueAndState<T> nextContainer = null;
do {
container = getContainer();
if (container.state == State.INVALID) {
nextContainer = container.refresh(State.GENERATE);
} else if (container.state == State.REGENERATING) {
nextContainer = container.refresh(State.REGEN);
} else if (container.state == State.GENERATING) {
nextContainer = container.refresh(State.GENERATE);
} else if (container.state == State.ERROR_INITIAL) {
nextContainer = container.refresh(State.INVALID);
} else if (container.state == State.ERROR || container.state == State.VALID) {
nextContainer = container.refresh(getForeground() ? State.GENERATE : State.REGEN);
} else if (container.state == State.SET) {
nextContainer = container.refresh(getForeground() ? State.GENERATE : State.REGEN);
} else {
return;
}
objectAccessor.compareAndSet(this, container, nextContainer);
cancelFuture(container);
} while (true);
}
public final int getSerial() {
return getContainer().serial;
}
public final boolean checkSerial(int serial) {
return getContainer().serial != serial;
}
protected final void setObject(T newObject) {
ValueAndState<T> container = getContainer();
ValueAndState<T> nextContainer = container.set(newObject);
objectAccessor.compareAndSet(this, container, nextContainer);
cancelFuture(container);
}
private void cancelFuture(ValueAndState<T> container) {
ExecutionPool.removePulse(container.pulse);
if (container.state == State.REGENERATING || container.state == State.GENERATING) {
container.future.cancel(false);
}
}
public final T getObject() throws ObjectException {
try {
ValueAndState<T> container;
ValueAndState<T> nextContainer = null;
do {
do {
container = getContainer();
if (container.state == State.ERROR || container.state == State.ERROR_INITIAL) {
throw container.t;
} else if (container.state == State.VALID) {
return container.getValue();
} else if (container.state == State.INVALID) {
nextContainer = container.submit(getInitial(), State.GENERATING_INITIAL);
} else if (container.state == State.SET) {
nextContainer = container.valid(container.getValue());
} else if (container.state == State.REGENERATING || container.state == State.GENERATING || container.state == State.GENERATING_INITIAL) {
if (!container.future.isDone()) {
if (container.state == State.GENERATING || container.state == State.GENERATING_INITIAL) {
container.future.run();
} else {
return container.getValue();
}
}
try {
try {
nextContainer = container.valid(container.future.get());
} catch (ExecutionException e) {
throw e.getCause();
}
} catch (Throwable t) {
nextContainer = container.error(t);
}
} else if (container.state == State.REGEN) {
nextContainer = container.submit(container.getValue(), State.REGENERATING);
} else {
nextContainer = container.submit(container.getValue(), State.GENERATING);
}
} while (!objectAccessor.compareAndSet(this, container, nextContainer));
if (nextContainer.state == State.REGENERATING) {
updateExecutor.submit(nextContainer.future);
} else if (nextContainer.pulse != null) {
ExecutionPool.removePulse(container.pulse);
ExecutionPool.addPulse(nextContainer.pulse);
}
} while (true);
} catch (Throwable e) {
return ObjectException.<T>checkException(e);
}
}
protected T getInitial() throws Exception {
return null;
}
protected abstract T load(T old, int serial) throws Exception;
protected boolean getForeground() {
return getForegroundForClass(getClass());
}
protected long getTTL() throws ConfigurationException {
return getTTLForClass(getClass());
}
}