blob: 23e18b5e2f03f5b26c43e1a292ad73bd2ab80b47 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.sstable;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Throwables;
import io.netty.util.concurrent.FastThreadLocalThread;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.UnfilteredSerializer;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.utils.JVMStabilityInspector;
/**
* A SSTable writer that doesn't assume rows are in sorted order.
* <p>
* This writer buffers rows in memory and then write them all in sorted order.
* To avoid loading the entire data set in memory, the amount of rows buffered
* is configurable. Each time the threshold is met, one SSTable will be
* created (and the buffer be reseted).
*
* @see SSTableSimpleWriter
*/
class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
{
private static final Buffer SENTINEL = new Buffer();
private Buffer buffer = new Buffer();
private final long bufferSize;
private long currentSize;
// Used to compute the row serialized size
private final SerializationHeader header;
private final BlockingQueue<Buffer> writeQueue = new SynchronousQueue<Buffer>();
private final DiskWriter diskWriter = new DiskWriter();
SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, PartitionColumns columns, long bufferSizeInMB)
{
super(directory, metadata, columns);
this.bufferSize = bufferSizeInMB * 1024L * 1024L;
this.header = new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS);
diskWriter.start();
}
PartitionUpdate getUpdateFor(DecoratedKey key)
{
assert key != null;
PartitionUpdate previous = buffer.get(key);
if (previous == null)
{
previous = createPartitionUpdate(key);
currentSize += PartitionUpdate.serializer.serializedSize(previous, formatType.info.getLatestVersion().correspondingMessagingVersion());
previous.allowNewUpdates();
buffer.put(key, previous);
}
return previous;
}
private void countRow(Row row)
{
// Note that the accounting of a row is a bit inaccurate (it doesn't take some of the file format optimization into account)
// and the maintaining of the bufferSize is in general not perfect. This has always been the case for this class but we should
// improve that. In particular, what we count is closer to the serialized value, but it's debatable that it's the right thing
// to count since it will take a lot more space in memory and the bufferSize if first and foremost used to avoid OOM when
// using this writer.
currentSize += UnfilteredSerializer.serializer.serializedSize(row, header, 0, formatType.info.getLatestVersion().correspondingMessagingVersion());
}
private void maybeSync() throws SyncException
{
try
{
if (currentSize > bufferSize)
sync();
}
catch (IOException e)
{
// addColumn does not throw IOException but we want to report this to the user,
// so wrap it in a temporary RuntimeException that we'll catch in rawAddRow above.
throw new SyncException(e);
}
}
private PartitionUpdate createPartitionUpdate(DecoratedKey key)
{
return new PartitionUpdate(metadata, key, columns, 4)
{
@Override
public void add(Row row)
{
super.add(row);
countRow(row);
maybeSync();
}
};
}
@Override
public void close() throws IOException
{
sync();
put(SENTINEL);
try
{
diskWriter.join();
checkForWriterException();
}
catch (Throwable e)
{
throw new RuntimeException(e);
}
checkForWriterException();
}
protected void sync() throws IOException
{
if (buffer.isEmpty())
return;
put(buffer);
buffer = new Buffer();
currentSize = 0;
}
private void put(Buffer buffer) throws IOException
{
while (true)
{
checkForWriterException();
try
{
if (writeQueue.offer(buffer, 1, TimeUnit.SECONDS))
break;
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
}
private void checkForWriterException() throws IOException
{
// slightly lame way to report exception from the writer, but that should be good enough
if (diskWriter.exception != null)
{
if (diskWriter.exception instanceof IOException)
throw (IOException) diskWriter.exception;
else
throw Throwables.propagate(diskWriter.exception);
}
}
static class SyncException extends RuntimeException
{
SyncException(IOException ioe)
{
super(ioe);
}
}
//// typedef
static class Buffer extends TreeMap<DecoratedKey, PartitionUpdate> {}
private class DiskWriter extends FastThreadLocalThread
{
volatile Throwable exception = null;
public void run()
{
while (true)
{
try
{
Buffer b = writeQueue.take();
if (b == SENTINEL)
return;
try (SSTableTxnWriter writer = createWriter())
{
for (Map.Entry<DecoratedKey, PartitionUpdate> entry : b.entrySet())
writer.append(entry.getValue().unfilteredIterator());
writer.finish(false);
}
}
catch (Throwable e)
{
JVMStabilityInspector.inspectThrowable(e);
// Keep only the first exception
if (exception == null)
exception = e;
}
}
}
}
}