blob: 2564e966d45d9526c71ee05feff693ee6ee24f16 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.sstable.format.big;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.EnumMap;
import java.util.Map;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.io.util.SequentialWriterOption;
import org.apache.cassandra.net.AsyncStreamingInputPlus;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadataRef;
import static java.lang.String.format;
import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
public class BigTableZeroCopyWriter extends SSTable implements SSTableMultiWriter
{
private static final Logger logger = LoggerFactory.getLogger(BigTableZeroCopyWriter.class);
private final TableMetadataRef metadata;
private volatile SSTableReader finalReader;
private final Map<Component.Type, SequentialWriter> componentWriters;
private static final SequentialWriterOption WRITER_OPTION =
SequentialWriterOption.newBuilder()
.trickleFsync(false)
.bufferSize(2 << 20)
.bufferType(BufferType.OFF_HEAP)
.build();
private static final ImmutableSet<Component> SUPPORTED_COMPONENTS =
ImmutableSet.of(Component.DATA,
Component.PRIMARY_INDEX,
Component.SUMMARY,
Component.STATS,
Component.COMPRESSION_INFO,
Component.FILTER,
Component.DIGEST,
Component.CRC);
public BigTableZeroCopyWriter(Descriptor descriptor,
TableMetadataRef metadata,
LifecycleNewTracker lifecycleNewTracker,
final Collection<Component> components)
{
super(descriptor, ImmutableSet.copyOf(components), metadata, DatabaseDescriptor.getDiskOptimizationStrategy());
lifecycleNewTracker.trackNew(this);
this.metadata = metadata;
this.componentWriters = new EnumMap<>(Component.Type.class);
if (!SUPPORTED_COMPONENTS.containsAll(components))
throw new AssertionError(format("Unsupported streaming component detected %s",
Sets.difference(ImmutableSet.copyOf(components), SUPPORTED_COMPONENTS)));
for (Component c : components)
componentWriters.put(c.type, makeWriter(descriptor, c));
}
private static SequentialWriter makeWriter(Descriptor descriptor, Component component)
{
return new SequentialWriter(new File(descriptor.filenameFor(component)), WRITER_OPTION, false);
}
private void write(DataInputPlus in, long size, SequentialWriter out) throws FSWriteError
{
final int BUFFER_SIZE = 1 << 20;
long bytesRead = 0;
byte[] buff = new byte[BUFFER_SIZE];
try
{
while (bytesRead < size)
{
int toRead = (int) Math.min(size - bytesRead, BUFFER_SIZE);
in.readFully(buff, 0, toRead);
int count = Math.min(toRead, BUFFER_SIZE);
out.write(buff, 0, count);
bytesRead += count;
}
out.sync(); // finish will also call sync(). Leaving here to get stuff flushed as early as possible
}
catch (IOException e)
{
throw new FSWriteError(e, out.getPath());
}
}
@Override
public boolean append(UnfilteredRowIterator partition)
{
throw new UnsupportedOperationException("Operation not supported by BigTableBlockWriter");
}
@Override
public Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult)
{
return finish(openResult);
}
@Override
public Collection<SSTableReader> finish(boolean openResult)
{
setOpenResult(openResult);
for (SequentialWriter writer : componentWriters.values())
writer.finish();
return finished();
}
@Override
public Collection<SSTableReader> finished()
{
if (finalReader == null)
finalReader = SSTableReader.open(descriptor, components, metadata);
return ImmutableList.of(finalReader);
}
@Override
public SSTableMultiWriter setOpenResult(boolean openResult)
{
return null;
}
@Override
public long getFilePointer()
{
return 0;
}
@Override
public TableId getTableId()
{
return metadata.id;
}
@Override
public Throwable commit(Throwable accumulate)
{
for (SequentialWriter writer : componentWriters.values())
accumulate = writer.commit(accumulate);
return accumulate;
}
@Override
public Throwable abort(Throwable accumulate)
{
for (SequentialWriter writer : componentWriters.values())
accumulate = writer.abort(accumulate);
return accumulate;
}
@Override
public void prepareToCommit()
{
for (SequentialWriter writer : componentWriters.values())
writer.prepareToCommit();
}
@Override
public void close()
{
for (SequentialWriter writer : componentWriters.values())
writer.close();
}
public void writeComponent(Component.Type type, DataInputPlus in, long size)
{
logger.info("Writing component {} to {} length {}", type, componentWriters.get(type).getPath(), prettyPrintMemory(size));
if (in instanceof AsyncStreamingInputPlus)
write((AsyncStreamingInputPlus) in, size, componentWriters.get(type));
else
write(in, size, componentWriters.get(type));
}
private void write(AsyncStreamingInputPlus in, long size, SequentialWriter writer)
{
logger.info("Block Writing component to {} length {}", writer.getPath(), prettyPrintMemory(size));
try
{
in.consume(writer::writeDirectlyToChannel, size);
writer.sync();
}
// FIXME: handle ACIP exceptions properly
catch (EOFException | AsyncStreamingInputPlus.InputTimeoutException e)
{
in.close();
}
catch (IOException e)
{
throw new FSWriteError(e, writer.getPath());
}
}
}