blob: 05c67da3f1f10b83d95ba00911f738dfb820940f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service.paxos;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.utils.Throwables;
import static org.apache.cassandra.service.paxos.AbstractPaxosRepair.Result.Outcome;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
public abstract class AbstractPaxosRepair
{
private static final Logger logger = LoggerFactory.getLogger(AbstractPaxosRepair.class);
public static class State {}
public interface StateUpdater<S, I, T extends Throwable>
{
State apply(S in, I param) throws T;
}
public interface Listener
{
void onComplete(AbstractPaxosRepair repair, Result result);
}
abstract class ConsumerState<T> extends State implements Consumer<T>
{
public void accept(T input)
{
updateState(this, input, ConsumerState::execute);
}
abstract State execute(T input) throws Throwable;
}
public static class Result extends State
{
enum Outcome { DONE, CANCELLED, FAILURE }
final Outcome outcome;
public Result(Outcome outcome)
{
this.outcome = outcome;
}
public String toString()
{
return outcome.toString();
}
public boolean wasSuccessful()
{
return outcome == Outcome.DONE;
}
}
static boolean isResult(State state)
{
return state instanceof Result;
}
public static final Result DONE = new Result(Outcome.DONE);
public static final Result CANCELLED = new Result(Outcome.CANCELLED);
public static final class Failure extends Result
{
public final Throwable failure;
Failure(Throwable failure)
{
super(Outcome.FAILURE);
this.failure = failure;
}
public String toString()
{
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
if (failure != null)
failure.printStackTrace(pw);
return outcome.toString() + ": " + sw;
}
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Failure failure1 = (Failure) o;
return Objects.equals(failure, failure1.failure);
}
public int hashCode()
{
return Objects.hash(failure);
}
}
private final DecoratedKey partitionKey;
private final Ballot incompleteBallot;
private List<Listener> listeners = null;
private volatile State state;
private volatile long startedNanos = Long.MIN_VALUE;
public AbstractPaxosRepair(DecoratedKey partitionKey, Ballot incompleteBallot)
{
this.partitionKey = partitionKey;
this.incompleteBallot = incompleteBallot;
}
public State state()
{
return state;
}
public long startedNanos()
{
return startedNanos;
}
public boolean isStarted()
{
return startedNanos != Long.MIN_VALUE;
}
public boolean isComplete()
{
return isResult(state);
}
public Ballot incompleteBallot()
{
return incompleteBallot;
}
/**
* add a listener to this repair, or if the repair has already completed, call the listener with the result
*/
public AbstractPaxosRepair addListener(Listener listener)
{
Result result = null;
synchronized (this)
{
if (isResult(state))
{
result = (Result) state;
}
else
{
if (listeners == null)
listeners = new ArrayList<>();
listeners.add(listener);
}
}
if (result != null)
listener.onComplete(this, result);
return this;
}
public AbstractPaxosRepair addListener(Consumer<Result> listener)
{
return addListener(((repair, result) -> listener.accept(result)));
}
public final DecoratedKey partitionKey()
{
return partitionKey;
}
public State restart(State state) { return restart(state, Long.MIN_VALUE); }
public abstract State restart(State state, long waitUntil);
public final synchronized AbstractPaxosRepair start()
{
updateState(null, null, (state, i2) -> {
Preconditions.checkState(!isStarted());
startedNanos = Math.max(Long.MIN_VALUE + 1, nanoTime());
return restart(state);
});
return this;
}
public final void cancel()
{
set(CANCELLED);
}
public final void cancelUnexceptionally()
{
try
{
cancel();
}
catch (Throwable t)
{
logger.error("Exception cancelling paxos repair", t);
}
}
public final synchronized Result await() throws InterruptedException
{
while (!isResult(state))
wait();
return (Result) state;
}
protected void set(Result result)
{
updateState(state(), null, (i1, i2) -> result);
}
protected <S extends State, I, T extends Throwable> void updateState(S expect, I param, StateUpdater<S, I, T> transform)
{
Result result = null;
List<Listener> listeners = null;
synchronized (this)
{
State next;
try
{
if (state != expect)
return;
state = next = transform.apply(expect, param);
}
catch (Throwable t)
{
state = next = new Failure(t);
}
if (isResult(next))
{
notifyAll();
result = (Result) next;
listeners = this.listeners;
this.listeners = null;
}
}
if (result != null && listeners != null)
{
Throwable t = null;
for (int i=0, mi=listeners.size(); i<mi; i++)
{
try
{
listeners.get(i).onComplete(this, result);
}
catch (Throwable throwable)
{
t = Throwables.merge(t, throwable);
}
}
Throwables.maybeFail(t);
}
}
}