blob: fdf4942c13c7f5729936cc35a54ad834db3503d2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.tcm;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.MetaStrategy;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.net.CMSIdentifierMismatchException;
import org.apache.cassandra.schema.DistributedSchema;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.Keyspaces;
import org.apache.cassandra.schema.ReplicationParams;
import org.apache.cassandra.tcm.extensions.ExtensionKey;
import org.apache.cassandra.tcm.extensions.ExtensionValue;
import org.apache.cassandra.tcm.membership.Directory;
import org.apache.cassandra.tcm.membership.Location;
import org.apache.cassandra.tcm.membership.NodeAddresses;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeState;
import org.apache.cassandra.tcm.membership.NodeVersion;
import org.apache.cassandra.tcm.ownership.DataPlacement;
import org.apache.cassandra.tcm.ownership.DataPlacements;
import org.apache.cassandra.tcm.ownership.PrimaryRangeComparator;
import org.apache.cassandra.tcm.ownership.ReplicaGroups;
import org.apache.cassandra.tcm.ownership.TokenMap;
import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
import org.apache.cassandra.tcm.sequences.InProgressSequences;
import org.apache.cassandra.tcm.sequences.LockedRanges;
import org.apache.cassandra.tcm.serialization.MetadataSerializer;
import org.apache.cassandra.tcm.serialization.Version;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import static org.apache.cassandra.config.CassandraRelevantProperties.LINE_SEPARATOR;
import static org.apache.cassandra.db.TypeSizes.sizeof;
public class ClusterMetadata
{
public static final int EMPTY_METADATA_IDENTIFIER = 0;
public static final Serializer serializer = new Serializer();
public final int metadataIdentifier;
public final Epoch epoch;
public final IPartitioner partitioner; // Set during (initial) construction and not modifiable via Transformer
public final DistributedSchema schema;
public final Directory directory;
public final TokenMap tokenMap;
public final DataPlacements placements;
public final LockedRanges lockedRanges;
public final InProgressSequences inProgressSequences;
public final ImmutableMap<ExtensionKey<?,?>, ExtensionValue<?>> extensions;
// These two fields are lazy but only for the test purposes, since their computation requires initialization of the log ks
private EndpointsForRange fullCMSReplicas;
private Set<InetAddressAndPort> fullCMSEndpoints;
public ClusterMetadata(IPartitioner partitioner)
{
this(partitioner, Directory.EMPTY);
}
@VisibleForTesting
public ClusterMetadata(IPartitioner partitioner, Directory directory)
{
this(partitioner, directory, DistributedSchema.first(directory.knownDatacenters()));
}
@VisibleForTesting
public ClusterMetadata(IPartitioner partitioner, Directory directory, DistributedSchema schema)
{
this(EMPTY_METADATA_IDENTIFIER,
Epoch.EMPTY,
partitioner,
schema,
directory,
new TokenMap(partitioner),
DataPlacements.EMPTY,
LockedRanges.EMPTY,
InProgressSequences.EMPTY,
ImmutableMap.of());
}
public ClusterMetadata(Epoch epoch,
IPartitioner partitioner,
DistributedSchema schema,
Directory directory,
TokenMap tokenMap,
DataPlacements placements,
LockedRanges lockedRanges,
InProgressSequences inProgressSequences,
Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions)
{
this(EMPTY_METADATA_IDENTIFIER,
epoch,
partitioner,
schema,
directory,
tokenMap,
placements,
lockedRanges,
inProgressSequences,
extensions);
}
private ClusterMetadata(int metadataIdentifier,
Epoch epoch,
IPartitioner partitioner,
DistributedSchema schema,
Directory directory,
TokenMap tokenMap,
DataPlacements placements,
LockedRanges lockedRanges,
InProgressSequences inProgressSequences,
Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions)
{
// TODO: token map is a feature of the specific placement strategy, and so may not be a relevant component of
// ClusterMetadata in the long term. We need to consider how the actual components of metadata can be evolved
// over time.
assert tokenMap == null || tokenMap.partitioner().getClass().equals(partitioner.getClass()) : "Partitioner for TokenMap doesn't match base partitioner";
this.metadataIdentifier = metadataIdentifier;
this.epoch = epoch;
this.partitioner = partitioner;
this.schema = schema;
this.directory = directory;
this.tokenMap = tokenMap;
this.placements = placements;
this.lockedRanges = lockedRanges;
this.inProgressSequences = inProgressSequences;
this.extensions = ImmutableMap.copyOf(extensions);
}
public Set<InetAddressAndPort> fullCMSMembers()
{
if (fullCMSEndpoints == null)
this.fullCMSEndpoints = ImmutableSet.copyOf(placements.get(ReplicationParams.meta(this)).reads.byEndpoint().keySet());
return fullCMSEndpoints;
}
public EndpointsForRange fullCMSMembersAsReplicas()
{
if (fullCMSReplicas == null)
fullCMSReplicas = placements.get(ReplicationParams.meta(this)).reads.forRange(MetaStrategy.entireRange).get();
return fullCMSReplicas;
}
public boolean isCMSMember(InetAddressAndPort endpoint)
{
return fullCMSMembers().contains(endpoint);
}
public Transformer transformer()
{
return new Transformer(this, this.nextEpoch());
}
public ClusterMetadata forceEpoch(Epoch epoch)
{
// In certain circumstances, the last modified epoch of the individual
// components may have been updated beyond the epoch we're specifying here.
// An example is the execution of an UnsafeJoin transformation, where the
// sub-steps (Start/Mid/Finish) are executed in series, each updating a
// single ClusterMetadata and its individual components. At the end of that
// sequence, the CM epoch is then set forcibly to ensure the UnsafeJoin only
// increments the published epoch by one. As each component has its own last
// modified epoch, we may also need to coerce those, but only if they are
// greater than the epoch we're forcing here.
return new ClusterMetadata(metadataIdentifier,
epoch,
partitioner,
capLastModified(schema, epoch),
capLastModified(directory, epoch),
capLastModified(tokenMap, epoch),
capLastModified(placements, epoch),
capLastModified(lockedRanges, epoch),
capLastModified(inProgressSequences, epoch),
capLastModified(extensions, epoch));
}
public ClusterMetadata initializeClusterIdentifier(int clusterIdentifier)
{
if (this.metadataIdentifier != EMPTY_METADATA_IDENTIFIER)
throw new IllegalStateException(String.format("Can only initialize cluster identifier once, but it was already set to %d", this.metadataIdentifier));
if (clusterIdentifier == EMPTY_METADATA_IDENTIFIER)
throw new IllegalArgumentException("Can not initialize cluster with empty cluster identifier");
return new ClusterMetadata(clusterIdentifier,
epoch,
partitioner,
schema,
directory,
tokenMap,
placements,
lockedRanges,
inProgressSequences,
extensions);
}
private static Map<ExtensionKey<?,?>, ExtensionValue<?>> capLastModified(Map<ExtensionKey<?,?>, ExtensionValue<?>> original, Epoch maxEpoch)
{
Map<ExtensionKey<?, ?>, ExtensionValue<?>> updated = new HashMap<>();
original.forEach((key, value) -> {
ExtensionValue<?> newValue = value == null || value.lastModified().isEqualOrBefore(maxEpoch)
? value
: (ExtensionValue<?>)value.withLastModified(maxEpoch);
updated.put(key, newValue);
});
return updated;
}
@SuppressWarnings("unchecked")
private static <V> V capLastModified(MetadataValue<V> value, Epoch maxEpoch)
{
return value == null || value.lastModified().isEqualOrBefore(maxEpoch)
? (V)value
: value.withLastModified(maxEpoch);
}
public Epoch nextEpoch()
{
return epoch.nextEpoch();
}
public DataPlacement writePlacementAllSettled(KeyspaceMetadata ksm)
{
ClusterMetadata metadata = this;
Iterator<MultiStepOperation<?>> iter = metadata.inProgressSequences.iterator();
while (iter.hasNext())
{
Transformation.Result result = iter.next().applyTo(metadata);
assert result.isSuccess();
metadata = result.success().metadata;
}
return metadata.placements.get(ksm.params.replication);
}
// TODO Remove this as it isn't really an equivalent to the previous concept of pending ranges
public boolean hasPendingRangesFor(KeyspaceMetadata ksm, Token token)
{
ReplicaGroups writes = placements.get(ksm.params.replication).writes;
ReplicaGroups reads = placements.get(ksm.params.replication).reads;
if (ksm.params.replication.isMeta())
return !reads.equals(writes);
return !reads.forToken(token).equals(writes.forToken(token));
}
// TODO Remove this as it isn't really an equivalent to the previous concept of pending ranges
public boolean hasPendingRangesFor(KeyspaceMetadata ksm, InetAddressAndPort endpoint)
{
ReplicaGroups writes = placements.get(ksm.params.replication).writes;
ReplicaGroups reads = placements.get(ksm.params.replication).reads;
return !writes.byEndpoint().get(endpoint).equals(reads.byEndpoint().get(endpoint));
}
public Collection<Range<Token>> localWriteRanges(KeyspaceMetadata metadata)
{
return writeRanges(metadata, FBUtilities.getBroadcastAddressAndPort());
}
public Collection<Range<Token>> writeRanges(KeyspaceMetadata metadata, InetAddressAndPort peer)
{
return placements.get(metadata.params.replication).writes.byEndpoint().get(peer).ranges();
}
// TODO Remove this as it isn't really an equivalent to the previous concept of pending ranges
public Map<Range<Token>, VersionedEndpoints.ForRange> pendingRanges(KeyspaceMetadata metadata)
{
Map<Range<Token>, VersionedEndpoints.ForRange> map = new HashMap<>();
ReplicaGroups writes = placements.get(metadata.params.replication).writes;
ReplicaGroups reads = placements.get(metadata.params.replication).reads;
// first, pending ranges as the result of range splitting or merging
// i.e. new ranges being created through join/leave
List<Range<Token>> pending = new ArrayList<>(writes.ranges());
pending.removeAll(reads.ranges());
for (Range<Token> p : pending)
map.put(p, placements.get(metadata.params.replication).writes.forRange(p));
// next, ranges where the ranges themselves are not changing, but the replicas are
// i.e. replacement or RF increase
writes.forEach((range, endpoints) -> {
VersionedEndpoints.ForRange readGroup = reads.forRange(range);
if (!readGroup.equals(endpoints))
map.put(range, VersionedEndpoints.forRange(endpoints.lastModified(),
endpoints.get().filter(r -> !readGroup.get().contains(r))));
});
return map;
}
// TODO Remove this as it isn't really an equivalent to the previous concept of pending endpoints
public VersionedEndpoints.ForToken pendingEndpointsFor(KeyspaceMetadata metadata, Token t)
{
VersionedEndpoints.ForToken writeEndpoints = placements.get(metadata.params.replication).writes.forToken(t);
VersionedEndpoints.ForToken readEndpoints = placements.get(metadata.params.replication).reads.forToken(t);
EndpointsForToken.Builder endpointsForToken = writeEndpoints.get().newBuilder(writeEndpoints.size() - readEndpoints.size());
for (Replica writeReplica : writeEndpoints.get())
{
if (!readEndpoints.get().contains(writeReplica))
endpointsForToken.add(writeReplica);
}
return VersionedEndpoints.forToken(writeEndpoints.lastModified(), endpointsForToken.build());
}
public static class Transformer
{
private final ClusterMetadata base;
private final Epoch epoch;
private final IPartitioner partitioner;
private DistributedSchema schema;
private Directory directory;
private TokenMap tokenMap;
private DataPlacements placements;
private LockedRanges lockedRanges;
private InProgressSequences inProgressSequences;
private final Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions;
private final Set<MetadataKey> modifiedKeys;
private Transformer(ClusterMetadata metadata, Epoch epoch)
{
this.base = metadata;
this.epoch = epoch;
this.partitioner = metadata.partitioner;
this.schema = metadata.schema;
this.directory = metadata.directory;
this.tokenMap = metadata.tokenMap;
this.placements = metadata.placements;
this.lockedRanges = metadata.lockedRanges;
this.inProgressSequences = metadata.inProgressSequences;
extensions = new HashMap<>(metadata.extensions);
modifiedKeys = new HashSet<>();
}
public Transformer with(DistributedSchema schema)
{
this.schema = schema;
return this;
}
public Transformer with(Directory directory)
{
this.directory = directory;
return this;
}
public Transformer register(NodeAddresses addresses, Location location, NodeVersion version)
{
directory = directory.with(addresses, location, version);
return this;
}
public Transformer unregister(NodeId nodeId)
{
directory = directory.without(nodeId);
return this;
}
public Transformer withNewAddresses(NodeId nodeId, NodeAddresses addresses)
{
directory = directory.withNodeAddresses(nodeId, addresses);
return this;
}
public Transformer withVersion(NodeId nodeId, NodeVersion version)
{
directory = directory.withNodeVersion(nodeId, version);
return this;
}
public Transformer withNodeState(NodeId id, NodeState state)
{
directory = directory.withNodeState(id, state);
return this;
}
public Transformer proposeToken(NodeId nodeId, Collection<Token> tokens)
{
tokenMap = tokenMap.assignTokens(nodeId, tokens);
return this;
}
public Transformer addToRackAndDC(NodeId nodeId)
{
directory = directory.withRackAndDC(nodeId);
return this;
}
public Transformer unproposeTokens(NodeId nodeId)
{
tokenMap = tokenMap.unassignTokens(nodeId);
directory = directory.withoutRackAndDC(nodeId);
return this;
}
public Transformer moveTokens(NodeId nodeId, Collection<Token> tokens)
{
tokenMap = tokenMap.unassignTokens(nodeId)
.assignTokens(nodeId, tokens);
return this;
}
public Transformer join(NodeId nodeId)
{
directory = directory.withNodeState(nodeId, NodeState.JOINED);
return this;
}
public Transformer replaced(NodeId replaced, NodeId replacement)
{
Collection<Token> transferringTokens = tokenMap.tokens(replaced);
tokenMap = tokenMap.unassignTokens(replaced)
.assignTokens(replacement, transferringTokens);
directory = directory.without(replaced)
.withRackAndDC(replacement)
.withNodeState(replacement, NodeState.JOINED);
return this;
}
public Transformer proposeRemoveNode(NodeId id)
{
tokenMap = tokenMap.unassignTokens(id);
return this;
}
public Transformer left(NodeId id)
{
tokenMap = tokenMap.unassignTokens(id);
directory = directory.withNodeState(id, NodeState.LEFT)
.withoutRackAndDC(id);
return this;
}
public Transformer with(DataPlacements placements)
{
this.placements = placements;
return this;
}
public Transformer with(LockedRanges lockedRanges)
{
this.lockedRanges = lockedRanges;
return this;
}
public Transformer with(InProgressSequences sequences)
{
this.inProgressSequences = sequences;
return this;
}
public Transformer with(ExtensionKey<?, ?> key, ExtensionValue<?> obj)
{
if (MetadataKeys.CORE_METADATA.contains(key))
throw new IllegalArgumentException("Core cluster metadata objects should be addressed directly, " +
"not using the associated MetadataKey");
if (!key.valueType.isInstance(obj))
throw new IllegalArgumentException("Value of type " + obj.getClass() +
" is incompatible with type for key " + key +
" (" + key.valueType + ")");
extensions.put(key, obj);
modifiedKeys.add(key);
return this;
}
public Transformer withIfAbsent(ExtensionKey<?, ?> key, ExtensionValue<?> obj)
{
if (extensions.containsKey(key))
return this;
return with(key, obj);
}
public Transformer without(ExtensionKey<?, ?> key)
{
if (MetadataKeys.CORE_METADATA.contains(key))
throw new IllegalArgumentException("Core cluster metadata objects should be addressed directly, " +
"not using the associated MetadataKey");
if (extensions.remove(key) != null)
modifiedKeys.add(key);
return this;
}
public Transformed build()
{
// Process extension first as a) these are actually mutable and b) they are added to the set of
// modified keys when added/updated/removed
for (MetadataKey key : modifiedKeys)
{
ExtensionValue<?> mutable = extensions.get(key);
if (null != mutable)
mutable.withLastModified(epoch);
}
if (schema != base.schema)
{
modifiedKeys.add(MetadataKeys.SCHEMA);
schema = schema.withLastModified(epoch);
}
if (directory != base.directory)
{
modifiedKeys.add(MetadataKeys.NODE_DIRECTORY);
directory = directory.withLastModified(epoch);
}
if (tokenMap != base.tokenMap)
{
modifiedKeys.add(MetadataKeys.TOKEN_MAP);
tokenMap = tokenMap.withLastModified(epoch);
}
if (placements != base.placements)
{
modifiedKeys.add(MetadataKeys.DATA_PLACEMENTS);
// sort all endpoint lists to preserve primary replica
if (CassandraRelevantProperties.TCM_SORT_REPLICA_GROUPS.getBoolean())
{
PrimaryRangeComparator comparator = new PrimaryRangeComparator(tokenMap, directory);
placements = DataPlacements.sortReplicaGroups(placements, comparator);
}
placements = placements.withLastModified(epoch);
}
if (lockedRanges != base.lockedRanges)
{
modifiedKeys.add(MetadataKeys.LOCKED_RANGES);
lockedRanges = lockedRanges.withLastModified(epoch);
}
if (inProgressSequences != base.inProgressSequences)
{
modifiedKeys.add(MetadataKeys.IN_PROGRESS_SEQUENCES);
inProgressSequences = inProgressSequences.withLastModified(epoch);
}
return new Transformed(new ClusterMetadata(base.metadataIdentifier,
epoch,
partitioner,
schema,
directory,
tokenMap,
placements,
lockedRanges,
inProgressSequences,
extensions),
ImmutableSet.copyOf(modifiedKeys));
}
public ClusterMetadata buildForGossipMode()
{
return new ClusterMetadata(base.metadataIdentifier,
Epoch.UPGRADE_GOSSIP,
partitioner,
schema,
directory,
tokenMap,
placements,
lockedRanges,
inProgressSequences,
extensions);
}
@Override
public String toString()
{
return "Transformer{" +
"baseEpoch=" + base.epoch +
", epoch=" + epoch +
", partitioner=" + partitioner +
", schema=" + schema +
", directory=" + schema +
", tokenMap=" + tokenMap +
", placement=" + placements +
", lockedRanges=" + lockedRanges +
", inProgressSequences=" + inProgressSequences +
", extensions=" + extensions +
", modifiedKeys=" + modifiedKeys +
'}';
}
public static class Transformed
{
public final ClusterMetadata metadata;
public final ImmutableSet<MetadataKey> modifiedKeys;
public Transformed(ClusterMetadata metadata, ImmutableSet<MetadataKey> modifiedKeys)
{
this.metadata = metadata;
this.modifiedKeys = modifiedKeys;
}
}
}
public String legacyToString()
{
StringBuilder sb = new StringBuilder();
Set<Pair<Token, InetAddressAndPort>> normal = new HashSet<>();
Set<Pair<Token, InetAddressAndPort>> bootstrapping = new HashSet<>();
Set<InetAddressAndPort> leaving = new HashSet<>();
for (Map.Entry<NodeId, NodeState> entry : directory.states.entrySet())
{
InetAddressAndPort endpoint = directory.endpoint(entry.getKey());
switch (entry.getValue())
{
case BOOTSTRAPPING:
for (Token t : tokenMap.tokens(entry.getKey()))
bootstrapping.add(Pair.create(t, endpoint));
break;
case LEAVING:
leaving.add(endpoint);
break;
case JOINED:
for (Token t : tokenMap.tokens(entry.getKey()))
normal.add(Pair.create(t, endpoint));
break;
case MOVING:
// todo when adding MOVE
break;
}
}
if (!normal.isEmpty())
{
sb.append("Normal Tokens:");
sb.append(LINE_SEPARATOR.getString());
for (Pair<Token, InetAddressAndPort> ep : normal)
{
sb.append(ep.right);
sb.append(':');
sb.append(ep.left);
sb.append(LINE_SEPARATOR.getString());
}
}
if (!bootstrapping.isEmpty())
{
sb.append("Bootstrapping Tokens:" );
sb.append(LINE_SEPARATOR.getString());
for (Pair<Token, InetAddressAndPort> entry : bootstrapping)
{
sb.append(entry.right).append(':').append(entry.left);
sb.append(LINE_SEPARATOR.getString());
}
}
if (!leaving.isEmpty())
{
sb.append("Leaving Endpoints:");
sb.append(LINE_SEPARATOR.getString());
for (InetAddressAndPort ep : leaving)
{
sb.append(ep);
sb.append(LINE_SEPARATOR.getString());
}
}
return sb.toString();
}
@Override
public String toString()
{
return "ClusterMetadata{" +
"epoch=" + epoch +
", schema=" + schema +
", directory=" + directory +
", tokenMap=" + tokenMap +
", placements=" + placements +
", lockedRanges=" + lockedRanges +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (!(o instanceof ClusterMetadata)) return false;
ClusterMetadata that = (ClusterMetadata) o;
return epoch.equals(that.epoch) &&
schema.equals(that.schema) &&
directory.equals(that.directory) &&
tokenMap.equals(that.tokenMap) &&
placements.equals(that.placements) &&
lockedRanges.equals(that.lockedRanges) &&
inProgressSequences.equals(that.inProgressSequences) &&
extensions.equals(that.extensions);
}
private static final Logger logger = LoggerFactory.getLogger(ClusterMetadata.class);
public void dumpDiff(ClusterMetadata other)
{
if (!epoch.equals(other.epoch))
{
logger.warn("Epoch {} != {}", epoch, other.epoch);
}
if (!schema.equals(other.schema))
{
Keyspaces.KeyspacesDiff diff = Keyspaces.diff(schema.getKeyspaces(), other.schema.getKeyspaces());
logger.warn("Schemas differ {}", diff);
}
if (!directory.equals(other.directory))
{
logger.warn("Directories differ:");
directory.dumpDiff(other.directory);
}
if (!tokenMap.equals(other.tokenMap))
{
logger.warn("Token maps differ:");
tokenMap.dumpDiff(other.tokenMap);
}
if (!placements.equals(other.placements))
{
logger.warn("Placements differ:");
placements.dumpDiff(other.placements);
}
if (!lockedRanges.equals(other.lockedRanges))
{
logger.warn("Locked ranges differ: {} != {}", lockedRanges, other.lockedRanges);
}
if (!inProgressSequences.equals(other.inProgressSequences))
{
logger.warn("In progress sequences differ: {} != {}", inProgressSequences, other.inProgressSequences);
}
if (!extensions.equals(other.extensions))
{
logger.warn("Extensions differ: {} != {}", extensions, other.extensions);
}
}
@Override
public int hashCode()
{
return Objects.hash(epoch, schema, directory, tokenMap, placements, lockedRanges, inProgressSequences, extensions);
}
public static ClusterMetadata current()
{
return ClusterMetadataService.instance().metadata();
}
public static void checkIdentifier(int remoteIdentifier)
{
ClusterMetadata metadata = currentNullable();
if (metadata != null)
{
int currentIdentifier = metadata.metadataIdentifier;
// We haven't yet joined CMS fully
if (currentIdentifier == EMPTY_METADATA_IDENTIFIER)
return;
// Peer hasn't yet joined CMS fully
if (remoteIdentifier == EMPTY_METADATA_IDENTIFIER)
return;
if (currentIdentifier != remoteIdentifier)
throw new CMSIdentifierMismatchException(String.format("Cluster Metadata Identifier mismatch. Node is attempting to communicate with a node from a different cluster. Current identifier %d. Remote identifier: %d", currentIdentifier, remoteIdentifier));
}
}
/**
* Startup of some services may race with cluster metadata initialization. We allow those services to
* gracefully handle scenarios when it is not yet initialized.
*/
public static ClusterMetadata currentNullable()
{
ClusterMetadataService service = ClusterMetadataService.instance();
if (service == null)
return null;
return service.metadata();
}
public NodeId myNodeId()
{
return directory.peerId(FBUtilities.getBroadcastAddressAndPort());
}
public NodeState myNodeState()
{
NodeId nodeId = myNodeId();
if (myNodeId() != null)
return directory.peerState(nodeId);
return null;
}
public boolean metadataSerializationUpgradeInProgress()
{
return !directory.clusterMaxVersion.serializationVersion().equals(directory.clusterMinVersion.serializationVersion());
}
public static class Serializer implements MetadataSerializer<ClusterMetadata>
{
@Override
public void serialize(ClusterMetadata metadata, DataOutputPlus out, Version version) throws IOException
{
if (version.isAtLeast(Version.V1))
out.writeUTF(metadata.partitioner.getClass().getCanonicalName());
if (version.isAtLeast(Version.V2))
out.writeUnsignedVInt32(metadata.metadataIdentifier);
Epoch.serializer.serialize(metadata.epoch, out);
if (version.isBefore(Version.V1))
out.writeUTF(metadata.partitioner.getClass().getCanonicalName());
DistributedSchema.serializer.serialize(metadata.schema, out, version);
Directory.serializer.serialize(metadata.directory, out, version);
TokenMap.serializer.serialize(metadata.tokenMap, out, version);
DataPlacements.serializer.serialize(metadata.placements, out, version);
LockedRanges.serializer.serialize(metadata.lockedRanges, out, version);
InProgressSequences.serializer.serialize(metadata.inProgressSequences, out, version);
out.writeInt(metadata.extensions.size());
for (Map.Entry<ExtensionKey<?, ?>, ExtensionValue<?>> entry : metadata.extensions.entrySet())
{
ExtensionKey<?, ?> key = entry.getKey();
ExtensionValue<?> value = entry.getValue();
ExtensionKey.serializer.serialize(key, out, version);
assert key.valueType.isInstance(value);
value.serialize(out, version);
}
}
@Override
public ClusterMetadata deserialize(DataInputPlus in, Version version) throws IOException
{
IPartitioner partitioner = null;
if (version.isAtLeast(Version.V1))
partitioner = FBUtilities.newPartitioner(in.readUTF());
int clusterIdentifier = EMPTY_METADATA_IDENTIFIER;
if (version.isAtLeast(Version.V2))
{
clusterIdentifier = in.readUnsignedVInt32();
checkIdentifier(clusterIdentifier);
}
Epoch epoch = Epoch.serializer.deserialize(in);
if (version.isBefore(Version.V1))
partitioner = FBUtilities.newPartitioner(in.readUTF());
DistributedSchema schema = DistributedSchema.serializer.deserialize(in, version);
Directory dir = Directory.serializer.deserialize(in, version);
TokenMap tokenMap = TokenMap.serializer.deserialize(in, version);
DataPlacements placements = DataPlacements.serializer.deserialize(in, version);
LockedRanges lockedRanges = LockedRanges.serializer.deserialize(in, version);
InProgressSequences ips = InProgressSequences.serializer.deserialize(in, version);
int items = in.readInt();
Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions = new HashMap<>(items);
for (int i = 0; i < items; i++)
{
ExtensionKey<?, ?> key = ExtensionKey.serializer.deserialize(in, version);
ExtensionValue<?> value = key.newValue();
value.deserialize(in, version);
extensions.put(key, value);
}
return new ClusterMetadata(clusterIdentifier,
epoch,
partitioner,
schema,
dir,
tokenMap,
placements,
lockedRanges,
ips,
extensions);
}
@Override
public long serializedSize(ClusterMetadata metadata, Version version)
{
long size = TypeSizes.INT_SIZE;
for (Map.Entry<ExtensionKey<?, ?>, ExtensionValue<?>> entry : metadata.extensions.entrySet())
size += ExtensionKey.serializer.serializedSize(entry.getKey(), version) +
entry.getValue().serializedSize(version);
if (version.isAtLeast(Version.V2))
size += TypeSizes.sizeofUnsignedVInt(metadata.metadataIdentifier);
size += Epoch.serializer.serializedSize(metadata.epoch) +
sizeof(metadata.partitioner.getClass().getCanonicalName()) +
DistributedSchema.serializer.serializedSize(metadata.schema, version) +
Directory.serializer.serializedSize(metadata.directory, version) +
TokenMap.serializer.serializedSize(metadata.tokenMap, version) +
DataPlacements.serializer.serializedSize(metadata.placements, version) +
LockedRanges.serializer.serializedSize(metadata.lockedRanges, version) +
InProgressSequences.serializer.serializedSize(metadata.inProgressSequences, version);
return size;
}
public static IPartitioner getPartitioner(DataInputPlus in, Version version) throws IOException
{
if (version.isAtLeast(Version.V1))
return FBUtilities.newPartitioner(in.readUTF());
Epoch.serializer.deserialize(in);
in.readUnsignedVInt();
in.readBoolean();
return FBUtilities.newPartitioner(in.readUTF());
}
}
}