blob: d1fab98b4f406038d42bdbb75b1f7e87d7f45a34 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.application.descriptors;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.context.ApplicationContainerContextFactory;
import org.apache.samza.context.ApplicationTaskContextFactory;
import org.apache.samza.operators.TableImpl;
import org.apache.samza.operators.data.TestMessageEnvelope;
import org.apache.samza.system.descriptors.ExpandingInputDescriptorProvider;
import org.apache.samza.system.descriptors.GenericInputDescriptor;
import org.apache.samza.system.descriptors.GenericOutputDescriptor;
import org.apache.samza.system.descriptors.GenericSystemDescriptor;
import org.apache.samza.system.descriptors.InputDescriptor;
import org.apache.samza.system.descriptors.SystemDescriptor;
import org.apache.samza.system.descriptors.TransformingInputDescriptorProvider;
import org.apache.samza.system.descriptors.InputTransformer;
import org.apache.samza.system.descriptors.StreamExpander;
import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec.OpCode;
import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.table.descriptors.BaseTableDescriptor;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Unit test for {@link StreamApplicationDescriptorImpl}
*/
public class TestStreamApplicationDescriptorImpl {
@Test
public void testConstructor() {
StreamApplication mockApp = mock(StreamApplication.class);
Config mockConfig = getConfig();
StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(mockApp, mockConfig);
verify(mockApp).describe(appDesc);
assertEquals(mockConfig, appDesc.getConfig());
}
@Test
public void testGetInputStreamWithValueSerde() {
String streamId = "test-stream-1";
Serde mockValueSerde = mock(Serde.class);
GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mockValueSerde);
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getInputStream(isd);
}, getConfig());
InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
assertEquals(streamId, inputOpSpec.getStreamId());
assertEquals(isd, streamAppDesc.getInputDescriptors().get(streamId));
assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
}
@Test
public void testGetInputStreamWithKeyValueSerde() {
String streamId = "test-stream-1";
KVSerde mockKVSerde = mock(KVSerde.class);
Serde mockKeySerde = mock(Serde.class);
Serde mockValueSerde = mock(Serde.class);
doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mockKVSerde);
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getInputStream(isd);
}, getConfig());
InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
assertEquals(streamId, inputOpSpec.getStreamId());
assertEquals(isd, streamAppDesc.getInputDescriptors().get(streamId));
assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
}
@Test(expected = IllegalArgumentException.class)
public void testGetInputStreamWithNullSerde() {
GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
GenericInputDescriptor isd = sd.getInputDescriptor("mockStreamId", null);
new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getInputStream(isd);
}, getConfig());
}
@Test
public void testGetInputStreamWithTransformFunction() {
String streamId = "test-stream-1";
Serde mockValueSerde = mock(Serde.class);
InputTransformer transformer = ime -> ime;
MockTransformingSystemDescriptor sd = new MockTransformingSystemDescriptor("mockSystem", transformer);
MockInputDescriptor isd = sd.getInputDescriptor(streamId, mockValueSerde);
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getInputStream(isd);
}, getConfig());
InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
assertEquals(streamId, inputOpSpec.getStreamId());
assertEquals(isd, streamAppDesc.getInputDescriptors().get(streamId));
assertEquals(transformer, inputOpSpec.getTransformer());
}
@Test
public void testGetInputStreamWithExpandingSystem() {
String streamId = "test-stream-1";
String expandedStreamId = "expanded-stream";
AtomicInteger expandCallCount = new AtomicInteger();
StreamExpander expander = (sg, isd) -> {
expandCallCount.incrementAndGet();
InputDescriptor expandedISD =
new GenericSystemDescriptor("expanded-system", "mockFactoryClass")
.getInputDescriptor(expandedStreamId, new IntegerSerde());
return sg.getInputStream(expandedISD);
};
MockExpandingSystemDescriptor sd = new MockExpandingSystemDescriptor("mock-system", expander);
MockInputDescriptor isd = sd.getInputDescriptor(streamId, new IntegerSerde());
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getInputStream(isd);
}, getConfig());
InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(expandedStreamId);
assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
assertEquals(1, expandCallCount.get());
assertFalse(streamAppDesc.getInputOperators().containsKey(streamId));
assertFalse(streamAppDesc.getInputDescriptors().containsKey(streamId));
assertTrue(streamAppDesc.getInputDescriptors().containsKey(expandedStreamId));
assertEquals(expandedStreamId, inputOpSpec.getStreamId());
}
@Test
public void testGetInputStreamWithRelaxedTypes() {
String streamId = "test-stream-1";
GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mock(Serde.class));
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getInputStream(isd);
}, getConfig());
InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
assertEquals(streamId, inputOpSpec.getStreamId());
assertEquals(isd, streamAppDesc.getInputDescriptors().get(streamId));
}
@Test
public void testMultipleGetInputStreams() {
String streamId1 = "test-stream-1";
String streamId2 = "test-stream-2";
GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
GenericInputDescriptor isd1 = sd.getInputDescriptor(streamId1, mock(Serde.class));
GenericInputDescriptor isd2 = sd.getInputDescriptor(streamId2, mock(Serde.class));
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getInputStream(isd1);
appDesc.getInputStream(isd2);
}, getConfig());
InputOperatorSpec inputOpSpec1 = streamAppDesc.getInputOperators().get(streamId1);
InputOperatorSpec inputOpSpec2 = streamAppDesc.getInputOperators().get(streamId2);
assertEquals(2, streamAppDesc.getInputOperators().size());
assertEquals(streamId1, inputOpSpec1.getStreamId());
assertEquals(streamId2, inputOpSpec2.getStreamId());
assertEquals(2, streamAppDesc.getInputDescriptors().size());
assertEquals(isd1, streamAppDesc.getInputDescriptors().get(streamId1));
assertEquals(isd2, streamAppDesc.getInputDescriptors().get(streamId2));
}
@Test(expected = IllegalStateException.class)
public void testGetSameInputStreamTwice() {
String streamId = "test-stream-1";
GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
GenericInputDescriptor isd1 = sd.getInputDescriptor(streamId, mock(Serde.class));
GenericInputDescriptor isd2 = sd.getInputDescriptor(streamId, mock(Serde.class));
new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getInputStream(isd1);
// should throw exception
appDesc.getInputStream(isd2);
}, getConfig());
}
@Test
public void testMultipleSystemDescriptorForSameSystemName() {
GenericSystemDescriptor sd1 = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
GenericSystemDescriptor sd2 = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
GenericInputDescriptor isd1 = sd1.getInputDescriptor("test-stream-1", mock(Serde.class));
GenericInputDescriptor isd2 = sd2.getInputDescriptor("test-stream-2", mock(Serde.class));
GenericOutputDescriptor osd1 = sd2.getOutputDescriptor("test-stream-3", mock(Serde.class));
new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getInputStream(isd1);
try {
appDesc.getInputStream(isd2);
fail("Adding input stream with the same system name but different SystemDescriptor should have failed");
} catch (IllegalStateException e) { }
try {
appDesc.getOutputStream(osd1);
fail("adding output stream with the same system name but different SystemDescriptor should have failed");
} catch (IllegalStateException e) { }
}, getConfig());
new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.withDefaultSystem(sd2);
try {
appDesc.getInputStream(isd1);
fail("Adding input stream with the same system name as the default system but different SystemDescriptor should have failed");
} catch (IllegalStateException e) { }
}, getConfig());
}
@Test
public void testGetOutputStreamWithKeyValueSerde() {
String streamId = "test-stream-1";
KVSerde mockKVSerde = mock(KVSerde.class);
Serde mockKeySerde = mock(Serde.class);
Serde mockValueSerde = mock(Serde.class);
doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, mockKVSerde);
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getOutputStream(osd);
}, getConfig());
OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = streamAppDesc.getOutputStreams().get(streamId);
assertEquals(streamId, outputStreamImpl.getStreamId());
assertEquals(osd, streamAppDesc.getOutputDescriptors().get(streamId));
assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
}
@Test(expected = IllegalArgumentException.class)
public void testGetOutputStreamWithNullSerde() {
String streamId = "test-stream-1";
GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, null);
new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getOutputStream(osd);
}, getConfig());
}
@Test
public void testGetOutputStreamWithValueSerde() {
String streamId = "test-stream-1";
Serde mockValueSerde = mock(Serde.class);
GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, mockValueSerde);
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getOutputStream(osd);
}, getConfig());
OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = streamAppDesc.getOutputStreams().get(streamId);
assertEquals(streamId, outputStreamImpl.getStreamId());
assertEquals(osd, streamAppDesc.getOutputDescriptors().get(streamId));
assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
}
@Test(expected = IllegalStateException.class)
public void testSetDefaultSystemDescriptorAfterGettingInputStream() {
String streamId = "test-stream-1";
GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mock(Serde.class));
new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getInputStream(isd);
appDesc.withDefaultSystem(sd); // should throw exception
}, getConfig());
}
@Test(expected = IllegalStateException.class)
public void testSetDefaultSystemDescriptorAfterGettingOutputStream() {
String streamId = "test-stream-1";
GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, mock(Serde.class));
new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getOutputStream(osd);
appDesc.withDefaultSystem(sd); // should throw exception
}, getConfig());
}
@Test(expected = IllegalStateException.class)
public void testSetDefaultSystemDescriptorAfterGettingIntermediateStream() {
String streamId = "test-stream-1";
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, getConfig());
streamAppDesc.getIntermediateStream(streamId, mock(Serde.class), false);
streamAppDesc.withDefaultSystem(mock(SystemDescriptor.class)); // should throw exception
}
@Test(expected = IllegalStateException.class)
public void testGetSameOutputStreamTwice() {
String streamId = "test-stream-1";
GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
GenericOutputDescriptor osd1 = sd.getOutputDescriptor(streamId, mock(Serde.class));
GenericOutputDescriptor osd2 = sd.getOutputDescriptor(streamId, mock(Serde.class));
new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getOutputStream(osd1);
appDesc.getOutputStream(osd2); // should throw exception
}, getConfig());
}
@Test
public void testGetIntermediateStreamWithValueSerde() {
String streamId = "stream-1";
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, getConfig());
Serde mockValueSerde = mock(Serde.class);
IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
streamAppDesc.getIntermediateStream(streamId, mockValueSerde, false);
assertEquals(streamAppDesc.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
assertEquals(streamAppDesc.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
assertEquals(streamId, intermediateStreamImpl.getStreamId());
assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
assertTrue(((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
assertEquals(mockValueSerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
}
@Test
public void testGetIntermediateStreamWithKeyValueSerde() {
String streamId = "streamId";
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, getConfig());
KVSerde mockKVSerde = mock(KVSerde.class);
Serde mockKeySerde = mock(Serde.class);
Serde mockValueSerde = mock(Serde.class);
doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
streamAppDesc.getIntermediateStream(streamId, mockKVSerde, false);
assertEquals(streamAppDesc.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
assertEquals(streamAppDesc.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
assertEquals(streamId, intermediateStreamImpl.getStreamId());
assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde());
assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
assertEquals(mockKeySerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
assertEquals(mockValueSerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
}
@Test
public void testGetIntermediateStreamWithDefaultSystemDescriptor() {
Config mockConfig = getConfig();
String streamId = "streamId";
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
GenericSystemDescriptor sd = new GenericSystemDescriptor("mock-system", "mock-system-factory");
streamAppDesc.withDefaultSystem(sd);
IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
streamAppDesc.getIntermediateStream(streamId, mock(Serde.class), false);
assertEquals(streamAppDesc.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
assertEquals(streamAppDesc.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
assertEquals(streamId, intermediateStreamImpl.getStreamId());
}
@Test(expected = NullPointerException.class)
public void testGetIntermediateStreamWithNoSerde() {
Config mockConfig = getConfig();
String streamId = "streamId";
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
streamAppDesc.getIntermediateStream(streamId, null, false); // should throw
}
@Test(expected = IllegalStateException.class)
public void testGetSameIntermediateStreamTwice() {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, getConfig());
streamAppDesc.getIntermediateStream("test-stream-1", mock(Serde.class), false);
// should throw exception
streamAppDesc.getIntermediateStream("test-stream-1", mock(Serde.class), false);
}
@Test
public void testGetNextOpIdIncrementsId() {
HashMap<String, String> configMap = new HashMap<>();
configMap.put(ApplicationConfig.APP_NAME, "appName");
configMap.put(ApplicationConfig.APP_ID, "1234");
Config config = new MapConfig(configMap);
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, config);
assertEquals("appName-1234-merge-0", streamAppDesc.getNextOpId(OpCode.MERGE, null));
assertEquals("appName-1234-join-customName", streamAppDesc.getNextOpId(OpCode.JOIN, "customName"));
assertEquals("appName-1234-map-2", streamAppDesc.getNextOpId(OpCode.MAP, null));
}
@Test(expected = SamzaException.class)
public void testGetNextOpIdRejectsDuplicates() {
HashMap<String, String> configMap = new HashMap<>();
configMap.put(ApplicationConfig.APP_NAME, "appName");
configMap.put(ApplicationConfig.APP_ID, "1234");
Config config = new MapConfig(configMap);
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, config);
assertEquals("appName-1234-join-customName", streamAppDesc.getNextOpId(OpCode.JOIN, "customName"));
streamAppDesc.getNextOpId(OpCode.JOIN, "customName"); // should throw
}
@Test
public void testOpIdValidation() {
HashMap<String, String> configMap = new HashMap<>();
configMap.put(ApplicationConfig.APP_NAME, "appName");
configMap.put(ApplicationConfig.APP_ID, "1234");
Config config = new MapConfig(configMap);
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, config);
// null and empty userDefinedIDs should fall back to autogenerated IDs.
try {
streamAppDesc.getNextOpId(OpCode.FILTER, null);
streamAppDesc.getNextOpId(OpCode.FILTER, "");
streamAppDesc.getNextOpId(OpCode.FILTER, " ");
streamAppDesc.getNextOpId(OpCode.FILTER, "\t");
} catch (SamzaException e) {
fail("Received an error with a null or empty operator ID instead of defaulting to auto-generated ID.");
}
List<String> validOpIds = ImmutableList.of("op_id", "op-id", "1000", "op_1", "OP_ID");
for (String validOpId: validOpIds) {
try {
streamAppDesc.getNextOpId(OpCode.FILTER, validOpId);
} catch (Exception e) {
fail("Received an exception with a valid operator ID: " + validOpId);
}
}
List<String> invalidOpIds = ImmutableList.of("op id", "op#id");
for (String invalidOpId: invalidOpIds) {
try {
streamAppDesc.getNextOpId(OpCode.FILTER, invalidOpId);
fail("Did not receive an exception with an invalid operator ID: " + invalidOpId);
} catch (SamzaException e) { }
}
}
@Test
public void testGetInputStreamPreservesInsertionOrder() {
Config mockConfig = getConfig();
String testStreamId1 = "test-stream-1";
String testStreamId2 = "test-stream-2";
String testStreamId3 = "test-stream-3";
GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getInputStream(sd.getInputDescriptor(testStreamId1, mock(Serde.class)));
appDesc.getInputStream(sd.getInputDescriptor(testStreamId2, mock(Serde.class)));
appDesc.getInputStream(sd.getInputDescriptor(testStreamId3, mock(Serde.class)));
}, mockConfig);
List<InputOperatorSpec> inputSpecs = new ArrayList<>(streamAppDesc.getInputOperators().values());
assertEquals(inputSpecs.size(), 3);
assertEquals(inputSpecs.get(0).getStreamId(), testStreamId1);
assertEquals(inputSpecs.get(1).getStreamId(), testStreamId2);
assertEquals(inputSpecs.get(2).getStreamId(), testStreamId3);
}
@Test
public void testGetTable() throws Exception {
Config mockConfig = getConfig();
String tableId = "t1";
BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
when(mockTableDescriptor.getTableId()).thenReturn(tableId);
AtomicReference<TableImpl> table = new AtomicReference<>();
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
table.set((TableImpl) appDesc.getTable(mockTableDescriptor));
}, mockConfig);
assertEquals(tableId, table.get().getTableId());
}
@Test
public void testApplicationContainerContextFactory() {
ApplicationContainerContextFactory factory = mock(ApplicationContainerContextFactory.class);
StreamApplication testApp = appDesc -> appDesc.withApplicationContainerContextFactory(factory);
StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, getConfig());
assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.of(factory));
}
@Test
public void testNoApplicationContainerContextFactory() {
StreamApplication testApp = appDesc -> {
};
StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, getConfig());
assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.empty());
}
@Test
public void testApplicationTaskContextFactory() {
ApplicationTaskContextFactory factory = mock(ApplicationTaskContextFactory.class);
StreamApplication testApp = appDesc -> appDesc.withApplicationTaskContextFactory(factory);
StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, getConfig());
assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.of(factory));
}
@Test
public void testNoApplicationTaskContextFactory() {
StreamApplication testApp = appDesc -> {
};
StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, getConfig());
assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.empty());
}
@Test
public void testProcessorLifecycleListenerFactory() {
ProcessorLifecycleListenerFactory mockFactory = mock(ProcessorLifecycleListenerFactory.class);
StreamApplication testApp = appSpec -> appSpec.withProcessorLifecycleListenerFactory(mockFactory);
StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(testApp, getConfig());
assertEquals(appDesc.getProcessorLifecycleListenerFactory(), mockFactory);
}
@Test(expected = IllegalStateException.class)
public void testGetTableWithBadId() {
Config mockConfig = getConfig();
new StreamApplicationDescriptorImpl(appDesc -> {
BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
when(mockTableDescriptor.getTableId()).thenReturn("my.table");
appDesc.getTable(mockTableDescriptor);
}, mockConfig);
}
private Config getConfig() {
HashMap<String, String> configMap = new HashMap<>();
configMap.put(ApplicationConfig.APP_NAME, "test-job");
return new MapConfig(configMap);
}
class MockExpandingSystemDescriptor extends SystemDescriptor<MockExpandingSystemDescriptor> implements ExpandingInputDescriptorProvider<Integer> {
public MockExpandingSystemDescriptor(String systemName, StreamExpander expander) {
super(systemName, "factory.class", null, expander);
}
@Override
public MockInputDescriptor<Integer> getInputDescriptor(String streamId, Serde serde) {
return new MockInputDescriptor<>(streamId, this, serde);
}
}
class MockTransformingSystemDescriptor extends SystemDescriptor<MockTransformingSystemDescriptor> implements TransformingInputDescriptorProvider<Integer> {
public MockTransformingSystemDescriptor(String systemName, InputTransformer transformer) {
super(systemName, "factory.class", transformer, null);
}
@Override
public MockInputDescriptor<Integer> getInputDescriptor(String streamId, Serde serde) {
return new MockInputDescriptor<>(streamId, this, serde);
}
}
public class MockInputDescriptor<StreamMessageType> extends InputDescriptor<StreamMessageType, MockInputDescriptor<StreamMessageType>> {
MockInputDescriptor(String streamId, SystemDescriptor systemDescriptor, Serde serde) {
super(streamId, serde, systemDescriptor, null);
}
}
}