blob: cb49723e2582777e23b885d1082e4afac688838e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.tcm.sequences;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.schema.ReplicationParams;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.MetadataValue;
import org.apache.cassandra.tcm.membership.Directory;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.ownership.DataPlacement;
import org.apache.cassandra.tcm.ownership.DataPlacements;
import org.apache.cassandra.tcm.serialization.MetadataSerializer;
import org.apache.cassandra.tcm.serialization.Version;
import static org.apache.cassandra.db.TypeSizes.sizeof;
public class LockedRanges implements MetadataValue<LockedRanges>
{
public static final Serializer serializer = new Serializer();
public static final LockedRanges EMPTY = new LockedRanges(Epoch.EMPTY, ImmutableMap.<Key, AffectedRanges>builder().build());
public static final Key NOT_LOCKED = new Key(Epoch.EMPTY);
public final ImmutableMap<Key, AffectedRanges> locked;
private final Epoch lastModified;
private LockedRanges(Epoch lastModified, ImmutableMap<Key, AffectedRanges> locked)
{
this.lastModified = lastModified;
this.locked = locked;
}
public LockedRanges lock(Key key, AffectedRanges ranges)
{
assert !key.equals(NOT_LOCKED) : "Can't lock ranges with noop key";
if (ranges == AffectedRanges.EMPTY)
return this;
// TODO might we need the ability for the holder of a key to lock multiple sets over time?
return new LockedRanges(lastModified,
ImmutableMap.<Key, AffectedRanges>builderWithExpectedSize(locked.size())
.putAll(locked)
.put(key, ranges)
.build());
}
public LockedRanges unlock(Key key)
{
if (key.equals(NOT_LOCKED))
return this;
ImmutableMap.Builder<Key, AffectedRanges> builder = ImmutableMap.builderWithExpectedSize(locked.size());
locked.forEach((k, r) -> {
if (!k.equals(key)) builder.put(k, r);
});
return new LockedRanges(lastModified, builder.build());
}
public Key intersects(AffectedRanges ranges)
{
for (Map.Entry<Key, AffectedRanges> e : locked.entrySet())
{
if (ranges.intersects(e.getValue()))
return e.getKey();
}
return NOT_LOCKED;
}
@Override
public LockedRanges withLastModified(Epoch epoch)
{
return new LockedRanges(epoch, locked);
}
@Override
public Epoch lastModified()
{
return lastModified;
}
@Override
public String toString()
{
return "LockedRanges{" +
"lastModified=" + lastModified +
", locked=" + locked +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (!(o instanceof LockedRanges)) return false;
LockedRanges that = (LockedRanges) o;
// check the last modified epoch and set of lock keys match first
if ( !Objects.equals(lastModified, that.lastModified) || !Objects.equals(locked.keySet(), that.locked.keySet()))
return false;
// now for each lock key, compare the AffectedRanges
for (Map.Entry<LockedRanges.Key, AffectedRanges> entry : locked.entrySet())
{
// AffectedRanges is a Map<ReplicationParams, Set<Range<Token>>
// so first check the keysets are the same, then do a pairwise compare on the sets of ranges
LockedRanges.AffectedRanges otherAffected = that.locked.get(entry.getKey());
Map<ReplicationParams, Set<Range<Token>>> thisRangesByReplication = entry.getValue().asMap();
Map<ReplicationParams, Set<Range<Token>>> thatRangesByReplication = otherAffected.asMap();
if (!thisRangesByReplication.keySet().equals(thatRangesByReplication.keySet()))
return false;
for (ReplicationParams replication : thisRangesByReplication.keySet())
if (!thisRangesByReplication.get(replication).equals(thatRangesByReplication.get(replication)))
return false;
};
return true;
}
@Override
public int hashCode()
{
return Objects.hash(lastModified, locked);
}
public static Key keyFor(Epoch epoch)
{
return new Key(epoch);
}
public interface AffectedRangesBuilder
{
AffectedRangesBuilder add(ReplicationParams params, Range<Token> range);
AffectedRanges build();
}
public interface AffectedRanges
{
AffectedRanges EMPTY = new AffectedRanges()
{
public boolean intersects(AffectedRanges other)
{
return false;
}
public void foreach(BiConsumer<ReplicationParams, Set<Range<Token>>> fn) {}
@Override
public String toString()
{
return "EMPTY";
}
public Map<ReplicationParams, Set<Range<Token>>> asMap()
{
return Collections.emptyMap();
}
};
default ImmutableSet<NodeId> toPeers(ReplicationParams replication, DataPlacements placements, Directory directory)
{
ImmutableSet.Builder<NodeId> peers = ImmutableSet.builder();
DataPlacement placement = placements.get(replication);
asMap().get(replication).stream()
.flatMap(range -> placement.affectedReplicas(range).stream())
.map(directory::peerId)
.forEach(peers::add);
return peers.build();
}
static AffectedRanges singleton(ReplicationParams replicationParams, Range<Token> tokenRange)
{
return builder().add(replicationParams, tokenRange).build();
}
static AffectedRangesBuilder builder()
{
return new AffectedRangesImpl();
}
boolean intersects(AffectedRanges other);
void foreach(BiConsumer<ReplicationParams, Set<Range<Token>>> fn);
Map<ReplicationParams, Set<Range<Token>>> asMap();
final class Serializer implements MetadataSerializer<AffectedRanges>
{
public static final Serializer instance = new Serializer();
public void serialize(AffectedRanges t, DataOutputPlus out, Version version) throws IOException
{
Map<ReplicationParams, Set<Range<Token>>> map = t.asMap();
out.writeInt(map.size());
for (Map.Entry<ReplicationParams, Set<Range<Token>>> rangeEntry : map.entrySet())
{
ReplicationParams params = rangeEntry.getKey();
Set<Range<Token>> ranges = rangeEntry.getValue();
ReplicationParams.serializer.serialize(params, out, version);
out.writeInt(ranges.size());
for (Range<Token> range : ranges)
{
Token.metadataSerializer.serialize(range.left, out, version);
Token.metadataSerializer.serialize(range.right, out, version);
}
}
}
public AffectedRanges deserialize(DataInputPlus in, Version version) throws IOException
{
int size = in.readInt();
Map<ReplicationParams, Set<Range<Token>>> map = Maps.newHashMapWithExpectedSize(size);
IPartitioner partitioner = ClusterMetadata.current().partitioner;
for (int x = 0; x < size; x++)
{
ReplicationParams params = ReplicationParams.serializer.deserialize(in, version);
int rangeSize = in.readInt();
Set<Range<Token>> range = Sets.newHashSetWithExpectedSize(rangeSize);
for (int y = 0; y < rangeSize; y++)
{
range.add(new Range<>(Token.metadataSerializer.deserialize(in, partitioner, version),
Token.metadataSerializer.deserialize(in, partitioner, version)));
}
map.put(params, range);
}
return new AffectedRangesImpl(map);
}
public long serializedSize(AffectedRanges t, Version version)
{
Map<ReplicationParams, Set<Range<Token>>> map = t.asMap();
long size = sizeof(map.size());
for (Map.Entry<ReplicationParams, Set<Range<Token>>> rangeEntry : map.entrySet())
{
ReplicationParams params = rangeEntry.getKey();
Set<Range<Token>> ranges = rangeEntry.getValue();
size += ReplicationParams.serializer.serializedSize(params, version);
size += sizeof(ranges.size());
for (Range<Token> range : ranges)
{
size += Token.metadataSerializer.serializedSize(range.left, version);
size += Token.metadataSerializer.serializedSize(range.right, version);
}
}
return size;
}
}
}
private static final class AffectedRangesImpl implements AffectedRangesBuilder, AffectedRanges
{
private final Map<ReplicationParams, Set<Range<Token>>> map;
public AffectedRangesImpl()
{
this(new HashMap<>());
}
public AffectedRangesImpl(Map<ReplicationParams, Set<Range<Token>>> map)
{
this.map = map;
}
@Override
public AffectedRangesBuilder add(ReplicationParams params, Range<Token> range)
{
Set<Range<Token>> ranges = map.get(params);
if (ranges == null)
{
ranges = new HashSet<>();
map.put(params, ranges);
}
ranges.add(range);
return this;
}
@Override
public Map<ReplicationParams, Set<Range<Token>>> asMap()
{
return map;
}
@Override
public AffectedRanges build()
{
return this;
}
@Override
public void foreach(BiConsumer<ReplicationParams, Set<Range<Token>>> fn)
{
map.forEach((k, v) -> fn.accept(k, Collections.unmodifiableSet(v)));
}
@Override
public boolean intersects(AffectedRanges other)
{
if (other == EMPTY)
return false;
for (Map.Entry<ReplicationParams, Set<Range<Token>>> e : ((AffectedRangesImpl) other).map.entrySet())
{
for (Range<Token> otherRange : e.getValue())
{
if (!map.containsKey(e.getKey()))
continue;
for (Range<Token> thisRange : map.get(e.getKey()))
{
if (thisRange.intersects(otherRange))
return true;
// Since we allow ownership of the MIN_TOKEN, we need to lock both sides of the
// wraparound range in case it transitions from non-wraparound to wraparound and back.
if ((thisRange.left.isMinimum() || thisRange.right.isMinimum()) &&
(otherRange.left.isMinimum() || otherRange.right.isMinimum()))
return true;
}
}
}
return false;
}
@Override
public String toString()
{
return "AffectedRangesImpl{" +
"map=" + map +
'}';
}
}
public static class Key
{
public static final Serializer serializer = new Serializer();
private final Epoch epoch;
private Key(Epoch epoch)
{
this.epoch = epoch;
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Key key1 = (Key) o;
return epoch.equals(key1.epoch);
}
@Override
public int hashCode()
{
return Objects.hash(epoch);
}
@Override
public String toString()
{
return "Key{" +
"key=" + epoch +
'}';
}
public static final class Serializer
{
public void serialize(Key t, DataOutputPlus out, Version version) throws IOException
{
Epoch.serializer.serialize(t.epoch, out, version);
}
public Key deserialize(DataInputPlus in, Version version) throws IOException
{
return new Key(Epoch.serializer.deserialize(in, version));
}
public long serializedSize(Key t, Version version)
{
return Epoch.serializer.serializedSize(t.epoch, version);
}
}
}
public static class Serializer implements MetadataSerializer<LockedRanges>
{
public void serialize(LockedRanges t, DataOutputPlus out, Version version) throws IOException
{
Epoch.serializer.serialize(t.lastModified, out, version);
out.writeInt(t.locked.size());
for (Map.Entry<Key, AffectedRanges> entry : t.locked.entrySet())
{
Key key = entry.getKey();
Epoch.serializer.serialize(key.epoch, out, version);
AffectedRanges.Serializer.instance.serialize(entry.getValue(), out, version);
}
}
public LockedRanges deserialize(DataInputPlus in, Version version) throws IOException
{
Epoch lastModified = Epoch.serializer.deserialize(in, version);
int size = in.readInt();
if (size == 0) return new LockedRanges(lastModified, ImmutableMap.of());
ImmutableMap.Builder<Key, AffectedRanges> result = ImmutableMap.builder();
for (int i = 0; i < size; i++)
{
Key key = new Key(Epoch.serializer.deserialize(in, version));
AffectedRanges ranges = AffectedRanges.Serializer.instance.deserialize(in, version);
result.put(key, ranges);
}
return new LockedRanges(lastModified, result.build());
}
public long serializedSize(LockedRanges t, Version version)
{
long size = Epoch.serializer.serializedSize(t.lastModified, version);
size += sizeof(t.locked.size());
for (Map.Entry<Key, AffectedRanges> entry : t.locked.entrySet())
{
Key key = entry.getKey();
size += Epoch.serializer.serializedSize(key.epoch, version);
size += AffectedRanges.Serializer.instance.serializedSize(entry.getValue(), version);
}
return size;
}
}
}