blob: 5ad391a25384b368bbc54dbf6110536ec32aff30 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.worker.shuffle;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteSource;
import org.apache.commons.io.FileUtils;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
import org.apache.druid.timeline.partition.BuildingShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.ShardSpecLookup;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
public class LocalIntermediaryDataManagerManualAddAndDeleteTest
{
@Rule
public TemporaryFolder tempDir = new TemporaryFolder();
@Rule
public ExpectedException expectedException = ExpectedException.none();
private LocalIntermediaryDataManager intermediaryDataManager;
private File intermediarySegmentsLocation;
private File siblingLocation;
@Before
public void setup() throws IOException
{
final WorkerConfig workerConfig = new WorkerConfig();
intermediarySegmentsLocation = tempDir.newFolder();
siblingLocation = tempDir.newFolder();
final TaskConfig taskConfig = new TaskConfig(
null,
null,
null,
null,
null,
false,
null,
null,
ImmutableList.of(new StorageLocationConfig(intermediarySegmentsLocation, 600L, null)),
false,
false
);
final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();
intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
intermediaryDataManager.start();
}
@After
public void teardown() throws InterruptedException
{
intermediaryDataManager.stop();
}
@Test
public void testAddSegmentFailure() throws IOException
{
int i = 0;
for (; i < 4; i++) {
File segmentFile = generateSegmentDir("file_" + i);
DataSegment segment = newSegment(Intervals.of("2018/2019"), i);
intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile);
}
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Can't find location to handle segment");
File segmentFile = generateSegmentDir("file_" + i);
DataSegment segment = newSegment(Intervals.of("2018/2019"), 4);
intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile);
}
@Test
public void testFindPartitionFiles() throws IOException
{
final String supervisorTaskId = "supervisorTaskId";
final Interval interval = Intervals.of("2018/2019");
final int partitionId = 0;
for (int i = 0; i < 4; i++) {
final File segmentFile = generateSegmentDir("file_" + i);
final DataSegment segment = newSegment(interval, partitionId);
intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId_" + i, segment, segmentFile);
}
for (int i = 0; i < 4; i++) {
final Optional<ByteSource> file = intermediaryDataManager.findPartitionFile(
supervisorTaskId,
"subTaskId_" + i,
interval,
partitionId
);
Assert.assertTrue(file.isPresent());
}
}
@Test
public void deletePartitions() throws IOException
{
final String supervisorTaskId = "supervisorTaskId";
final Interval interval = Intervals.of("2018/2019");
for (int partitionId = 0; partitionId < 2; partitionId++) {
for (int subTaskId = 0; subTaskId < 2; subTaskId++) {
final File segmentFile = generateSegmentDir("file_" + partitionId + "_" + subTaskId);
final DataSegment segment = newSegment(interval, partitionId);
intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId_" + subTaskId, segment, segmentFile);
}
}
intermediaryDataManager.deletePartitions(supervisorTaskId);
for (int partitionId = 0; partitionId < 2; partitionId++) {
for (int subTaskId = 0; subTaskId < 2; subTaskId++) {
Assert.assertFalse(
intermediaryDataManager.findPartitionFile(supervisorTaskId, "subTaskId_" + subTaskId, interval, partitionId).isPresent()
);
}
}
}
@Test
public void testAddRemoveAdd() throws IOException
{
final String supervisorTaskId = "supervisorTaskId";
final Interval interval = Intervals.of("2018/2019");
int i = 0;
for (; i < 4; i++) {
File segmentFile = generateSegmentDir("file_" + i);
DataSegment segment = newSegment(interval, i);
intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile);
}
intermediaryDataManager.deletePartitions(supervisorTaskId);
File segmentFile = generateSegmentDir("file_" + i);
DataSegment segment = newSegment(interval, i);
intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile);
}
@Test
public void testFailsWithCraftyFabricatedNamesForDelete() throws IOException
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("supervisorTaskId cannot start with the '.' character.");
final String supervisorTaskId = "../" + siblingLocation.getName();
final String someFile = "sneaky-snake.txt";
File dataFile = new File(siblingLocation, someFile);
FileUtils.write(
dataFile,
"test data",
StandardCharsets.UTF_8
);
Assert.assertTrue(new File(intermediarySegmentsLocation, supervisorTaskId).exists());
Assert.assertTrue(dataFile.exists());
intermediaryDataManager.deletePartitions(supervisorTaskId);
Assert.assertTrue(new File(intermediarySegmentsLocation, supervisorTaskId).exists());
Assert.assertTrue(dataFile.exists());
}
@Test
public void testFailsWithCraftyFabricatedNamesForFind() throws IOException
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("supervisorTaskId cannot start with the '.' character.");
final String supervisorTaskId = "../" + siblingLocation.getName();
final Interval interval = Intervals.of("2018/2019");
final int partitionId = 0;
final String intervalAndPart =
StringUtils.format("%s/%s/%s", interval.getStart().toString(), interval.getEnd().toString(), partitionId);
final String someFile = "sneaky-snake.txt";
final String someFilePath = intervalAndPart + "/" + someFile;
// can only traverse to find files that are in a path of the form {start}/{end}/{partitionId}, so write a data file
// in a location like that
File dataFile = new File(siblingLocation, someFilePath);
FileUtils.write(
dataFile,
"test data",
StandardCharsets.UTF_8
);
Assert.assertTrue(new File(intermediarySegmentsLocation, supervisorTaskId).exists());
Assert.assertTrue(
new File(intermediarySegmentsLocation, supervisorTaskId + "/" + someFilePath).exists());
final Optional<ByteSource> foundFile1 = intermediaryDataManager.findPartitionFile(
supervisorTaskId,
someFile,
interval,
partitionId
);
Assert.assertFalse(foundFile1.isPresent());
}
private File generateSegmentDir(String fileName) throws IOException
{
// Each file size is 138 bytes after compression
final File segmentDir = tempDir.newFolder();
FileUtils.write(new File(segmentDir, fileName), "test data.", StandardCharsets.UTF_8);
return segmentDir;
}
private DataSegment newSegment(Interval interval, int bucketId)
{
return new DataSegment(
"dataSource",
interval,
"version",
null,
null,
null,
new TestShardSpec(bucketId),
9,
10
);
}
private static class TestShardSpec implements BucketNumberedShardSpec<BuildingShardSpec<ShardSpec>>
{
private final int bucketId;
private TestShardSpec(int bucketId)
{
this.bucketId = bucketId;
}
@Override
public int getBucketId()
{
return bucketId;
}
@Override
public BuildingShardSpec<ShardSpec> convert(int partitionId)
{
throw new UnsupportedOperationException();
}
@Override
public ShardSpecLookup getLookup(List<? extends ShardSpec> shardSpecs)
{
throw new UnsupportedOperationException();
}
}
}