blob: e812ff7ac71eb1e28b865e4b92cc510dd139201d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.loading;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.hamcrest.CoreMatchers;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
public class SegmentLocalCacheManagerConcurrencyTest
{
@Rule
public final TemporaryFolder tmpFolder = new TemporaryFolder();
@Rule
public final ExpectedException expectedException = ExpectedException.none();
private final ObjectMapper jsonMapper;
private final String dataSource = "test_ds";
private final String segmentVersion;
private File localSegmentCacheFolder;
private SegmentLocalCacheManager manager;
private ExecutorService executorService;
public SegmentLocalCacheManagerConcurrencyTest()
{
jsonMapper = new DefaultObjectMapper();
jsonMapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local"));
jsonMapper.setInjectableValues(
new InjectableValues.Std().addValue(
LocalDataSegmentPuller.class,
new LocalDataSegmentPuller()
)
);
segmentVersion = DateTimes.nowUtc().toString();
}
@Before
public void setUp() throws Exception
{
EmittingLogger.registerEmitter(new NoopServiceEmitter());
localSegmentCacheFolder = tmpFolder.newFolder("segment_cache_folder");
final List<StorageLocationConfig> locations = new ArrayList<>();
// Each segment has the size of 1000 bytes. This deep storage is capable of storing up to 2 segments.
final StorageLocationConfig locationConfig = new StorageLocationConfig(localSegmentCacheFolder, 2000L, null);
locations.add(locationConfig);
manager = new SegmentLocalCacheManager(
new SegmentLoaderConfig().withLocations(locations),
jsonMapper
);
executorService = Execs.multiThreaded(4, "segment-loader-local-cache-manager-concurrency-test-%d");
}
@After
public void tearDown()
{
executorService.shutdownNow();
}
@Test
public void testGetSegment() throws IOException, ExecutionException, InterruptedException
{
final File localStorageFolder = tmpFolder.newFolder("local_storage_folder");
final List<DataSegment> segmentsToLoad = new ArrayList<>(4);
final Interval interval = Intervals.of("2019-01-01/P1D");
for (int partitionId = 0; partitionId < 4; partitionId++) {
final String segmentPath = Paths.get(
localStorageFolder.getCanonicalPath(),
dataSource,
StringUtils.format("%s_%s", interval.getStart().toString(), interval.getEnd().toString()),
segmentVersion,
String.valueOf(partitionId)
).toString();
// manually create a local segment under localStorageFolder
final File localSegmentFile = new File(
localStorageFolder,
segmentPath
);
localSegmentFile.mkdirs();
final File indexZip = new File(localSegmentFile, "index.zip");
indexZip.createNewFile();
final DataSegment segment = newSegment(interval, partitionId).withLoadSpec(
ImmutableMap.of(
"type",
"local",
"path",
localSegmentFile.getAbsolutePath()
)
);
segmentsToLoad.add(segment);
}
final List<Future> futures = segmentsToLoad
.stream()
.map(segment -> executorService.submit(() -> manager.getSegmentFiles(segment)))
.collect(Collectors.toList());
expectedException.expect(ExecutionException.class);
expectedException.expectCause(CoreMatchers.instanceOf(SegmentLoadingException.class));
expectedException.expectMessage("Failed to load segment");
for (Future future : futures) {
future.get();
}
}
private DataSegment newSegment(Interval interval, int partitionId)
{
return DataSegment.builder()
.dataSource(dataSource)
.interval(interval)
.loadSpec(
ImmutableMap.of(
"type",
"local",
"path",
"somewhere"
)
)
.version(segmentVersion)
.dimensions(ImmutableList.of())
.metrics(ImmutableList.of())
.shardSpec(new NumberedShardSpec(partitionId, 0))
.binaryVersion(9)
.size(1000L)
.build();
}
}