blob: ecc4f27840d2d2f9ac854ef05007029d5edf938d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.hints;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import javax.annotation.Nullable;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.UnknownColumnFamilyException;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.NativeLibrary;
/**
* A paged non-compressed hints reader that provides two iterators:
* - a 'raw' ByteBuffer iterator that doesn't deserialize the hints, but returns the pre-encoded hints verbatim
* - a decoded iterator, that deserializes the underlying bytes into {@link Hint} instances.
*
* The former is an optimisation for when the messaging version of the file matches the messaging version of the destination
* node. Extra decoding and reencoding is a waste of effort in this scenario, so we avoid it.
*
* The latter is required for dispatch of hints to nodes that have a different messaging version, and in general is just an
* easy way to enable backward and future compatibilty.
*/
class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
{
private static final Logger logger = LoggerFactory.getLogger(HintsReader.class);
// don't read more than 512 KB of hints at a time.
private static final int PAGE_SIZE = 512 << 10;
private final HintsDescriptor descriptor;
private final File file;
private final ChecksummedDataInput input;
// we pass the RateLimiter into HintsReader itself because it's cheaper to calculate the size before the hint is deserialized
@Nullable
private final RateLimiter rateLimiter;
protected HintsReader(HintsDescriptor descriptor, File file, ChecksummedDataInput reader, RateLimiter rateLimiter)
{
this.descriptor = descriptor;
this.file = file;
this.input = reader;
this.rateLimiter = rateLimiter;
}
@SuppressWarnings("resource") // HintsReader owns input
static HintsReader open(File file, RateLimiter rateLimiter)
{
ChecksummedDataInput reader = ChecksummedDataInput.open(file);
try
{
HintsDescriptor descriptor = HintsDescriptor.deserialize(reader);
if (descriptor.isCompressed())
{
// since the hints descriptor is always uncompressed, it needs to be read with the normal ChecksummedDataInput.
// The compressed input is instantiated with the uncompressed input's position
reader = CompressedChecksummedDataInput.upgradeInput(reader, descriptor.createCompressor());
}
else if (descriptor.isEncrypted())
reader = EncryptedChecksummedDataInput.upgradeInput(reader, descriptor.getCipher(), descriptor.createCompressor());
return new HintsReader(descriptor, file, reader, rateLimiter);
}
catch (IOException e)
{
reader.close();
throw new FSReadError(e, file);
}
}
static HintsReader open(File file)
{
return open(file, null);
}
public void close()
{
input.close();
}
public HintsDescriptor descriptor()
{
return descriptor;
}
void seek(InputPosition newPosition)
{
input.seek(newPosition);
}
public Iterator<Page> iterator()
{
return new PagesIterator();
}
public ChecksummedDataInput getInput()
{
return input;
}
final class Page
{
public final InputPosition position;
private Page(InputPosition inputPosition)
{
this.position = inputPosition;
}
Iterator<Hint> hintsIterator()
{
return new HintsIterator(position);
}
Iterator<ByteBuffer> buffersIterator()
{
return new BuffersIterator(position);
}
}
final class PagesIterator extends AbstractIterator<Page>
{
@SuppressWarnings("resource")
protected Page computeNext()
{
input.tryUncacheRead();
if (input.isEOF())
return endOfData();
return new Page(input.getSeekPosition());
}
}
/**
* A decoding iterator that deserializes the hints as it goes.
*/
final class HintsIterator extends AbstractIterator<Hint>
{
private final InputPosition offset;
HintsIterator(InputPosition offset)
{
super();
this.offset = offset;
}
protected Hint computeNext()
{
Hint hint;
do
{
InputPosition position = input.getSeekPosition();
if (input.isEOF())
return endOfData(); // reached EOF
if (position.subtract(offset) >= PAGE_SIZE)
return endOfData(); // read page size or more bytes
try
{
hint = computeNextInternal();
}
catch (EOFException e)
{
logger.warn("Unexpected EOF replaying hints ({}), likely due to unflushed hint file on shutdown; continuing", descriptor.fileName(), e);
return endOfData();
}
catch (IOException e)
{
throw new FSReadError(e, file);
}
}
while (hint == null);
return hint;
}
private Hint computeNextInternal() throws IOException
{
input.resetCrc();
input.resetLimit();
int size = input.readInt();
// if we cannot corroborate the size via crc, then we cannot safely skip this hint
if (!input.checkCrc())
throw new IOException("Digest mismatch exception");
return readHint(size);
}
private Hint readHint(int size) throws IOException
{
if (rateLimiter != null)
rateLimiter.acquire(size);
input.limit(size);
Hint hint;
try
{
hint = Hint.serializer.deserialize(input, descriptor.messagingVersion());
input.checkLimit(0);
}
catch (UnknownColumnFamilyException e)
{
logger.warn("Failed to read a hint for {}: {} - table with id {} is unknown in file {}",
StorageService.instance.getEndpointForHostId(descriptor.hostId),
descriptor.hostId,
e.cfId,
descriptor.fileName());
input.skipBytes(Ints.checkedCast(size - input.bytesPastLimit()));
hint = null; // set the return value to null and let following code to update/check the CRC
}
if (input.checkCrc())
return hint;
// log a warning and skip the corrupted entry
logger.warn("Failed to read a hint for {}: {} - digest mismatch for hint at position {} in file {}",
StorageService.instance.getEndpointForHostId(descriptor.hostId),
descriptor.hostId,
input.getPosition() - size - 4,
descriptor.fileName());
return null;
}
}
/**
* A verbatim iterator that simply returns the underlying ByteBuffers.
*/
final class BuffersIterator extends AbstractIterator<ByteBuffer>
{
private final InputPosition offset;
BuffersIterator(InputPosition offset)
{
super();
this.offset = offset;
}
protected ByteBuffer computeNext()
{
ByteBuffer buffer;
do
{
InputPosition position = input.getSeekPosition();
if (input.isEOF())
return endOfData(); // reached EOF
if (position.subtract(offset) >= PAGE_SIZE)
return endOfData(); // read page size or more bytes
try
{
buffer = computeNextInternal();
}
catch (EOFException e)
{
logger.warn("Unexpected EOF replaying hints ({}), likely due to unflushed hint file on shutdown; continuing", descriptor.fileName(), e);
return endOfData();
}
catch (IOException e)
{
throw new FSReadError(e, file);
}
}
while (buffer == null);
return buffer;
}
private ByteBuffer computeNextInternal() throws IOException
{
input.resetCrc();
input.resetLimit();
int size = input.readInt();
// if we cannot corroborate the size via crc, then we cannot safely skip this hint
if (!input.checkCrc())
throw new IOException("Digest mismatch exception");
return readBuffer(size);
}
private ByteBuffer readBuffer(int size) throws IOException
{
if (rateLimiter != null)
rateLimiter.acquire(size);
input.limit(size);
ByteBuffer buffer = ByteBufferUtil.read(input, size);
if (input.checkCrc())
return buffer;
// log a warning and skip the corrupted entry
logger.warn("Failed to read a hint for {} - digest mismatch for hint at position {} in file {}",
descriptor.hostId,
input.getPosition() - size - 4,
descriptor.fileName());
return null;
}
}
}