blob: 80e8dc6015f2db66106a8b2f3d02fa28f4ff2b92 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.Segment;
import org.apache.druid.timeline.DataSegment;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/**
*/
public class SegmentLoaderLocalCacheManager implements SegmentLoader
{
private static final EmittingLogger log = new EmittingLogger(SegmentLoaderLocalCacheManager.class);
private final IndexIO indexIO;
private final SegmentLoaderConfig config;
private final ObjectMapper jsonMapper;
private final List<StorageLocation> locations;
// This directoryWriteRemoveLock is used when creating or removing a directory
private final Object directoryWriteRemoveLock = new Object();
/**
* A map between segment and referenceCountingLocks.
*
* These locks should be acquired whenever getting or deleting files for a segment.
* If different threads try to get or delete files simultaneously, one of them creates a lock first using
* {@link #createOrGetLock}. And then, all threads compete with each other to get the lock.
* Finally, the lock should be released using {@link #unlock}.
*
* An example usage is:
*
* final ReferenceCountingLock lock = createOrGetLock(segment);
* synchronized (lock) {
* try {
* doSomething();
* }
* finally {
* unlock(lock);
* }
* }
*/
private final ConcurrentHashMap<DataSegment, ReferenceCountingLock> segmentLocks = new ConcurrentHashMap<>();
private final StorageLocationSelectorStrategy strategy;
// Note that we only create this via injection in historical and realtime nodes. Peons create these
// objects via SegmentLoaderFactory objects, so that they can store segments in task-specific
// directories rather than statically configured directories.
@Inject
public SegmentLoaderLocalCacheManager(
IndexIO indexIO,
SegmentLoaderConfig config,
@Json ObjectMapper mapper
)
{
this.indexIO = indexIO;
this.config = config;
this.jsonMapper = mapper;
this.locations = new ArrayList<>();
for (StorageLocationConfig locationConfig : config.getLocations()) {
locations.add(
new StorageLocation(
locationConfig.getPath(),
locationConfig.getMaxSize(),
locationConfig.getFreeSpacePercent()
)
);
}
this.strategy = config.getStorageLocationSelectorStrategy(locations);
}
@Override
public boolean isSegmentLoaded(final DataSegment segment)
{
return findStorageLocationIfLoaded(segment) != null;
}
private StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
{
for (StorageLocation location : locations) {
File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false));
if (localStorageDir.exists()) {
return location;
}
}
return null;
}
@Override
public Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadingException
{
final ReferenceCountingLock lock = createOrGetLock(segment);
final File segmentFiles;
synchronized (lock) {
try {
segmentFiles = getSegmentFiles(segment);
}
finally {
unlock(segment, lock);
}
}
File factoryJson = new File(segmentFiles, "factory.json");
final SegmentizerFactory factory;
if (factoryJson.exists()) {
try {
factory = jsonMapper.readValue(factoryJson, SegmentizerFactory.class);
}
catch (IOException e) {
throw new SegmentLoadingException(e, "%s", e.getMessage());
}
} else {
factory = new MMappedQueryableSegmentizerFactory(indexIO);
}
return factory.factorize(segment, segmentFiles, lazy);
}
@Override
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
{
final ReferenceCountingLock lock = createOrGetLock(segment);
synchronized (lock) {
try {
StorageLocation loc = findStorageLocationIfLoaded(segment);
String storageDir = DataSegmentPusher.getDefaultStorageDir(segment, false);
if (loc == null) {
loc = loadSegmentWithRetry(segment, storageDir);
}
return new File(loc.getPath(), storageDir);
}
finally {
unlock(segment, lock);
}
}
}
/**
* location may fail because of IO failure, most likely in two cases:<p>
* 1. druid don't have the write access to this location, most likely the administrator doesn't config it correctly<p>
* 2. disk failure, druid can't read/write to this disk anymore
*
* Locations are fetched using {@link StorageLocationSelectorStrategy}.
*/
private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException
{
Iterator<StorageLocation> locationsIterator = strategy.getLocations();
while (locationsIterator.hasNext()) {
StorageLocation loc = locationsIterator.next();
File storageDir = loc.reserve(storageDirStr, segment);
if (storageDir != null) {
try {
loadInLocationWithStartMarker(segment, storageDir);
return loc;
}
catch (SegmentLoadingException e) {
try {
log.makeAlert(
e,
"Failed to load segment in current location [%s], try next location if any",
loc.getPath().getAbsolutePath()
).addData("location", loc.getPath().getAbsolutePath()).emit();
}
finally {
loc.removeSegmentDir(storageDir, segment);
cleanupCacheFiles(loc.getPath(), storageDir);
}
}
}
}
throw new SegmentLoadingException("Failed to load segment %s in all locations.", segment.getId());
}
private void loadInLocationWithStartMarker(DataSegment segment, File storageDir) throws SegmentLoadingException
{
// We use a marker to prevent the case where a segment is downloaded, but before the download completes,
// the parent directories of the segment are removed
final File downloadStartMarker = new File(storageDir, "downloadStartMarker");
synchronized (directoryWriteRemoveLock) {
if (!storageDir.mkdirs()) {
log.debug("Unable to make parent file[%s]", storageDir);
}
try {
if (!downloadStartMarker.createNewFile()) {
throw new SegmentLoadingException("Was not able to create new download marker for [%s]", storageDir);
}
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Unable to create marker file for [%s]", storageDir);
}
}
loadInLocation(segment, storageDir);
if (!downloadStartMarker.delete()) {
throw new SegmentLoadingException("Unable to remove marker file for [%s]", storageDir);
}
}
private void loadInLocation(DataSegment segment, File storageDir) throws SegmentLoadingException
{
// LoadSpec isn't materialized until here so that any system can interpret Segment without having to have all the
// LoadSpec dependencies.
final LoadSpec loadSpec = jsonMapper.convertValue(segment.getLoadSpec(), LoadSpec.class);
final LoadSpec.LoadSpecResult result = loadSpec.loadSegment(storageDir);
if (result.getSize() != segment.getSize()) {
log.warn(
"Segment [%s] is different than expected size. Expected [%d] found [%d]",
segment.getId(),
segment.getSize(),
result.getSize()
);
}
}
@Override
public void cleanup(DataSegment segment)
{
if (!config.isDeleteOnRemove()) {
return;
}
final ReferenceCountingLock lock = createOrGetLock(segment);
synchronized (lock) {
try {
StorageLocation loc = findStorageLocationIfLoaded(segment);
if (loc == null) {
log.warn("Asked to cleanup something[%s] that didn't exist. Skipping.", segment.getId());
return;
}
// If storageDir.mkdirs() success, but downloadStartMarker.createNewFile() failed,
// in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not.
// So we should always clean all possible locations here
for (StorageLocation location : locations) {
File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false));
if (localStorageDir.exists()) {
// Druid creates folders of the form dataSource/interval/version/partitionNum.
// We need to clean up all these directories if they are all empty.
cleanupCacheFiles(location.getPath(), localStorageDir);
location.removeSegmentDir(localStorageDir, segment);
}
}
}
finally {
unlock(segment, lock);
}
}
}
private void cleanupCacheFiles(File baseFile, File cacheFile)
{
if (cacheFile.equals(baseFile)) {
return;
}
synchronized (directoryWriteRemoveLock) {
log.info("Deleting directory[%s]", cacheFile);
try {
FileUtils.deleteDirectory(cacheFile);
}
catch (Exception e) {
log.error(e, "Unable to remove directory[%s]", cacheFile);
}
}
File parent = cacheFile.getParentFile();
if (parent != null) {
File[] children = parent.listFiles();
if (children == null || children.length == 0) {
cleanupCacheFiles(baseFile, parent);
}
}
}
private ReferenceCountingLock createOrGetLock(DataSegment dataSegment)
{
return segmentLocks.compute(
dataSegment,
(segment, lock) -> {
final ReferenceCountingLock nonNullLock;
if (lock == null) {
nonNullLock = new ReferenceCountingLock();
} else {
nonNullLock = lock;
}
nonNullLock.increment();
return nonNullLock;
}
);
}
@SuppressWarnings("ObjectEquality")
private void unlock(DataSegment dataSegment, ReferenceCountingLock lock)
{
segmentLocks.compute(
dataSegment,
(segment, existingLock) -> {
if (existingLock == null) {
throw new ISE("Lock has already been removed");
} else if (existingLock != lock) {
throw new ISE("Different lock instance");
} else {
if (existingLock.numReferences == 1) {
return null;
} else {
existingLock.decrement();
return existingLock;
}
}
}
);
}
@VisibleForTesting
private static class ReferenceCountingLock
{
private int numReferences;
private void increment()
{
++numReferences;
}
private void decrement()
{
--numReferences;
}
}
@VisibleForTesting
public ConcurrentHashMap<DataSegment, ReferenceCountingLock> getSegmentLocks()
{
return segmentLocks;
}
@VisibleForTesting
public List<StorageLocation> getLocations()
{
return locations;
}
}