blob: 5020b45d89b9e2f1a65d4cf2510e344c28f9a4fa [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.cassandra.stress.generate;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.cassandra.stress.Operation;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.utils.LockedDynamicList;
public class SeedManager
final Distribution visits;
final Generator writes;
final Generator reads;
final ConcurrentHashMap<Long, Seed> managing = new ConcurrentHashMap<>();
final LockedDynamicList<Seed> sampleFrom;
final Distribution sample;
final long sampleOffset;
final int sampleSize;
final boolean updateSampleImmediately;
public SeedManager(StressSettings settings)
Generator writes, reads;
if (settings.generate.sequence != null)
long[] seq = settings.generate.sequence;
if (settings.generate.readlookback != null)
LookbackableWriteGenerator series = new LookbackableWriteGenerator(seq[0], seq[1], settings.generate.wrap, settings.generate.readlookback.get());
writes = series;
reads = series.reads;
writes = reads = new SeriesGenerator(seq[0], seq[1], settings.generate.wrap);
writes = reads = new RandomGenerator(settings.generate.distribution.get());
this.visits = settings.insert.visits.get();
this.writes = writes;
this.reads = reads;
Distribution sample = settings.insert.revisit.get();
this.sampleOffset = Math.min(sample.minValue(), sample.maxValue());
long sampleSize = 1 + Math.max(sample.minValue(), sample.maxValue()) - sampleOffset;
if (sampleOffset < 0 || sampleSize > Integer.MAX_VALUE)
throw new IllegalArgumentException("sample range is invalid");
this.sampleFrom = new LockedDynamicList<>((int) sampleSize);
this.sample = DistributionInverted.invert(sample);
this.sampleSize = (int) sampleSize;
this.updateSampleImmediately = visits.average() > 1;
public Seed next(Operation op)
if (!op.isWrite())
Seed seed =;
if (seed == null)
return null;
Seed managing = this.managing.get(seed);
return managing == null ? seed : managing;
while (true)
int index = (int) ( - sampleOffset);
Seed seed = sampleFrom.get(index);
if (seed != null && seed.isSaved())
return seed;
seed =;
if (seed == null)
return null;
if (managing.putIfAbsent(seed.seed, seed) == null)
if (!updateSampleImmediately ||, sampleSize))
return seed;
managing.remove(seed.seed, seed);
public void markLastWrite(Seed seed, boolean first)
// we could have multiple iterators mark the last write simultaneously,
// so we ensure we remove conditionally, and only remove the exact seed we were operating over
// this is important because, to ensure correctness, we do not support calling remove multiple
// times on the same DynamicList.Node
if (managing.remove(seed.seed, seed) && !first)
public void markFirstWrite(Seed seed, boolean last)
if (!last && !updateSampleImmediately), Integer.MAX_VALUE);
private abstract class Generator
abstract Seed next(int visits);
void finishWrite(Seed seed) { }
private class RandomGenerator extends Generator
final Distribution distribution;
public RandomGenerator(Distribution distribution)
this.distribution = distribution;
public Seed next(int visits)
return new Seed(, visits);
private class SeriesGenerator extends Generator
final long start;
final long totalCount;
final boolean wrap;
final AtomicLong next = new AtomicLong();
public SeriesGenerator(long start, long end, boolean wrap)
this.wrap = wrap;
if (start > end)
throw new IllegalStateException();
this.start = start;
this.totalCount = 1 + end - start;
public Seed next(int visits)
long next =;
if (!wrap && next >= totalCount)
return null;
return new Seed(start + (next % totalCount), visits);
private class LookbackableWriteGenerator extends SeriesGenerator
final AtomicLong writeCount = new AtomicLong();
final ConcurrentSkipListMap<Seed, Seed> afterMin = new ConcurrentSkipListMap<>();
final LookbackReadGenerator reads;
public LookbackableWriteGenerator(long start, long end, boolean wrap, Distribution readLookback)
super(start, end, wrap);
reads = new LookbackReadGenerator(readLookback);
public Seed next(int visits)
long next =;
if (!wrap && next >= totalCount)
return null;
return new Seed(start + (next % totalCount), visits);
void finishWrite(Seed seed)
if (seed.seed <= writeCount.get())
afterMin.put(seed, seed);
while (true)
Map.Entry<Seed, Seed> head = afterMin.firstEntry();
if (head == null)
long min = this.writeCount.get();
if (head.getKey().seed <= min)
if (head.getKey().seed == min + 1 && this.writeCount.compareAndSet(min, min + 1))
private class LookbackReadGenerator extends Generator
final Distribution lookback;
public LookbackReadGenerator(Distribution lookback)
this.lookback = lookback;
if (lookback.maxValue() > start + totalCount)
throw new IllegalArgumentException("Invalid lookback distribution; max value is " + lookback.maxValue()
+ ", but series only ranges from " + writeCount + " to " + (start + totalCount));
public Seed next(int visits)
long lookback =;
long range = writeCount.get();
long startOffset = range - lookback;
if (startOffset < 0)
if (range == totalCount && !wrap)
return null;
startOffset = range == 0 ? 0 : lookback % range;
return new Seed(start + startOffset, visits);