| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.indexer.path; |
| |
| import com.fasterxml.jackson.core.JsonProcessingException; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.base.Joiner; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Lists; |
| import org.apache.druid.indexer.HadoopDruidIndexerConfig; |
| import org.apache.druid.indexer.HadoopIOConfig; |
| import org.apache.druid.indexer.HadoopIngestionSpec; |
| import org.apache.druid.indexer.HadoopTuningConfig; |
| import org.apache.druid.jackson.DefaultObjectMapper; |
| import org.apache.druid.java.util.common.Intervals; |
| import org.apache.druid.java.util.common.StringUtils; |
| import org.apache.druid.java.util.common.granularity.Granularities; |
| import org.apache.druid.java.util.common.granularity.Granularity; |
| import org.apache.druid.java.util.common.granularity.PeriodGranularity; |
| import org.apache.druid.query.aggregation.AggregatorFactory; |
| import org.apache.druid.segment.indexing.DataSchema; |
| import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.joda.time.DateTimeZone; |
| import org.joda.time.Period; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.TemporaryFolder; |
| |
| import java.io.IOException; |
| import java.util.Arrays; |
| |
| public class GranularityPathSpecTest |
| { |
| private static final HadoopTuningConfig DEFAULT_TUNING_CONFIG = new HadoopTuningConfig( |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| null, |
| false, |
| false, |
| false, |
| false, |
| null, |
| false, |
| false, |
| null, |
| null, |
| null, |
| false, |
| false, |
| null, |
| null, |
| null, |
| null |
| ); |
| |
| private GranularityPathSpec granularityPathSpec; |
| private final String TEST_STRING_PATH = "TEST"; |
| private final String TEST_STRING_PATTERN = "*.TEST"; |
| private final String TEST_STRING_FORMAT = "F_TEST"; |
| |
| private final ObjectMapper jsonMapper = new DefaultObjectMapper(); |
| |
| @Rule |
| public final TemporaryFolder testFolder = new TemporaryFolder(); |
| |
| @Before |
| public void setUp() |
| { |
| granularityPathSpec = new GranularityPathSpec(); |
| } |
| |
| @After |
| public void tearDown() |
| { |
| granularityPathSpec = null; |
| } |
| |
| @Test |
| public void testSetInputPath() |
| { |
| granularityPathSpec.setInputPath(TEST_STRING_PATH); |
| Assert.assertEquals(TEST_STRING_PATH, granularityPathSpec.getInputPath()); |
| } |
| |
| @Test |
| public void testSetFilePattern() |
| { |
| granularityPathSpec.setFilePattern(TEST_STRING_PATTERN); |
| Assert.assertEquals(TEST_STRING_PATTERN, granularityPathSpec.getFilePattern()); |
| } |
| |
| @Test |
| public void testSetPathFormat() |
| { |
| granularityPathSpec.setPathFormat(TEST_STRING_FORMAT); |
| Assert.assertEquals(TEST_STRING_FORMAT, granularityPathSpec.getPathFormat()); |
| } |
| |
| @Test |
| public void testSetDataGranularity() |
| { |
| Granularity granularity = Granularities.DAY; |
| granularityPathSpec.setDataGranularity(granularity); |
| Assert.assertEquals(granularity, granularityPathSpec.getDataGranularity()); |
| } |
| |
| @Test |
| public void testSerdeCustomInputFormat() throws Exception |
| { |
| testSerde("/test/path", "*.test", "pat_pat", Granularities.SECOND, TextInputFormat.class); |
| } |
| |
| @Test |
| public void testSerdeNoInputFormat() throws Exception |
| { |
| testSerde("/test/path", "*.test", "pat_pat", Granularities.SECOND, null); |
| } |
| |
| @Test |
| public void testAddInputPath() throws Exception |
| { |
| UserGroupInformation.setLoginUser(UserGroupInformation.createUserForTesting("test", new String[]{"testGroup"})); |
| HadoopIngestionSpec spec = new HadoopIngestionSpec( |
| new DataSchema( |
| "foo", |
| null, |
| new AggregatorFactory[0], |
| new UniformGranularitySpec( |
| Granularities.DAY, |
| Granularities.MINUTE, |
| ImmutableList.of(Intervals.of("2015-11-06T00:00Z/2015-11-07T00:00Z")) |
| ), |
| null, |
| jsonMapper |
| ), |
| new HadoopIOConfig(null, null, null), |
| DEFAULT_TUNING_CONFIG |
| ); |
| |
| granularityPathSpec.setDataGranularity(Granularities.HOUR); |
| granularityPathSpec.setFilePattern(".*"); |
| granularityPathSpec.setInputFormat(TextInputFormat.class); |
| |
| Job job = Job.getInstance(); |
| String formatStr = "file:%s/%s;org.apache.hadoop.mapreduce.lib.input.TextInputFormat"; |
| |
| testFolder.newFolder("test", "y=2015", "m=11", "d=06", "H=00"); |
| testFolder.newFolder("test", "y=2015", "m=11", "d=06", "H=02"); |
| testFolder.newFolder("test", "y=2015", "m=11", "d=06", "H=05"); |
| testFolder.newFile("test/y=2015/m=11/d=06/H=00/file1"); |
| testFolder.newFile("test/y=2015/m=11/d=06/H=02/file2"); |
| testFolder.newFile("test/y=2015/m=11/d=06/H=05/file3"); |
| testFolder.newFile("test/y=2015/m=11/d=06/H=05/file4"); |
| |
| granularityPathSpec.setInputPath(testFolder.getRoot().getPath() + "/test"); |
| |
| granularityPathSpec.addInputPaths(HadoopDruidIndexerConfig.fromSpec(spec), job); |
| |
| String actual = job.getConfiguration().get("mapreduce.input.multipleinputs.dir.formats"); |
| |
| String expected = Joiner.on(",").join(Lists.newArrayList( |
| StringUtils.format(formatStr, testFolder.getRoot(), "test/y=2015/m=11/d=06/H=00/file1"), |
| StringUtils.format(formatStr, testFolder.getRoot(), "test/y=2015/m=11/d=06/H=02/file2"), |
| StringUtils.format(formatStr, testFolder.getRoot(), "test/y=2015/m=11/d=06/H=05/file3"), |
| StringUtils.format(formatStr, testFolder.getRoot(), "test/y=2015/m=11/d=06/H=05/file4") |
| )); |
| |
| Assert.assertEquals("Did not find expected input paths", expected, actual); |
| } |
| |
| @Test |
| public void testIntervalTrimming() throws Exception |
| { |
| UserGroupInformation.setLoginUser(UserGroupInformation.createUserForTesting("test", new String[]{"testGroup"})); |
| HadoopIngestionSpec spec = new HadoopIngestionSpec( |
| new DataSchema( |
| "foo", |
| null, |
| new AggregatorFactory[0], |
| new UniformGranularitySpec( |
| Granularities.DAY, |
| Granularities.ALL, |
| ImmutableList.of(Intervals.of("2015-01-01T11Z/2015-01-02T05Z")) |
| ), |
| null, |
| jsonMapper |
| ), |
| new HadoopIOConfig(null, null, null), |
| DEFAULT_TUNING_CONFIG |
| ); |
| |
| granularityPathSpec.setDataGranularity(Granularities.HOUR); |
| granularityPathSpec.setPathFormat("yyyy/MM/dd/HH"); |
| granularityPathSpec.setFilePattern(".*"); |
| granularityPathSpec.setInputFormat(TextInputFormat.class); |
| |
| Job job = Job.getInstance(); |
| String formatStr = "file:%s/%s;org.apache.hadoop.mapreduce.lib.input.TextInputFormat"; |
| |
| createFile( |
| testFolder, |
| "test/2015/01/01/00/file1", "test/2015/01/01/10/file2", "test/2015/01/01/18/file3", "test/2015/01/02/00/file1", |
| "test/2015/01/02/03/file2", "test/2015/01/02/05/file3", "test/2015/01/02/07/file4", "test/2015/01/02/09/file5" |
| ); |
| |
| granularityPathSpec.setInputPath(testFolder.getRoot().getPath() + "/test"); |
| |
| granularityPathSpec.addInputPaths(HadoopDruidIndexerConfig.fromSpec(spec), job); |
| |
| String actual = job.getConfiguration().get("mapreduce.input.multipleinputs.dir.formats"); |
| |
| String expected = Joiner.on(",").join( |
| Lists.newArrayList( |
| StringUtils.format(formatStr, testFolder.getRoot(), "test/2015/01/01/18/file3"), |
| StringUtils.format(formatStr, testFolder.getRoot(), "test/2015/01/02/00/file1"), |
| StringUtils.format(formatStr, testFolder.getRoot(), "test/2015/01/02/03/file2") |
| ) |
| ); |
| |
| Assert.assertEquals("Did not find expected input paths", expected, actual); |
| } |
| |
| @Test |
| public void testBackwardCompatiblePeriodSegmentGranularitySerialization() throws JsonProcessingException |
| { |
| final PeriodGranularity pt2S = new PeriodGranularity(new Period("PT2S"), null, DateTimeZone.UTC); |
| Assert.assertNotEquals("\"SECOND\"", jsonMapper.writeValueAsString(pt2S)); |
| final Granularity pt1S = Granularities.SECOND; |
| Assert.assertEquals("\"SECOND\"", jsonMapper.writeValueAsString(pt1S)); |
| } |
| |
| private void createFile(TemporaryFolder folder, String... files) throws IOException |
| { |
| for (String file : files) { |
| String[] split = file.split("/"); |
| Assert.assertTrue(split.length > 1); |
| folder.newFolder(Arrays.copyOfRange(split, 0, split.length - 1)); |
| folder.newFile(file); |
| } |
| } |
| |
| private void testSerde( |
| String inputPath, |
| String filePattern, |
| String pathFormat, |
| Granularity granularity, |
| Class inputFormat) throws Exception |
| { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("{\"inputPath\" : \""); |
| sb.append(inputPath); |
| sb.append("\","); |
| sb.append("\"filePattern\" : \""); |
| sb.append(filePattern); |
| sb.append("\","); |
| sb.append("\"pathFormat\" : \""); |
| sb.append(pathFormat); |
| sb.append("\","); |
| sb.append("\"dataGranularity\" : "); |
| // Double-check Jackson's lower-case enum support |
| sb.append(jsonMapper.writeValueAsString(granularity)); |
| sb.append(","); |
| if (inputFormat != null) { |
| sb.append("\"inputFormat\" : \""); |
| sb.append(inputFormat.getName()); |
| sb.append("\","); |
| } |
| sb.append("\"type\" : \"granularity\"}"); |
| |
| GranularityPathSpec pathSpec = (GranularityPathSpec) StaticPathSpecTest.readWriteRead(sb.toString(), jsonMapper); |
| Assert.assertEquals(inputFormat, pathSpec.getInputFormat()); |
| Assert.assertEquals(inputPath, pathSpec.getInputPath()); |
| Assert.assertEquals(filePattern, pathSpec.getFilePattern()); |
| Assert.assertEquals(pathFormat, pathSpec.getPathFormat()); |
| Assert.assertEquals(granularity, pathSpec.getDataGranularity()); |
| } |
| } |