blob: bbb70d27db26f6c16dca4019ee379ac738f3ec42 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for THE
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.operators;
import java.io.IOException;
import java.io.NotSerializableException;
import java.io.ObjectInputStream;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import org.apache.samza.SamzaException;
import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
import org.apache.samza.operators.functions.ScheduledFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OperatorSpecTestUtils;
import org.apache.samza.operators.spec.OperatorSpecs;
import org.apache.samza.operators.spec.OutputOperatorSpec;
import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.spec.SinkOperatorSpec;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.serializers.NoOpSerde;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
* Unit tests for {@link OperatorSpecGraph}
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(OperatorSpec.class)
public class TestOperatorSpecGraph {
private StreamApplicationDescriptorImpl mockAppDesc;
private Map<String, InputOperatorSpec> inputOpSpecMap;
private Map<String, OutputStreamImpl> outputStrmMap;
private Set<OperatorSpec> allOpSpecs;
@Before
public void setUp() {
this.mockAppDesc = mock(StreamApplicationDescriptorImpl.class);
/**
* Setup two linear transformation pipelines:
* 1) input1 --> filter --> sendTo
* 2) input2 --> map --> sink
*/
String inputStreamId1 = "test-input-1";
String outputStreamId = "test-output-1";
InputOperatorSpec testInput = new InputOperatorSpec(inputStreamId1, new NoOpSerde(), new NoOpSerde(), null, true, inputStreamId1);
StreamOperatorSpec filterOp = OperatorSpecs.createFilterOperatorSpec(m -> true, "test-filter-2");
OutputStreamImpl outputStream1 = new OutputStreamImpl(outputStreamId, null, null, true);
OutputOperatorSpec outputSpec = OperatorSpecs.createSendToOperatorSpec(outputStream1, "test-output-3");
testInput.registerNextOperatorSpec(filterOp);
filterOp.registerNextOperatorSpec(outputSpec);
String streamId2 = "test-input-2";
InputOperatorSpec testInput2 = new InputOperatorSpec(streamId2, new NoOpSerde(), new NoOpSerde(), null, true, "test-input-4");
StreamOperatorSpec testMap = OperatorSpecs.createMapOperatorSpec(m -> m, "test-map-5");
SinkOperatorSpec testSink = OperatorSpecs.createSinkOperatorSpec((m, mc, tc) -> { }, "test-sink-6");
testInput2.registerNextOperatorSpec(testMap);
testMap.registerNextOperatorSpec(testSink);
this.inputOpSpecMap = new LinkedHashMap<>();
inputOpSpecMap.put(inputStreamId1, testInput);
inputOpSpecMap.put(streamId2, testInput2);
this.outputStrmMap = new LinkedHashMap<>();
outputStrmMap.put(outputStreamId, outputStream1);
when(mockAppDesc.getInputOperators()).thenReturn(Collections.unmodifiableMap(inputOpSpecMap));
when(mockAppDesc.getOutputStreams()).thenReturn(Collections.unmodifiableMap(outputStrmMap));
this.allOpSpecs = new HashSet<OperatorSpec>() { {
this.add(testInput);
this.add(filterOp);
this.add(outputSpec);
this.add(testInput2);
this.add(testMap);
this.add(testSink);
} };
}
@After
public void tearDown() {
this.mockAppDesc = null;
this.inputOpSpecMap = null;
this.outputStrmMap = null;
this.allOpSpecs = null;
}
@Test
public void testConstructor() {
OperatorSpecGraph specGraph = new OperatorSpecGraph(mockAppDesc);
assertEquals(specGraph.getInputOperators(), inputOpSpecMap);
assertEquals(specGraph.getOutputStreams(), outputStrmMap);
assertTrue(!specGraph.hasWindowOrJoins());
assertEquals(specGraph.getAllOperatorSpecs(), this.allOpSpecs);
}
@Test
public void testClone() {
OperatorSpecGraph operatorSpecGraph = new OperatorSpecGraph(mockAppDesc);
OperatorSpecGraph clonedSpecGraph = operatorSpecGraph.clone();
OperatorSpecTestUtils.assertClonedGraph(operatorSpecGraph, clonedSpecGraph);
}
@Test(expected = NotSerializableException.class)
public void testCloneWithSerializationError() throws Throwable {
OperatorSpec mockFailedOpSpec = PowerMockito.mock(OperatorSpec.class);
when(mockFailedOpSpec.getOpId()).thenReturn("test-failed-op-4");
allOpSpecs.add(mockFailedOpSpec);
inputOpSpecMap.values().stream().findFirst().get().registerNextOperatorSpec(mockFailedOpSpec);
//failed with serialization error
try {
new OperatorSpecGraph(mockAppDesc);
fail("Should have failed with serialization error");
} catch (SamzaException se) {
throw se.getCause();
}
}
@Test(expected = IOException.class)
public void testCloneWithDeserializationError() throws Throwable {
TestDeserializeOperatorSpec testOp = new TestDeserializeOperatorSpec(OperatorSpec.OpCode.MAP, "test-failed-op-4");
this.allOpSpecs.add(testOp);
inputOpSpecMap.values().stream().findFirst().get().registerNextOperatorSpec(testOp);
OperatorSpecGraph operatorSpecGraph = new OperatorSpecGraph(mockAppDesc);
//failed with serialization error
try {
operatorSpecGraph.clone();
fail("Should have failed with serialization error");
} catch (SamzaException se) {
throw se.getCause();
}
}
private static class TestDeserializeOperatorSpec extends OperatorSpec {
public TestDeserializeOperatorSpec(OpCode opCode, String opId) {
super(opCode, opId);
}
private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
throw new IOException("Raise IOException to cause deserialization failure");
}
@Override
public WatermarkFunction getWatermarkFn() {
return null;
}
@Override
public ScheduledFunction getScheduledFn() {
return null;
}
}
}