blob: 3eacce68731160d2c0b0d2181a388629b0dc1244 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.messages;
import java.util.BitSet;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.api.Data;
import accord.local.CommandStore;
import accord.local.Node;
import accord.local.SafeCommandStore;
import accord.messages.ReadData.ReadNack;
import accord.primitives.PartialTxn;
import accord.primitives.Participants;
import accord.primitives.Ranges;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.topology.Topologies;
import accord.utils.Invariants;
import static accord.messages.MessageType.READ_RSP;
import static accord.messages.TxnRequest.computeWaitForEpoch;
import static accord.messages.TxnRequest.latestRelevantEpochIndex;
// TODO (required, efficiency): dedup - can currently have infinite pending reads that will be executed independently
public abstract class ReadData extends AbstractEpochRequest<ReadNack>
{
private static final Logger logger = LoggerFactory.getLogger(ReadData.class);
public enum ReadType
{
readTxnData(0),
waitUntilApplied(1),
applyThenWaitUntilApplied(2);
public final byte val;
ReadType(int val)
{
this.val = (byte)val;
}
@SuppressWarnings("unused")
public static ReadType fromValue(byte val)
{
switch (val)
{
case 0:
return readTxnData;
case 1:
return waitUntilApplied;
case 2:
return applyThenWaitUntilApplied;
default:
throw new IllegalArgumentException("Unrecognized ReadType value " + val);
}
}
}
// TODO (expected, cleanup): should this be a Route?
public final Participants<?> readScope;
private final long waitForEpoch;
private Data data;
transient BitSet waitingOn;
transient int waitingOnCount;
transient Ranges unavailable;
public ReadData(Node.Id to, Topologies topologies, TxnId txnId, Participants<?> readScope)
{
super(txnId);
int startIndex = latestRelevantEpochIndex(to, topologies, readScope);
this.readScope = TxnRequest.computeScope(to, topologies, readScope, startIndex, Participants::slice, Participants::with);
this.waitForEpoch = computeWaitForEpoch(to, topologies, startIndex);
}
protected ReadData(TxnId txnId, Participants<?> readScope, long waitForEpoch)
{
super(txnId);
this.readScope = readScope;
this.waitForEpoch = waitForEpoch;
}
abstract public ReadType kind();
protected abstract void cancel();
protected abstract long executeAtEpoch();
protected abstract void reply(@Nullable Ranges unavailable, @Nullable Data data, @Nullable Throwable fail);
protected void onAllReadsComplete() {}
@Override
public long waitForEpoch()
{
return waitForEpoch;
}
@Override
protected void process()
{
waitingOn = new BitSet();
node.mapReduceConsumeLocal(this, readScope, executeAtEpoch(), executeAtEpoch(), this);
}
@Override
public ReadNack reduce(ReadNack r1, ReadNack r2)
{
return r1 == null || r2 == null
? r1 == null ? r2 : r1
: r1.compareTo(r2) >= 0 ? r1 : r2;
}
@Override
public synchronized void accept(ReadNack reply, Throwable failure)
{
if (reply != null)
{
node.reply(replyTo, replyContext, reply, failure);
}
else if (failure != null)
{
// TODO (expected, testing): test
node.reply(replyTo, replyContext, null, failure);
data = null;
// TODO (expected, exceptions): probably a better way to handle this, as might not be uncaught
node.agent().onUncaughtException(failure);
cancel();
}
else
{
ack(null);
}
}
private void ack(@Nullable Ranges newUnavailable)
{
if (newUnavailable != null && !newUnavailable.isEmpty())
{
newUnavailable = newUnavailable.intersecting(readScope);
if (this.unavailable == null) this.unavailable = newUnavailable;
else this.unavailable = newUnavailable.with(this.unavailable);
}
// wait for -1 to ensure the setup phase has also completed. Setup calls ack in its callback
// and prevents races where we respond before dispatching all the required reads (if the reads are
// completing faster than the reads can be setup on all required shards)
if (-1 == --waitingOnCount)
{
onAllReadsComplete();
reply(this.unavailable, data, null);
}
}
protected synchronized void readComplete(CommandStore commandStore, @Nullable Data result, @Nullable Ranges unavailable)
{
Invariants.checkState(waitingOn.get(commandStore.id()), "Txn %s's waiting on does not contain store %d; waitingOn=%s", txnId, commandStore.id(), waitingOn);
logger.trace("{}: read completed on {}", txnId, commandStore);
if (result != null)
data = data == null ? result : data.merge(result);
waitingOn.clear(commandStore.id());
ack(unavailable);
}
void read(SafeCommandStore safeStore, Timestamp executeAt, PartialTxn txn)
{
CommandStore unsafeStore = safeStore.commandStore();
Ranges unavailable = safeStore.ranges().unsafeToReadAt(executeAt);
txn.read(safeStore, executeAt).begin((next, throwable) -> {
if (throwable != null)
{
// TODO (expected, exceptions): should send exception to client, and consistency handle/propagate locally
logger.trace("{}: read failed for {}: {}", txnId, unsafeStore, throwable);
node.reply(replyTo, replyContext, null, throwable);
cancel();
}
else
readComplete(unsafeStore, next, unavailable);
});
}
@Override
public TxnId primaryTxnId()
{
return txnId;
}
public interface ReadReply extends Reply
{
boolean isOk();
}
public enum ReadNack implements ReadReply
{
Invalid, NotCommitted, Redundant;
@Override
public String toString()
{
return "Read" + name();
}
@Override
public MessageType type()
{
return READ_RSP;
}
@Override
public boolean isOk()
{
return false;
}
@Override
public boolean isFinal()
{
return this != NotCommitted;
}
}
public static class ReadOk implements ReadReply
{
/**
* if the replica we contacted was unable to fully answer the query, due to bootstrapping some portion,
* this is set to the ranges that were unavailable
*
* TODO (required): narrow to only the *intersecting* ranges that are unavailable, or do so on the recipient
*/
public final @Nullable Ranges unavailable;
public final @Nullable Data data;
public ReadOk(@Nullable Ranges unavailable, @Nullable Data data)
{
this.unavailable = unavailable;
this.data = data;
}
@Override
public String toString()
{
return "ReadOk{" + data + (unavailable == null ? "" : ", unavailable:" + unavailable) + '}';
}
@Override
public MessageType type()
{
return READ_RSP;
}
@Override
public boolean isOk()
{
return true;
}
}
}