| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.coordinator.stream; |
| |
| import org.apache.samza.SamzaException; |
| import org.apache.samza.config.Config; |
| import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage; |
| import org.apache.samza.execution.JobPlanner; |
| import org.apache.samza.serializers.model.SamzaObjectMapper; |
| import org.apache.samza.system.OutgoingMessageEnvelope; |
| import org.codehaus.jackson.type.TypeReference; |
| import org.junit.Test; |
| |
| import java.lang.reflect.Field; |
| import java.lang.reflect.InvocationTargetException; |
| import java.lang.reflect.Method; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import static org.junit.Assert.*; |
| |
| /** |
| * This class is a unit test for the CoordinatorStreamWriter class. |
| */ |
| public class TestCoordinatorStreamWriter { |
| |
| private CoordinatorStreamWriter coordinatorStreamWriter; |
| private MockCoordinatorStreamSystemFactory.MockSystemProducer systemProducer; |
| |
| @Test |
| public void testCoordinatorStream() { |
| |
| Map<String, String> userConfigs = new HashMap<>(); |
| userConfigs.put("systems.coordinatorStreamWriter.samza.factory", "org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory"); |
| userConfigs.put("app.name", "coordinator-stream-writer-test"); |
| userConfigs.put("job.coordinator.system", "coordinatorStreamWriter"); |
| Config generatedConfig = JobPlanner.generateSingleJobConfig(userConfigs); |
| coordinatorStreamWriter = new CoordinatorStreamWriter(generatedConfig); |
| boolean exceptionHappened = false; |
| |
| try { |
| |
| //get coordinator system producer |
| Field coordinatorProducerField = coordinatorStreamWriter.getClass().getDeclaredField("coordinatorStreamSystemProducer"); |
| coordinatorProducerField.setAccessible(true); |
| assertNotNull(coordinatorProducerField.get(coordinatorStreamWriter)); |
| CoordinatorStreamSystemProducer coordinatorStreamSystemProducer = (CoordinatorStreamSystemProducer) coordinatorProducerField.get(coordinatorStreamWriter); |
| |
| //get mock system producer |
| Field systemProducerField = coordinatorStreamSystemProducer.getClass().getDeclaredField("systemProducer"); |
| systemProducerField.setAccessible(true); |
| systemProducer = (MockCoordinatorStreamSystemFactory.MockSystemProducer) systemProducerField.get(coordinatorStreamSystemProducer); |
| |
| testStart(); |
| testSendMessage(); |
| testStop(); |
| |
| |
| } catch (NoSuchFieldException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { |
| e.printStackTrace(); |
| exceptionHappened = true; |
| } |
| |
| assertFalse(exceptionHappened); |
| |
| |
| } |
| |
| |
| public void testStart() throws NoSuchFieldException, IllegalAccessException { |
| |
| //checks before starting |
| assertFalse(systemProducer.isStarted()); |
| |
| //start and check if start has been done successfully |
| coordinatorStreamWriter.start(); |
| assertTrue(systemProducer.isStarted()); |
| |
| } |
| |
| public void testStop() throws NoSuchFieldException, IllegalAccessException { |
| |
| //checks before stopping |
| assertTrue(systemProducer.isStarted()); |
| |
| //stop and check if stop has been done correctly |
| coordinatorStreamWriter.stop(); |
| assertTrue(systemProducer.isStopped()); |
| } |
| |
| @SuppressWarnings({ "unchecked", "rawtypes" }) |
| public void testSendMessage() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { |
| |
| //check a correct message |
| assertEquals(0, systemProducer.getEnvelopes().size()); |
| coordinatorStreamWriter.sendMessage("set-config", "key0", "value0"); |
| assertEquals(1, systemProducer.getEnvelopes().size()); |
| |
| //check invalid input is handled |
| boolean exceptionHappened = false; |
| try { |
| coordinatorStreamWriter.sendMessage("invalid-type", "key-invalid", "value-invalid"); |
| } catch (IllegalArgumentException e) { |
| exceptionHappened = true; |
| } |
| assertTrue(exceptionHappened); |
| assertEquals(1, systemProducer.getEnvelopes().size()); |
| |
| |
| //check sendSetConfigMessage method works correctly |
| Class[] sendArgs = {String.class, String.class}; |
| Method sendSetConfigMethod = coordinatorStreamWriter.getClass().getDeclaredMethod("sendSetConfigMessage", sendArgs); |
| sendSetConfigMethod.setAccessible(true); |
| sendSetConfigMethod.invoke(coordinatorStreamWriter, "key1", "value1"); |
| assertEquals(2, systemProducer.getEnvelopes().size()); |
| |
| |
| //check the messages are correct |
| List<OutgoingMessageEnvelope> envelopes = systemProducer.getEnvelopes(); |
| OutgoingMessageEnvelope envelope0 = envelopes.get(0); |
| OutgoingMessageEnvelope envelope1 = envelopes.get(1); |
| TypeReference<Object[]> keyRef = new TypeReference<Object[]>() { |
| }; |
| TypeReference<Map<String, Object>> msgRef = new TypeReference<Map<String, Object>>() { |
| }; |
| assertEquals(2, envelopes.size()); |
| |
| assertEquals("key0", deserialize((byte[]) envelope0.getKey(), keyRef)[CoordinatorStreamMessage.KEY_INDEX]); |
| Map<String, String> values = (Map<String, String>) deserialize((byte[]) envelope0.getMessage(), msgRef).get("values"); |
| assertEquals("value0", values.get("value")); |
| |
| assertEquals("key1", deserialize((byte[]) envelope1.getKey(), keyRef)[CoordinatorStreamMessage.KEY_INDEX]); |
| values = (Map<String, String>) deserialize((byte[]) envelope1.getMessage(), msgRef).get("values"); |
| assertEquals("value1", values.get("value")); |
| } |
| |
| private <T> T deserialize(byte[] bytes, TypeReference<T> reference) { |
| try { |
| if (bytes != null) { |
| String valueStr = new String((byte[]) bytes, "UTF-8"); |
| return SamzaObjectMapper.getObjectMapper().readValue(valueStr, reference); |
| } |
| } catch (Exception e) { |
| throw new SamzaException(e); |
| } |
| |
| return null; |
| } |
| |
| } |
| |