| /* |
| * Copyright © 2012-2014 Cask Data, Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| * use this file except in compliance with the License. You may obtain a copy of |
| * the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package co.cask.tephra.persist; |
| |
| import com.google.common.base.Function; |
| import com.google.common.collect.Lists; |
| import com.google.common.util.concurrent.AbstractIdleService; |
| |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NavigableMap; |
| import java.util.TreeMap; |
| import javax.annotation.Nullable; |
| |
| /** |
| * Stores the latest transaction snapshot and logs in memory. |
| */ |
| public class InMemoryTransactionStateStorage extends AbstractIdleService implements TransactionStateStorage { |
| // only keeps the most recent snapshot in memory |
| private TransactionSnapshot lastSnapshot; |
| |
| private NavigableMap<Long, TransactionLog> logs = new TreeMap<>(); |
| |
| @Override |
| protected void startUp() throws Exception { |
| } |
| |
| @Override |
| protected void shutDown() throws Exception { |
| lastSnapshot = null; |
| logs = new TreeMap<>(); |
| } |
| |
| @Override |
| public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException { |
| // no codecs in in-memory mode |
| } |
| |
| @Override |
| public void writeSnapshot(TransactionSnapshot snapshot) throws IOException { |
| lastSnapshot = snapshot; |
| } |
| |
| @Override |
| public TransactionSnapshot getLatestSnapshot() throws IOException { |
| return lastSnapshot; |
| } |
| |
| @Override |
| public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException { |
| return lastSnapshot; |
| } |
| |
| @Override |
| public long deleteOldSnapshots(int numberToKeep) throws IOException { |
| // always only keep the last snapshot |
| return lastSnapshot.getTimestamp(); |
| } |
| |
| @Override |
| public List<String> listSnapshots() throws IOException { |
| List<String> snapshots = Lists.newArrayListWithCapacity(1); |
| if (lastSnapshot != null) { |
| snapshots.add(Long.toString(lastSnapshot.getTimestamp())); |
| } |
| return snapshots; |
| } |
| |
| @Override |
| public List<TransactionLog> getLogsSince(long timestamp) throws IOException { |
| return Lists.newArrayList(logs.tailMap(timestamp).values()); |
| } |
| |
| @Override |
| public TransactionLog createLog(long timestamp) throws IOException { |
| TransactionLog log = new InMemoryTransactionLog(timestamp); |
| logs.put(timestamp, log); |
| return log; |
| } |
| |
| @Override |
| public void deleteLogsOlderThan(long timestamp) throws IOException { |
| Iterator<Map.Entry<Long, TransactionLog>> logIter = logs.entrySet().iterator(); |
| while (logIter.hasNext()) { |
| Map.Entry<Long, TransactionLog> logEntry = logIter.next(); |
| if (logEntry.getKey() < timestamp) { |
| logIter.remove(); |
| } |
| } |
| } |
| |
| @Override |
| public void setupStorage() throws IOException { |
| } |
| |
| @Override |
| public List<String> listLogs() throws IOException { |
| return Lists.transform(Lists.newArrayList(logs.keySet()), new Function<Long, String>() { |
| @Nullable |
| @Override |
| public String apply(@Nullable Long input) { |
| return input.toString(); |
| } |
| }); |
| } |
| |
| @Override |
| public String getLocation() { |
| return "in-memory"; |
| } |
| |
| public static class InMemoryTransactionLog implements TransactionLog { |
| private long timestamp; |
| private List<TransactionEdit> edits = Lists.newArrayList(); |
| boolean isClosed = false; |
| public InMemoryTransactionLog(long timestamp) { |
| this.timestamp = timestamp; |
| } |
| |
| @Override |
| public String getName() { |
| return "in-memory@" + timestamp; |
| } |
| |
| @Override |
| public long getTimestamp() { |
| return timestamp; |
| } |
| |
| @Override |
| public void append(TransactionEdit edit) throws IOException { |
| if (isClosed) { |
| throw new IOException("Log is closed"); |
| } |
| edits.add(edit); |
| } |
| |
| @Override |
| public void append(List<TransactionEdit> edits) throws IOException { |
| if (isClosed) { |
| throw new IOException("Log is closed"); |
| } |
| edits.addAll(edits); |
| } |
| |
| @Override |
| public void close() { |
| isClosed = true; |
| } |
| |
| @Override |
| public TransactionLogReader getReader() throws IOException { |
| return new InMemoryLogReader(edits.iterator()); |
| } |
| } |
| |
| public static class InMemoryLogReader implements TransactionLogReader { |
| private final Iterator<TransactionEdit> editIterator; |
| |
| public InMemoryLogReader(Iterator<TransactionEdit> editIterator) { |
| this.editIterator = editIterator; |
| } |
| |
| @Override |
| public TransactionEdit next() throws IOException { |
| if (editIterator.hasNext()) { |
| return editIterator.next(); |
| } |
| return null; |
| } |
| |
| @Override |
| public TransactionEdit next(TransactionEdit reuse) throws IOException { |
| return next(); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| } |
| } |
| } |