blob: 224809c8a4d5a4fa983221ac722f4636585188da [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.lowwatermark;
import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
import static org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_BEFORE_CHANGE;
import static org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_CHANGED;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEventParameters;
import org.apache.ignite.internal.lowwatermark.message.GetLowWatermarkRequest;
import org.apache.ignite.internal.lowwatermark.message.LowWatermarkMessageGroup;
import org.apache.ignite.internal.lowwatermark.message.LowWatermarkMessagesFactory;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
import org.apache.ignite.internal.schema.configuration.LowWatermarkConfigurationSchema;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
/**
* Class to manage the low watermark.
*
* <p>Low watermark updating occurs in the following cases (will only be updated if the new value is greater than the existing one):</p>
* <ul>
* <li>By calling {@link #updateAndNotify(HybridTimestamp)}.</li>
* <li>In the background every {@link LowWatermarkConfigurationSchema#updateFrequency} milliseconds, a new value will be created in
* {@link #createNewLowWatermarkCandidate()}.</li>
* </ul>
*
* <p>Algorithm for updating a new low watermark:</p>
* <ul>
* <li>Write the new value in vault by {@link #LOW_WATERMARK_VAULT_KEY}.</li>
* <li>Notify all {@link LowWatermarkEvent} listeners that the new watermark has changed and wait until they complete processing the
* event.</li>
* </ul>
*
* @see LowWatermarkEvent
*/
public class LowWatermarkImpl extends AbstractEventProducer<LowWatermarkEvent, LowWatermarkEventParameters> implements LowWatermark,
IgniteComponent {
private static final IgniteLogger LOG = Loggers.forClass(LowWatermarkImpl.class);
static final ByteArray LOW_WATERMARK_VAULT_KEY = new ByteArray("low-watermark");
private static final LowWatermarkMessagesFactory MESSAGES_FACTORY = new LowWatermarkMessagesFactory();
private final LowWatermarkConfiguration lowWatermarkConfig;
private final ClockService clockService;
private final VaultManager vaultManager;
// TODO: IGNITE-21772 Consider using a shared pool to update a low watermark
private final ScheduledExecutorService scheduledThreadPool;
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
private final AtomicBoolean closeGuard = new AtomicBoolean();
private volatile @Nullable HybridTimestamp lowWatermark;
private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture = new AtomicReference<>();
private final FailureProcessor failureProcessor;
private final ReadWriteLock updateLowWatermarkLock = new ReentrantReadWriteLock();
private final MessagingService messagingService;
private final AtomicReference<LowWatermarkCandidate> lowWatermarkCandidate = new AtomicReference<>(
new LowWatermarkCandidate(MIN_VALUE, nullCompletedFuture())
);
/**
* Constructor.
*
* @param nodeName Node name.
* @param lowWatermarkConfig Low watermark configuration.
* @param clockService A hybrid logical clock.
* @param vaultManager Vault manager.
* @param failureProcessor Failure processor tha is used to handle critical errors.
*/
public LowWatermarkImpl(
String nodeName,
LowWatermarkConfiguration lowWatermarkConfig,
ClockService clockService,
VaultManager vaultManager,
FailureProcessor failureProcessor,
MessagingService messagingService
) {
this.lowWatermarkConfig = lowWatermarkConfig;
this.clockService = clockService;
this.vaultManager = vaultManager;
this.failureProcessor = failureProcessor;
this.messagingService = messagingService;
scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
NamedThreadFactory.create(nodeName, "low-watermark-updater", LOG)
);
}
@Override
public CompletableFuture<Void> start() {
return inBusyLockAsync(busyLock, () -> {
setLowWatermarkOnRecovery(readLowWatermarkFromVault());
messagingService.addMessageHandler(LowWatermarkMessageGroup.class, this::onReceiveNetworkMessage);
return nullCompletedFuture();
});
}
/** Schedule low watermark updates. */
public void scheduleUpdates() {
inBusyLock(busyLock, this::scheduleUpdateLowWatermarkBusy);
}
private @Nullable HybridTimestamp readLowWatermarkFromVault() {
VaultEntry vaultEntry = vaultManager.get(LOW_WATERMARK_VAULT_KEY);
return vaultEntry == null ? null : ByteUtils.fromBytes(vaultEntry.value());
}
@Override
public void stop() {
if (!closeGuard.compareAndSet(false, true)) {
return;
}
busyLock.block();
ScheduledFuture<?> lastScheduledTaskFuture = this.lastScheduledTaskFuture.get();
if (lastScheduledTaskFuture != null) {
lastScheduledTaskFuture.cancel(true);
}
IgniteUtils.shutdownAndAwaitTermination(scheduledThreadPool, 10, TimeUnit.SECONDS);
}
@Override
public @Nullable HybridTimestamp getLowWatermark() {
return inBusyLock(busyLock, () -> lowWatermark);
}
@Override
public void getLowWatermarkSafe(Consumer<@Nullable HybridTimestamp> consumer) {
inBusyLock(busyLock, () -> {
updateLowWatermarkLock.readLock().lock();
try {
consumer.accept(lowWatermark);
} finally {
updateLowWatermarkLock.readLock().unlock();
}
});
}
private void scheduleUpdateLowWatermarkBusy() {
ScheduledFuture<?> previousScheduledFuture = this.lastScheduledTaskFuture.get();
assert previousScheduledFuture == null || previousScheduledFuture.isDone() : "previous scheduled task has not finished";
ScheduledFuture<?> newScheduledFuture = scheduledThreadPool.schedule(
() -> updateLowWatermark(createNewLowWatermarkCandidate()),
lowWatermarkConfig.updateFrequency().value(),
TimeUnit.MILLISECONDS
);
boolean casResult = lastScheduledTaskFuture.compareAndSet(previousScheduledFuture, newScheduledFuture);
assert casResult : "only one scheduled task is expected";
}
HybridTimestamp createNewLowWatermarkCandidate() {
HybridTimestamp now = clockService.now();
return now.subtractPhysicalTime(lowWatermarkConfig.dataAvailabilityTime().value() + clockService.maxClockSkewMillis());
}
private void setLowWatermark(HybridTimestamp newLowWatermark) {
updateLowWatermarkLock.writeLock().lock();
try {
assert lowWatermark == null || newLowWatermark.compareTo(lowWatermark) > 0 :
"Low watermark should only grow: [cur=" + lowWatermark + ", new=" + newLowWatermark + "]";
lowWatermark = newLowWatermark;
} finally {
updateLowWatermarkLock.writeLock().unlock();
}
}
private void setLowWatermarkOnRecovery(@Nullable HybridTimestamp newLowWatermark) {
updateLowWatermarkLock.writeLock().lock();
try {
lowWatermark = newLowWatermark;
} finally {
updateLowWatermarkLock.writeLock().unlock();
}
}
void onReceiveNetworkMessage(NetworkMessage message, ClusterNode sender, @Nullable Long correlationId) {
inBusyLock(busyLock, () -> {
if (!(message instanceof GetLowWatermarkRequest)) {
return;
}
assert correlationId != null : sender;
messagingService.respond(
sender,
MESSAGES_FACTORY.getLowWatermarkResponse().lowWatermark(hybridTimestampToLong(lowWatermark)).build(),
correlationId
);
});
}
@Override
public void updateLowWatermark(HybridTimestamp newLowWatermark) {
inBusyLock(busyLock, () -> {
LowWatermarkCandidate newLowWatermarkCandidate = new LowWatermarkCandidate(newLowWatermark, new CompletableFuture<>());
LowWatermarkCandidate oldLowWatermarkCandidate;
do {
oldLowWatermarkCandidate = lowWatermarkCandidate.get();
// If another candidate contains a higher low watermark, then there is no need to update.
if (oldLowWatermarkCandidate.lowWatermark().compareTo(newLowWatermark) >= 0) {
return;
}
} while (!lowWatermarkCandidate.compareAndSet(oldLowWatermarkCandidate, newLowWatermarkCandidate));
// We will start the update as soon as the previous one finishes.
oldLowWatermarkCandidate.updateFuture()
.thenComposeAsync(unused -> updateAndNotify(newLowWatermark), scheduledThreadPool)
.whenComplete((unused, throwable) -> {
if (throwable != null) {
newLowWatermarkCandidate.updateFuture().completeExceptionally(throwable);
} else {
newLowWatermarkCandidate.updateFuture().complete(null);
}
});
});
}
CompletableFuture<Void> updateAndNotify(HybridTimestamp newLowWatermark) {
return inBusyLockAsync(busyLock, () ->
fireEvent(LOW_WATERMARK_BEFORE_CHANGE, new ChangeLowWatermarkEventParameters(newLowWatermark))
.thenComposeAsync(unused -> {
vaultManager.put(LOW_WATERMARK_VAULT_KEY, ByteUtils.toBytes(newLowWatermark));
setLowWatermark(newLowWatermark);
return fireEvent(LOW_WATERMARK_CHANGED, new ChangeLowWatermarkEventParameters(newLowWatermark));
}, scheduledThreadPool)
.whenCompleteAsync((unused, throwable) -> {
if (throwable != null) {
if (!(throwable instanceof NodeStoppingException)) {
LOG.error("Failed to update low watermark, will schedule again: {}", throwable, newLowWatermark);
failureProcessor.process(new FailureContext(CRITICAL_ERROR, throwable));
inBusyLock(busyLock, this::scheduleUpdateLowWatermarkBusy);
}
} else {
LOG.info("Successful low watermark update: {}", newLowWatermark);
inBusyLock(busyLock, this::scheduleUpdateLowWatermarkBusy);
}
}, scheduledThreadPool)
);
}
}