blob: 4bcff88cc2a1e74ece3cd3fae9a778951a7bd500 [file] [log] [blame]
/*
* *
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.tez.runtime.library.conf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.HashMap;
import java.util.Map;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.junit.Test;
public class TestUnorderedPartitionedKVOutputConfig {
@Test(timeout = 5000)
public void testNullParams() {
try {
UnorderedPartitionedKVOutputConfig.newBuilder(
null, "VALUE", "PARTITIONER", null);
fail("Expecting a null parameter list to fail");
} catch (NullPointerException npe) {
assertTrue(npe.getMessage().contains("cannot be null"));
}
try {
UnorderedPartitionedKVOutputConfig.newBuilder(
"KEY", null, "PARTITIONER", null);
fail("Expecting a null parameter list to fail");
} catch (NullPointerException npe) {
assertTrue(npe.getMessage().contains("cannot be null"));
}
try {
UnorderedPartitionedKVOutputConfig.newBuilder(
"KEY", "VALUE", null, null);
fail("Expecting a null parameter list to fail");
} catch (NullPointerException npe) {
assertTrue(npe.getMessage().contains("cannot be null"));
}
}
@Test(timeout = 5000)
public void testSetters() {
Configuration fromConf = new Configuration(false);
fromConf.set("test.conf.key.1", "confkey1");
fromConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 1111);
fromConf.set("io.shouldExist", "io");
Map<String, String> additionalConf = new HashMap<String, String>();
additionalConf.put("test.key.2", "key2");
additionalConf.put(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, "2222");
additionalConf.put("file.shouldExist", "file");
Configuration fromConfUnfiltered = new Configuration(false);
fromConfUnfiltered.set("test.conf.unfiltered.1", "unfiltered1");
UnorderedPartitionedKVOutputConfig.Builder builder =
UnorderedPartitionedKVOutputConfig.newBuilder("KEY", "VALUE", "PARTITIONER",
null)
.setCompression(true, "CustomCodec", null)
.setAvailableBufferSize(1111)
.setAdditionalConfiguration("fs.shouldExist", "fs")
.setAdditionalConfiguration("test.key.1", "key1")
.setAdditionalConfiguration(TezRuntimeConfiguration
.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED, "true")
.setAdditionalConfiguration(TezRuntimeConfiguration
.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE, "5120")
.setAdditionalConfiguration(TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, "true")
.setAdditionalConfiguration(TezRuntimeConfiguration
.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, "false")
.setAdditionalConfiguration(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
String.valueOf(false))
.setAdditionalConfiguration(additionalConf)
.setFromConfiguration(fromConf)
.setFromConfigurationUnfiltered(fromConfUnfiltered);
UnorderedPartitionedKVOutputConfig configuration = builder.build();
UnorderedPartitionedKVOutputConfig rebuilt =
new UnorderedPartitionedKVOutputConfig();
rebuilt.fromUserPayload(configuration.toUserPayload());
Configuration conf = rebuilt.conf;
// Verify programmatic API usage
assertEquals(true, conf.getBoolean(TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, false));
assertEquals(false, conf.getBoolean(TezRuntimeConfiguration
.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true));
assertEquals(1111, conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 0));
assertEquals("KEY", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, ""));
assertEquals("VALUE", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, ""));
assertEquals("PARTITIONER", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, ""));
assertEquals("CustomCodec",
conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, ""));
assertEquals(true, conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS,
false));
// Verify additional configs
assertEquals(false, conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
assertEquals(1111, conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT));
assertEquals(2222,
conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, 0));
assertEquals(true,
conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED, false));
assertEquals(5120,
conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE, 512));
assertEquals("io", conf.get("io.shouldExist"));
assertEquals("file", conf.get("file.shouldExist"));
assertEquals("fs", conf.get("fs.shouldExist"));
assertNull(conf.get("test.conf.key.1"));
assertNull(conf.get("test.key.1"));
assertNull(conf.get("test.key.2"));
assertEquals("unfiltered1", conf.get("test.conf.unfiltered.1"));
}
@Test(timeout = 5000)
public void testDefaultConfigsUsed() {
UnorderedPartitionedKVOutputConfig.Builder builder =
UnorderedPartitionedKVOutputConfig
.newBuilder("KEY", "VALUE", "PARTITIONER", null)
.setKeySerializationClass("SerClass1", null)
.setValueSerializationClass("SerClass2", null);
UnorderedPartitionedKVOutputConfig configuration = builder.build();
UnorderedPartitionedKVOutputConfig rebuilt =
new UnorderedPartitionedKVOutputConfig();
rebuilt.fromUserPayload(configuration.toUserPayload());
Configuration conf = rebuilt.conf;
assertEquals(true, conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
// Default property present.
assertEquals("TestCodec",
conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, ""));
// Verify whatever was configured
assertEquals("KEY", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, ""));
assertEquals("VALUE", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, ""));
assertEquals("PARTITIONER", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, ""));
assertTrue(conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).startsWith("SerClass2," +
"SerClass1"));
//for unordered paritioned kv output, comparator is not populated
assertNull(conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS));
}
@Test(timeout = 5000)
public void testPartitionerConfigs() {
Map<String, String> partitionerConf = Maps.newHashMap();
partitionerConf.put("partitioner.test.key", "PARTITIONERKEY");
partitionerConf
.put(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, "InvalidKeyOverride");
UnorderedPartitionedKVOutputConfig.Builder builder =
UnorderedPartitionedKVOutputConfig
.newBuilder("KEY", "VALUE", "PARTITIONER", partitionerConf);
UnorderedPartitionedKVOutputConfig configuration = builder.build();
UnorderedPartitionedKVOutputConfig rebuilt =
new UnorderedPartitionedKVOutputConfig();
rebuilt.fromUserPayload(configuration.toUserPayload());
Configuration conf = rebuilt.conf;
// Default Output property should not be overridden based on partitioner config
assertEquals("TestCodec",
conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, ""));
assertEquals("PARTITIONERKEY", conf.get("partitioner.test.key"));
}
}