blob: 687ec6bb084b145dfdbacbd9b1a232f1389a7d48 [file] [log] [blame]
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.metamx.druid.indexer.granularity.UniformGranularitySpec;
import com.metamx.druid.indexer.partitions.PartitionsSpec;
import com.metamx.druid.indexer.updater.DbUpdaterJobSpec;
import com.metamx.druid.jackson.DefaultObjectMapper;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
public class HadoopDruidIndexerConfigTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testGranularitySpec() {
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ " \"granularitySpec\":{"
+ " \"type\":\"uniform\","
+ " \"gran\":\"hour\","
+ " \"intervals\":[\"2012-01-01/P1D\"]"
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
throw Throwables.propagate(e);
}
final UniformGranularitySpec granularitySpec = (UniformGranularitySpec) cfg.getGranularitySpec();
Assert.assertEquals(
"getIntervals",
Lists.newArrayList(new Interval("2012-01-01/P1D")),
granularitySpec.getIntervals()
);
Assert.assertEquals(
"getGranularity",
"HOUR",
granularitySpec.getGranularity().toString()
);
}
@Test
public void testGranularitySpecLegacy() {
// Deprecated and replaced by granularitySpec, but still supported
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"segmentGranularity\":\"day\","
+ "\"intervals\":[\"2012-02-01/P1D\"]"
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
throw Throwables.propagate(e);
}
final UniformGranularitySpec granularitySpec = (UniformGranularitySpec) cfg.getGranularitySpec();
Assert.assertEquals(
"getIntervals",
Lists.newArrayList(new Interval("2012-02-01/P1D")),
granularitySpec.getIntervals()
);
Assert.assertEquals(
"getGranularity",
"DAY",
granularitySpec.getGranularity().toString()
);
}
@Test
public void testGranularitySpecPostConstructorIntervals() {
// Deprecated and replaced by granularitySpec, but still supported
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonMapper.readValue(
"{"
+ "\"segmentGranularity\":\"day\""
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
throw Throwables.propagate(e);
}
cfg.setIntervals(Lists.newArrayList(new Interval("2012-03-01/P1D")));
final UniformGranularitySpec granularitySpec = (UniformGranularitySpec) cfg.getGranularitySpec();
Assert.assertEquals(
"getIntervals",
Lists.newArrayList(new Interval("2012-03-01/P1D")),
granularitySpec.getIntervals()
);
Assert.assertEquals(
"getGranularity",
"DAY",
granularitySpec.getGranularity().toString()
);
}
@Test
public void testInvalidGranularityCombination() {
boolean thrown = false;
try {
final HadoopDruidIndexerConfig cfg = jsonReadWriteRead(
"{"
+ "\"segmentGranularity\":\"day\","
+ "\"intervals\":[\"2012-02-01/P1D\"],"
+ "\"granularitySpec\":{"
+ " \"type\":\"uniform\","
+ " \"gran\":\"hour\","
+ " \"intervals\":[\"2012-01-01/P1D\"]"
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
thrown = true;
}
Assert.assertTrue("Exception thrown", thrown);
}
@Test
public void testPartitionsSpecAutoDimension() {
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100"
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getPartitionDimension",
partitionsSpec.getPartitionDimension(),
null
);
}
@Test
public void testPartitionsSpecSpecificDimension() {
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100,"
+ " \"partitionDimension\":\"foo\""
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
150
);
Assert.assertEquals(
"getPartitionDimension",
partitionsSpec.getPartitionDimension(),
"foo"
);
}
@Test
public void testPartitionsSpecLegacy() {
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"targetPartitionSize\":100,"
+ "\"partitionDimension\":\"foo\""
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
150
);
Assert.assertEquals(
"getPartitionDimension",
partitionsSpec.getPartitionDimension(),
"foo"
);
}
@Test
public void testPartitionsSpecMaxPartitionSize() {
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100,"
+ " \"maxPartitionSize\":200,"
+ " \"partitionDimension\":\"foo\""
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
throw Throwables.propagate(e);
}
final PartitionsSpec partitionsSpec = cfg.getPartitionsSpec();
Assert.assertEquals(
"isDeterminingPartitions",
partitionsSpec.isDeterminingPartitions(),
true
);
Assert.assertEquals(
"getTargetPartitionSize",
partitionsSpec.getTargetPartitionSize(),
100
);
Assert.assertEquals(
"getMaxPartitionSize",
partitionsSpec.getMaxPartitionSize(),
200
);
Assert.assertEquals(
"getPartitionDimension",
partitionsSpec.getPartitionDimension(),
"foo"
);
}
@Test
public void testInvalidPartitionsCombination() {
boolean thrown = false;
try {
final HadoopDruidIndexerConfig cfg = jsonReadWriteRead(
"{"
+ "\"targetPartitionSize\":100,"
+ "\"partitionsSpec\":{"
+ " \"targetPartitionSize\":100"
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
thrown = true;
}
Assert.assertTrue("Exception thrown", thrown);
}
@Test
public void testDbUpdaterJobSpec() throws Exception
{
final HadoopDruidIndexerConfig cfg;
cfg = jsonReadWriteRead(
"{"
+ "\"updaterJobSpec\":{\n"
+ " \"type\" : \"db\",\n"
+ " \"connectURI\" : \"jdbc:mysql://localhost/druid\",\n"
+ " \"user\" : \"rofl\",\n"
+ " \"password\" : \"p4ssw0rd\",\n"
+ " \"segmentTable\" : \"segments\"\n"
+ " }"
+ "}",
HadoopDruidIndexerConfig.class
);
final DbUpdaterJobSpec spec = (DbUpdaterJobSpec) cfg.getUpdaterJobSpec();
Assert.assertEquals("segments", spec.getSegmentTable());
Assert.assertEquals("jdbc:mysql://localhost/druid", spec.getDatabaseConnectURI());
Assert.assertEquals("rofl", spec.getDatabaseUser());
Assert.assertEquals("p4ssw0rd", spec.getDatabasePassword());
Assert.assertEquals(false, spec.useValidationQuery());
}
@Test
public void testDefaultSettings() {
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
throw Throwables.propagate(e);
}
Assert.assertEquals(
"cleanupOnFailure",
cfg.isCleanupOnFailure(),
true
);
Assert.assertEquals(
"overwriteFiles",
cfg.isOverwriteFiles(),
false
);
Assert.assertEquals(
"isDeterminingPartitions",
cfg.getPartitionsSpec().isDeterminingPartitions(),
false
);
}
@Test
public void testNoCleanupOnFailure() {
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{\"cleanupOnFailure\":false}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
throw Throwables.propagate(e);
}
Assert.assertEquals(
"cleanupOnFailure",
cfg.isCleanupOnFailure(),
false
);
}
@Test
public void shouldMakeHDFSCompliantSegmentOutputPath() {
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"dataSource\": \"source\","
+ " \"granularitySpec\":{"
+ " \"type\":\"uniform\","
+ " \"gran\":\"hour\","
+ " \"intervals\":[\"2012-07-10/P1D\"]"
+ " },"
+ "\"segmentOutputPath\": \"hdfs://server:9100/tmp/druid/datatest\""
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
throw Throwables.propagate(e);
}
cfg.setVersion("some:brand:new:version");
Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712);
Path path = cfg.makeSegmentOutputPath(new DistributedFileSystem(), bucket);
Assert.assertEquals(
"hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version/4712",
path.toString()
);
}
@Test
public void shouldMakeDefaultSegmentOutputPathIfNotHDFS() {
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"dataSource\": \"the:data:source\","
+ " \"granularitySpec\":{"
+ " \"type\":\"uniform\","
+ " \"gran\":\"hour\","
+ " \"intervals\":[\"2012-07-10/P1D\"]"
+ " },"
+ "\"segmentOutputPath\": \"/tmp/dru:id/data:test\""
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
throw Throwables.propagate(e);
}
cfg.setVersion("some:brand:new:version");
Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712);
Path path = cfg.makeSegmentOutputPath(new LocalFileSystem(), bucket);
Assert.assertEquals("/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:version/4712", path.toString());
}
private <T> T jsonReadWriteRead(String s, Class<T> klass)
{
try {
return jsonMapper.readValue(jsonMapper.writeValueAsBytes(jsonMapper.readValue(s, klass)), klass);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}