blob: 14e1370691cbae1af88766f329b4ee7d4b5f414b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.omid.tso;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
import com.lmax.disruptor.TimeoutHandler;
import com.lmax.disruptor.dsl.Disruptor;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.tso.TSOStateManager.TSOState;
import org.jboss.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import static com.lmax.disruptor.dsl.ProducerType.MULTI;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.omid.tso.AbstractRequestProcessor.RequestEvent.EVENT_FACTORY;
abstract class AbstractRequestProcessor implements EventHandler<AbstractRequestProcessor.RequestEvent>, RequestProcessor, TimeoutHandler {
private static final Logger LOG = LoggerFactory.getLogger(AbstractRequestProcessor.class);
// Disruptor-related attributes
private final ExecutorService disruptorExec;
protected final Disruptor<RequestEvent> disruptor;
protected RingBuffer<RequestEvent> requestRing;
private final TimestampOracle timestampOracle;
private final CommitHashMap hashmap;
private final MetricsRegistry metrics;
private final LowWatermarkWriter lowWatermarkWriter;
private long lowWatermark = -1L;
AbstractRequestProcessor(MetricsRegistry metrics,
TimestampOracle timestampOracle,
Panicker panicker,
TSOServerConfig config, LowWatermarkWriter lowWatermarkWriter)
throws IOException {
// ------------------------------------------------------------------------------------------------------------
// Disruptor initialization
// ------------------------------------------------------------------------------------------------------------
TimeoutBlockingWaitStrategy timeoutStrategy = new TimeoutBlockingWaitStrategy(config.getBatchPersistTimeoutInMs(), MILLISECONDS);
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("request-%d").build();
this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory);
this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, MULTI, timeoutStrategy);
disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This must be before handleEventsWith()
disruptor.handleEventsWith(this);
// ------------------------------------------------------------------------------------------------------------
// Attribute initialization
// ------------------------------------------------------------------------------------------------------------
this.metrics = metrics;
this.timestampOracle = timestampOracle;
this.hashmap = new CommitHashMap(config.getConflictMapSize());
this.lowWatermarkWriter = lowWatermarkWriter;
LOG.info("RequestProcessor initialized");
}
/**
* This should be called when the TSO gets leadership
*/
@Override
public void update(TSOState state) throws Exception {
LOG.info("Initializing RequestProcessor state...");
this.lowWatermark = state.getLowWatermark();
lowWatermarkWriter.persistLowWatermark(lowWatermark).get(); // Sync persist
LOG.info("RequestProcessor state initialized with LWMs {} and Epoch {}", lowWatermark, state.getEpoch());
}
@Override
public void onEvent(RequestEvent event, long sequence, boolean endOfBatch) throws Exception {
switch (event.getType()) {
case TIMESTAMP:
handleTimestamp(event);
break;
case COMMIT:
handleCommit(event);
break;
default:
throw new IllegalStateException("Event not allowed in Request Processor: " + event);
}
}
@Override
public void onTimeout(long sequence) throws Exception {
// TODO We can not use this as a timeout trigger for flushing. This timeout is related to the time between
// TODO (cont) arrivals of requests to the disruptor. We need another mechanism to trigger timeouts
// TODO (cont) WARNING!!! Take care with the implementation because if there's other thread than request-0
// TODO (cont) thread the one that calls persistProc.triggerCurrentBatchFlush(); we'll incur in concurrency issues
// TODO (cont) This is because, in the current implementation, only the request-0 thread calls the public methods
// TODO (cont) in persistProc and it is guaranteed that access them serially.
onTimeout();
}
@Override
public void timestampRequest(Channel c, MonitoringContext monCtx) {
monCtx.timerStart("request.processor.timestamp.latency");
long seq = requestRing.next();
RequestEvent e = requestRing.get(seq);
RequestEvent.makeTimestampRequest(e, c, monCtx);
requestRing.publish(seq);
}
@Override
public void commitRequest(long startTimestamp, Collection<Long> writeSet, boolean isRetry, Channel c,
MonitoringContext monCtx) {
monCtx.timerStart("request.processor.commit.latency");
long seq = requestRing.next();
RequestEvent e = requestRing.get(seq);
RequestEvent.makeCommitRequest(e, startTimestamp, monCtx, writeSet, isRetry, c);
requestRing.publish(seq);
}
private void handleTimestamp(RequestEvent requestEvent) throws Exception {
long timestamp = timestampOracle.next();
requestEvent.getMonCtx().timerStop("request.processor.timestamp.latency");
forwardTimestamp(timestamp, requestEvent.getChannel(), requestEvent.getMonCtx());
}
private void handleCommit(RequestEvent event) throws Exception {
long startTimestamp = event.getStartTimestamp();
Iterable<Long> writeSet = event.writeSet();
boolean isCommitRetry = event.isCommitRetry();
Channel c = event.getChannel();
boolean txCanCommit;
int numCellsInWriteset = 0;
// 0. check if it should abort
if (startTimestamp <= lowWatermark) {
txCanCommit = false;
} else {
// 1. check the write-write conflicts
txCanCommit = true;
for (long cellId : writeSet) {
long value = hashmap.getLatestWriteForCell(cellId);
if (value != 0 && value >= startTimestamp) {
txCanCommit = false;
break;
}
numCellsInWriteset++;
}
}
if (txCanCommit) {
// 2. commit
long commitTimestamp = timestampOracle.next();
if (numCellsInWriteset > 0) {
long newLowWatermark = lowWatermark;
for (long r : writeSet) {
long removed = hashmap.putLatestWriteForCell(r, commitTimestamp);
newLowWatermark = Math.max(removed, newLowWatermark);
}
if (newLowWatermark != lowWatermark) {
LOG.trace("Setting new low Watermark to {}", newLowWatermark);
lowWatermark = newLowWatermark;
lowWatermarkWriter.persistLowWatermark(newLowWatermark); // Async persist
}
}
event.getMonCtx().timerStop("request.processor.commit.latency");
forwardCommit(startTimestamp, commitTimestamp, c, event.getMonCtx());
} else {
event.getMonCtx().timerStop("request.processor.commit.latency");
if (isCommitRetry) { // Re-check if it was already committed but the client retried due to a lag replying
forwardCommitRetry(startTimestamp, c, event.getMonCtx());
} else {
forwardAbort(startTimestamp, c, event.getMonCtx());
}
}
}
public abstract void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
public abstract void forwardCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
public abstract void forwardAbort(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
public abstract void forwardTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
public abstract void onTimeout() throws Exception;
@Override
public void close() throws IOException {
LOG.info("Terminating Request Processor...");
disruptor.halt();
disruptor.shutdown();
LOG.info("\tRequest Processor Disruptor shutdown");
disruptorExec.shutdownNow();
try {
disruptorExec.awaitTermination(3, SECONDS);
LOG.info("\tRequest Processor Disruptor executor shutdown");
} catch (InterruptedException e) {
LOG.error("Interrupted whilst finishing Request Processor Disruptor executor");
Thread.currentThread().interrupt();
}
LOG.info("Request Processor terminated");
}
final static class RequestEvent implements Iterable<Long> {
enum Type {
TIMESTAMP, COMMIT
}
private Type type = null;
private Channel channel = null;
private boolean isCommitRetry = false;
private long startTimestamp = 0;
private MonitoringContext monCtx;
private long numCells = 0;
private static final int MAX_INLINE = 40;
private Long writeSet[] = new Long[MAX_INLINE];
private Collection<Long> writeSetAsCollection = null; // for the case where there's more than MAX_INLINE
static void makeTimestampRequest(RequestEvent e, Channel c, MonitoringContext monCtx) {
e.type = Type.TIMESTAMP;
e.channel = c;
e.monCtx = monCtx;
}
static void makeCommitRequest(RequestEvent e,
long startTimestamp,
MonitoringContext monCtx,
Collection<Long> writeSet,
boolean isRetry,
Channel c) {
e.monCtx = monCtx;
e.type = Type.COMMIT;
e.channel = c;
e.startTimestamp = startTimestamp;
e.isCommitRetry = isRetry;
if (writeSet.size() > MAX_INLINE) {
e.numCells = writeSet.size();
e.writeSetAsCollection = writeSet;
} else {
e.writeSetAsCollection = null;
e.numCells = writeSet.size();
int i = 0;
for (Long cellId : writeSet) {
e.writeSet[i] = cellId;
i++;
}
}
}
MonitoringContext getMonCtx() {
return monCtx;
}
Type getType() {
return type;
}
long getStartTimestamp() {
return startTimestamp;
}
Channel getChannel() {
return channel;
}
@Override
public Iterator<Long> iterator() {
if (writeSetAsCollection != null) {
return writeSetAsCollection.iterator();
}
return new Iterator<Long>() {
int i = 0;
@Override
public boolean hasNext() {
return i < numCells;
}
@Override
public Long next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return writeSet[i++];
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
Iterable<Long> writeSet() {
return this;
}
boolean isCommitRetry() {
return isCommitRetry;
}
final static EventFactory<RequestEvent> EVENT_FACTORY = new EventFactory<RequestEvent>() {
@Override
public RequestEvent newInstance() {
return new RequestEvent();
}
};
}
}