blob: 4214364ce88d9d2212986f9c18e8a4cc97e3db84 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntime;
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig;
import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.IObjectFactory;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@PrepareForTest({FunctionRuntimeManager.class, RuntimeFactory.class})
@Slf4j
@PowerMockIgnore({
"javax.management.*",
"javax.ws.*",
"org.apache.logging.log4j.*",
"org.apache.pulsar.functions.runtime.thread"
})
public class FunctionRuntimeManagerTest {
@ObjectFactory
public IObjectFactory getObjectFactory() {
return new org.powermock.modules.testng.PowerMockObjectFactory();
}
@Test
public void testProcessAssignmentUpdateAddFunctions() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal().convertValue(
new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
PulsarClient pulsarClient = mock(PulsarClient.class);
ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
doReturn(mock(Reader.class)).when(readerBuilder).create();
PulsarWorkerService workerService = mock(PulsarWorkerService.class);
doReturn(pulsarClient).when(workerService).getClient();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
mockStatic(RuntimeFactory.class);
PowerMockito.when(RuntimeFactory.getFuntionRuntimeFactory(eq(ThreadRuntimeFactory.class.getName())))
.thenReturn(new ThreadRuntimeFactory());
// test new assignment add functions
FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
workerConfig,
workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class)));
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
functionRuntimeManager.setFunctionActioner(functionActioner);
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();
Function.Assignment assignment1 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function1).setInstanceId(0).build())
.build();
Function.Assignment assignment2 = Function.Assignment.newBuilder()
.setWorkerId("worker-2")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function2).setInstanceId(0).build())
.build();
List<Function.Assignment> assignments = new LinkedList<>();
assignments.add(assignment1);
assignments.add(assignment2);
functionRuntimeManager.processAssignment(assignment1);
functionRuntimeManager.processAssignment(assignment2);
verify(functionRuntimeManager, times(2)).setAssignment(any(Function.Assignment.class));
verify(functionRuntimeManager, times(0)).deleteAssignment(any(Function.Assignment.class));
assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 2);
assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-1").get("test-tenant/test-namespace/func-1:0"), assignment1);
assertEquals(functionRuntimeManager.workerIdToAssignments.get("worker-2")
.get("test-tenant/test-namespace/func-2:0"), assignment2);
verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner).startFunction(argThat(
functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function1)));
verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class));
assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 1);
assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"),
new FunctionRuntimeInfo().setFunctionInstance(
Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
.build()));
}
@Test
public void testProcessAssignmentUpdateDeleteFunctions() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal().convertValue(
new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
PulsarClient pulsarClient = mock(PulsarClient.class);
ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
doReturn(mock(Reader.class)).when(readerBuilder).create();
PulsarWorkerService workerService = mock(PulsarWorkerService.class);
doReturn(pulsarClient).when(workerService).getClient();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
mockStatic(RuntimeFactory.class);
PowerMockito.when(RuntimeFactory.getFuntionRuntimeFactory(eq(ThreadRuntimeFactory.class.getName())))
.thenReturn(new ThreadRuntimeFactory());
// test new assignment delete functions
FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
workerConfig,
workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class)));
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
functionRuntimeManager.setFunctionActioner(functionActioner);
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();
// Delete this assignment
Function.Assignment assignment1 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function1).setInstanceId(0).build())
.build();
Function.Assignment assignment2 = Function.Assignment.newBuilder()
.setWorkerId("worker-2")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function2).setInstanceId(0).build())
.build();
// add existing assignments
functionRuntimeManager.setAssignment(assignment1);
functionRuntimeManager.setAssignment(assignment2);
reset(functionRuntimeManager);
functionRuntimeManager.functionRuntimeInfos.put(
"test-tenant/test-namespace/func-1:0", new FunctionRuntimeInfo().setFunctionInstance(
Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
.build()));
functionRuntimeManager.processAssignment(assignment1);
functionRuntimeManager.processAssignment(assignment2);
functionRuntimeManager.deleteAssignment(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance()));
verify(functionRuntimeManager, times(0)).setAssignment(any(Function.Assignment.class));
verify(functionRuntimeManager, times(1)).deleteAssignment(any(String.class));
assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-2").get("test-tenant/test-namespace/func-2:0"), assignment2);
verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner, times(1)).terminateFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner).terminateFunction(argThat(
functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function1)));
assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 0);
}
@Test
public void testProcessAssignmentUpdateModifyFunctions() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal().convertValue(
new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
PulsarClient pulsarClient = mock(PulsarClient.class);
ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
doReturn(mock(Reader.class)).when(readerBuilder).create();
PulsarWorkerService workerService = mock(PulsarWorkerService.class);
doReturn(pulsarClient).when(workerService).getClient();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
mockStatic(RuntimeFactory.class);
PowerMockito.when(RuntimeFactory.getFuntionRuntimeFactory(eq(ThreadRuntimeFactory.class.getName())))
.thenReturn(new ThreadRuntimeFactory());
// test new assignment update functions
FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
workerConfig,
workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
functionRuntimeManager.setFunctionActioner(functionActioner);
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();
Function.Assignment assignment1 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function1).setInstanceId(0).build())
.build();
Function.Assignment assignment2 = Function.Assignment.newBuilder()
.setWorkerId("worker-2")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function2).setInstanceId(0).build())
.build();
// add existing assignments
functionRuntimeManager.setAssignment(assignment1);
functionRuntimeManager.setAssignment(assignment2);
reset(functionActioner);
Function.Assignment assignment3 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function2).setInstanceId(0).build())
.build();
functionRuntimeManager.functionRuntimeInfos.put(
"test-tenant/test-namespace/func-1:0", new FunctionRuntimeInfo().setFunctionInstance(
Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
.build()));
functionRuntimeManager.functionRuntimeInfos.put(
"test-tenant/test-namespace/func-2:0", new FunctionRuntimeInfo().setFunctionInstance(
Function.Instance.newBuilder().setFunctionMetaData(function2).setInstanceId(0)
.build()));
functionRuntimeManager.processAssignment(assignment1);
functionRuntimeManager.processAssignment(assignment3);
verify(functionActioner, times(1)).stopFunction(any(FunctionRuntimeInfo.class));
// make sure terminate is not called since this is a update operation
verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner).stopFunction(argThat(
functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function2)));
verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner).startFunction(argThat(
functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function2)));
assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 2);
assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-1").get("test-tenant/test-namespace/func-1:0"), assignment1);
assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-1").get("test-tenant/test-namespace/func-2:0"), assignment3);
reset(functionActioner);
// add a stop
Function.FunctionMetaData.Builder function2StoppedBldr = function2.toBuilder();
function2StoppedBldr.putInstanceStates(0, Function.FunctionState.STOPPED);
Function.FunctionMetaData function2Stopped = function2StoppedBldr.build();
Function.Assignment assignment4 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function2Stopped).setInstanceId(0).build())
.build();
functionRuntimeManager.processAssignment(assignment4);
verify(functionActioner, times(1)).stopFunction(any(FunctionRuntimeInfo.class));
// make sure terminate is not called since this is a update operation
verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner).stopFunction(argThat(functionRuntimeInfo ->
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function2)));
verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class));
assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 2);
assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-1").get("test-tenant/test-namespace/func-1:0"), assignment1);
assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-1").get("test-tenant/test-namespace/func-2:0"), assignment4);
}
@Test
public void testReassignment() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal().convertValue(
new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
PulsarClient pulsarClient = mock(PulsarClient.class);
ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
doReturn(mock(Reader.class)).when(readerBuilder).create();
PulsarWorkerService workerService = mock(PulsarWorkerService.class);
doReturn(pulsarClient).when(workerService).getClient();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
mockStatic(RuntimeFactory.class);
PowerMockito.when(RuntimeFactory.getFuntionRuntimeFactory(eq(ThreadRuntimeFactory.class.getName())))
.thenReturn(new ThreadRuntimeFactory());
// test new assignment update functions
FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
workerConfig,
workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
functionRuntimeManager.setFunctionActioner(functionActioner);
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
Function.Assignment assignment1 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function1).setInstanceId(0).build())
.build();
/** Test transfer from me to other worker **/
// add existing assignments
functionRuntimeManager.setAssignment(assignment1);
// new assignment with different worker
Function.Assignment assignment2 = Function.Assignment.newBuilder()
.setWorkerId("worker-2")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function1).setInstanceId(0).build())
.build();
FunctionRuntimeInfo functionRuntimeInfo = new FunctionRuntimeInfo().setFunctionInstance(
Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
.build());
functionRuntimeManager.functionRuntimeInfos.put(
"test-tenant/test-namespace/func-1:0", functionRuntimeInfo);
functionRuntimeManager.processAssignment(assignment2);
verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner, times(1)).stopFunction(any(FunctionRuntimeInfo.class));
assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-2").get("test-tenant/test-namespace/func-1:0"), assignment2);
assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 0);
assertNull(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"));
/** Test transfer from other worker to me **/
reset(functionActioner);
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
functionRuntimeManager.setFunctionActioner(functionActioner);
Function.Assignment assignment3 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function1).setInstanceId(0).build())
.build();
functionRuntimeManager.processAssignment(assignment3);
verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class));
assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-1").get("test-tenant/test-namespace/func-1:0"), assignment3);
assertNull(functionRuntimeManager.workerIdToAssignments
.get("worker-2"));
assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 1);
assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"), functionRuntimeInfo);
}
@Test
public void testRuntimeManagerInitialize() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal().convertValue(
new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();
Function.Assignment assignment1 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function1).setInstanceId(0).build())
.build();
Function.Assignment assignment2 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function2).setInstanceId(0).build())
.build();
Function.Assignment assignment3 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function2).setInstanceId(0).build())
.build();
List<Message<byte[]>> messageList = new LinkedList<>();
MessageMetadata metadata = new MessageMetadata();
MessageId messageId1 = new MessageIdImpl(0, 1, -1);
Message message1 = spy(new MessageImpl("foo", messageId1.toString(),
new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null, metadata));
doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey();
MessageId messageId2 = new MessageIdImpl(0, 2, -1);
Message message2 = spy(new MessageImpl("foo", messageId2.toString(),
new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null, metadata));
doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey();
// delete function2
MessageId messageId3 = new MessageIdImpl(0, 3, -1);
Message message3 = spy(new MessageImpl("foo", messageId3.toString(),
new HashMap<>(), Unpooled.copiedBuffer("".getBytes()), null, metadata));
doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment3.getInstance())).when(message3).getKey();
messageList.add(message1);
messageList.add(message2);
messageList.add(message3);
PulsarClient pulsarClient = mock(PulsarClient.class);
Reader<byte[]> reader = mock(Reader.class);
Iterator<Message<byte[]>> it = messageList.iterator();
when(reader.readNext()).thenAnswer(new Answer<Message<byte[]>>() {
@Override
public Message<byte[]> answer(InvocationOnMock invocationOnMock) throws Throwable {
return it.next();
}
});
when(reader.readNextAsync()).thenAnswer(new Answer<CompletableFuture<Message<byte[]>>>() {
@Override
public CompletableFuture<Message<byte[]>> answer(InvocationOnMock invocationOnMock) throws Throwable {
return new CompletableFuture<>();
}
});
when(reader.hasMessageAvailable()).thenAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
return it.hasNext();
}
});
ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).readerName(anyString());
doReturn(readerBuilder).when(readerBuilder).subscriptionRolePrefix(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
doReturn(reader).when(readerBuilder).create();
PulsarWorkerService workerService = mock(PulsarWorkerService.class);
doReturn(pulsarClient).when(workerService).getClient();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
ErrorNotifier errorNotifier = mock(ErrorNotifier.class);
mockStatic(RuntimeFactory.class);
PowerMockito.when(RuntimeFactory.getFuntionRuntimeFactory(eq(ThreadRuntimeFactory.class.getName())))
.thenReturn(new ThreadRuntimeFactory());
// test new assignment add functions
FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
workerConfig,
workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
errorNotifier);
FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner());
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
functionRuntimeManager.setFunctionActioner(functionActioner);
assertEquals(functionRuntimeManager.initialize(), messageId3);
assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class));
// verify stop function is called zero times because we don't want to unnecessarily restart any functions during initialization
verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner).startFunction(argThat(functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance().equals(assignment1.getInstance())));
assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 1);
assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"),
new FunctionRuntimeInfo().setFunctionInstance(
Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
.build()));
// verify no errors occurred
verify(errorNotifier, times(0)).triggerError(any());
}
@Test
public void testExternallyManagedRuntimeUpdate() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setFunctionRuntimeFactoryClassName(KubernetesRuntimeFactory.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal()
.convertValue(new KubernetesRuntimeFactoryConfig()
.setSubmittingInsidePod(false), Map.class));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setPulsarFunctionsCluster("cluster");
PulsarClient pulsarClient = mock(PulsarClient.class);
ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
doReturn(mock(Reader.class)).when(readerBuilder).create();
PulsarWorkerService workerService = mock(PulsarWorkerService.class);
doReturn(pulsarClient).when(workerService).getClient();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
KubernetesRuntimeFactory kubernetesRuntimeFactory = mock(KubernetesRuntimeFactory.class);
doNothing().when(kubernetesRuntimeFactory).initialize(
any(WorkerConfig.class),
any(AuthenticationConfig.class),
any(SecretsProviderConfigurator.class),
any(),
any(),
any()
);
doNothing().when(kubernetesRuntimeFactory).setupClient();
doReturn(true).when(kubernetesRuntimeFactory).externallyManaged();
KubernetesRuntime kubernetesRuntime = mock(KubernetesRuntime.class);
doReturn(kubernetesRuntime).when(kubernetesRuntimeFactory).createContainer(any(), any(), any(), any());
FunctionActioner functionActioner = spy(new FunctionActioner(
workerConfig,
kubernetesRuntimeFactory, null, null, null, null));
mockStatic(RuntimeFactory.class);
PowerMockito.when(RuntimeFactory.getFuntionRuntimeFactory(anyString()))
.thenReturn(kubernetesRuntimeFactory);
// test new assignment update functions
FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
workerConfig,
workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
functionRuntimeManager.setFunctionActioner(functionActioner);
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
.setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath("path").build())
.setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
Function.Assignment assignment1 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function1).setInstanceId(0).build())
.build();
/** Test transfer from me to other worker **/
// add existing assignments
functionRuntimeManager.setAssignment(assignment1);
// new assignment with different worker
Function.Assignment assignment2 = Function.Assignment.newBuilder()
.setWorkerId("worker-2")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function1).setInstanceId(0).build())
.build();
Function.Instance instance = Function.Instance.newBuilder()
.setFunctionMetaData(function1).setInstanceId(0).build();
FunctionRuntimeInfo functionRuntimeInfo = new FunctionRuntimeInfo()
.setFunctionInstance(instance)
.setRuntimeSpawner(functionActioner.getRuntimeSpawner(instance, function1.getPackageLocation().getPackagePath()));
functionRuntimeManager.functionRuntimeInfos.put(
"test-tenant/test-namespace/func-1:0", functionRuntimeInfo);
functionRuntimeManager.processAssignment(assignment2);
// make sure nothing is called
verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class));
assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-2").get("test-tenant/test-namespace/func-1:0"), assignment2);
assertNull(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"));
/** Test transfer from other worker to me **/
Function.Assignment assignment3 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function1).setInstanceId(0).build())
.build();
functionRuntimeManager.processAssignment(assignment3);
// make sure nothing is called
verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class));
assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-1").get("test-tenant/test-namespace/func-1:0"), assignment3);
assertNull(functionRuntimeManager.workerIdToAssignments
.get("worker-2"));
assertEquals(
functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0").getFunctionInstance(),
functionRuntimeInfo.getFunctionInstance());
assertNotNull(
functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner());
assertEquals(
functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner().getInstanceConfig().getFunctionDetails(),
function1.getFunctionDetails());
assertEquals(
functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner().getInstanceConfig().getInstanceId(),
instance.getInstanceId());
assertTrue(
functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner().getRuntimeFactory() instanceof KubernetesRuntimeFactory);
assertNotNull(
functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner().getRuntime());
verify(kubernetesRuntime, times(1)).reinitialize();
}
@Test
public void testFunctionRuntimeSetCorrectly() {
// Function runtime not set
try {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
new FunctionRuntimeManager(
workerConfig,
mock(PulsarWorkerService.class),
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
fail();
} catch (Exception e) {
assertEquals(e.getMessage(), "A Function Runtime Factory needs to be set");
}
// Function runtime class not found
try {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setFunctionRuntimeFactoryClassName("foo");
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal().convertValue(new KubernetesRuntimeFactoryConfig(), Map.class));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
new FunctionRuntimeManager(
workerConfig,
mock(PulsarWorkerService.class),
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
fail();
} catch (Exception e) {
assertEquals(e.getCause().getClass(), ClassNotFoundException.class);
}
// Function runtime class does not implement correct interface
try {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setFunctionRuntimeFactoryClassName(FunctionRuntimeManagerTest.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal().convertValue(new KubernetesRuntimeFactoryConfig(), Map.class));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
new FunctionRuntimeManager(
workerConfig,
mock(PulsarWorkerService.class),
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
fail();
} catch (Exception e) {
assertEquals(e.getMessage(), "org.apache.pulsar.functions.worker.FunctionRuntimeManagerTest does not implement org.apache.pulsar.functions.runtime.RuntimeFactory");
}
// Correct runtime class
try {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal().convertValue(new KubernetesRuntimeFactoryConfig(), Map.class));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
mockStatic(RuntimeFactory.class);
PowerMockito.when(RuntimeFactory.getFuntionRuntimeFactory(eq(ThreadRuntimeFactory.class.getName())))
.thenReturn(new ThreadRuntimeFactory());
FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
workerConfig,
mock(PulsarWorkerService.class),
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class);
} catch (Exception e) {
log.error("Failed to initialize the runtime manager : ", e);
fail();
}
}
@Test
public void testFunctionRuntimeFactoryConfigsBackwardsCompatibility() throws Exception {
// Test kubernetes runtime
WorkerConfig.KubernetesContainerFactory kubernetesContainerFactory
= new WorkerConfig.KubernetesContainerFactory();
kubernetesContainerFactory.setK8Uri("k8Uri");
kubernetesContainerFactory.setJobNamespace("jobNamespace");
kubernetesContainerFactory.setJobName("jobName");
kubernetesContainerFactory.setPulsarDockerImageName("pulsarDockerImageName");
kubernetesContainerFactory.setImagePullPolicy("imagePullPolicy");
kubernetesContainerFactory.setPulsarRootDir("pulsarRootDir");
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setKubernetesContainerFactory(kubernetesContainerFactory);
KubernetesRuntimeFactory mockedKubernetesRuntimeFactory = spy(new KubernetesRuntimeFactory());
doNothing().when(mockedKubernetesRuntimeFactory).initialize(
any(WorkerConfig.class),
any(AuthenticationConfig.class),
any(SecretsProviderConfigurator.class),
any(),
any(),
any()
);
doNothing().when(mockedKubernetesRuntimeFactory).setupClient();
doReturn(true).when(mockedKubernetesRuntimeFactory).externallyManaged();
PowerMockito.whenNew(KubernetesRuntimeFactory.class)
.withNoArguments().thenReturn(mockedKubernetesRuntimeFactory);
FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
workerConfig,
mock(PulsarWorkerService.class),
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
KubernetesRuntimeFactory kubernetesRuntimeFactory = (KubernetesRuntimeFactory) functionRuntimeManager.getRuntimeFactory();
assertEquals(kubernetesRuntimeFactory.getK8Uri(), "k8Uri");
assertEquals(kubernetesRuntimeFactory.getJobNamespace(), "jobNamespace");
assertEquals(kubernetesRuntimeFactory.getPulsarDockerImageName(), "pulsarDockerImageName");
assertEquals(kubernetesRuntimeFactory.getImagePullPolicy(), "imagePullPolicy");
assertEquals(kubernetesRuntimeFactory.getPulsarRootDir(), "pulsarRootDir");
// Test process runtime
WorkerConfig.ProcessContainerFactory processContainerFactory
= new WorkerConfig.ProcessContainerFactory();
processContainerFactory.setExtraFunctionDependenciesDir("extraDependenciesDir");
processContainerFactory.setLogDirectory("logDirectory");
processContainerFactory.setPythonInstanceLocation("pythonInstanceLocation");
processContainerFactory.setJavaInstanceJarLocation("javaInstanceJarLocation");
workerConfig = new WorkerConfig();
workerConfig.setProcessContainerFactory(processContainerFactory);
functionRuntimeManager = new FunctionRuntimeManager(
workerConfig,
mock(PulsarWorkerService.class),
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ProcessRuntimeFactory.class);
ProcessRuntimeFactory processRuntimeFactory = (ProcessRuntimeFactory) functionRuntimeManager.getRuntimeFactory();
assertEquals(processRuntimeFactory.getExtraDependenciesDir(), "extraDependenciesDir");
assertEquals(processRuntimeFactory.getLogDirectory(), "logDirectory/functions");
assertEquals(processRuntimeFactory.getPythonInstanceFile(), "pythonInstanceLocation");
assertEquals(processRuntimeFactory.getJavaInstanceJarFile(), "javaInstanceJarLocation");
// Test thread runtime
WorkerConfig.ThreadContainerFactory threadContainerFactory
= new WorkerConfig.ThreadContainerFactory();
threadContainerFactory.setThreadGroupName("threadGroupName");
workerConfig = new WorkerConfig();
workerConfig.setThreadContainerFactory(threadContainerFactory);
functionRuntimeManager = new FunctionRuntimeManager(
workerConfig,
mock(PulsarWorkerService.class),
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class));
assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class);
ThreadRuntimeFactory threadRuntimeFactory = (ThreadRuntimeFactory) functionRuntimeManager.getRuntimeFactory();
assertEquals(threadRuntimeFactory.getThreadGroup().getName(), "threadGroupName");
}
}