blob: 8e50456d6badbe859d04b0af4bd5071bd2915a97 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.memtable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableMetadataRef;
import org.github.jamm.Unmetered;
public abstract class AbstractMemtable implements Memtable
{
protected final AtomicLong currentOperations = new AtomicLong(0);
protected final ColumnsCollector columnsCollector;
protected final StatsCollector statsCollector = new StatsCollector();
// The smallest timestamp for all partitions stored in this memtable
protected AtomicLong minTimestamp = new AtomicLong(Long.MAX_VALUE);
// The smallest local deletion time for all partitions in this memtable
protected AtomicInteger minLocalDeletionTime = new AtomicInteger(Integer.MAX_VALUE);
// Note: statsCollector has corresponding statistics to the two above, but starts with an epoch value which is not
// correct for their usage.
@Unmetered
protected TableMetadataRef metadata;
public AbstractMemtable(TableMetadataRef metadataRef)
{
this.metadata = metadataRef;
this.columnsCollector = new ColumnsCollector(metadata.get().regularAndStaticColumns());
}
@VisibleForTesting
public AbstractMemtable(TableMetadataRef metadataRef, long minTimestamp)
{
this.metadata = metadataRef;
this.columnsCollector = new ColumnsCollector(metadata.get().regularAndStaticColumns());
this.minTimestamp = new AtomicLong(minTimestamp);
}
public TableMetadata metadata()
{
return metadata.get();
}
public long operationCount()
{
return currentOperations.get();
}
public long getMinTimestamp()
{
return minTimestamp.get() != EncodingStats.NO_STATS.minTimestamp ? minTimestamp.get() : NO_MIN_TIMESTAMP;
}
public int getMinLocalDeletionTime()
{
return minLocalDeletionTime.get();
}
protected static void updateMin(AtomicLong minTracker, long newValue)
{
while (true)
{
long existing = minTracker.get();
if (existing <= newValue)
break;
if (minTracker.compareAndSet(existing, newValue))
break;
}
}
protected static void updateMin(AtomicInteger minTracker, int newValue)
{
while (true)
{
int existing = minTracker.get();
if (existing <= newValue)
break;
if (minTracker.compareAndSet(existing, newValue))
break;
}
}
RegularAndStaticColumns columns()
{
return columnsCollector.get();
}
EncodingStats encodingStats()
{
return statsCollector.get();
}
protected static class ColumnsCollector
{
private final HashMap<ColumnMetadata, AtomicBoolean> predefined = new HashMap<>();
private final ConcurrentSkipListSet<ColumnMetadata> extra = new ConcurrentSkipListSet<>();
ColumnsCollector(RegularAndStaticColumns columns)
{
for (ColumnMetadata def : columns.statics)
predefined.put(def, new AtomicBoolean());
for (ColumnMetadata def : columns.regulars)
predefined.put(def, new AtomicBoolean());
}
public void update(RegularAndStaticColumns columns)
{
for (ColumnMetadata s : columns.statics)
update(s);
for (ColumnMetadata r : columns.regulars)
update(r);
}
public void update(ColumnsCollector other)
{
for (Map.Entry<ColumnMetadata, AtomicBoolean> v : other.predefined.entrySet())
if (v.getValue().get())
update(v.getKey());
extra.addAll(other.extra);
}
private void update(ColumnMetadata definition)
{
AtomicBoolean present = predefined.get(definition);
if (present != null)
{
if (!present.get())
present.set(true);
}
else
{
extra.add(definition);
}
}
/**
* Get the current state of the columns set.
*
* Note: If this is executed while mutations are still being performed on the table (e.g. to prepare
* an sstable for streaming when Memtable.Factory.streamFromMemtable() is true), the resulting view may be
* in a somewhat inconsistent state (it may include partial updates, as well as miss updates older than
* ones it does include).
*/
public RegularAndStaticColumns get()
{
RegularAndStaticColumns.Builder builder = RegularAndStaticColumns.builder();
for (Map.Entry<ColumnMetadata, AtomicBoolean> e : predefined.entrySet())
if (e.getValue().get())
builder.add(e.getKey());
return builder.addAll(extra).build();
}
}
protected static class StatsCollector
{
private final AtomicReference<EncodingStats> stats = new AtomicReference<>(EncodingStats.NO_STATS);
public void update(EncodingStats newStats)
{
while (true)
{
EncodingStats current = stats.get();
EncodingStats updated = current.mergeWith(newStats);
if (stats.compareAndSet(current, updated))
return;
}
}
public EncodingStats get()
{
return stats.get();
}
}
protected abstract class AbstractFlushablePartitionSet<P extends Partition> implements FlushablePartitionSet<P>
{
public long dataSize()
{
return getLiveDataSize();
}
public CommitLogPosition commitLogLowerBound()
{
return AbstractMemtable.this.getCommitLogLowerBound();
}
public LastCommitLogPosition commitLogUpperBound()
{
return AbstractMemtable.this.getFinalCommitLogUpperBound();
}
public EncodingStats encodingStats()
{
return AbstractMemtable.this.encodingStats();
}
public RegularAndStaticColumns columns()
{
return AbstractMemtable.this.columns();
}
}
}