| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.functions.worker; |
| |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.ArgumentMatchers.anyString; |
| import static org.mockito.Mockito.*; |
| |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.pulsar.client.api.*; |
| import org.apache.pulsar.functions.proto.Function; |
| import org.apache.pulsar.functions.proto.Request; |
| import org.testng.Assert; |
| import org.testng.annotations.Test; |
| |
| public class FunctionMetaDataManagerTest { |
| |
| static byte[] producerByteArray; |
| |
| private static PulsarClient mockPulsarClient() throws PulsarClientException { |
| ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class); |
| when(builder.topic(anyString())).thenReturn(builder); |
| when(builder.producerName(anyString())).thenReturn(builder); |
| when(builder.enableBatching(anyBoolean())).thenReturn(builder); |
| when(builder.blockIfQueueFull(anyBoolean())).thenReturn(builder); |
| when(builder.compressionType(any(CompressionType.class))).thenReturn(builder); |
| when(builder.sendTimeout(anyInt(), any(TimeUnit.class))).thenReturn(builder); |
| when(builder.accessMode(any())).thenReturn(builder); |
| |
| Producer producer = mock(Producer.class); |
| TypedMessageBuilder messageBuilder = mock(TypedMessageBuilder.class); |
| when(messageBuilder.key(anyString())).thenReturn(messageBuilder); |
| doAnswer(invocation -> { |
| Object arg0 = invocation.getArgument(0); |
| FunctionMetaDataManagerTest.producerByteArray = (byte[])arg0; |
| return messageBuilder; |
| }).when(messageBuilder).value(any()); |
| when(messageBuilder.property(anyString(), anyString())).thenReturn(messageBuilder); |
| when(producer.newMessage()).thenReturn(messageBuilder); |
| |
| when(builder.create()).thenReturn(producer); |
| when(builder.createAsync()).thenReturn(CompletableFuture.completedFuture(producer)); |
| |
| PulsarClient client = mock(PulsarClient.class); |
| when(client.newProducer()).thenReturn(builder); |
| |
| return client; |
| } |
| |
| @Test |
| public void testListFunctions() throws PulsarClientException { |
| FunctionMetaDataManager functionMetaDataManager = spy( |
| new FunctionMetaDataManager(new WorkerConfig(), |
| mock(SchedulerManager.class), |
| mockPulsarClient(), ErrorNotifier.getDefaultImpl())); |
| |
| Map<String, Function.FunctionMetaData> functionMetaDataMap1 = new HashMap<>(); |
| Function.FunctionMetaData f1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( |
| Function.FunctionDetails.newBuilder().setName("func-1")).build(); |
| functionMetaDataMap1.put("func-1", f1); |
| Function.FunctionMetaData f2 = Function.FunctionMetaData.newBuilder().setFunctionDetails( |
| Function.FunctionDetails.newBuilder().setName("func-2")).build(); |
| functionMetaDataMap1.put("func-2", f2); |
| Function.FunctionMetaData f3 = Function.FunctionMetaData.newBuilder().setFunctionDetails( |
| Function.FunctionDetails.newBuilder().setName("func-3")).build(); |
| Map<String, Function.FunctionMetaData> functionMetaDataInfoMap2 = new HashMap<>(); |
| functionMetaDataInfoMap2.put("func-3", f3); |
| |
| |
| functionMetaDataManager.functionMetaDataMap.put("tenant-1", new HashMap<>()); |
| functionMetaDataManager.functionMetaDataMap.get("tenant-1").put("namespace-1", functionMetaDataMap1); |
| functionMetaDataManager.functionMetaDataMap.get("tenant-1").put("namespace-2", functionMetaDataInfoMap2); |
| |
| Assert.assertEquals(0, functionMetaDataManager.listFunctions( |
| "tenant", "namespace").size()); |
| Assert.assertEquals(2, functionMetaDataManager.listFunctions( |
| "tenant-1", "namespace-1").size()); |
| Assert.assertTrue(functionMetaDataManager.listFunctions( |
| "tenant-1", "namespace-1").contains(f1)); |
| Assert.assertTrue(functionMetaDataManager.listFunctions( |
| "tenant-1", "namespace-1").contains(f2)); |
| Assert.assertEquals(1, functionMetaDataManager.listFunctions( |
| "tenant-1", "namespace-2").size()); |
| Assert.assertTrue(functionMetaDataManager.listFunctions( |
| "tenant-1", "namespace-2").contains(f3)); |
| } |
| |
| @Test |
| public void testUpdateIfLeaderFunctionWithoutCompaction() throws Exception { |
| testUpdateIfLeaderFunction(false); |
| } |
| |
| @Test |
| public void testUpdateIfLeaderFunctionWithCompaction() throws Exception { |
| testUpdateIfLeaderFunction(true); |
| } |
| |
| private void testUpdateIfLeaderFunction(boolean compact) throws Exception { |
| |
| WorkerConfig workerConfig = new WorkerConfig(); |
| workerConfig.setWorkerId("worker-1"); |
| workerConfig.setUseCompactedMetadataTopic(compact); |
| FunctionMetaDataManager functionMetaDataManager = spy( |
| new FunctionMetaDataManager(workerConfig, |
| mock(SchedulerManager.class), |
| mockPulsarClient(), ErrorNotifier.getDefaultImpl())); |
| Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder() |
| .setVersion(1) |
| .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")).build(); |
| |
| // update when you are not the leader |
| try { |
| functionMetaDataManager.updateFunctionOnLeader(m1, false); |
| Assert.assertTrue(false); |
| } catch (IllegalStateException e) { |
| Assert.assertEquals(e.getMessage(), "Not the leader"); |
| } |
| |
| // become leader |
| Producer<byte[]> exclusiveProducer = functionMetaDataManager.acquireExclusiveWrite(() -> true); |
| functionMetaDataManager.acquireLeadership(exclusiveProducer); |
| // Now w should be able to really update |
| functionMetaDataManager.updateFunctionOnLeader(m1, false); |
| if (compact) { |
| Assert.assertTrue(Arrays.equals(m1.toByteArray(), producerByteArray)); |
| } else { |
| Assert.assertFalse(Arrays.equals(m1.toByteArray(), producerByteArray)); |
| } |
| |
| // outdated request |
| try { |
| functionMetaDataManager.updateFunctionOnLeader(m1, false); |
| Assert.assertTrue(false); |
| } catch (IllegalArgumentException e) { |
| Assert.assertEquals(e.getMessage(), "Update request ignored because it is out of date. Please try again."); |
| } |
| // udpate with new version |
| Function.FunctionMetaData m2 = m1.toBuilder().setVersion(2).build(); |
| functionMetaDataManager.updateFunctionOnLeader(m2, false); |
| if (compact) { |
| Assert.assertTrue(Arrays.equals(m2.toByteArray(), producerByteArray)); |
| } else { |
| Assert.assertFalse(Arrays.equals(m2.toByteArray(), producerByteArray)); |
| } |
| } |
| |
| @Test |
| public void deregisterFunctionWithoutCompaction() throws Exception { |
| deregisterFunction(false); |
| } |
| |
| @Test |
| public void deregisterFunctionWithCompaction() throws Exception { |
| deregisterFunction(true); |
| } |
| |
| private void deregisterFunction(boolean compact) throws Exception { |
| SchedulerManager mockedScheduler = mock(SchedulerManager.class); |
| WorkerConfig workerConfig = new WorkerConfig(); |
| workerConfig.setWorkerId("worker-1"); |
| workerConfig.setUseCompactedMetadataTopic(compact); |
| FunctionMetaDataManager functionMetaDataManager = spy( |
| new FunctionMetaDataManager(workerConfig, |
| mockedScheduler, |
| mockPulsarClient(), ErrorNotifier.getDefaultImpl())); |
| Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder() |
| .setVersion(1) |
| .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1") |
| .setNamespace("namespace-1").setTenant("tenant-1")).build(); |
| |
| // Try deleting when you are not the leader |
| try { |
| functionMetaDataManager.updateFunctionOnLeader(m1, true); |
| Assert.assertTrue(false); |
| } catch (IllegalStateException e) { |
| Assert.assertEquals(e.getMessage(), "Not the leader"); |
| } |
| |
| // become leader |
| Producer<byte[]> exclusiveProducer = functionMetaDataManager.acquireExclusiveWrite(() -> true); |
| functionMetaDataManager.acquireLeadership(exclusiveProducer); |
| verify(mockedScheduler, times(0)).schedule(); |
| // Now try deleting |
| functionMetaDataManager.updateFunctionOnLeader(m1, true); |
| // make sure schedule was not called because function didn't exist. |
| verify(mockedScheduler, times(0)).schedule(); |
| |
| // insert function |
| functionMetaDataManager.updateFunctionOnLeader(m1, false); |
| verify(mockedScheduler, times(1)).schedule(); |
| |
| // outdated request |
| try { |
| functionMetaDataManager.updateFunctionOnLeader(m1, true); |
| Assert.assertTrue(false); |
| } catch (IllegalArgumentException e) { |
| Assert.assertEquals(e.getMessage(), "Delete request ignored because it is out of date. Please try again."); |
| } |
| verify(mockedScheduler, times(1)).schedule(); |
| |
| // udpate with new version |
| m1 = m1.toBuilder().setVersion(2).build(); |
| functionMetaDataManager.updateFunctionOnLeader(m1, true); |
| verify(mockedScheduler, times(2)).schedule(); |
| if (compact) { |
| Assert.assertTrue(Arrays.equals("".getBytes(), producerByteArray)); |
| } else { |
| Assert.assertFalse(Arrays.equals(m1.toByteArray(), producerByteArray)); |
| } |
| } |
| |
| @Test |
| public void testProcessRequest() throws PulsarClientException, IOException { |
| WorkerConfig workerConfig = new WorkerConfig(); |
| FunctionMetaDataManager functionMetaDataManager = spy( |
| new FunctionMetaDataManager(workerConfig, |
| mock(SchedulerManager.class), |
| mockPulsarClient(), ErrorNotifier.getDefaultImpl())); |
| |
| doReturn(true).when(functionMetaDataManager).processUpdate(any(Function.FunctionMetaData.class)); |
| doReturn(true).when(functionMetaDataManager).proccessDeregister(any(Function.FunctionMetaData.class)); |
| |
| Request.ServiceRequest serviceRequest |
| = Request.ServiceRequest.newBuilder().setServiceRequestType( |
| Request.ServiceRequest.ServiceRequestType.UPDATE).build(); |
| Message msg = mock(Message.class); |
| doReturn(serviceRequest.toByteArray()).when(msg).getData(); |
| functionMetaDataManager.processMetaDataTopicMessage(msg); |
| |
| verify(functionMetaDataManager, times(1)).processUpdate |
| (any(Function.FunctionMetaData.class)); |
| verify(functionMetaDataManager).processUpdate(serviceRequest.getFunctionMetaData()); |
| |
| serviceRequest |
| = Request.ServiceRequest.newBuilder().setServiceRequestType( |
| Request.ServiceRequest.ServiceRequestType.INITIALIZE).build(); |
| doReturn(serviceRequest.toByteArray()).when(msg).getData(); |
| functionMetaDataManager.processMetaDataTopicMessage(msg); |
| |
| serviceRequest |
| = Request.ServiceRequest.newBuilder().setServiceRequestType( |
| Request.ServiceRequest.ServiceRequestType.DELETE).build(); |
| doReturn(serviceRequest.toByteArray()).when(msg).getData(); |
| functionMetaDataManager.processMetaDataTopicMessage(msg); |
| |
| verify(functionMetaDataManager, times(1)).proccessDeregister( |
| any(Function.FunctionMetaData.class)); |
| verify(functionMetaDataManager).proccessDeregister(serviceRequest.getFunctionMetaData()); |
| } |
| |
| @Test |
| public void processUpdateTest() throws PulsarClientException { |
| SchedulerManager schedulerManager = mock(SchedulerManager.class); |
| WorkerConfig workerConfig = new WorkerConfig(); |
| workerConfig.setWorkerId("worker-1"); |
| FunctionMetaDataManager functionMetaDataManager = spy( |
| new FunctionMetaDataManager(workerConfig, |
| schedulerManager, |
| mockPulsarClient(), ErrorNotifier.getDefaultImpl())); |
| Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder() |
| .setVersion(1) |
| .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1") |
| .setNamespace("namespace-1").setTenant("tenant-1")).build(); |
| |
| Assert.assertTrue(functionMetaDataManager.processUpdate(m1)); |
| verify(functionMetaDataManager, times(1)) |
| .setFunctionMetaData(any(Function.FunctionMetaData.class)); |
| verify(schedulerManager, times(0)).schedule(); |
| Assert.assertEquals(m1, functionMetaDataManager.functionMetaDataMap.get( |
| "tenant-1").get("namespace-1").get("func-1")); |
| Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get( |
| "tenant-1").get("namespace-1").size()); |
| |
| // outdated request |
| try { |
| functionMetaDataManager.processUpdate(m1); |
| Assert.assertTrue(false); |
| } catch (IllegalArgumentException e) { |
| Assert.assertEquals(e.getMessage(), "Update request ignored because it is out of date. Please try again."); |
| } |
| verify(functionMetaDataManager, times(1)) |
| .setFunctionMetaData(any(Function.FunctionMetaData.class)); |
| verify(schedulerManager, times(0)).schedule(); |
| Assert.assertEquals(m1, functionMetaDataManager.functionMetaDataMap.get( |
| "tenant-1").get("namespace-1").get("func-1")); |
| Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get( |
| "tenant-1").get("namespace-1").size()); |
| |
| // udpate with new version |
| m1 = m1.toBuilder().setVersion(2).build(); |
| Assert.assertTrue(functionMetaDataManager.processUpdate(m1)); |
| verify(functionMetaDataManager, times(2)) |
| .setFunctionMetaData(any(Function.FunctionMetaData.class)); |
| verify(schedulerManager, times(0)).schedule(); |
| Assert.assertEquals(m1, functionMetaDataManager.functionMetaDataMap.get( |
| "tenant-1").get("namespace-1").get("func-1")); |
| Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get( |
| "tenant-1").get("namespace-1").size()); |
| } |
| |
| @Test |
| public void processDeregister() throws PulsarClientException { |
| SchedulerManager schedulerManager = mock(SchedulerManager.class); |
| WorkerConfig workerConfig = new WorkerConfig(); |
| workerConfig.setWorkerId("worker-1"); |
| FunctionMetaDataManager functionMetaDataManager = spy( |
| new FunctionMetaDataManager(workerConfig, |
| schedulerManager, |
| mockPulsarClient(), ErrorNotifier.getDefaultImpl())); |
| Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder() |
| .setVersion(1) |
| .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1") |
| .setNamespace("namespace-1").setTenant("tenant-1")).build(); |
| |
| Assert.assertFalse(functionMetaDataManager.proccessDeregister(m1)); |
| verify(functionMetaDataManager, times(0)) |
| .setFunctionMetaData(any(Function.FunctionMetaData.class)); |
| verify(schedulerManager, times(0)).schedule(); |
| Assert.assertEquals(0, functionMetaDataManager.functionMetaDataMap.size()); |
| |
| // insert something |
| Assert.assertTrue(functionMetaDataManager.processUpdate(m1)); |
| verify(functionMetaDataManager, times(1)) |
| .setFunctionMetaData(any(Function.FunctionMetaData.class)); |
| verify(schedulerManager, times(0)).schedule(); |
| Assert.assertEquals(m1, functionMetaDataManager.functionMetaDataMap.get( |
| "tenant-1").get("namespace-1").get("func-1")); |
| Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get( |
| "tenant-1").get("namespace-1").size()); |
| |
| // outdated delete request |
| try { |
| functionMetaDataManager.proccessDeregister(m1); |
| Assert.assertTrue(false); |
| } catch (IllegalArgumentException e) { |
| Assert.assertEquals(e.getMessage(), "Delete request ignored because it is out of date. Please try again."); |
| } |
| verify(functionMetaDataManager, times(1)) |
| .setFunctionMetaData(any(Function.FunctionMetaData.class)); |
| verify(schedulerManager, times(0)).schedule(); |
| Assert.assertEquals(m1, functionMetaDataManager.functionMetaDataMap.get( |
| "tenant-1").get("namespace-1").get("func-1")); |
| Assert.assertEquals(1, functionMetaDataManager.functionMetaDataMap.get( |
| "tenant-1").get("namespace-1").size()); |
| |
| // delete now |
| m1 = m1.toBuilder().setVersion(2).build(); |
| Assert.assertTrue(functionMetaDataManager.proccessDeregister(m1)); |
| verify(functionMetaDataManager, times(1)) |
| .setFunctionMetaData(any(Function.FunctionMetaData.class)); |
| verify(schedulerManager, times(0)).schedule(); |
| Assert.assertEquals(0, functionMetaDataManager.functionMetaDataMap.get( |
| "tenant-1").get("namespace-1").size()); |
| } |
| } |