blob: ee040d01591f36f06d999aab3a5391d62abe16a8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.client.handler;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.tableIdNotFoundException;
import static org.apache.ignite.internal.event.EventListener.fromConsumer;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.table.LongPriorityQueue;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.TableNotFoundException;
import org.jetbrains.annotations.Nullable;
/**
* Primary partition replica tracker. Shared by all instances of {@link ClientInboundMessageHandler}.
*
* <p>Keeps up-to-date lists of primary replicas by partition for every table, avoiding expensive placement driver calls in most cases.
*
* <p>Every "assignment" (set of primary replicas per partition) is identified by a maxStartTime - latest known lease start time.
*
* <p>Assumptions:
* - Primary replicas are not changed often.
* - We do "best effort" partition awareness - it is ok if we don't have the latest primary replicas at some point or don't have them
* at all. What matters is that we have the correct assignment eventually.
* - It is allowed to return incomplete assignment (null for some partitions) - better than nothing.
* - We don't know which tables the client is going to use, so we track a common maxStartTime for all tables.
*
* <p>Tracking logic:
* - Listen to election events from placement driver, update primary replicas. This is the main source of information.
* - When we have not yet received events for all partitions of a certain table, and the client requests the assignment,
* load it from the placement driver. Wait for a limited amount of time (in getPrimaryReplica) and return what we have.
* Don't block the client for too long, it is better to miss the primary than to delay the request.
*/
public class ClientPrimaryReplicaTracker {
private final ConcurrentHashMap<TablePartitionId, ReplicaHolder> primaryReplicas = new ConcurrentHashMap<>();
private final AtomicLong maxStartTime = new AtomicLong();
private final PlacementDriver placementDriver;
private final ClockService clockService;
private final CatalogService catalogService;
private final SchemaSyncService schemaSyncService;
private final LowWatermark lowWatermark;
private final EventListener<ChangeLowWatermarkEventParameters> lwmListener = fromConsumer(this::onLwmChanged);
private final EventListener<DropTableEventParameters> dropTableEventListener = fromConsumer(this::onTableDrop);
private final EventListener<PrimaryReplicaEventParameters> primaryReplicaEventListener = fromConsumer(this::onPrimaryReplicaChanged);
/** A queue for deferred table destruction events. */
private final LongPriorityQueue<DestroyTableEvent> destructionEventsQueue =
new LongPriorityQueue<>(DestroyTableEvent::catalogVersion);
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
private final AtomicBoolean stopGuard = new AtomicBoolean();
/**
* Constructor.
*
* @param placementDriver Placement driver.
* @param catalogService Catalog.
* @param clockService Clock service.
* @param schemaSyncService Schema synchronization service.
* @param lowWatermark Low watermark.
*/
public ClientPrimaryReplicaTracker(
PlacementDriver placementDriver,
CatalogService catalogService,
ClockService clockService,
SchemaSyncService schemaSyncService,
LowWatermark lowWatermark
) {
this.placementDriver = placementDriver;
this.catalogService = catalogService;
this.clockService = clockService;
this.schemaSyncService = schemaSyncService;
this.lowWatermark = lowWatermark;
}
/**
* Gets primary replicas by partition for the table.
*
* @param tableId Table ID.
* @param maxStartTime Timestamp.
* @return Primary replicas for the table, or null when not yet known.
*/
public CompletableFuture<PrimaryReplicasResult> primaryReplicasAsync(int tableId, @Nullable Long maxStartTime) {
if (!busyLock.enterBusy()) {
return CompletableFuture.failedFuture(new NodeStoppingException());
}
try {
return primaryReplicasAsyncInternal(tableId, maxStartTime);
} finally {
busyLock.leaveBusy();
}
}
/**
* Gets primary replicas by partition for the table.
*
* @param tableId Table ID.
* @param maxStartTime Timestamp.
* @return Primary replicas for the table, or null when not yet known.
*/
private CompletableFuture<PrimaryReplicasResult> primaryReplicasAsyncInternal(int tableId, @Nullable Long maxStartTime) {
HybridTimestamp timestamp = clockService.now();
if (maxStartTime == null) {
maxStartTime = this.maxStartTime.get();
} else {
// If the client provides an old maxStartTime, ignore it and use the current one.
maxStartTime = Math.max(maxStartTime, this.maxStartTime.get());
}
// Check happy path: if we already have all replicas, and this.maxStartTime >= maxStartTime, return synchronously.
PrimaryReplicasResult fastRes = primaryReplicasNoWait(tableId, maxStartTime, timestamp, false);
if (fastRes != null) {
return completedFuture(fastRes);
}
// Request primary for all partitions.
CompletableFuture<Integer> partitionsFut = partitionsAsync(tableId, timestamp).thenCompose(partitions -> {
CompletableFuture<?>[] futures = new CompletableFuture<?>[partitions];
for (int partition = 0; partition < partitions; partition++) {
TablePartitionId tablePartitionId = new TablePartitionId(tableId, partition);
futures[partition] = placementDriver.getPrimaryReplica(tablePartitionId, timestamp).thenAccept(replicaMeta -> {
if (replicaMeta != null && replicaMeta.getLeaseholder() != null) {
updatePrimaryReplica(tablePartitionId, replicaMeta.getStartTime(), replicaMeta.getLeaseholder());
}
});
}
return CompletableFuture.allOf(futures).thenApply(v -> partitions);
});
// Wait for all futures, check condition again.
// Give up (return null) if we don't have replicas with specified maxStartTime - the client will retry later.
long maxStartTime0 = maxStartTime;
return partitionsFut.handle((partitions, err) -> {
if (err != null) {
var cause = ExceptionUtils.unwrapCause(err);
if (cause instanceof TableNotFoundException) {
throw new CompletionException(cause);
}
assert false : "Unexpected error: " + err;
}
PrimaryReplicasResult res = primaryReplicasNoWait(tableId, maxStartTime0, timestamp, true);
if (res != null) {
return res;
}
// Return partition count to the client so that batching can be initialized.
return new PrimaryReplicasResult(partitions);
});
}
@Nullable
private PrimaryReplicasResult primaryReplicasNoWait(
int tableId, long maxStartTime, HybridTimestamp timestamp, boolean allowUnknownReplicas) {
long currentMaxStartTime = this.maxStartTime.get();
if (currentMaxStartTime < maxStartTime) {
return null;
}
int partitions;
try {
partitions = partitionsNoWait(tableId, timestamp);
} catch (IllegalStateException | TableNotFoundException e) {
// Table or schema not found for because we did not wait.
return null;
}
List<String> res = new ArrayList<>(partitions);
for (int partition = 0; partition < partitions; partition++) {
TablePartitionId tablePartitionId = new TablePartitionId(tableId, partition);
ReplicaHolder holder = primaryReplicas.get(tablePartitionId);
if (holder == null || holder.nodeName == null || holder.leaseStartTime == null) {
if (allowUnknownReplicas) {
res.add(null);
continue;
} else {
return null;
}
}
res.add(holder.nodeName);
}
return new PrimaryReplicasResult(res, currentMaxStartTime);
}
private CompletableFuture<Integer> partitionsAsync(int tableId, HybridTimestamp timestamp) {
return schemaSyncService.waitForMetadataCompleteness(timestamp).thenApply(v -> partitionsNoWait(tableId, timestamp));
}
private int partitionsNoWait(int tableId, HybridTimestamp timestamp) {
CatalogTableDescriptor table = catalogService.table(tableId, timestamp.longValue());
if (table == null) {
throw tableIdNotFoundException(tableId);
}
CatalogZoneDescriptor zone = catalogService.zone(table.zoneId(), timestamp.longValue());
if (zone == null) {
throw tableIdNotFoundException(tableId);
}
return zone.partitions();
}
long maxStartTime() {
return maxStartTime.get();
}
void start() {
// This could be newer than the actual max start time, but we are on the safe side here.
maxStartTime.set(clockService.nowLong());
placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, primaryReplicaEventListener);
catalogService.listen(CatalogEvent.TABLE_DROP, dropTableEventListener);
lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED, lwmListener);
}
void stop() {
if (!stopGuard.compareAndSet(false, true)) {
return;
}
busyLock.block();
lowWatermark.removeListener(LowWatermarkEvent.LOW_WATERMARK_CHANGED, lwmListener);
catalogService.removeListener(CatalogEvent.TABLE_DROP, dropTableEventListener);
placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, primaryReplicaEventListener);
primaryReplicas.clear();
}
private void onPrimaryReplicaChanged(PrimaryReplicaEventParameters primaryReplicaEvent) {
inBusyLock(busyLock, () -> {
if (!(primaryReplicaEvent.groupId() instanceof TablePartitionId)) {
return;
}
TablePartitionId tablePartitionId = (TablePartitionId) primaryReplicaEvent.groupId();
updatePrimaryReplica(tablePartitionId, primaryReplicaEvent.startTime(), primaryReplicaEvent.leaseholder());
});
}
private void onTableDrop(DropTableEventParameters parameters) {
inBusyLock(busyLock, () -> {
int tableId = parameters.tableId();
int catalogVersion = parameters.catalogVersion();
int previousVersion = catalogVersion - 1;
// Retrieve descriptor during synchronous call, before the previous catalog version could be concurrently compacted.
int partitions = getTablePartitionsFromCatalog(catalogService, previousVersion, tableId);
destructionEventsQueue.enqueue(new DestroyTableEvent(catalogVersion, tableId, partitions));
});
}
private void onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
inBusyLock(busyLock, () -> {
int earliestVersion = catalogService.activeCatalogVersion(parameters.newLowWatermark().longValue());
List<DestroyTableEvent> events = destructionEventsQueue.drainUpTo(earliestVersion);
events.forEach(event -> removeTable(event.tableId(), event.partitions()));
});
}
private void removeTable(int tableId, int partitions) {
for (int partition = 0; partition < partitions; partition++) {
TablePartitionId tablePartitionId = new TablePartitionId(tableId, partition);
primaryReplicas.remove(tablePartitionId);
}
}
private void updatePrimaryReplica(TablePartitionId tablePartitionId, HybridTimestamp startTime, String nodeName) {
long startTimeLong = startTime.longValue();
primaryReplicas.compute(tablePartitionId, (key, existingVal) -> {
if (existingVal != null
&& existingVal.leaseStartTime != null
&& existingVal.leaseStartTime.longValue() >= startTimeLong) {
return existingVal;
}
return new ReplicaHolder(nodeName, startTime);
});
maxStartTime.updateAndGet(value -> Math.max(value, startTimeLong));
}
private static int getTablePartitionsFromCatalog(CatalogService catalogService, int catalogVersion, int tableId) {
CatalogTableDescriptor tableDescriptor = catalogService.table(tableId, catalogVersion);
assert tableDescriptor != null : "tableId=" + tableId + ", catalogVersion=" + catalogVersion;
int zoneId = tableDescriptor.zoneId();
CatalogZoneDescriptor zoneDescriptor = catalogService.zone(zoneId, catalogVersion);
assert zoneDescriptor != null : "zoneId=" + zoneId + ", catalogVersion=" + catalogVersion;
return zoneDescriptor.partitions();
}
private static class ReplicaHolder {
final String nodeName;
final HybridTimestamp leaseStartTime;
ReplicaHolder(String nodeName, HybridTimestamp leaseStartTime) {
this.nodeName = nodeName;
this.leaseStartTime = leaseStartTime;
}
}
/**
* Primary replicas per partition with timestamp.
*/
public static class PrimaryReplicasResult {
private final int partitions;
@Nullable
private final List<String> nodeNames;
private final long timestamp;
PrimaryReplicasResult(int partitions) {
this.partitions = partitions;
this.nodeNames = null;
this.timestamp = 0;
}
PrimaryReplicasResult(List<String> nodeNames, long timestamp) {
this.partitions = nodeNames.size();
this.nodeNames = nodeNames;
this.timestamp = timestamp;
}
public @Nullable List<String> nodeNames() {
return nodeNames;
}
public long timestamp() {
return timestamp;
}
public int partitions() {
return partitions;
}
}
/** Internal event. */
private static class DestroyTableEvent {
final int catalogVersion;
final int tableId;
final int partitions;
DestroyTableEvent(int catalogVersion, int tableId, int partitions) {
this.catalogVersion = catalogVersion;
this.tableId = tableId;
this.partitions = partitions;
}
public int catalogVersion() {
return catalogVersion;
}
public int tableId() {
return tableId;
}
public int partitions() {
return partitions;
}
}
}