blob: d9969de209676b578c932b64c63942f09617bee5 [file] [log] [blame]
/*
* Copyright © 2012-2014 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package co.cask.tephra.persist;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractIdleService;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import javax.annotation.Nullable;
/**
* Stores the latest transaction snapshot and logs in memory.
*/
public class InMemoryTransactionStateStorage extends AbstractIdleService implements TransactionStateStorage {
// only keeps the most recent snapshot in memory
private TransactionSnapshot lastSnapshot;
private NavigableMap<Long, TransactionLog> logs = new TreeMap<>();
@Override
protected void startUp() throws Exception {
}
@Override
protected void shutDown() throws Exception {
lastSnapshot = null;
logs = new TreeMap<>();
}
@Override
public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException {
// no codecs in in-memory mode
}
@Override
public void writeSnapshot(TransactionSnapshot snapshot) throws IOException {
lastSnapshot = snapshot;
}
@Override
public TransactionSnapshot getLatestSnapshot() throws IOException {
return lastSnapshot;
}
@Override
public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException {
return lastSnapshot;
}
@Override
public long deleteOldSnapshots(int numberToKeep) throws IOException {
// always only keep the last snapshot
return lastSnapshot.getTimestamp();
}
@Override
public List<String> listSnapshots() throws IOException {
List<String> snapshots = Lists.newArrayListWithCapacity(1);
if (lastSnapshot != null) {
snapshots.add(Long.toString(lastSnapshot.getTimestamp()));
}
return snapshots;
}
@Override
public List<TransactionLog> getLogsSince(long timestamp) throws IOException {
return Lists.newArrayList(logs.tailMap(timestamp).values());
}
@Override
public TransactionLog createLog(long timestamp) throws IOException {
TransactionLog log = new InMemoryTransactionLog(timestamp);
logs.put(timestamp, log);
return log;
}
@Override
public void deleteLogsOlderThan(long timestamp) throws IOException {
Iterator<Map.Entry<Long, TransactionLog>> logIter = logs.entrySet().iterator();
while (logIter.hasNext()) {
Map.Entry<Long, TransactionLog> logEntry = logIter.next();
if (logEntry.getKey() < timestamp) {
logIter.remove();
}
}
}
@Override
public void setupStorage() throws IOException {
}
@Override
public List<String> listLogs() throws IOException {
return Lists.transform(Lists.newArrayList(logs.keySet()), new Function<Long, String>() {
@Nullable
@Override
public String apply(@Nullable Long input) {
return input.toString();
}
});
}
@Override
public String getLocation() {
return "in-memory";
}
public static class InMemoryTransactionLog implements TransactionLog {
private long timestamp;
private List<TransactionEdit> edits = Lists.newArrayList();
boolean isClosed = false;
public InMemoryTransactionLog(long timestamp) {
this.timestamp = timestamp;
}
@Override
public String getName() {
return "in-memory@" + timestamp;
}
@Override
public long getTimestamp() {
return timestamp;
}
@Override
public void append(TransactionEdit edit) throws IOException {
if (isClosed) {
throw new IOException("Log is closed");
}
edits.add(edit);
}
@Override
public void append(List<TransactionEdit> edits) throws IOException {
if (isClosed) {
throw new IOException("Log is closed");
}
edits.addAll(edits);
}
@Override
public void close() {
isClosed = true;
}
@Override
public TransactionLogReader getReader() throws IOException {
return new InMemoryLogReader(edits.iterator());
}
}
public static class InMemoryLogReader implements TransactionLogReader {
private final Iterator<TransactionEdit> editIterator;
public InMemoryLogReader(Iterator<TransactionEdit> editIterator) {
this.editIterator = editIterator;
}
@Override
public TransactionEdit next() throws IOException {
if (editIterator.hasNext()) {
return editIterator.next();
}
return null;
}
@Override
public TransactionEdit next(TransactionEdit reuse) throws IOException {
return next();
}
@Override
public void close() throws IOException {
}
}
}