blob: b2a5004b80ac3bb7d5de1e9a4bf9f060c170ed7b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service.paxos.uncommitted;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.PeekingIterator;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.util.ChecksummedRandomAccessReader;
import org.apache.cassandra.io.util.ChecksummedSequentialWriter;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.io.util.SequentialWriterOption;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.paxos.Ballot;
import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.Throwables;
public class UncommittedDataFile
{
static final String EXTENSION = "paxos";
static final String TMP_SUFFIX = ".tmp";
private static final int VERSION = 0;
final TableId tableId;
private final File file;
private final File crcFile;
private final long generation;
private int activeReaders = 0;
private boolean markedDeleted = false;
private UncommittedDataFile(TableId tableId, File file, File crcFile, long generation)
{
this.tableId = tableId;
this.file = file;
this.crcFile = crcFile;
this.generation = generation;
}
public static UncommittedDataFile create(TableId tableId, File file, File crcFile, long generation)
{
return new UncommittedDataFile(tableId, file, crcFile, generation);
}
static Writer writer(File directory, String keyspace, String table, TableId tableId, long generation) throws IOException
{
return new Writer(directory, keyspace, table, tableId, generation);
}
static Set<TableId> listTableIds(File directory)
{
Pattern pattern = Pattern.compile(".*-([a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12})-(\\d+)\\." + EXTENSION + '$');
Set<TableId> tableIds = new HashSet<>();
for (String fname : directory.listNamesUnchecked())
{
Matcher matcher = pattern.matcher(fname);
if (matcher.matches())
tableIds.add(TableId.fromUUID(UUID.fromString(matcher.group(1))));
}
return tableIds;
}
static Pattern fileRegexFor(TableId tableId)
{
return Pattern.compile(".*-" + tableId.toString() + "-(\\d+)\\." + EXTENSION + ".*");
}
static boolean isTmpFile(String fname)
{
return fname.endsWith(TMP_SUFFIX);
}
static boolean isCrcFile(String fname)
{
return fname.endsWith(".crc");
}
static String fileName(String keyspace, String table, TableId tableId, long generation)
{
return String.format("%s-%s-%s-%s.%s", keyspace, table, tableId, generation, EXTENSION);
}
static String crcName(String fname)
{
return fname + ".crc";
}
synchronized void markDeleted()
{
markedDeleted = true;
maybeDelete();
}
private void maybeDelete()
{
if (markedDeleted && activeReaders == 0)
{
file.delete();
crcFile.delete();
}
}
synchronized private void onIteratorClose()
{
activeReaders--;
maybeDelete();
}
@VisibleForTesting
File file()
{
return file;
}
@VisibleForTesting
int getActiveReaders()
{
return activeReaders;
}
@VisibleForTesting
boolean isMarkedDeleted()
{
return markedDeleted;
}
long generation()
{
return generation;
}
/**
* Return an iterator of the file contents for the given token ranges. Token ranges
* must be normalized
*/
synchronized CloseableIterator<PaxosKeyState> iterator(Collection<Range<Token>> ranges)
{
Preconditions.checkArgument(Iterables.elementsEqual(Range.normalize(ranges), ranges));
if (markedDeleted)
return null;
activeReaders++;
return new KeyCommitStateIterator(ranges);
}
private interface PeekingKeyCommitIterator extends CloseableIterator<PaxosKeyState>, PeekingIterator<PaxosKeyState>
{
static final PeekingKeyCommitIterator EMPTY = new PeekingKeyCommitIterator()
{
public PaxosKeyState peek() { throw new NoSuchElementException(); }
public void remove() { throw new NoSuchElementException(); }
public void close() { }
public boolean hasNext() { return false; }
public PaxosKeyState next() { throw new NoSuchElementException(); }
};
}
static class Writer
{
final File directory;
final String keyspace;
final String table;
final TableId tableId;
long generation;
private final File file;
private final File crcFile;
final SequentialWriter writer;
DecoratedKey lastKey = null;
private String fileName(long generation)
{
return UncommittedDataFile.fileName(keyspace, table, tableId, generation);
}
private String crcName(long generation)
{
return UncommittedDataFile.crcName(fileName(generation));
}
Writer(File directory, String keyspace, String table, TableId tableId, long generation) throws IOException
{
this.directory = directory;
this.keyspace = keyspace;
this.table = table;
this.tableId = tableId;
this.generation = generation;
directory.createDirectoriesIfNotExists();
this.file = new File(this.directory, fileName(generation) + TMP_SUFFIX);
this.crcFile = new File(this.directory, crcName(generation) + TMP_SUFFIX);
this.writer = new ChecksummedSequentialWriter(file, crcFile, null, SequentialWriterOption.DEFAULT);
this.writer.writeInt(VERSION);
}
void append(PaxosKeyState state) throws IOException
{
if (lastKey != null)
Preconditions.checkArgument(state.key.compareTo(lastKey) > 0);
lastKey = state.key;
ByteBufferUtil.writeWithShortLength(state.key.getKey(), writer);
state.ballot.serialize(writer);
writer.writeBoolean(state.committed);
}
Throwable abort(Throwable accumulate)
{
return writer.abort(accumulate);
}
UncommittedDataFile finish()
{
writer.finish();
File finalCrc = new File(directory, crcName(generation));
File finalData = new File(directory, fileName(generation));
try
{
crcFile.move(finalCrc);
file.move(finalData);
return new UncommittedDataFile(tableId, finalData, finalCrc, generation);
}
catch (Throwable e)
{
Throwable merged = e;
for (File f : new File[]{crcFile, finalCrc, file, finalData})
{
try
{
if (f.exists())
Files.delete(f.toPath());
}
catch (Throwable t)
{
merged = Throwables.merge(merged, t);
}
}
if (merged != e)
throw new RuntimeException(merged);
throw e;
}
}
}
class KeyCommitStateIterator extends AbstractIterator<PaxosKeyState> implements PeekingKeyCommitIterator
{
private final Iterator<Range<Token>> rangeIterator;
private final RandomAccessReader reader;
private Range<PartitionPosition> currentRange;
KeyCommitStateIterator(Collection<Range<Token>> ranges)
{
this.rangeIterator = ranges.iterator();
try
{
this.reader = ChecksummedRandomAccessReader.open(file, crcFile);
}
catch (IOException e)
{
throw new FSReadError(e, file);
}
validateVersion(this.reader);
Preconditions.checkArgument(rangeIterator.hasNext());
currentRange = convertRange(rangeIterator.next());
}
private Range<PartitionPosition> convertRange(Range<Token> tokenRange)
{
return new Range<>(tokenRange.left.maxKeyBound(), tokenRange.right.maxKeyBound());
}
private void validateVersion(RandomAccessReader reader)
{
try
{
int version = reader.readInt();
Preconditions.checkArgument(version == VERSION, "unsupported file version: %s", version);
}
catch (IOException e)
{
throw new FSReadError(e, file);
}
}
PaxosKeyState createKeyState(DecoratedKey key, RandomAccessReader reader) throws IOException
{
return new PaxosKeyState(tableId, key,
Ballot.deserialize(reader),
reader.readBoolean());
}
/**
* skip any bytes after the key
*/
void skipEntryRemainder(RandomAccessReader reader) throws IOException
{
reader.skipBytes((int) Ballot.sizeInBytes());
reader.readBoolean();
}
protected synchronized PaxosKeyState computeNext()
{
try
{
nextKey:
while (!reader.isEOF())
{
DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(reader));
while (!currentRange.contains(key))
{
// if this falls before our current target range, just keep going
if (currentRange.left.compareTo(key) >= 0)
{
skipEntryRemainder(reader);
continue nextKey;
}
// otherwise check against subsequent ranges and end iteration if there are none
if (!rangeIterator.hasNext())
return endOfData();
currentRange = convertRange(rangeIterator.next());
}
return createKeyState(key, reader);
}
return endOfData();
}
catch (IOException e)
{
throw new FSReadError(e, file);
}
}
public void close()
{
synchronized (this)
{
reader.close();
}
onIteratorClose();
}
}
}