blob: f2ea39651a26f4e6398b8093ff817840568c1602 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.Unpooled;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Sinks;
import org.apache.pulsar.client.admin.Sources;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntime;
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig;
import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedConstruction;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.annotations.Test;
@Slf4j
public class FunctionRuntimeManagerTest {
private final String PULSAR_SERVICE_URL = "pulsar://localhost:6650";
@Test
public void testProcessAssignmentUpdateAddFunctions() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getMapper().getObjectMapper().convertValue(
new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
PulsarClient pulsarClient = mock(PulsarClient.class);
ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
doReturn(mock(Reader.class)).when(readerBuilder).create();
PulsarWorkerService workerService = mock(PulsarWorkerService.class);
doReturn(pulsarClient).when(workerService).getClient();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
try (final MockedStatic<RuntimeFactory> runtimeFactoryMockedStatic = Mockito
.mockStatic(RuntimeFactory.class);) {
mockRuntimeFactory(runtimeFactoryMockedStatic);
// test new assignment add functions
@Cleanup
FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
workerConfig,
workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class)));
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
functionRuntimeManager.setFunctionActioner(functionActioner);
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();
Function.Assignment assignment1 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function1).setInstanceId(0).build())
.build();
Function.Assignment assignment2 = Function.Assignment.newBuilder()
.setWorkerId("worker-2")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function2).setInstanceId(0).build())
.build();
List<Function.Assignment> assignments = new LinkedList<>();
assignments.add(assignment1);
assignments.add(assignment2);
functionRuntimeManager.processAssignment(assignment1);
functionRuntimeManager.processAssignment(assignment2);
verify(functionRuntimeManager, times(2)).setAssignment(any(Function.Assignment.class));
verify(functionRuntimeManager, times(0)).deleteAssignment(any(Function.Assignment.class));
assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 2);
assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-1").get("test-tenant/test-namespace/func-1:0"), assignment1);
assertEquals(functionRuntimeManager.workerIdToAssignments.get("worker-2")
.get("test-tenant/test-namespace/func-2:0"), assignment2);
verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner).startFunction(argThat(
functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
.equals(function1)));
verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class));
assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 1);
assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"),
new FunctionRuntimeInfo().setFunctionInstance(
Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
.build()));
}
}
@Test
public void testProcessAssignmentUpdateDeleteFunctions() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getMapper().getObjectMapper().convertValue(
new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
PulsarClient pulsarClient = mock(PulsarClient.class);
ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
doReturn(mock(Reader.class)).when(readerBuilder).create();
PulsarWorkerService workerService = mock(PulsarWorkerService.class);
doReturn(pulsarClient).when(workerService).getClient();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
try (final MockedStatic<RuntimeFactory> runtimeFactoryMockedStatic = Mockito
.mockStatic(RuntimeFactory.class);) {
mockRuntimeFactory(runtimeFactoryMockedStatic);
// test new assignment delete functions
@Cleanup
FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
workerConfig,
workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class)));
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
functionRuntimeManager.setFunctionActioner(functionActioner);
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();
// Delete this assignment
Function.Assignment assignment1 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function1).setInstanceId(0).build())
.build();
Function.Assignment assignment2 = Function.Assignment.newBuilder()
.setWorkerId("worker-2")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function2).setInstanceId(0).build())
.build();
// add existing assignments
functionRuntimeManager.setAssignment(assignment1);
functionRuntimeManager.setAssignment(assignment2);
reset(functionRuntimeManager);
functionRuntimeManager.functionRuntimeInfos.put(
"test-tenant/test-namespace/func-1:0", new FunctionRuntimeInfo().setFunctionInstance(
Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
.build()));
functionRuntimeManager.processAssignment(assignment1);
functionRuntimeManager.processAssignment(assignment2);
functionRuntimeManager
.deleteAssignment(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance()));
verify(functionRuntimeManager, times(0)).setAssignment(any(Function.Assignment.class));
verify(functionRuntimeManager, times(1)).deleteAssignment(any(String.class));
assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-2").get("test-tenant/test-namespace/func-2:0"), assignment2);
verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner, times(1)).terminateFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner).terminateFunction(argThat(
functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
.equals(function1)));
assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 0);
}
}
private void mockRuntimeFactory(MockedStatic<RuntimeFactory> runtimeFactoryMockedStatic) {
runtimeFactoryMockedStatic
.when(() -> RuntimeFactory.getFuntionRuntimeFactory(eq(ThreadRuntimeFactory.class.getName())))
.thenAnswer((Answer<ThreadRuntimeFactory>) invocation -> new ThreadRuntimeFactory());
}
@Test
public void testProcessAssignmentUpdateModifyFunctions() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getMapper().getObjectMapper().convertValue(
new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
PulsarClient pulsarClient = mock(PulsarClient.class);
ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
doReturn(mock(Reader.class)).when(readerBuilder).create();
PulsarWorkerService workerService = mock(PulsarWorkerService.class);
doReturn(pulsarClient).when(workerService).getClient();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
try (final MockedStatic<RuntimeFactory> runtimeFactoryMockedStatic = Mockito
.mockStatic(RuntimeFactory.class);) {
mockRuntimeFactory(runtimeFactoryMockedStatic);
// test new assignment update functions
@Cleanup
FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
workerConfig,
workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
functionRuntimeManager.setFunctionActioner(functionActioner);
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();
Function.Assignment assignment1 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function1).setInstanceId(0).build())
.build();
Function.Assignment assignment2 = Function.Assignment.newBuilder()
.setWorkerId("worker-2")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function2).setInstanceId(0).build())
.build();
// add existing assignments
functionRuntimeManager.setAssignment(assignment1);
functionRuntimeManager.setAssignment(assignment2);
reset(functionActioner);
Function.Assignment assignment3 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function2).setInstanceId(0).build())
.build();
functionRuntimeManager.functionRuntimeInfos.put(
"test-tenant/test-namespace/func-1:0", new FunctionRuntimeInfo().setFunctionInstance(
Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
.build()));
functionRuntimeManager.functionRuntimeInfos.put(
"test-tenant/test-namespace/func-2:0", new FunctionRuntimeInfo().setFunctionInstance(
Function.Instance.newBuilder().setFunctionMetaData(function2).setInstanceId(0)
.build()));
functionRuntimeManager.processAssignment(assignment1);
functionRuntimeManager.processAssignment(assignment3);
verify(functionActioner, times(1)).stopFunction(any(FunctionRuntimeInfo.class));
// make sure terminate is not called since this is a update operation
verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner).stopFunction(argThat(
functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
.equals(function2)));
verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner).startFunction(argThat(
functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
.equals(function2)));
assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 2);
assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-1").get("test-tenant/test-namespace/func-1:0"), assignment1);
assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-1").get("test-tenant/test-namespace/func-2:0"), assignment3);
reset(functionActioner);
// add a stop
Function.FunctionMetaData.Builder function2StoppedBldr = function2.toBuilder();
function2StoppedBldr.putInstanceStates(0, Function.FunctionState.STOPPED);
Function.FunctionMetaData function2Stopped = function2StoppedBldr.build();
Function.Assignment assignment4 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function2Stopped).setInstanceId(0).build())
.build();
functionRuntimeManager.processAssignment(assignment4);
verify(functionActioner, times(1)).stopFunction(any(FunctionRuntimeInfo.class));
// make sure terminate is not called since this is a update operation
verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner).stopFunction(argThat(functionRuntimeInfo ->
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function2)));
verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class));
assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 2);
assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-1").get("test-tenant/test-namespace/func-1:0"), assignment1);
assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-1").get("test-tenant/test-namespace/func-2:0"), assignment4);
}
}
@Test
public void testReassignment() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getMapper().getObjectMapper().convertValue(
new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
PulsarClient pulsarClient = mock(PulsarClient.class);
ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
doReturn(mock(Reader.class)).when(readerBuilder).create();
PulsarWorkerService workerService = mock(PulsarWorkerService.class);
doReturn(pulsarClient).when(workerService).getClient();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
try (final MockedStatic<RuntimeFactory> runtimeFactoryMockedStatic = Mockito
.mockStatic(RuntimeFactory.class);) {
mockRuntimeFactory(runtimeFactoryMockedStatic);
// test new assignment update functions
@Cleanup
FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
workerConfig,
workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
functionRuntimeManager.setFunctionActioner(functionActioner);
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
Function.Assignment assignment1 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function1).setInstanceId(0).build())
.build();
/** Test transfer from me to other worker **/
// add existing assignments
functionRuntimeManager.setAssignment(assignment1);
// new assignment with different worker
Function.Assignment assignment2 = Function.Assignment.newBuilder()
.setWorkerId("worker-2")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function1).setInstanceId(0).build())
.build();
FunctionRuntimeInfo functionRuntimeInfo = new FunctionRuntimeInfo().setFunctionInstance(
Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
.build());
functionRuntimeManager.functionRuntimeInfos.put(
"test-tenant/test-namespace/func-1:0", functionRuntimeInfo);
functionRuntimeManager.processAssignment(assignment2);
verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner, times(1)).stopFunction(any(FunctionRuntimeInfo.class));
assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-2").get("test-tenant/test-namespace/func-1:0"), assignment2);
assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 0);
assertNull(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"));
/** Test transfer from other worker to me **/
reset(functionActioner);
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
functionRuntimeManager.setFunctionActioner(functionActioner);
Function.Assignment assignment3 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function1).setInstanceId(0).build())
.build();
functionRuntimeManager.processAssignment(assignment3);
verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class));
assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-1").get("test-tenant/test-namespace/func-1:0"), assignment3);
assertNull(functionRuntimeManager.workerIdToAssignments
.get("worker-2"));
assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 1);
assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"),
functionRuntimeInfo);
}
}
@Test
public void testRuntimeManagerInitialize() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getMapper().getObjectMapper().convertValue(
new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();
Function.Assignment assignment1 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function1).setInstanceId(0).build())
.build();
Function.Assignment assignment2 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function2).setInstanceId(0).build())
.build();
Function.Assignment assignment3 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function2).setInstanceId(0).build())
.build();
List<Message<byte[]>> messageList = new LinkedList<>();
MessageMetadata metadata = new MessageMetadata();
MessageId messageId1 = new MessageIdImpl(0, 1, -1);
Message message1 = spy(new MessageImpl("foo", messageId1.toString(),
new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null, metadata));
doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey();
MessageId messageId2 = new MessageIdImpl(0, 2, -1);
Message message2 = spy(new MessageImpl("foo", messageId2.toString(),
new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null, metadata));
doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey();
// delete function2
MessageId messageId3 = new MessageIdImpl(0, 3, -1);
Message message3 = spy(new MessageImpl("foo", messageId3.toString(),
new HashMap<>(), Unpooled.copiedBuffer("".getBytes()), null, metadata));
doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment3.getInstance())).when(message3).getKey();
messageList.add(message1);
messageList.add(message2);
messageList.add(message3);
PulsarClient pulsarClient = mock(PulsarClient.class);
Reader<byte[]> reader = mock(Reader.class);
Iterator<Message<byte[]>> it = messageList.iterator();
when(reader.readNext()).thenAnswer(new Answer<Message<byte[]>>() {
@Override
public Message<byte[]> answer(InvocationOnMock invocationOnMock) throws Throwable {
return it.next();
}
});
when(reader.readNextAsync()).thenAnswer(new Answer<CompletableFuture<Message<byte[]>>>() {
@Override
public CompletableFuture<Message<byte[]>> answer(InvocationOnMock invocationOnMock) throws Throwable {
return new CompletableFuture<>();
}
});
when(reader.hasMessageAvailable()).thenAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
return it.hasNext();
}
});
ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).readerName(anyString());
doReturn(readerBuilder).when(readerBuilder).subscriptionRolePrefix(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
doReturn(reader).when(readerBuilder).create();
PulsarWorkerService workerService = mock(PulsarWorkerService.class);
doReturn(pulsarClient).when(workerService).getClient();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
ErrorNotifier errorNotifier = mock(ErrorNotifier.class);
try (final MockedStatic<RuntimeFactory> runtimeFactoryMockedStatic = Mockito
.mockStatic(RuntimeFactory.class);) {
mockRuntimeFactory(runtimeFactoryMockedStatic);
// test new assignment add functions
@Cleanup
FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
workerConfig,
workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
errorNotifier);
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
functionRuntimeManager.setFunctionActioner(functionActioner);
assertEquals(functionRuntimeManager.initialize(), messageId3);
assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class));
// verify stop function is called zero times because we don't want to unnecessarily restart any functions during initialization
verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner).startFunction(
argThat(functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance()
.equals(assignment1.getInstance())));
assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 1);
assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"),
new FunctionRuntimeInfo().setFunctionInstance(
Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
.build()));
// verify no errors occurred
verify(errorNotifier, times(0)).triggerError(any());
}
}
@Test
public void testExternallyManagedRuntimeUpdate() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setFunctionRuntimeFactoryClassName(KubernetesRuntimeFactory.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getMapper().getObjectMapper()
.convertValue(new KubernetesRuntimeFactoryConfig()
.setSubmittingInsidePod(false), Map.class));
workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setPulsarFunctionsCluster("cluster");
PulsarClient pulsarClient = mock(PulsarClient.class);
ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
doReturn(mock(Reader.class)).when(readerBuilder).create();
PulsarWorkerService workerService = mock(PulsarWorkerService.class);
doReturn(pulsarClient).when(workerService).getClient();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
KubernetesRuntimeFactory kubernetesRuntimeFactory = mock(KubernetesRuntimeFactory.class);
doNothing().when(kubernetesRuntimeFactory).initialize(
any(WorkerConfig.class),
any(AuthenticationConfig.class),
any(SecretsProviderConfigurator.class),
any(),
any(),
any(),
any()
);
doNothing().when(kubernetesRuntimeFactory).setupClient();
doReturn(true).when(kubernetesRuntimeFactory).externallyManaged();
KubernetesRuntime kubernetesRuntime = mock(KubernetesRuntime.class);
doReturn(kubernetesRuntime).when(kubernetesRuntimeFactory).createContainer(any(), any(), any(), any(), any(), any());
FunctionActioner functionActioner = spy(new FunctionActioner(
workerConfig,
kubernetesRuntimeFactory, null, null, null, null, workerService.getPackageUrlValidator()));
try (final MockedStatic<RuntimeFactory> runtimeFactoryMockedStatic = Mockito
.mockStatic(RuntimeFactory.class);) {
runtimeFactoryMockedStatic.when(() -> RuntimeFactory.getFuntionRuntimeFactory(anyString()))
.thenAnswer(invocation -> kubernetesRuntimeFactory);
// test new assignment update functions
@Cleanup
FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
workerConfig,
workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
functionRuntimeManager.setFunctionActioner(functionActioner);
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
.setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath("path"))
.setTransformFunctionPackageLocation(Function.PackageLocationMetaData.newBuilder()
.setPackagePath("function-path"))
.setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
Function.Assignment assignment1 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function1).setInstanceId(0).build())
.build();
/** Test transfer from me to other worker **/
// add existing assignments
functionRuntimeManager.setAssignment(assignment1);
// new assignment with different worker
Function.Assignment assignment2 = Function.Assignment.newBuilder()
.setWorkerId("worker-2")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function1).setInstanceId(0).build())
.build();
Function.Instance instance = Function.Instance.newBuilder()
.setFunctionMetaData(function1).setInstanceId(0).build();
FunctionRuntimeInfo functionRuntimeInfo = new FunctionRuntimeInfo()
.setFunctionInstance(instance)
.setRuntimeSpawner(functionActioner
.getRuntimeSpawner(instance, function1.getPackageLocation().getPackagePath(),
function1.getTransformFunctionPackageLocation().getPackagePath()));
functionRuntimeManager.functionRuntimeInfos.put(
"test-tenant/test-namespace/func-1:0", functionRuntimeInfo);
functionRuntimeManager.processAssignment(assignment2);
// make sure nothing is called
verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class));
assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-2").get("test-tenant/test-namespace/func-1:0"), assignment2);
assertNull(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"));
/** Test transfer from other worker to me **/
Function.Assignment assignment3 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function1).setInstanceId(0).build())
.build();
functionRuntimeManager.processAssignment(assignment3);
// make sure nothing is called
verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class));
assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-1").get("test-tenant/test-namespace/func-1:0"), assignment3);
assertNull(functionRuntimeManager.workerIdToAssignments
.get("worker-2"));
assertEquals(
functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0")
.getFunctionInstance(),
functionRuntimeInfo.getFunctionInstance());
assertNotNull(
functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0")
.getRuntimeSpawner());
assertEquals(
functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0")
.getRuntimeSpawner().getInstanceConfig().getFunctionDetails(),
function1.getFunctionDetails());
assertEquals(
functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0")
.getRuntimeSpawner().getInstanceConfig().getInstanceId(),
instance.getInstanceId());
assertTrue(
functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0")
.getRuntimeSpawner().getRuntimeFactory() instanceof KubernetesRuntimeFactory);
assertNotNull(
functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0")
.getRuntimeSpawner().getRuntime());
verify(kubernetesRuntime, times(1)).reinitialize();
}
}
@Test
public void testFunctionRuntimeSetCorrectly() {
// Function runtime not set
try {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
new FunctionRuntimeManager(
workerConfig,
mock(PulsarWorkerService.class),
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
fail();
} catch (Exception e) {
assertEquals(e.getMessage(), "A Function Runtime Factory needs to be set");
}
// Function runtime class not found
try {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setFunctionRuntimeFactoryClassName("foo");
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getMapper().getObjectMapper()
.convertValue(new KubernetesRuntimeFactoryConfig(), Map.class));
workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
new FunctionRuntimeManager(
workerConfig,
mock(PulsarWorkerService.class),
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
fail();
} catch (Exception e) {
assertEquals(e.getCause().getClass(), ClassNotFoundException.class);
}
// Function runtime class does not implement correct interface
try {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setFunctionRuntimeFactoryClassName(FunctionRuntimeManagerTest.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getMapper().getObjectMapper()
.convertValue(new KubernetesRuntimeFactoryConfig(), Map.class));
workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
new FunctionRuntimeManager(
workerConfig,
mock(PulsarWorkerService.class),
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
fail();
} catch (Exception e) {
assertEquals(e.getMessage(),
"org.apache.pulsar.functions.worker.FunctionRuntimeManagerTest does not implement org.apache.pulsar.functions.runtime.RuntimeFactory");
}
// Correct runtime class
try {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getMapper().getObjectMapper()
.convertValue(new KubernetesRuntimeFactoryConfig(), Map.class));
workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
try (final MockedStatic<RuntimeFactory> runtimeFactoryMockedStatic = Mockito
.mockStatic(RuntimeFactory.class);) {
mockRuntimeFactory(runtimeFactoryMockedStatic);
@Cleanup
FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
workerConfig,
mock(PulsarWorkerService.class),
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class);
}
} catch (Exception e) {
log.error("Failed to initialize the runtime manager : ", e);
fail();
}
}
@Test
public void testFunctionRuntimeFactoryConfigsBackwardsCompatibility() throws Exception {
// Test kubernetes runtime
WorkerConfig.KubernetesContainerFactory kubernetesContainerFactory
= new WorkerConfig.KubernetesContainerFactory();
kubernetesContainerFactory.setK8Uri("k8Uri");
kubernetesContainerFactory.setJobNamespace("jobNamespace");
kubernetesContainerFactory.setJobName("jobName");
kubernetesContainerFactory.setPulsarDockerImageName("pulsarDockerImageName");
kubernetesContainerFactory.setImagePullPolicy("imagePullPolicy");
kubernetesContainerFactory.setPulsarRootDir("pulsarRootDir");
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setKubernetesContainerFactory(kubernetesContainerFactory);
try (MockedConstruction<KubernetesRuntimeFactory> mocked = Mockito.mockConstruction(KubernetesRuntimeFactory.class,
withSettings().defaultAnswer(CALLS_REAL_METHODS),
(mockedKubernetesRuntimeFactory, context) -> {
doNothing().when(mockedKubernetesRuntimeFactory).initialize(
any(WorkerConfig.class),
any(AuthenticationConfig.class),
any(SecretsProviderConfigurator.class),
any(),
any(),
any(),
any()
);
doNothing().when(mockedKubernetesRuntimeFactory).setupClient();
doReturn(true).when(mockedKubernetesRuntimeFactory).externallyManaged();
})) {
@Cleanup
FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
workerConfig,
mock(PulsarWorkerService.class),
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
KubernetesRuntimeFactory kubernetesRuntimeFactory = (KubernetesRuntimeFactory) functionRuntimeManager.getRuntimeFactory();
assertEquals(kubernetesRuntimeFactory.getK8Uri(), "k8Uri");
assertEquals(kubernetesRuntimeFactory.getJobNamespace(), "jobNamespace");
assertEquals(kubernetesRuntimeFactory.getPulsarDockerImageName(), "pulsarDockerImageName");
assertEquals(kubernetesRuntimeFactory.getImagePullPolicy(), "imagePullPolicy");
assertEquals(kubernetesRuntimeFactory.getPulsarRootDir(), "pulsarRootDir");
// Test process runtime
WorkerConfig.ProcessContainerFactory processContainerFactory
= new WorkerConfig.ProcessContainerFactory();
processContainerFactory.setExtraFunctionDependenciesDir("extraDependenciesDir");
processContainerFactory.setLogDirectory("logDirectory");
processContainerFactory.setPythonInstanceLocation("pythonInstanceLocation");
processContainerFactory.setJavaInstanceJarLocation("javaInstanceJarLocation");
workerConfig = new WorkerConfig();
workerConfig.setProcessContainerFactory(processContainerFactory);
functionRuntimeManager.close();
functionRuntimeManager = new FunctionRuntimeManager(
workerConfig,
mock(PulsarWorkerService.class),
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ProcessRuntimeFactory.class);
ProcessRuntimeFactory processRuntimeFactory =
(ProcessRuntimeFactory) functionRuntimeManager.getRuntimeFactory();
assertEquals(processRuntimeFactory.getExtraDependenciesDir(), "extraDependenciesDir");
assertEquals(processRuntimeFactory.getLogDirectory(), "logDirectory/functions");
assertEquals(processRuntimeFactory.getPythonInstanceFile(), "pythonInstanceLocation");
assertEquals(processRuntimeFactory.getJavaInstanceJarFile(), "javaInstanceJarLocation");
// Test thread runtime
WorkerConfig.ThreadContainerFactory threadContainerFactory
= new WorkerConfig.ThreadContainerFactory();
threadContainerFactory.setThreadGroupName("threadGroupName");
workerConfig = new WorkerConfig();
workerConfig.setThreadContainerFactory(threadContainerFactory);
workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
functionRuntimeManager.close();
functionRuntimeManager = new FunctionRuntimeManager(
workerConfig,
mock(PulsarWorkerService.class),
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class);
ThreadRuntimeFactory threadRuntimeFactory =
(ThreadRuntimeFactory) functionRuntimeManager.getRuntimeFactory();
assertEquals(threadRuntimeFactory.getThreadGroup().getName(), "threadGroupName");
}
}
@Test
public void testThreadFunctionInstancesRestart() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getMapper().getObjectMapper().convertValue(
new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
PulsarWorkerService workerService = mock(PulsarWorkerService.class);
// mock pulsarAdmin sources sinks functions
PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class);
Sources sources = mock(Sources.class);
doNothing().when(sources).restartSource(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any());
doReturn(sources).when(pulsarAdmin).sources();
Sinks sinks = mock(Sinks.class);
doReturn(sinks).when(pulsarAdmin).sinks();
Functions functions = mock(Functions.class);
doNothing().when(functions)
.restartFunction(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any());
doReturn(functions).when(pulsarAdmin).functions();
doReturn(pulsarAdmin).when(workerService).getFunctionAdmin();
try (final MockedStatic<RuntimeFactory> runtimeFactoryMockedStatic = Mockito
.mockStatic(RuntimeFactory.class);) {
mockRuntimeFactory(runtimeFactoryMockedStatic);
List<WorkerInfo> workerInfos = new LinkedList<>();
workerInfos.add(WorkerInfo.of("worker-1", "localhost", 0));
workerInfos.add(WorkerInfo.of("worker-2", "localhost", 0));
MembershipManager membershipManager = mock(MembershipManager.class);
doReturn(workerInfos).when(membershipManager).getCurrentMembership();
// build three types of FunctionMetaData
Function.FunctionMetaData function = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("function")
.setComponentType(Function.FunctionDetails.ComponentType.FUNCTION)).build();
Function.FunctionMetaData source = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("source")
.setComponentType(Function.FunctionDetails.ComponentType.SOURCE)).build();
Function.FunctionMetaData sink = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("sink")
.setComponentType(Function.FunctionDetails.ComponentType.SINK)).build();
@Cleanup
FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
workerConfig,
workerService,
mock(Namespace.class),
membershipManager,
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class)));
// verify restart function/source/sink using different assignment
verifyRestart(functionRuntimeManager, function, "worker-1", false, false);
verifyRestart(functionRuntimeManager, function, "worker-2", false, true);
verifyRestart(functionRuntimeManager, source, "worker-1", false, false);
verifyRestart(functionRuntimeManager, source, "worker-2", false, true);
verifyRestart(functionRuntimeManager, sink, "worker-1", false, false);
verifyRestart(functionRuntimeManager, sink, "worker-2", false, true);
}
}
@Test
public void testKubernetesFunctionInstancesRestart() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
WorkerConfig.KubernetesContainerFactory kubernetesContainerFactory
= new WorkerConfig.KubernetesContainerFactory();
workerConfig.setKubernetesContainerFactory(kubernetesContainerFactory);
try (MockedConstruction<KubernetesRuntimeFactory> mocked = Mockito.mockConstruction(KubernetesRuntimeFactory.class,
(mockedKubernetesRuntimeFactory, context) -> {
doNothing().when(mockedKubernetesRuntimeFactory).initialize(
any(WorkerConfig.class),
any(AuthenticationConfig.class),
any(SecretsProviderConfigurator.class),
any(),
any(),
any(),
any()
);
doNothing().when(mockedKubernetesRuntimeFactory).setupClient();
doReturn(true).when(mockedKubernetesRuntimeFactory).externallyManaged();
})) {
PulsarWorkerService workerService = mock(PulsarWorkerService.class);
// mock pulsarAdmin sources sinks functions
PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class);
Sources sources = mock(Sources.class);
doNothing().when(sources)
.restartSource(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any());
doReturn(sources).when(pulsarAdmin).sources();
Sinks sinks = mock(Sinks.class);
doReturn(sinks).when(pulsarAdmin).sinks();
Functions functions = mock(Functions.class);
doNothing().when(functions)
.restartFunction(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any());
doReturn(functions).when(pulsarAdmin).functions();
doReturn(pulsarAdmin).when(workerService).getFunctionAdmin();
try (final MockedStatic<RuntimeFactory> runtimeFactoryMockedStatic = Mockito
.mockStatic(RuntimeFactory.class);) {
mockRuntimeFactory(runtimeFactoryMockedStatic);
List<WorkerInfo> workerInfos = new LinkedList<>();
workerInfos.add(WorkerInfo.of("worker-1", "localhost", 0));
workerInfos.add(WorkerInfo.of("worker-2", "localhost", 0));
MembershipManager membershipManager = mock(MembershipManager.class);
doReturn(workerInfos).when(membershipManager).getCurrentMembership();
// build three types of FunctionMetaData
Function.FunctionMetaData function = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("function")
.setComponentType(Function.FunctionDetails.ComponentType.FUNCTION)).build();
Function.FunctionMetaData source = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("source")
.setComponentType(Function.FunctionDetails.ComponentType.SOURCE)).build();
Function.FunctionMetaData sink = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("sink")
.setComponentType(Function.FunctionDetails.ComponentType.SINK)).build();
@Cleanup
FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
workerConfig,
workerService,
mock(Namespace.class),
membershipManager,
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class)));
// verify restart function/source/sink using different assignment
verifyRestart(functionRuntimeManager, function, "worker-1", true, false);
verifyRestart(functionRuntimeManager, function, "worker-2", true, true);
verifyRestart(functionRuntimeManager, source, "worker-1", true, false);
verifyRestart(functionRuntimeManager, source, "worker-2", true, true);
verifyRestart(functionRuntimeManager, sink, "worker-1", true, false);
verifyRestart(functionRuntimeManager, sink, "worker-2", true, true);
}
}
}
private static void verifyRestart(FunctionRuntimeManager functionRuntimeManager, Function.FunctionMetaData function,
String workerId, boolean externallyManaged, boolean expectRestartByPulsarAdmin) throws Exception {
Function.Assignment assignment = Function.Assignment.newBuilder()
.setWorkerId(workerId)
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function).setInstanceId(0).build())
.build();
doReturn(List.of(assignment)).when(functionRuntimeManager)
.findFunctionAssignments("test-tenant", "test-namespace", "function");
functionRuntimeManager.restartFunctionInstances("test-tenant", "test-namespace", "function");
if (expectRestartByPulsarAdmin) {
verify(functionRuntimeManager, times(1))
.restartFunctionUsingPulsarAdmin(eq(assignment), eq("test-tenant"),
eq("test-namespace"), eq("function"), eq(externallyManaged));
} else {
verify(functionRuntimeManager).stopFunction(eq(FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance())), eq(true));
}
}
}