blob: 57f2dca4922d633d00eab72d68f34180b673ecba [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
public class LeaderServiceTest {
private final WorkerConfig workerConfig;
private LeaderService leaderService;
private PulsarClientImpl mockClient;
AtomicReference<ConsumerEventListener> listenerHolder;
private ConsumerImpl mockConsumer;
private FunctionAssignmentTailer functionAssignmentTailer;
private SchedulerManager schedulerManager;
private FunctionRuntimeManager functionRuntimeManager;
private FunctionMetaDataManager functionMetadataManager;
private CompletableFuture metadataManagerInitFuture;
private CompletableFuture runtimeManagerInitFuture;
private CompletableFuture readToTheEndAndExitFuture;
private MembershipManager membershipManager;
public LeaderServiceTest() {
this.workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal().convertValue(
new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setWorkerPort(1234);
}
@BeforeMethod
public void setup() throws PulsarClientException {
mockClient = mock(PulsarClientImpl.class);
mockConsumer = mock(ConsumerImpl.class);
ConsumerBuilder<byte[]> mockConsumerBuilder = mock(ConsumerBuilder.class);
when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder);
when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder);
when(mockConsumerBuilder.property(anyString(), anyString())).thenReturn(mockConsumerBuilder);
when(mockConsumerBuilder.consumerName(anyString())).thenReturn(mockConsumerBuilder);
when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
WorkerService workerService = mock(WorkerService.class);
doReturn(workerConfig).when(workerService).getWorkerConfig();
listenerHolder = new AtomicReference<>();
when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenAnswer(invocationOnMock -> {
ConsumerEventListener listener = invocationOnMock.getArgument(0);
listenerHolder.set(listener);
return mockConsumerBuilder;
});
when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
schedulerManager = mock(SchedulerManager.class);
functionAssignmentTailer = mock(FunctionAssignmentTailer.class);
readToTheEndAndExitFuture = mock(CompletableFuture.class);
when(functionAssignmentTailer.triggerReadToTheEndAndExit()).thenReturn(readToTheEndAndExitFuture);
functionRuntimeManager = mock(FunctionRuntimeManager.class);
functionMetadataManager = mock(FunctionMetaDataManager.class);
metadataManagerInitFuture = mock(CompletableFuture.class);
runtimeManagerInitFuture = mock(CompletableFuture.class);
when(functionMetadataManager.getIsInitialized()).thenReturn(metadataManagerInitFuture);
when(functionRuntimeManager.getIsInitialized()).thenReturn(runtimeManagerInitFuture);
membershipManager = mock(MembershipManager.class);
leaderService = spy(new LeaderService(workerService, mockClient, functionAssignmentTailer, schedulerManager,
functionRuntimeManager, functionMetadataManager, membershipManager, ErrorNotifier.getDefaultImpl()));
leaderService.start();
}
@Test
public void testLeaderService() throws Exception {
MessageId messageId = new MessageIdImpl(1, 2, -1);
when(schedulerManager.getLastMessageProduced()).thenReturn(messageId);
assertFalse(leaderService.isLeader());
verify(mockClient, times(1)).newConsumer();
listenerHolder.get().becameActive(mockConsumer, 0);
assertTrue(leaderService.isLeader());
verify(functionMetadataManager, times(1)).getIsInitialized();
verify(metadataManagerInitFuture, times(1)).get();
verify(functionRuntimeManager, times(1)).getIsInitialized();
verify(runtimeManagerInitFuture, times(1)).get();
verify(functionMetadataManager, times(1)).acquireExclusiveWrite(any());
verify(schedulerManager, times(1)).acquireExclusiveWrite(any());
verify(functionAssignmentTailer, times(1)).triggerReadToTheEndAndExit();
verify(functionAssignmentTailer, times(1)).close();
verify(schedulerManager, times((1))).initialize(any());
listenerHolder.get().becameInactive(mockConsumer, 0);
assertFalse(leaderService.isLeader());
verify(functionAssignmentTailer, times(1)).startFromMessage(messageId);
verify(schedulerManager, times(1)).close();
verify(functionMetadataManager, times(1)).giveupLeadership();
}
@Test
public void testLeaderServiceNoNewScheduling() throws Exception {
when(schedulerManager.getLastMessageProduced()).thenReturn(null);
assertFalse(leaderService.isLeader());
verify(mockClient, times(1)).newConsumer();
listenerHolder.get().becameActive(mockConsumer, 0);
assertTrue(leaderService.isLeader());
verify(functionMetadataManager, times(1)).acquireExclusiveWrite(any());
verify(schedulerManager, times(1)).acquireExclusiveWrite(any());
verify(functionAssignmentTailer, times(1)).triggerReadToTheEndAndExit();
verify(readToTheEndAndExitFuture, times(1)).get();
verify(functionAssignmentTailer, times(1)).close();
verify(schedulerManager, times((1))).initialize(any());
listenerHolder.get().becameInactive(mockConsumer, 0);
assertFalse(leaderService.isLeader());
verify(functionAssignmentTailer, times(1)).start();
verify(schedulerManager, times(1)).close();
verify(functionMetadataManager, times(1)).giveupLeadership();
}
@Test
public void testAcquireScheduleManagerExclusiveProducerNotLeaderAnymore() throws Exception {
MessageId messageId = new MessageIdImpl(1, 2, -1);
when(schedulerManager.getLastMessageProduced()).thenReturn(messageId);
assertFalse(leaderService.isLeader());
verify(mockClient, times(1)).newConsumer();
// acquire exclusive producer failed because no leader anymore
when(schedulerManager.acquireExclusiveWrite(any())).thenThrow(new WorkerUtils.NotLeaderAnymore());
listenerHolder.get().becameActive(mockConsumer, 0);
// should have failed to become leader
assertFalse(leaderService.isLeader());
verify(functionMetadataManager, times(1)).getIsInitialized();
verify(metadataManagerInitFuture, times(1)).get();
verify(functionRuntimeManager, times(1)).getIsInitialized();
verify(runtimeManagerInitFuture, times(1)).get();
verify(schedulerManager, times(1)).acquireExclusiveWrite(any());
verify(functionMetadataManager, times(0)).acquireExclusiveWrite(any());
verify(functionAssignmentTailer, times(0)).triggerReadToTheEndAndExit();
verify(functionAssignmentTailer, times(0)).close();
verify(schedulerManager, times((0))).initialize(any());
listenerHolder.get().becameInactive(mockConsumer, 0);
assertFalse(leaderService.isLeader());
verify(functionAssignmentTailer, times(0)).startFromMessage(messageId);
verify(functionAssignmentTailer, times(0)).start();
verify(schedulerManager, times(0)).close();
verify(functionMetadataManager, times(0)).giveupLeadership();
}
@Test
public void testAcquireFunctionMetadataManagerExclusiveProducerNotLeaderAnymore() throws Exception {
MessageId messageId = new MessageIdImpl(1, 2, -1);
when(schedulerManager.getLastMessageProduced()).thenReturn(messageId);
assertFalse(leaderService.isLeader());
verify(mockClient, times(1)).newConsumer();
// acquire exclusive producer failed because no leader anymore
when(functionMetadataManager.acquireExclusiveWrite(any())).thenThrow(new WorkerUtils.NotLeaderAnymore());
listenerHolder.get().becameActive(mockConsumer, 0);
// should have failed to become leader
assertFalse(leaderService.isLeader());
verify(functionMetadataManager, times(1)).getIsInitialized();
verify(metadataManagerInitFuture, times(1)).get();
verify(functionRuntimeManager, times(1)).getIsInitialized();
verify(runtimeManagerInitFuture, times(1)).get();
verify(schedulerManager, times(1)).acquireExclusiveWrite(any());
verify(functionMetadataManager, times(1)).acquireExclusiveWrite(any());
verify(functionAssignmentTailer, times(0)).triggerReadToTheEndAndExit();
verify(functionAssignmentTailer, times(0)).close();
verify(schedulerManager, times((0))).initialize(any());
listenerHolder.get().becameInactive(mockConsumer, 0);
assertFalse(leaderService.isLeader());
verify(functionAssignmentTailer, times(0)).startFromMessage(messageId);
verify(functionAssignmentTailer, times(0)).start();
verify(schedulerManager, times(0)).close();
verify(functionMetadataManager, times(0)).giveupLeadership();
}
}