blob: 4a9c5b5951d868251fb012a53d021f16cc32be7d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.omid.tso;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.omid.metrics.Gauge;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.timestamp.storage.TimestampStorage;
import org.apache.omid.transaction.AbstractTransactionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.omid.metrics.MetricsUtils.name;
/**
* The Timestamp Oracle that gives monotonically increasing timestamps based on world time
*/
@Singleton
public class WorldClockOracleImpl implements TimestampOracle {
private static final Logger LOG = LoggerFactory.getLogger(WorldClockOracleImpl.class);
static final long MAX_TX_PER_MS = 1_000_000; // 1 million
static final long TIMESTAMP_INTERVAL_MS = 10_000; // 10 seconds interval
private static final long TIMESTAMP_ALLOCATION_INTERVAL_MS = 7_000; // 7 seconds
private long lastTimestamp;
private long maxTimestamp;
private TimestampStorage storage;
private Panicker panicker;
private volatile long maxAllocatedTime;
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("ts-persist-%d").build());
private Runnable allocateTimestampsBatchTask;
private class AllocateTimestampBatchTask implements Runnable {
long previousMaxTime;
AllocateTimestampBatchTask(long previousMaxTime) {
this.previousMaxTime = previousMaxTime;
}
@Override
public void run() {
long newMaxTime = (System.currentTimeMillis() + TIMESTAMP_INTERVAL_MS) * MAX_TX_PER_MS;
try {
storage.updateMaxTimestamp(previousMaxTime, newMaxTime);
maxAllocatedTime = newMaxTime;
previousMaxTime = newMaxTime;
} catch (Throwable e) {
panicker.panic("Can't store the new max timestamp", e);
}
}
}
@Inject
public WorldClockOracleImpl(MetricsRegistry metrics,
TimestampStorage tsStorage,
Panicker panicker) throws IOException {
this.storage = tsStorage;
this.panicker = panicker;
metrics.gauge(name("tso", "maxTimestamp"), new Gauge<Long>() {
@Override
public Long getValue() {
return maxTimestamp;
}
});
}
@Override
public void initialize() throws IOException {
this.lastTimestamp = this.maxTimestamp = storage.getMaxTimestamp();
this.allocateTimestampsBatchTask = new AllocateTimestampBatchTask(lastTimestamp);
// Trigger first allocation of timestamps
scheduler.schedule(allocateTimestampsBatchTask, 0, TimeUnit.MILLISECONDS);
// Waiting for the current epoch to start. Occurs in case of failover when the previous TSO allocated the current time frame.
while ((System.currentTimeMillis() * MAX_TX_PER_MS) < this.lastTimestamp) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
continue;
}
}
// Launch the periodic timestamp interval allocation. In this case, the timestamp interval is extended even though the TSO is idle.
// Because we are world time based, this guarantees that the first request after a long time does not need to wait for new interval allocation.
scheduler.scheduleAtFixedRate(allocateTimestampsBatchTask, TIMESTAMP_ALLOCATION_INTERVAL_MS, TIMESTAMP_ALLOCATION_INTERVAL_MS, TimeUnit.MILLISECONDS);
}
/**
* Returns the next timestamp if available. Otherwise spins till the ts-persist thread allocates a new timestamp.
*/
@Override
public long next() {
long currentMsFirstTimestamp = System.currentTimeMillis() * MAX_TX_PER_MS;
lastTimestamp += AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
// Return the next timestamp in case we are still in the same millisecond as the previous timestamp was.
if (lastTimestamp >= currentMsFirstTimestamp) {
return lastTimestamp;
}
if (currentMsFirstTimestamp >= maxTimestamp) { // Intentional race to reduce synchronization overhead in every access to maxTimestamp
while (maxAllocatedTime <= currentMsFirstTimestamp) { // Waiting for the interval allocation
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
continue;
}
}
assert (maxAllocatedTime > maxTimestamp);
maxTimestamp = maxAllocatedTime;
}
lastTimestamp = currentMsFirstTimestamp;
return lastTimestamp;
}
@Override
public long getLast() {
return lastTimestamp;
}
@Override
public String toString() {
return String.format("TimestampOracle -> LastTimestamp: %d, MaxTimestamp: %d", lastTimestamp, maxTimestamp);
}
@VisibleForTesting
static class InMemoryTimestampStorage implements TimestampStorage {
long maxTime = 0;
@Override
public void updateMaxTimestamp(long previousMaxTime, long nextMaxTime) {
maxTime = nextMaxTime;
LOG.info("Updating max timestamp: (previous:{}, new:{})", previousMaxTime, nextMaxTime);
}
@Override
public long getMaxTimestamp() {
return maxTime;
}
}
}