blob: 3cae6edb0b57a68cc62ce5dad9aade1760ab6392 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.cdap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import io.cdap.plugin.common.Constants;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.io.cdap.context.BatchSinkContextImpl;
import org.apache.beam.sdk.io.cdap.context.BatchSourceContextImpl;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
/** Test class for {@link CdapIO}. */
@RunWith(JUnit4.class)
public class CdapIOTest {
@Rule public final transient TestPipeline p = TestPipeline.create();
@Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
private static final Map<String, Object> TEST_EMPLOYEE_PARAMS_MAP =
ImmutableMap.<String, Object>builder()
.put(EmployeeConfig.OBJECT_TYPE, "employee")
.put(Constants.Reference.REFERENCE_NAME, "referenceName")
.build();
@Before
public void setUp() {
OutputCommitter mockedOutputCommitter = Mockito.mock(OutputCommitter.class);
EmployeeOutputFormat.initWrittenOutput(mockedOutputCommitter);
}
@Test
public void testReadBuildsCorrectly() {
EmployeeConfig pluginConfig =
new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
CdapIO.Read<String, String> read =
CdapIO.<String, String>read()
.withCdapPlugin(
Plugin.create(
EmployeeBatchSource.class,
EmployeeInputFormat.class,
EmployeeInputFormatProvider.class))
.withPluginConfig(pluginConfig)
.withKeyClass(String.class)
.withValueClass(String.class);
Plugin cdapPlugin = read.getCdapPlugin();
assertNotNull(cdapPlugin);
assertEquals(EmployeeBatchSource.class, cdapPlugin.getPluginClass());
assertEquals(EmployeeInputFormat.class, cdapPlugin.getFormatClass());
assertEquals(EmployeeInputFormatProvider.class, cdapPlugin.getFormatProviderClass());
assertNotNull(cdapPlugin.getContext());
assertEquals(BatchSourceContextImpl.class, cdapPlugin.getContext().getClass());
assertEquals(PluginConstants.PluginType.SOURCE, cdapPlugin.getPluginType());
assertNotNull(cdapPlugin.getHadoopConfiguration());
assertEquals(pluginConfig, read.getPluginConfig());
assertEquals(String.class, read.getKeyClass());
assertEquals(String.class, read.getValueClass());
}
@Test
public void testReadObjectCreationFailsIfCdapPluginClassIsNull() {
assertThrows(
IllegalArgumentException.class,
() -> CdapIO.<String, String>read().withCdapPluginClass(null));
}
@Test
public void testReadObjectCreationFailsIfPluginConfigIsNull() {
assertThrows(
IllegalArgumentException.class, () -> CdapIO.<String, String>read().withPluginConfig(null));
}
@Test
public void testReadObjectCreationFailsIfKeyClassIsNull() {
assertThrows(
IllegalArgumentException.class, () -> CdapIO.<String, String>read().withKeyClass(null));
}
@Test
public void testReadObjectCreationFailsIfValueClassIsNull() {
assertThrows(
IllegalArgumentException.class, () -> CdapIO.<String, String>read().withValueClass(null));
}
@Test
public void testReadValidationFailsMissingCdapPluginClass() {
CdapIO.Read<String, String> read = CdapIO.read();
assertThrows(IllegalArgumentException.class, read::validateTransform);
}
@Test
public void testReadObjectCreationFailsIfCdapPluginClassIsNotSupported() {
assertThrows(
UnsupportedOperationException.class,
() -> CdapIO.<String, String>read().withCdapPluginClass(EmployeeBatchSource.class));
}
@Test
public void testReadingData() {
EmployeeConfig pluginConfig =
new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
CdapIO.Read<String, String> read =
CdapIO.<String, String>read()
.withCdapPlugin(
Plugin.create(
EmployeeBatchSource.class,
EmployeeInputFormat.class,
EmployeeInputFormatProvider.class))
.withPluginConfig(pluginConfig)
.withKeyClass(String.class)
.withValueClass(String.class);
List<KV<String, String>> expected = new ArrayList<>();
for (int i = 1; i < EmployeeInputFormat.NUM_OF_TEST_EMPLOYEE_RECORDS; i++) {
expected.add(KV.of(String.valueOf(i), EmployeeInputFormat.EMPLOYEE_NAME_PREFIX + i));
}
PCollection<KV<String, String>> actual = p.apply("ReadTest", read);
PAssert.that(actual).containsInAnyOrder(expected);
p.run();
}
@Test
public void testWriteBuildsCorrectly() {
EmployeeConfig pluginConfig =
new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
CdapIO.Write<String, String> write =
CdapIO.<String, String>write()
.withCdapPlugin(
Plugin.create(
EmployeeBatchSink.class,
EmployeeOutputFormat.class,
EmployeeOutputFormatProvider.class))
.withPluginConfig(pluginConfig)
.withKeyClass(String.class)
.withValueClass(String.class)
.withLocksDirPath(tmpFolder.getRoot().getAbsolutePath());
Plugin cdapPlugin = write.getCdapPlugin();
assertNotNull(cdapPlugin);
assertNotNull(write.getLocksDirPath());
assertEquals(EmployeeBatchSink.class, cdapPlugin.getPluginClass());
assertEquals(EmployeeOutputFormat.class, cdapPlugin.getFormatClass());
assertEquals(EmployeeOutputFormatProvider.class, cdapPlugin.getFormatProviderClass());
assertNotNull(cdapPlugin.getContext());
assertEquals(BatchSinkContextImpl.class, cdapPlugin.getContext().getClass());
assertEquals(PluginConstants.PluginType.SINK, cdapPlugin.getPluginType());
assertNotNull(cdapPlugin.getHadoopConfiguration());
assertEquals(pluginConfig, write.getPluginConfig());
assertEquals(String.class, write.getKeyClass());
assertEquals(String.class, write.getValueClass());
}
@Test
public void testWriteObjectCreationFailsIfCdapPluginClassIsNull() {
assertThrows(
IllegalArgumentException.class,
() -> CdapIO.<String, String>write().withCdapPluginClass(null));
}
@Test
public void testWriteObjectCreationFailsIfPluginConfigIsNull() {
assertThrows(
IllegalArgumentException.class,
() -> CdapIO.<String, String>write().withPluginConfig(null));
}
@Test
public void testWriteObjectCreationFailsIfKeyClassIsNull() {
assertThrows(
IllegalArgumentException.class, () -> CdapIO.<String, String>write().withKeyClass(null));
}
@Test
public void testWriteObjectCreationFailsIfValueClassIsNull() {
assertThrows(
IllegalArgumentException.class, () -> CdapIO.<String, String>write().withValueClass(null));
}
@Test
public void testWriteObjectCreationFailsIfLockDirIsNull() {
assertThrows(
IllegalArgumentException.class,
() -> CdapIO.<String, String>write().withLocksDirPath(null));
}
@Test
public void testWriteValidationFailsMissingCdapPluginClass() {
CdapIO.Write<String, String> write = CdapIO.write();
assertThrows(IllegalArgumentException.class, write::validateTransform);
}
@Test
public void testWriteObjectCreationFailsIfCdapPluginClassIsNotSupported() {
assertThrows(
UnsupportedOperationException.class,
() -> CdapIO.<String, String>write().withCdapPluginClass(EmployeeBatchSink.class));
}
@Test
public void testWritingData() throws IOException {
List<KV<String, String>> data = new ArrayList<>();
for (int i = 0; i < EmployeeInputFormat.NUM_OF_TEST_EMPLOYEE_RECORDS; i++) {
data.add(KV.of(String.valueOf(i), EmployeeInputFormat.EMPLOYEE_NAME_PREFIX + i));
}
PCollection<KV<String, String>> input = p.apply(Create.of(data));
EmployeeConfig pluginConfig =
new ConfigWrapper<>(EmployeeConfig.class).withParams(TEST_EMPLOYEE_PARAMS_MAP).build();
input.apply(
"Write",
CdapIO.<String, String>write()
.withCdapPlugin(
Plugin.create(
EmployeeBatchSink.class,
EmployeeOutputFormat.class,
EmployeeInputFormatProvider.class))
.withPluginConfig(pluginConfig)
.withKeyClass(String.class)
.withValueClass(String.class)
.withLocksDirPath(tmpFolder.getRoot().getAbsolutePath()));
p.run();
List<KV<String, String>> writtenOutput = EmployeeOutputFormat.getWrittenOutput();
assertEquals(data.size(), writtenOutput.size());
assertTrue(data.containsAll(writtenOutput));
assertTrue(writtenOutput.containsAll(data));
Mockito.verify(EmployeeOutputFormat.getOutputCommitter()).commitJob(Mockito.any());
}
}