| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.system.eventhub; |
| |
| import com.microsoft.azure.eventhubs.*; |
| import org.apache.samza.system.SystemStreamPartition; |
| import org.apache.samza.system.eventhub.consumer.EventHubSystemConsumer; |
| import org.junit.Assert; |
| import org.mockito.stubbing.Answer; |
| import org.powermock.api.mockito.PowerMockito; |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.TimeUnit; |
| |
| import static org.mockito.Matchers.*; |
| |
| public class MockEventHubClientManagerFactory extends EventHubClientManagerFactory { |
| private Map<SystemStreamPartition, List<EventData>> eventData; |
| private Map<String, Map<String, Map<Integer, List<EventData>>>> receivedData; |
| private Map<String, EventPosition> startingOffsets = new HashMap<>(); |
| |
| public MockEventHubClientManagerFactory() { |
| this.receivedData = new HashMap<>(); |
| } |
| |
| public MockEventHubClientManagerFactory(Map<SystemStreamPartition, List<EventData>> eventData) { |
| this.eventData = eventData; |
| } |
| |
| @Override |
| public EventHubClientManager getEventHubClientManager(String systemName, String streamName, EventHubConfig config) { |
| if (receivedData != null) { |
| if (!receivedData.containsKey(systemName)) { |
| receivedData.put(systemName, new HashMap<>()); |
| } |
| |
| if (!receivedData.get(systemName).containsKey(streamName)) { |
| receivedData.get(systemName).put(streamName, new HashMap<>()); |
| receivedData.get(systemName).get(streamName).put(0, new ArrayList<>()); |
| receivedData.get(systemName).get(streamName).put(1, new ArrayList<>()); |
| } |
| } |
| return new MockEventHubClientManager(systemName, streamName); |
| } |
| |
| // Emulate EventHub sending data |
| public void sendToHandlers(Map<SystemStreamPartition, PartitionReceiveHandler> handlers) { |
| if (eventData == null) return; |
| handlers.forEach((ssp, value) -> value.onReceive(eventData.get(ssp))); |
| } |
| |
| public void triggerError(Map<SystemStreamPartition, PartitionReceiveHandler> handlers, Throwable e) { |
| handlers.forEach((ssp, value) -> value.onError(e)); |
| } |
| |
| public EventPosition getPartitionOffset(String partitionId) { |
| return startingOffsets.getOrDefault(partitionId, null); |
| } |
| |
| public List<EventData> getSentData(String systemName, String streamName, Integer partitionId) { |
| if (receivedData.containsKey(systemName) && receivedData.get(systemName).containsKey(streamName)) { |
| return receivedData.get(systemName).get(streamName).get(partitionId); |
| } |
| return null; |
| } |
| |
| private class MockEventHubClientManager implements EventHubClientManager { |
| Boolean initiated = false; |
| EventHubClient mockEventHubClient = PowerMockito.mock(EventHubClient.class); |
| String systemName; |
| String streamName; |
| |
| MockEventHubClientManager(String systemName, String streamName) { |
| this.systemName = systemName; |
| this.streamName = streamName; |
| |
| // Consumer mocks |
| PartitionReceiver mockPartitionReceiver = PowerMockito.mock(PartitionReceiver.class); |
| PowerMockito.when(mockPartitionReceiver.setReceiveHandler(any())).then((Answer<Void>) invocationOnMock -> { |
| PartitionReceiveHandler handler = invocationOnMock.getArgumentAt(0, PartitionReceiveHandler.class); |
| if (handler == null) { |
| Assert.fail("Handler for setReceiverHandler was null"); |
| } |
| return null; |
| }); |
| PartitionRuntimeInformation mockPartitionRuntimeInfo = PowerMockito.mock(PartitionRuntimeInformation.class); |
| PowerMockito.when(mockPartitionRuntimeInfo.getLastEnqueuedOffset()) |
| .thenReturn(EventHubSystemConsumer.START_OF_STREAM); |
| CompletableFuture<PartitionRuntimeInformation> partitionFuture = new MockPartitionFuture(mockPartitionRuntimeInfo); |
| |
| // Producer mocks |
| PartitionSender mockPartitionSender0 = PowerMockito.mock(PartitionSender.class); |
| PartitionSender mockPartitionSender1 = PowerMockito.mock(PartitionSender.class); |
| PowerMockito.when(mockPartitionSender0.send(any(EventData.class))) |
| .then((Answer<CompletableFuture<Void>>) invocationOnMock -> { |
| EventData data = invocationOnMock.getArgumentAt(0, EventData.class); |
| receivedData.get(systemName).get(streamName).get(0).add(data); |
| return new CompletableFuture<>(); |
| }); |
| PowerMockito.when(mockPartitionSender1.send(any(EventData.class))) |
| .then((Answer<CompletableFuture<Void>>) invocationOnMock -> { |
| EventData data = invocationOnMock.getArgumentAt(0, EventData.class); |
| receivedData.get(systemName).get(streamName).get(1).add(data); |
| return new CompletableFuture<>(); |
| }); |
| |
| EventHubRuntimeInformation mockRuntimeInfo = PowerMockito.mock(EventHubRuntimeInformation.class); |
| CompletableFuture<EventHubRuntimeInformation> future = new MockFuture(mockRuntimeInfo); |
| PowerMockito.when(mockRuntimeInfo.getPartitionCount()).thenReturn(2); |
| |
| try { |
| // Consumer calls |
| PowerMockito.when(mockEventHubClient.createReceiver(anyString(), anyString(), anyObject())) |
| .then((Answer<CompletableFuture<PartitionReceiver>>) invocationOnMock -> { |
| String partitionId = invocationOnMock.getArgumentAt(1, String.class); |
| startingOffsets.put(partitionId, EventPosition.fromEndOfStream()); |
| return CompletableFuture.completedFuture(mockPartitionReceiver); |
| }); |
| |
| PowerMockito.when(mockEventHubClient.createReceiver(anyString(), anyString(), anyObject())) |
| .then((Answer<CompletableFuture<PartitionReceiver>>) invocationOnMock -> { |
| String partitionId = invocationOnMock.getArgumentAt(1, String.class); |
| EventPosition offset = invocationOnMock.getArgumentAt(2, EventPosition.class); |
| startingOffsets.put(partitionId, offset); |
| return CompletableFuture.completedFuture(mockPartitionReceiver); |
| }); |
| |
| PowerMockito.when(mockEventHubClient.getPartitionRuntimeInformation(anyString())).thenReturn(partitionFuture); |
| |
| // Producer calls |
| PowerMockito.when(mockEventHubClient.createPartitionSender("0")).thenReturn(CompletableFuture.completedFuture(mockPartitionSender0)); |
| PowerMockito.when(mockEventHubClient.createPartitionSender("1")).thenReturn(CompletableFuture.completedFuture(mockPartitionSender1)); |
| |
| PowerMockito.when(mockEventHubClient.getRuntimeInformation()).thenReturn(future); |
| |
| PowerMockito.when(mockEventHubClient.send(any(EventData.class), anyString())) |
| .then((Answer<CompletableFuture<Void>>) invocationOnMock -> { |
| EventData data = invocationOnMock.getArgumentAt(0, EventData.class); |
| String key = invocationOnMock.getArgumentAt(1, String.class); |
| Integer intKey = Integer.valueOf(key); |
| receivedData.get(systemName).get(streamName).get(intKey % 2).add(data); |
| return new CompletableFuture<>(); |
| }); |
| } catch (Exception e) { |
| Assert.fail("Failed to create create mock methods for EventHubClient"); |
| } |
| } |
| |
| @Override |
| public void init() { |
| initiated = true; |
| } |
| |
| @Override |
| public EventHubClient getEventHubClient() { |
| if (!initiated) { |
| Assert.fail("Should have called init() on EventHubClient before getEventHubClient()"); |
| } |
| return mockEventHubClient; |
| } |
| |
| @Override |
| public void close(long timeoutMS) { |
| if (!initiated) { |
| Assert.fail("Should have called init() on EventHubClient before close()"); |
| } |
| initiated = false; |
| } |
| |
| private class MockFuture extends CompletableFuture<EventHubRuntimeInformation> { |
| EventHubRuntimeInformation runtimeInformation; |
| |
| MockFuture(EventHubRuntimeInformation runtimeInformation) { |
| this.runtimeInformation = runtimeInformation; |
| } |
| |
| @Override |
| public EventHubRuntimeInformation get(long timeout, TimeUnit unit) { |
| return runtimeInformation; |
| } |
| } |
| |
| private class MockPartitionFuture extends CompletableFuture<PartitionRuntimeInformation> { |
| PartitionRuntimeInformation runtimeInformation; |
| |
| MockPartitionFuture(PartitionRuntimeInformation runtimeInformation) { |
| this.runtimeInformation = runtimeInformation; |
| } |
| |
| @Override |
| public PartitionRuntimeInformation get(long timeout, TimeUnit unit) { |
| return runtimeInformation; |
| } |
| } |
| |
| } |
| |
| } |