blob: f33a5d22ab3b81eea8ffa252ddef17c2df28fac5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
/**
* This class is responsible for reading assignments from the 'assignments' functions internal topic.
* Only functions worker leader writes to the topic while other workers read from the topic.
* When a worker become a leader, the worker
* will read to the end of the assignments topic and close its reader to the topic.
* Then the worker and new leader will be in charge of computing new assignments when necessary.
* The leader does not need to listen to the assignments topic because it can
* just update its in memory assignments map directly
* after it computes a new scheduling. When a worker loses leadership,
* the worker is start reading from the assignments topic again.
*/
@Slf4j
public class FunctionAssignmentTailer implements AutoCloseable {
private final FunctionRuntimeManager functionRuntimeManager;
private final ReaderBuilder readerBuilder;
private final WorkerConfig workerConfig;
private final ErrorNotifier errorNotifier;
private Reader<byte[]> reader;
private volatile boolean isRunning = false;
private volatile boolean exitOnEndOfTopic = false;
private CompletableFuture<Void> exitFuture;
private Thread tailerThread;
@Getter
private MessageId lastMessageId = null;
public FunctionAssignmentTailer(
FunctionRuntimeManager functionRuntimeManager,
ReaderBuilder readerBuilder,
WorkerConfig workerConfig,
ErrorNotifier errorNotifier) {
this.functionRuntimeManager = functionRuntimeManager;
this.exitFuture = new CompletableFuture<>();
this.readerBuilder = readerBuilder;
this.workerConfig = workerConfig;
this.errorNotifier = errorNotifier;
}
public synchronized CompletableFuture<Void> triggerReadToTheEndAndExit() {
exitOnEndOfTopic = true;
return this.exitFuture;
}
public void startFromMessage(MessageId startMessageId) throws PulsarClientException {
if (!isRunning) {
isRunning = true;
if (reader == null) {
reader = createReader(startMessageId);
}
if (tailerThread == null || !tailerThread.isAlive()) {
tailerThread = getTailerThread();
}
tailerThread.start();
}
}
public synchronized void start() throws PulsarClientException {
MessageId startMessageId = lastMessageId == null ? MessageId.earliest : lastMessageId;
startFromMessage(startMessageId);
}
@Override
public synchronized void close() {
log.info("Closing function assignment tailer");
try {
isRunning = false;
if (tailerThread != null) {
while (true) {
tailerThread.interrupt();
try {
tailerThread.join(5000, 0);
} catch (InterruptedException e) {
log.warn("Waiting for assignment tailer thread to stop is interrupted", e);
}
if (tailerThread.isAlive()) {
log.warn("Assignment tailer thread is still alive. Will attempt to interrupt again.");
} else {
break;
}
}
tailerThread = null;
// complete exit future to be safe
exitFuture.complete(null);
// reset the future
exitFuture = new CompletableFuture<>();
}
if (reader != null) {
reader.close();
reader = null;
}
exitOnEndOfTopic = false;
} catch (IOException e) {
log.error("Failed to stop function assignment tailer", e);
}
}
private Reader<byte[]> createReader(MessageId startMessageId) throws PulsarClientException {
log.info("Assignment tailer will start reading from message id {}", startMessageId);
return WorkerUtils.createReader(
readerBuilder,
workerConfig.getWorkerId() + "-function-assignment-tailer",
workerConfig.getFunctionAssignmentTopic(),
startMessageId);
}
private Thread getTailerThread() {
Thread t = new Thread(() -> {
while (isRunning) {
try {
Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
if (msg == null) {
if (exitOnEndOfTopic && !reader.hasMessageAvailableAsync().get(10, TimeUnit.SECONDS)) {
break;
}
} else {
functionRuntimeManager.processAssignmentMessage(msg);
// keep track of last message id
lastMessageId = msg.getMessageId();
}
} catch (Throwable th) {
if (isRunning) {
log.error("Encountered error in assignment tailer", th);
// trigger fatal error
isRunning = false;
errorNotifier.triggerError(th);
} else {
if (!(th instanceof InterruptedException || th.getCause() instanceof InterruptedException)) {
log.warn("Encountered error when assignment tailer is not running", th);
}
}
}
}
log.info("assignment tailer thread exiting");
exitFuture.complete(null);
});
t.setName("assignment-tailer-thread");
return t;
}
Thread getThread() {
return tailerThread;
}
}