blob: 5020b45d89b9e2f1a65d4cf2510e344c28f9a4fa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.stress.generate;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.cassandra.stress.Operation;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.utils.LockedDynamicList;
public class SeedManager
{
final Distribution visits;
final Generator writes;
final Generator reads;
final ConcurrentHashMap<Long, Seed> managing = new ConcurrentHashMap<>();
final LockedDynamicList<Seed> sampleFrom;
final Distribution sample;
final long sampleOffset;
final int sampleSize;
final boolean updateSampleImmediately;
public SeedManager(StressSettings settings)
{
Generator writes, reads;
if (settings.generate.sequence != null)
{
long[] seq = settings.generate.sequence;
if (settings.generate.readlookback != null)
{
LookbackableWriteGenerator series = new LookbackableWriteGenerator(seq[0], seq[1], settings.generate.wrap, settings.generate.readlookback.get());
writes = series;
reads = series.reads;
}
else
{
writes = reads = new SeriesGenerator(seq[0], seq[1], settings.generate.wrap);
}
}
else
{
writes = reads = new RandomGenerator(settings.generate.distribution.get());
}
this.visits = settings.insert.visits.get();
this.writes = writes;
this.reads = reads;
Distribution sample = settings.insert.revisit.get();
this.sampleOffset = Math.min(sample.minValue(), sample.maxValue());
long sampleSize = 1 + Math.max(sample.minValue(), sample.maxValue()) - sampleOffset;
if (sampleOffset < 0 || sampleSize > Integer.MAX_VALUE)
throw new IllegalArgumentException("sample range is invalid");
this.sampleFrom = new LockedDynamicList<>((int) sampleSize);
this.sample = DistributionInverted.invert(sample);
this.sampleSize = (int) sampleSize;
this.updateSampleImmediately = visits.average() > 1;
}
public Seed next(Operation op)
{
if (!op.isWrite())
{
Seed seed = reads.next(-1);
if (seed == null)
return null;
Seed managing = this.managing.get(seed);
return managing == null ? seed : managing;
}
while (true)
{
int index = (int) (sample.next() - sampleOffset);
Seed seed = sampleFrom.get(index);
if (seed != null && seed.isSaved())
return seed;
seed = writes.next((int) visits.next());
if (seed == null)
return null;
if (managing.putIfAbsent(seed.seed, seed) == null)
{
if (!updateSampleImmediately || seed.save(sampleFrom, sampleSize))
return seed;
managing.remove(seed.seed, seed);
}
}
}
public void markLastWrite(Seed seed, boolean first)
{
// we could have multiple iterators mark the last write simultaneously,
// so we ensure we remove conditionally, and only remove the exact seed we were operating over
// this is important because, to ensure correctness, we do not support calling remove multiple
// times on the same DynamicList.Node
if (managing.remove(seed.seed, seed) && !first)
seed.remove(sampleFrom);
}
public void markFirstWrite(Seed seed, boolean last)
{
if (!last && !updateSampleImmediately)
seed.save(sampleFrom, Integer.MAX_VALUE);
writes.finishWrite(seed);
}
private abstract class Generator
{
abstract Seed next(int visits);
void finishWrite(Seed seed) { }
}
private class RandomGenerator extends Generator
{
final Distribution distribution;
public RandomGenerator(Distribution distribution)
{
this.distribution = distribution;
}
public Seed next(int visits)
{
return new Seed(distribution.next(), visits);
}
}
private class SeriesGenerator extends Generator
{
final long start;
final long totalCount;
final boolean wrap;
final AtomicLong next = new AtomicLong();
public SeriesGenerator(long start, long end, boolean wrap)
{
this.wrap = wrap;
if (start > end)
throw new IllegalStateException();
this.start = start;
this.totalCount = 1 + end - start;
}
public Seed next(int visits)
{
long next = this.next.getAndIncrement();
if (!wrap && next >= totalCount)
return null;
return new Seed(start + (next % totalCount), visits);
}
}
private class LookbackableWriteGenerator extends SeriesGenerator
{
final AtomicLong writeCount = new AtomicLong();
final ConcurrentSkipListMap<Seed, Seed> afterMin = new ConcurrentSkipListMap<>();
final LookbackReadGenerator reads;
public LookbackableWriteGenerator(long start, long end, boolean wrap, Distribution readLookback)
{
super(start, end, wrap);
this.writeCount.set(0);
reads = new LookbackReadGenerator(readLookback);
}
public Seed next(int visits)
{
long next = this.next.getAndIncrement();
if (!wrap && next >= totalCount)
return null;
return new Seed(start + (next % totalCount), visits);
}
void finishWrite(Seed seed)
{
if (seed.seed <= writeCount.get())
return;
afterMin.put(seed, seed);
while (true)
{
Map.Entry<Seed, Seed> head = afterMin.firstEntry();
if (head == null)
return;
long min = this.writeCount.get();
if (head.getKey().seed <= min)
return;
if (head.getKey().seed == min + 1 && this.writeCount.compareAndSet(min, min + 1))
{
afterMin.remove(head.getKey());
continue;
}
return;
}
}
private class LookbackReadGenerator extends Generator
{
final Distribution lookback;
public LookbackReadGenerator(Distribution lookback)
{
this.lookback = lookback;
if (lookback.maxValue() > start + totalCount)
throw new IllegalArgumentException("Invalid lookback distribution; max value is " + lookback.maxValue()
+ ", but series only ranges from " + writeCount + " to " + (start + totalCount));
}
public Seed next(int visits)
{
long lookback = this.lookback.next();
long range = writeCount.get();
long startOffset = range - lookback;
if (startOffset < 0)
{
if (range == totalCount && !wrap)
return null;
startOffset = range == 0 ? 0 : lookback % range;
}
return new Seed(start + startOffset, visits);
}
}
}
}