blob: c8dbe88bfc63842d2edc156b9a2369c9c83b4596 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.affinity;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.configuration.internal.ConfigurationManager;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.internal.affinity.event.AffinityEvent;
import org.apache.ignite.internal.affinity.event.AffinityEventParameters;
import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.manager.Producer;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.internal.metastorage.client.Conditions;
import org.apache.ignite.internal.metastorage.client.EntryEvent;
import org.apache.ignite.internal.metastorage.client.Operations;
import org.apache.ignite.internal.metastorage.client.WatchEvent;
import org.apache.ignite.internal.metastorage.client.WatchListener;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.NotNull;
/**
* Affinity manager is responsible for affinity function related logic including calculating affinity assignments.
*/
public class AffinityManager extends Producer<AffinityEvent, AffinityEventParameters> {
/** The logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(AffinityManager.class);
/** Internal prefix for the metasorage. */
private static final String INTERNAL_PREFIX = "internal.tables.assignment.";
/**
* MetaStorage manager in order to watch private distributed affinity specific configuration, cause
* ConfigurationManger handles only public configuration.
*/
private final MetaStorageManager metaStorageMgr;
/** Configuration manager in order to handle and listen affinity specific configuration. */
private final ConfigurationManager configurationMgr;
/** Baseline manager. */
private final BaselineManager baselineMgr;
/**
* Creates a new affinity manager.
*
* @param configurationMgr Configuration module.
* @param metaStorageMgr Meta storage service.
* @param baselineMgr Baseline manager.
*/
public AffinityManager(
ConfigurationManager configurationMgr,
MetaStorageManager metaStorageMgr,
BaselineManager baselineMgr
) {
this.configurationMgr = configurationMgr;
this.metaStorageMgr = metaStorageMgr;
this.baselineMgr = baselineMgr;
metaStorageMgr.registerWatchByPrefix(new ByteArray(INTERNAL_PREFIX), new WatchListener() {
@Override public boolean onUpdate(@NotNull WatchEvent watchEvt) {
for (EntryEvent evt : watchEvt.entryEvents()) {
String keyAsString = new ByteArray(evt.newEntry().key().bytes()).toString();
String tabIdVal = keyAsString.substring(INTERNAL_PREFIX.length());
UUID tblId = UUID.fromString(tabIdVal);
if (evt.newEntry().value() == null) {
assert evt.oldEntry().value() != null : "Previous assignment is unknown";
List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)ByteUtils.fromBytes(
evt.oldEntry().value());
onEvent(AffinityEvent.REMOVED, new AffinityEventParameters(tblId, assignment), null);
}
else {
List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)ByteUtils.fromBytes(
evt.newEntry().value());
onEvent(AffinityEvent.CALCULATED, new AffinityEventParameters(tblId, assignment), null);
}
}
return true;
}
@Override public void onError(@NotNull Throwable e) {
LOG.error("Meta storage listener issue", e);
}
});
}
/**
* Calculates an assignment for a table which was specified by id.
*
* @param tblId Table identifier.
* @param tblName Table name.
* @return A future which will complete when the assignment is calculated.
*/
public CompletableFuture<Boolean> calculateAssignments(UUID tblId, String tblName) {
TableConfiguration tblConfig = configurationMgr.configurationRegistry()
.getConfiguration(TablesConfiguration.KEY).tables().get(tblName);
var key = new ByteArray(INTERNAL_PREFIX + tblId);
// TODO: https://issues.apache.org/jira/browse/IGNITE-14716 Need to support baseline changes.
return metaStorageMgr.invoke(
Conditions.notExists(key),
Operations.put(key, ByteUtils.toBytes(
RendezvousAffinityFunction.assignPartitions(
baselineMgr.nodes(),
tblConfig.partitions().value(),
tblConfig.replicas().value(),
false,
null
))),
Operations.noop());
}
/**
* Removes an assignment for a table which was specified by id.
*
* @param tblId Table identifier.
* @return A future which will complete when assignment is removed.
*/
public CompletableFuture<Boolean> removeAssignment(UUID tblId) {
var key = new ByteArray(INTERNAL_PREFIX + tblId);
return metaStorageMgr.invoke(
Conditions.exists(key),
Operations.remove(key),
Operations.noop());
}
}