blob: e0a4f1adb9145b60cebd4c8710de02922c95b5fb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.operators.spec;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.Scheduler;
import org.apache.samza.operators.data.TestMessageEnvelope;
import org.apache.samza.operators.data.TestOutputMessageEnvelope;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.functions.ScheduledFunction;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.StringSerde;
import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;
import static org.junit.Assert.*;
/**
* Test for all {@link OperatorSpec}
*/
public class TestOperatorSpec {
private static class MapWithWatermarkFn implements MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope>, WatermarkFunction<TestOutputMessageEnvelope> {
@Override
public Collection<TestOutputMessageEnvelope> processWatermark(long watermark) {
return null;
}
@Override
public Long getOutputWatermark() {
return null;
}
@Override
public TestOutputMessageEnvelope apply(TestMessageEnvelope m) {
return new TestOutputMessageEnvelope(m.getKey(), m.getMessage().hashCode());
}
}
private static class MapWithScheduledFn implements MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope>,
ScheduledFunction<String, TestOutputMessageEnvelope> {
@Override
public TestOutputMessageEnvelope apply(TestMessageEnvelope m) {
return new TestOutputMessageEnvelope(m.getKey(), m.getMessage().hashCode());
}
@Override
public void schedule(Scheduler<String> scheduler) {
}
@Override
public Collection<TestOutputMessageEnvelope> onCallback(String key, long timestamp) {
return null;
}
}
private static class MapWithEnum implements MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> {
private OperatorSpecTestUtils.TestEnum type;
MapWithEnum(OperatorSpecTestUtils.TestEnum type) {
this.type = type;
}
OperatorSpecTestUtils.TestEnum getType() {
return this.type;
}
void setType(OperatorSpecTestUtils.TestEnum type) {
this.type = type;
}
@Override
public TestOutputMessageEnvelope apply(TestMessageEnvelope m) {
return new TestOutputMessageEnvelope(m.getKey(), m.getMessage().hashCode());
}
}
private static class TestJoinFunction implements JoinFunction<String, Object, Object, TestOutputMessageEnvelope> {
@Override
public TestOutputMessageEnvelope apply(Object message, Object otherMessage) {
return new TestOutputMessageEnvelope(message.toString(), message.hashCode() + otherMessage.hashCode());
}
@Override
public String getFirstKey(Object message) {
return message.toString();
}
@Override
public String getSecondKey(Object message) {
return message.toString();
}
}
private static class TestStreamTableJoinFunction implements StreamTableJoinFunction<String, Object, Object, TestOutputMessageEnvelope> {
@Override
public TestOutputMessageEnvelope apply(Object message, Object record) {
return new TestOutputMessageEnvelope(message.toString(), message.hashCode() + record.hashCode());
}
@Override
public String getMessageKey(Object message) {
return message.toString();
}
@Override
public String getRecordKey(Object record) {
return record.toString();
}
}
@Test
public void testStreamOperatorSpecWithFlatMap() {
FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> flatMap = m -> {
List<TestOutputMessageEnvelope> result = new ArrayList<>();
result.add(new TestOutputMessageEnvelope(m.getKey(), m.getMessage().hashCode()));
return result;
};
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> streamOperatorSpec =
OperatorSpecs.createFlatMapOperatorSpec(flatMap, "op0");
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> cloneOperatorSpec =
(StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) OperatorSpecTestUtils.copyOpSpec(streamOperatorSpec);
assertNotEquals(streamOperatorSpec, cloneOperatorSpec);
assertTrue(streamOperatorSpec.isClone(cloneOperatorSpec));
assertNotEquals(streamOperatorSpec.getTransformFn(), cloneOperatorSpec.getTransformFn());
assertTrue(cloneOperatorSpec.getTransformFn() instanceof FlatMapFunction);
assertNull(streamOperatorSpec.getWatermarkFn());
assertNull(cloneOperatorSpec.getWatermarkFn());
assertNull(streamOperatorSpec.getScheduledFn());
assertNull(cloneOperatorSpec.getScheduledFn());
}
@Test
public void testStreamOperatorSpecWithMap() {
MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mapFn =
m -> new TestOutputMessageEnvelope(m.getKey(), m.getMessage().hashCode());
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> streamOperatorSpec =
OperatorSpecs.createMapOperatorSpec(mapFn, "op0");
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> cloneOperatorSpec =
(StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) OperatorSpecTestUtils.copyOpSpec(streamOperatorSpec);
assertNotEquals(streamOperatorSpec, cloneOperatorSpec);
assertTrue(streamOperatorSpec.isClone(cloneOperatorSpec));
MapFunction userFn = (MapFunction) Whitebox.getInternalState(streamOperatorSpec, "mapFn");
assertEquals(userFn, mapFn);
assertNotEquals(streamOperatorSpec.getTransformFn(), cloneOperatorSpec.getTransformFn());
MapFunction clonedUserFn = (MapFunction) Whitebox.getInternalState(cloneOperatorSpec, "mapFn");
assertTrue(cloneOperatorSpec.getTransformFn() instanceof FlatMapFunction);
assertTrue(clonedUserFn instanceof MapFunction);
assertNotEquals(userFn, clonedUserFn);
assertNull(streamOperatorSpec.getWatermarkFn());
assertNull(cloneOperatorSpec.getWatermarkFn());
assertNull(streamOperatorSpec.getScheduledFn());
assertNull(cloneOperatorSpec.getScheduledFn());
}
@Test
public void testStreamOperatorSpecWithFilter() {
FilterFunction<TestMessageEnvelope> filterFn = m -> m.getKey().equals("key1");
StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> streamOperatorSpec =
OperatorSpecs.createFilterOperatorSpec(filterFn, "op0");
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> cloneOperatorSpec =
(StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) OperatorSpecTestUtils.copyOpSpec(streamOperatorSpec);
assertNotEquals(streamOperatorSpec, cloneOperatorSpec);
assertTrue(streamOperatorSpec.isClone(cloneOperatorSpec));
FilterFunction userFn = (FilterFunction) Whitebox.getInternalState(streamOperatorSpec, "filterFn");
assertEquals(userFn, filterFn);
assertNotEquals(streamOperatorSpec.getTransformFn(), cloneOperatorSpec.getTransformFn());
FilterFunction clonedUserFn = (FilterFunction) Whitebox.getInternalState(cloneOperatorSpec, "filterFn");
assertTrue(cloneOperatorSpec.getTransformFn() instanceof FlatMapFunction);
assertTrue(clonedUserFn instanceof FilterFunction);
assertNotEquals(userFn, clonedUserFn);
assertNull(streamOperatorSpec.getWatermarkFn());
assertNull(cloneOperatorSpec.getWatermarkFn());
assertNull(streamOperatorSpec.getScheduledFn());
assertNull(cloneOperatorSpec.getScheduledFn());
}
@Test
public void testInputOperatorSpec() {
Serde<Object> objSerde = new Serde<Object>() {
@Override
public Object fromBytes(byte[] bytes) {
return null;
}
@Override
public byte[] toBytes(Object object) {
return new byte[0];
}
};
InputOperatorSpec inputOperatorSpec = new InputOperatorSpec(
"mockStreamId", new StringSerde("UTF-8"), objSerde, null, true, "op0");
InputOperatorSpec inputOpCopy = (InputOperatorSpec) OperatorSpecTestUtils.copyOpSpec(inputOperatorSpec);
assertNotEquals("Expected deserialized copy of operator spec should not be the same as the original operator spec", inputOperatorSpec, inputOpCopy);
assertTrue(inputOperatorSpec.isClone(inputOpCopy));
}
@Test
public void testOutputOperatorSpec() {
Serde<Object> objSerde = new Serde<Object>() {
@Override
public Object fromBytes(byte[] bytes) {
return null;
}
@Override
public byte[] toBytes(Object object) {
return new byte[0];
}
};
OutputStreamImpl<KV<String, Object>> outputStrmImpl = new OutputStreamImpl<>("mockStreamId", new StringSerde("UTF-8"), objSerde, true);
OutputOperatorSpec<KV<String, Object>> outputOperatorSpec = new OutputOperatorSpec<>(outputStrmImpl, "op0");
OutputOperatorSpec<KV<String, Object>> outputOpCopy = (OutputOperatorSpec<KV<String, Object>>) OperatorSpecTestUtils
.copyOpSpec(outputOperatorSpec);
assertNotEquals("Expected deserialized copy of operator spec should not be the same as the original operator spec", outputOperatorSpec, outputOpCopy);
assertTrue(outputOperatorSpec.isClone(outputOpCopy));
}
@Test
public void testSinkOperatorSpec() {
SinkFunction<TestMessageEnvelope> sinkFn = (m, c, tc) -> System.out.print(m.toString());
SinkOperatorSpec<TestMessageEnvelope> sinkOpSpec = new SinkOperatorSpec<>(sinkFn, "op0");
SinkOperatorSpec<TestMessageEnvelope> sinkOpCopy = (SinkOperatorSpec<TestMessageEnvelope>) OperatorSpecTestUtils.copyOpSpec(sinkOpSpec);
assertNotEquals("Expected deserialized copy of operator spec should not be the same as the original operator spec", sinkOpSpec, sinkOpCopy);
assertTrue(sinkOpSpec.isClone(sinkOpCopy));
}
@Test
public void testJoinOperatorSpec() {
InputOperatorSpec leftOpSpec = new InputOperatorSpec(
"test-input-1", new NoOpSerde<>(), new NoOpSerde<>(), null, false, "op0");
InputOperatorSpec rightOpSpec = new InputOperatorSpec(
"test-input-2", new NoOpSerde<>(), new NoOpSerde<>(), null, false, "op1");
Serde<Object> objSerde = new Serde<Object>() {
@Override
public Object fromBytes(byte[] bytes) {
return null;
}
@Override
public byte[] toBytes(Object object) {
return new byte[0];
}
};
JoinFunction<String, Object, Object, TestOutputMessageEnvelope> joinFn = new TestJoinFunction();
JoinOperatorSpec<String, Object, Object, TestOutputMessageEnvelope> joinOperatorSpec =
new JoinOperatorSpec<>(leftOpSpec, rightOpSpec, joinFn, new StringSerde("UTF-8"), objSerde, objSerde, 50000, "op2");
JoinOperatorSpec<String, Object, Object, TestOutputMessageEnvelope> joinOpCopy =
(JoinOperatorSpec<String, Object, Object, TestOutputMessageEnvelope>) OperatorSpecTestUtils.copyOpSpec(joinOperatorSpec);
assertNotEquals("Expected deserialized copy of operator spec should not be the same as the original operator spec", joinOperatorSpec, joinOpCopy);
assertTrue(joinOperatorSpec.isClone(joinOpCopy));
assertTrue(joinOpCopy.getLeftInputOpSpec().isClone(leftOpSpec));
assertTrue(joinOpCopy.getRightInputOpSpec().isClone(rightOpSpec));
}
@Test
public void testStreamTableJoinOperatorSpec() {
StreamTableJoinFunction<String, Object, Object, TestOutputMessageEnvelope> joinFn = new TestStreamTableJoinFunction();
String tableId = "t1";
StreamTableJoinOperatorSpec<String, Object, Object, TestOutputMessageEnvelope> joinOperatorSpec =
new StreamTableJoinOperatorSpec<>(tableId, joinFn, "join-3", 1, null, "3");
StreamTableJoinOperatorSpec<String, Object, Object, TestOutputMessageEnvelope> joinOpSpecCopy =
(StreamTableJoinOperatorSpec<String, Object, Object, TestOutputMessageEnvelope>) OperatorSpecTestUtils.copyOpSpec(joinOperatorSpec);
assertNotEquals(joinOpSpecCopy, joinOperatorSpec);
assertEquals(joinOpSpecCopy.getOpId(), joinOperatorSpec.getOpId());
assertEquals(joinOpSpecCopy.getTableId(), joinOperatorSpec.getTableId());
assertArrayEquals(joinOpSpecCopy.getArgs(), joinOperatorSpec.getArgs());
}
@Test
public void testSendToTableOperatorSpec() {
String tableId = "t1";
SendToTableOperatorSpec<String, Integer> sendOpSpec =
new SendToTableOperatorSpec<>(tableId, "output-1", 1, null, "3");
SendToTableOperatorSpec<String, Integer> sendToCopy = (SendToTableOperatorSpec<String, Integer>) OperatorSpecTestUtils
.copyOpSpec(sendOpSpec);
assertNotEquals(sendToCopy, sendOpSpec);
assertEquals(sendToCopy.getOpId(), sendOpSpec.getOpId());
assertArrayEquals(sendToCopy.getArgs(), sendOpSpec.getArgs());
assertTrue(sendToCopy.getTableId().equals(sendOpSpec.getTableId()));
}
@Test
public void testBroadcastOperatorSpec() {
OutputStreamImpl<TestOutputMessageEnvelope> outputStream =
new OutputStreamImpl<>("output-0", new StringSerde("UTF-8"), new JsonSerdeV2<TestOutputMessageEnvelope>(), true);
BroadcastOperatorSpec<TestOutputMessageEnvelope> broadcastOpSpec = new BroadcastOperatorSpec<>(outputStream, "broadcast-1");
BroadcastOperatorSpec<TestOutputMessageEnvelope> broadcastOpCopy = (BroadcastOperatorSpec<TestOutputMessageEnvelope>) OperatorSpecTestUtils
.copyOpSpec(broadcastOpSpec);
assertNotEquals(broadcastOpCopy, broadcastOpSpec);
assertEquals(broadcastOpCopy.getOpId(), broadcastOpSpec.getOpId());
assertTrue(broadcastOpCopy.getOutputStream() != broadcastOpSpec.getOutputStream());
assertEquals(broadcastOpCopy.getOutputStream().getStreamId(), broadcastOpSpec.getOutputStream().getStreamId());
assertEquals(broadcastOpCopy.getOutputStream().isKeyed(), broadcastOpSpec.getOutputStream().isKeyed());
}
@Test
public void testMapStreamOperatorSpecWithWatermark() {
MapWithWatermarkFn testMapFn = new MapWithWatermarkFn();
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> streamOperatorSpec =
OperatorSpecs.createMapOperatorSpec(testMapFn, "op0");
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> cloneOperatorSpec =
(StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) OperatorSpecTestUtils.copyOpSpec(streamOperatorSpec);
assertNotEquals(streamOperatorSpec, cloneOperatorSpec);
assertTrue(streamOperatorSpec.isClone(cloneOperatorSpec));
assertNotEquals(streamOperatorSpec.getTransformFn(), cloneOperatorSpec.getTransformFn());
assertEquals(streamOperatorSpec.getWatermarkFn(), testMapFn);
assertNotNull(cloneOperatorSpec.getWatermarkFn());
assertNotEquals(cloneOperatorSpec.getTransformFn(), cloneOperatorSpec.getWatermarkFn());
assertNull(streamOperatorSpec.getScheduledFn());
assertNull(cloneOperatorSpec.getScheduledFn());
}
@Test
public void testMapStreamOperatorSpecWithScheduledFunction() {
MapWithScheduledFn testMapFn = new MapWithScheduledFn();
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> streamOperatorSpec =
OperatorSpecs.createMapOperatorSpec(testMapFn, "op0");
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> cloneOperatorSpec =
(StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) OperatorSpecTestUtils.copyOpSpec(streamOperatorSpec);
assertNotEquals(streamOperatorSpec, cloneOperatorSpec);
assertTrue(streamOperatorSpec.isClone(cloneOperatorSpec));
assertNotEquals(streamOperatorSpec.getTransformFn(), cloneOperatorSpec.getTransformFn());
assertNull(streamOperatorSpec.getWatermarkFn());
assertNull(cloneOperatorSpec.getWatermarkFn());
assertNotEquals(cloneOperatorSpec.getTransformFn(), cloneOperatorSpec.getWatermarkFn());
assertEquals(streamOperatorSpec.getScheduledFn(), testMapFn);
assertNotNull(cloneOperatorSpec.getScheduledFn());
assertNotEquals(streamOperatorSpec.getScheduledFn(), cloneOperatorSpec.getScheduledFn());
}
@Test
public void testStreamOperatorSpecWithMapAndListInClosure() {
List<Integer> integers = new ArrayList<>(1);
integers.add(0, 100);
List<String> keys = new ArrayList<>(1);
keys.add(0, "test-1");
MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mapFn =
m -> new TestOutputMessageEnvelope(keys.get(m.getKey().hashCode() % 1), integers.get(m.getMessage().hashCode() % 1));
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> streamOperatorSpec =
OperatorSpecs.createMapOperatorSpec(mapFn, "op0");
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> cloneOperatorSpec =
(StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) OperatorSpecTestUtils.copyOpSpec(streamOperatorSpec);
assertNotEquals(streamOperatorSpec, cloneOperatorSpec);
assertTrue(streamOperatorSpec.isClone(cloneOperatorSpec));
MapFunction userFn = (MapFunction) Whitebox.getInternalState(streamOperatorSpec, "mapFn");
assertEquals(userFn, mapFn);
assertNotEquals(streamOperatorSpec.getTransformFn(), cloneOperatorSpec.getTransformFn());
MapFunction clonedUserFn = (MapFunction) Whitebox.getInternalState(cloneOperatorSpec, "mapFn");
assertTrue(cloneOperatorSpec.getTransformFn() instanceof FlatMapFunction);
assertTrue(clonedUserFn instanceof MapFunction);
assertNotEquals(userFn, clonedUserFn);
// verify changing the values in the original keys and integers list will change the result of the original map function
TestMessageEnvelope mockImsg = new TestMessageEnvelope("input-key-x", new String("value-x"));
assertEquals(((MapFunction) userFn).apply(mockImsg), new TestOutputMessageEnvelope("test-1", 100));
integers.set(0, 200);
keys.set(0, "test-2");
assertEquals(((MapFunction) userFn).apply(mockImsg), new TestOutputMessageEnvelope("test-2", 200));
// verify that the cloned map function uses a different copy of lists and still yields the same result
assertEquals(((MapFunction) clonedUserFn).apply(mockImsg), new TestOutputMessageEnvelope("test-1", 100));
}
@Test
public void testStreamOperatorSpecWithMapWithFunctionReference() {
MapFunction<KV<String, Object>, Object> mapFn = KV::getValue;
StreamOperatorSpec<KV<String, Object>, Object> streamOperatorSpec =
OperatorSpecs.createMapOperatorSpec(mapFn, "op0");
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> cloneOperatorSpec =
(StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) OperatorSpecTestUtils.copyOpSpec(streamOperatorSpec);
assertNotEquals(streamOperatorSpec, cloneOperatorSpec);
assertTrue(streamOperatorSpec.isClone(cloneOperatorSpec));
MapFunction userFn = (MapFunction) Whitebox.getInternalState(streamOperatorSpec, "mapFn");
assertEquals(userFn, mapFn);
assertNotEquals(streamOperatorSpec.getTransformFn(), cloneOperatorSpec.getTransformFn());
MapFunction clonedUserFn = (MapFunction) Whitebox.getInternalState(cloneOperatorSpec, "mapFn");
assertTrue(cloneOperatorSpec.getTransformFn() instanceof FlatMapFunction);
assertTrue(clonedUserFn instanceof MapFunction);
assertNotEquals(userFn, clonedUserFn);
}
@Test
public void testStreamOperatorSpecWithMapWithEnum() {
MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mapFn = new MapWithEnum(OperatorSpecTestUtils.TestEnum.One);
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> streamOperatorSpec =
OperatorSpecs.createMapOperatorSpec(mapFn, "op0");
assertTrue(streamOperatorSpec instanceof MapOperatorSpec);
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> cloneOperatorSpec =
(StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) OperatorSpecTestUtils.copyOpSpec(streamOperatorSpec);
assertNotEquals(streamOperatorSpec, cloneOperatorSpec);
assertTrue(streamOperatorSpec.isClone(cloneOperatorSpec));
MapFunction userFn = (MapFunction) Whitebox.getInternalState(streamOperatorSpec, "mapFn");
assertEquals(userFn, mapFn);
assertNotEquals(streamOperatorSpec.getTransformFn(), cloneOperatorSpec.getTransformFn());
MapFunction clonedUserFn = (MapFunction) Whitebox.getInternalState(cloneOperatorSpec, "mapFn");
assertTrue(cloneOperatorSpec.getTransformFn() instanceof FlatMapFunction);
assertTrue(clonedUserFn instanceof MapWithEnum);
assertNotEquals(userFn, clonedUserFn);
// originally the types should be the same
assertTrue(((MapWithEnum) userFn).getType() == ((MapWithEnum) clonedUserFn).getType());
// after changing the type of the cloned user function, the types are different now
((MapWithEnum) clonedUserFn).setType(OperatorSpecTestUtils.TestEnum.Two);
assertTrue(((MapWithEnum) userFn).getType() != ((MapWithEnum) clonedUserFn).getType());
}
}