blob: 16e6550ab555af3015c1804bbd3d470f3af0a0d5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.admin.cli;
import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import com.beust.jcommander.ParameterException;
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.admin.cli.utils.CmdUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Sinks;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.Assert;
import org.testng.IObjectFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
@Slf4j
@PrepareForTest({CmdFunctions.class})
@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.core.*", "org.apache.pulsar.functions.api.*" })
public class TestCmdSinks {
@ObjectFactory
public IObjectFactory getObjectFactory() {
return new org.powermock.modules.testng.PowerMockObjectFactory();
}
private static final String TENANT = "test-tenant";
private static final String NAMESPACE = "test-namespace";
private static final String NAME = "test";
private static final String CLASS_NAME = "SomeRandomClassName";
private static final String INPUTS = "test-src1,test-src2";
private static final List<String> INPUTS_LIST;
static {
INPUTS_LIST = new LinkedList<>();
INPUTS_LIST.add("test-src1");
INPUTS_LIST.add("test-src2");
}
private static final String TOPIC_PATTERN = "test-src*";
private static final String CUSTOM_SERDE_INPUT_STRING = "{\"test_src3\": \"\"}";
private static final Map<String, String> CUSTOM_SERDE_INPUT_MAP;
static {
CUSTOM_SERDE_INPUT_MAP = new HashMap<>();
CUSTOM_SERDE_INPUT_MAP.put("test_src3", "");
}
private static final FunctionConfig.ProcessingGuarantees PROCESSING_GUARANTEES
= FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE;
private static final Integer PARALLELISM = 1;
private static final String JAR_FILE_NAME = "dummy.nar";
private String JAR_FILE_PATH;
private String WRONG_JAR_PATH;
private static final Double CPU = 100.0;
private static final Long RAM = 1024L * 1024L;
private static final Long DISK = 1024L * 1024L * 1024L;
private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 0000 2018\"}";
private PulsarAdmin pulsarAdmin;
private Sinks sink;
private CmdSinks cmdSinks;
private CmdSinks.CreateSink createSink;
private CmdSinks.UpdateSink updateSink;
private CmdSinks.LocalSinkRunner localSinkRunner;
private CmdSinks.DeleteSink deleteSink;
private ClassLoader oldContextClassLoader;
private ClassLoader jarClassLoader;
@BeforeMethod
public void setup() throws Exception {
pulsarAdmin = mock(PulsarAdmin.class);
sink = mock(Sinks.class);
when(pulsarAdmin.sinks()).thenReturn(sink);
cmdSinks = spy(new CmdSinks(() -> pulsarAdmin));
createSink = spy(cmdSinks.getCreateSink());
updateSink = spy(cmdSinks.getUpdateSink());
localSinkRunner = spy(cmdSinks.getLocalSinkRunner());
deleteSink = spy(cmdSinks.getDeleteSink());
mockStatic(CmdFunctions.class);
PowerMockito.doNothing().when(localSinkRunner).runCmd();
URL file = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME);
if (file == null) {
throw new RuntimeException("Failed to file required test archive: " + JAR_FILE_NAME);
}
JAR_FILE_PATH = file.getFile();
jarClassLoader = ClassLoaderUtils.loadJar(new File(JAR_FILE_PATH));
oldContextClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(jarClassLoader);
}
@AfterMethod(alwaysRun = true)
public void cleanup() throws IOException {
if (jarClassLoader != null && jarClassLoader instanceof Closeable) {
((Closeable) jarClassLoader).close();
jarClassLoader = null;
}
if (oldContextClassLoader != null) {
Thread.currentThread().setContextClassLoader(oldContextClassLoader);
oldContextClassLoader = null;
}
}
public SinkConfig getSinkConfig() {
SinkConfig sinkConfig = new SinkConfig();
sinkConfig.setTenant(TENANT);
sinkConfig.setNamespace(NAMESPACE);
sinkConfig.setName(NAME);
sinkConfig.setInputs(INPUTS_LIST);
sinkConfig.setTopicToSerdeClassName(CUSTOM_SERDE_INPUT_MAP);
sinkConfig.setTopicsPattern(TOPIC_PATTERN);
sinkConfig.setProcessingGuarantees(PROCESSING_GUARANTEES);
sinkConfig.setParallelism(PARALLELISM);
sinkConfig.setArchive(JAR_FILE_PATH);
sinkConfig.setResources(new Resources(CPU, RAM, DISK));
sinkConfig.setConfigs(createSink.parseConfigs(SINK_CONFIG_STRING));
return sinkConfig;
}
@Test
public void testCliCorrect() throws Exception {
SinkConfig sinkConfig = getSinkConfig();
testCmdSinkCliMissingArgs(
TENANT,
NAMESPACE,
NAME,
INPUTS,
TOPIC_PATTERN,
CUSTOM_SERDE_INPUT_STRING,
PROCESSING_GUARANTEES,
PARALLELISM,
JAR_FILE_PATH,
CPU,
RAM,
DISK,
SINK_CONFIG_STRING,
sinkConfig
);
}
@Test
public void testMissingInput() throws Exception {
SinkConfig sinkConfig = getSinkConfig();
sinkConfig.setInputs(null);
testCmdSinkCliMissingArgs(
TENANT,
NAMESPACE,
NAME,
null,
TOPIC_PATTERN,
CUSTOM_SERDE_INPUT_STRING,
PROCESSING_GUARANTEES,
PARALLELISM,
JAR_FILE_PATH,
CPU,
RAM,
DISK,
SINK_CONFIG_STRING,
sinkConfig
);
}
@Test
public void testMissingCustomSerdeInput() throws Exception {
SinkConfig sinkConfig = getSinkConfig();
sinkConfig.setTopicToSerdeClassName(null);
sinkConfig.setTopicToSchemaType(null);
testCmdSinkCliMissingArgs(
TENANT,
NAMESPACE,
NAME,
INPUTS,
TOPIC_PATTERN,
null,
PROCESSING_GUARANTEES,
PARALLELISM,
JAR_FILE_PATH,
CPU,
RAM,
DISK,
SINK_CONFIG_STRING,
sinkConfig
);
}
@Test
public void testMissingTopicPattern() throws Exception {
SinkConfig sinkConfig = getSinkConfig();
sinkConfig.setTopicsPattern(null);
testCmdSinkCliMissingArgs(
TENANT,
NAMESPACE,
NAME,
INPUTS,
null,
CUSTOM_SERDE_INPUT_STRING,
PROCESSING_GUARANTEES,
PARALLELISM,
JAR_FILE_PATH,
CPU,
RAM,
DISK,
SINK_CONFIG_STRING,
sinkConfig
);
}
@Test
public void testMissingProcessingGuarantees() throws Exception {
SinkConfig sinkConfig = getSinkConfig();
sinkConfig.setProcessingGuarantees(null);
testCmdSinkCliMissingArgs(
TENANT,
NAMESPACE,
NAME,
INPUTS,
TOPIC_PATTERN,
CUSTOM_SERDE_INPUT_STRING,
null,
PARALLELISM,
JAR_FILE_PATH,
CPU,
RAM,
DISK,
SINK_CONFIG_STRING,
sinkConfig
);
}
@Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink archive not specfied")
public void testMissingArchive() throws Exception {
SinkConfig sinkConfig = getSinkConfig();
sinkConfig.setArchive(null);
testCmdSinkCliMissingArgs(
TENANT,
NAMESPACE,
NAME,
INPUTS,
TOPIC_PATTERN,
CUSTOM_SERDE_INPUT_STRING,
PROCESSING_GUARANTEES,
PARALLELISM,
null,
CPU,
RAM,
DISK,
SINK_CONFIG_STRING,
sinkConfig
);
}
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Sink Archive file /tmp/foo.jar" +
" does not exist")
public void testInvalidJar() throws Exception {
SinkConfig sinkConfig = getSinkConfig();
String fakeJar = "/tmp/foo.jar";
sinkConfig.setArchive(fakeJar);
testCmdSinkCliMissingArgs(
TENANT,
NAMESPACE,
NAME,
INPUTS,
TOPIC_PATTERN,
CUSTOM_SERDE_INPUT_STRING,
PROCESSING_GUARANTEES,
PARALLELISM,
fakeJar,
CPU,
RAM,
DISK,
SINK_CONFIG_STRING,
sinkConfig
);
}
@Test
public void testMissingConfig() throws Exception {
SinkConfig sinkConfig = getSinkConfig();
sinkConfig.setConfigs(null);
testCmdSinkCliMissingArgs(
TENANT,
NAMESPACE,
NAME,
INPUTS,
TOPIC_PATTERN,
CUSTOM_SERDE_INPUT_STRING,
PROCESSING_GUARANTEES,
PARALLELISM,
JAR_FILE_PATH,
CPU,
RAM,
DISK,
null,
sinkConfig
);
}
public void testCmdSinkCliMissingArgs(
String tenant,
String namespace,
String name,
String inputs,
String topicPattern,
String customSerdeInputString,
FunctionConfig.ProcessingGuarantees processingGuarantees,
Integer parallelism,
String jarFile,
Double cpu,
Long ram,
Long disk,
String sinkConfigString,
SinkConfig sinkConfig) throws Exception {
// test create sink
createSink.tenant = tenant;
createSink.namespace = namespace;
createSink.name = name;
createSink.inputs = inputs;
createSink.topicsPattern = topicPattern;
createSink.customSerdeInputString = customSerdeInputString;
createSink.processingGuarantees = processingGuarantees;
createSink.parallelism = parallelism;
createSink.archive = jarFile;
createSink.cpu = cpu;
createSink.ram = ram;
createSink.disk = disk;
createSink.sinkConfigString = sinkConfigString;
createSink.processArguments();
createSink.runCmd();
// test update sink
updateSink.tenant = tenant;
updateSink.namespace = namespace;
updateSink.name = name;
updateSink.inputs = inputs;
updateSink.topicsPattern = topicPattern;
updateSink.customSerdeInputString = customSerdeInputString;
updateSink.processingGuarantees = processingGuarantees;
updateSink.parallelism = parallelism;
updateSink.archive = jarFile;
updateSink.cpu = cpu;
updateSink.ram = ram;
updateSink.disk = disk;
updateSink.sinkConfigString = sinkConfigString;
updateSink.processArguments();
updateSink.runCmd();
// test local runner
localSinkRunner.tenant = tenant;
localSinkRunner.namespace = namespace;
localSinkRunner.name = name;
localSinkRunner.inputs = inputs;
localSinkRunner.topicsPattern = topicPattern;
localSinkRunner.customSerdeInputString = customSerdeInputString;
localSinkRunner.processingGuarantees = processingGuarantees;
localSinkRunner.parallelism = parallelism;
localSinkRunner.archive = jarFile;
localSinkRunner.cpu = cpu;
localSinkRunner.ram = ram;
localSinkRunner.disk = disk;
localSinkRunner.sinkConfigString = sinkConfigString;
localSinkRunner.processArguments();
localSinkRunner.runCmd();
verify(createSink).validateSinkConfigs(eq(sinkConfig));
verify(updateSink).validateSinkConfigs(eq(sinkConfig));
verify(localSinkRunner).validateSinkConfigs(eq(sinkConfig));
}
@Test
public void testCmdSinkConfigFileCorrect() throws Exception {
SinkConfig sinkConfig = getSinkConfig();
testCmdSinkConfigFile(sinkConfig, sinkConfig);
}
@Test
public void testCmdSinkConfigFileMissingTopicToSerdeClassName() throws Exception {
SinkConfig testSinkConfig = getSinkConfig();
SinkConfig expectedSinkConfig = getSinkConfig();
testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
}
@Test
public void testCmdSinkConfigFileMissingTopicsPattern() throws Exception {
SinkConfig testSinkConfig = getSinkConfig();
SinkConfig expectedSinkConfig = getSinkConfig();
testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
}
@Test
public void testCmdSinkConfigFileMissingConfig() throws Exception {
SinkConfig testSinkConfig = getSinkConfig();
testSinkConfig.setConfigs(null);
SinkConfig expectedSinkConfig = getSinkConfig();
expectedSinkConfig.setConfigs(null);
testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
}
@Test
public void testCmdSinkConfigFileMissingProcessingGuarantees() throws Exception {
SinkConfig testSinkConfig = getSinkConfig();
testSinkConfig.setProcessingGuarantees(null);
SinkConfig expectedSinkConfig = getSinkConfig();
expectedSinkConfig.setProcessingGuarantees(null);
testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
}
@Test
public void testCmdSinkConfigFileMissingResources() throws Exception {
SinkConfig testSinkConfig = getSinkConfig();
testSinkConfig.setResources(null);
SinkConfig expectedSinkConfig = getSinkConfig();
expectedSinkConfig.setResources(null);
testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
}
@Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "Sink archive not specfied")
public void testCmdSinkConfigFileMissingJar() throws Exception {
SinkConfig testSinkConfig = getSinkConfig();
testSinkConfig.setArchive(null);
SinkConfig expectedSinkConfig = getSinkConfig();
expectedSinkConfig.setArchive(null);
testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
}
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Sink Archive file /tmp/foo.jar does not exist")
public void testCmdSinkConfigFileInvalidJar() throws Exception {
SinkConfig testSinkConfig = getSinkConfig();
testSinkConfig.setArchive("/tmp/foo.jar");
SinkConfig expectedSinkConfig = getSinkConfig();
expectedSinkConfig.setArchive("/tmp/foo.jar");
testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
}
public void testCmdSinkConfigFile(SinkConfig testSinkConfig, SinkConfig expectedSinkConfig) throws Exception {
File file = Files.createTempFile("", "").toFile();
new YAMLMapper().writeValue(file, testSinkConfig);
Assert.assertEquals(testSinkConfig, CmdUtils.loadConfig(file.getAbsolutePath(), SinkConfig.class));
// test create sink
createSink.sinkConfigFile = file.getAbsolutePath();
createSink.processArguments();
createSink.runCmd();
// test update sink
updateSink.sinkConfigFile = file.getAbsolutePath();
updateSink.processArguments();
updateSink.runCmd();
// test local runner
localSinkRunner.sinkConfigFile = file.getAbsolutePath();
localSinkRunner.processArguments();
localSinkRunner.runCmd();
verify(createSink).validateSinkConfigs(eq(expectedSinkConfig));
verify(updateSink).validateSinkConfigs(eq(expectedSinkConfig));
verify(localSinkRunner).validateSinkConfigs(eq(expectedSinkConfig));
}
@Test
public void testCliOverwriteConfigFile() throws Exception {
SinkConfig testSinkConfig = new SinkConfig();
testSinkConfig.setTenant(TENANT + "-prime");
testSinkConfig.setNamespace(NAMESPACE + "-prime");
testSinkConfig.setName(NAME + "-prime");
testSinkConfig.setTopicToSerdeClassName(new HashMap<>());
testSinkConfig.getTopicToSerdeClassName().put("test-src-prime", "");
testSinkConfig.getTopicToSerdeClassName().put("test_src3-prime", "");
testSinkConfig.setTopicsPattern(TOPIC_PATTERN + "-prime");
testSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
testSinkConfig.setParallelism(PARALLELISM + 1);
testSinkConfig.setArchive(JAR_FILE_PATH + "-prime");
testSinkConfig.setResources(new Resources(CPU + 1, RAM + 1, DISK + 1));
testSinkConfig.setConfigs(createSink.parseConfigs("{\"created_at-prime\":\"Mon Jul 02 00:33:15 +0000 2018\", \"otherConfigProperties\":{\"property1.value\":\"value1\",\"property2.value\":\"value2\"}}"));
SinkConfig expectedSinkConfig = getSinkConfig();
File file = Files.createTempFile("", "").toFile();
new YAMLMapper().writeValue(file, testSinkConfig);
Assert.assertEquals(testSinkConfig, CmdUtils.loadConfig(file.getAbsolutePath(), SinkConfig.class));
testMixCliAndConfigFile(
TENANT,
NAMESPACE,
NAME,
CLASS_NAME,
INPUTS,
TOPIC_PATTERN,
CUSTOM_SERDE_INPUT_STRING,
PROCESSING_GUARANTEES,
PARALLELISM,
JAR_FILE_PATH,
CPU,
RAM,
DISK,
SINK_CONFIG_STRING,
file.getAbsolutePath(),
expectedSinkConfig
);
}
public void testMixCliAndConfigFile(
String tenant,
String namespace,
String name,
String className,
String inputs,
String topicPattern,
String customSerdeInputString,
FunctionConfig.ProcessingGuarantees processingGuarantees,
Integer parallelism,
String jarFile,
Double cpu,
Long ram,
Long disk,
String sinkConfigString,
String sinkConfigFile,
SinkConfig sinkConfig
) throws Exception {
// test create sink
createSink.tenant = tenant;
createSink.namespace = namespace;
createSink.name = name;
createSink.inputs = inputs;
createSink.topicsPattern = topicPattern;
createSink.customSerdeInputString = customSerdeInputString;
createSink.processingGuarantees = processingGuarantees;
createSink.parallelism = parallelism;
createSink.archive = jarFile;
createSink.cpu = cpu;
createSink.ram = ram;
createSink.disk = disk;
createSink.sinkConfigString = sinkConfigString;
createSink.sinkConfigFile = sinkConfigFile;
createSink.processArguments();
createSink.runCmd();
// test update sink
updateSink.tenant = tenant;
updateSink.namespace = namespace;
updateSink.name = name;
updateSink.inputs = inputs;
updateSink.topicsPattern = topicPattern;
updateSink.customSerdeInputString = customSerdeInputString;
updateSink.processingGuarantees = processingGuarantees;
updateSink.parallelism = parallelism;
updateSink.archive = jarFile;
updateSink.cpu = cpu;
updateSink.ram = ram;
updateSink.disk = disk;
updateSink.sinkConfigString = sinkConfigString;
updateSink.sinkConfigFile = sinkConfigFile;
updateSink.processArguments();
updateSink.runCmd();
// test local runner
localSinkRunner.tenant = tenant;
localSinkRunner.namespace = namespace;
localSinkRunner.name = name;
localSinkRunner.inputs = inputs;
localSinkRunner.topicsPattern = topicPattern;
localSinkRunner.customSerdeInputString = customSerdeInputString;
localSinkRunner.processingGuarantees = processingGuarantees;
localSinkRunner.parallelism = parallelism;
localSinkRunner.archive = jarFile;
localSinkRunner.cpu = cpu;
localSinkRunner.ram = ram;
localSinkRunner.disk = disk;
localSinkRunner.sinkConfigString = sinkConfigString;
localSinkRunner.sinkConfigFile = sinkConfigFile;
localSinkRunner.processArguments();
localSinkRunner.runCmd();
verify(createSink).validateSinkConfigs(eq(sinkConfig));
verify(updateSink).validateSinkConfigs(eq(sinkConfig));
verify(localSinkRunner).validateSinkConfigs(eq(sinkConfig));
}
@Test
public void testDeleteMissingTenant() throws Exception {
deleteSink.tenant = null;
deleteSink.namespace = NAMESPACE;
deleteSink.sinkName = NAME;
deleteSink.processArguments();
deleteSink.runCmd();
verify(sink).deleteSink(eq(PUBLIC_TENANT), eq(NAMESPACE), eq(NAME));
}
@Test
public void testDeleteMissingNamespace() throws Exception {
deleteSink.tenant = TENANT;
deleteSink.namespace = null;
deleteSink.sinkName = NAME;
deleteSink.processArguments();
deleteSink.runCmd();
verify(sink).deleteSink(eq(TENANT), eq(DEFAULT_NAMESPACE), eq(NAME));
}
@Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "You must specify a name for the sink")
public void testDeleteMissingName() throws Exception {
deleteSink.tenant = TENANT;
deleteSink.namespace = NAMESPACE;
deleteSink.sinkName = null;
deleteSink.processArguments();
deleteSink.runCmd();
verify(sink).deleteSink(eq(TENANT), eq(NAMESPACE), null);
}
@Test
public void testUpdateSink() throws Exception {
updateSink.name = "my-sink";
updateSink.archive = "new-archive";
updateSink.processArguments();
updateSink.runCmd();
verify(sink).updateSink(eq(SinkConfig.builder()
.tenant(PUBLIC_TENANT)
.namespace(DEFAULT_NAMESPACE)
.name(updateSink.name)
.archive(updateSink.archive)
.build()), eq(updateSink.archive), eq(new UpdateOptions()));
updateSink.archive = null;
updateSink.parallelism = 2;
updateSink.processArguments();
updateSink.updateAuthData = true;
updateSink.runCmd();
UpdateOptions updateOptions = new UpdateOptions();
updateOptions.setUpdateAuthData(true);
verify(sink).updateSink(eq(SinkConfig.builder()
.tenant(PUBLIC_TENANT)
.namespace(DEFAULT_NAMESPACE)
.name(updateSink.name)
.parallelism(2)
.build()), eq(null), eq(updateOptions));
}
}