blob: 25c9cae4554ff436d2a95ba0556b1f7ff34d616c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.loading;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/**
*
*/
public class SegmentLocalCacheManager implements SegmentCacheManager
{
@VisibleForTesting
static final String DOWNLOAD_START_MARKER_FILE_NAME = "downloadStartMarker";
private static final EmittingLogger log = new EmittingLogger(SegmentLocalCacheManager.class);
private final SegmentLoaderConfig config;
private final ObjectMapper jsonMapper;
private final List<StorageLocation> locations;
// This directoryWriteRemoveLock is used when creating or removing a directory
private final Object directoryWriteRemoveLock = new Object();
/**
* A map between segment and referenceCountingLocks.
*
* These locks should be acquired whenever getting or deleting files for a segment.
* If different threads try to get or delete files simultaneously, one of them creates a lock first using
* {@link #createOrGetLock}. And then, all threads compete with each other to get the lock.
* Finally, the lock should be released using {@link #unlock}.
*
* An example usage is:
*
* final ReferenceCountingLock lock = createOrGetLock(segment);
* synchronized (lock) {
* try {
* doSomething();
* }
* finally {
* unlock(lock);
* }
* }
*/
private final ConcurrentHashMap<DataSegment, ReferenceCountingLock> segmentLocks = new ConcurrentHashMap<>();
private final StorageLocationSelectorStrategy strategy;
// Note that we only create this via injection in historical and realtime nodes. Peons create these
// objects via SegmentCacheManagerFactory objects, so that they can store segments in task-specific
// directories rather than statically configured directories.
@Inject
public SegmentLocalCacheManager(
List<StorageLocation> locations,
SegmentLoaderConfig config,
@Nonnull StorageLocationSelectorStrategy strategy,
@Json ObjectMapper mapper
)
{
this.config = config;
this.jsonMapper = mapper;
this.locations = locations;
this.strategy = strategy;
log.info("Using storage location strategy: [%s]", this.strategy.getClass().getSimpleName());
}
@VisibleForTesting
SegmentLocalCacheManager(
SegmentLoaderConfig config,
@Nonnull StorageLocationSelectorStrategy strategy,
@Json ObjectMapper mapper
)
{
this(config.toStorageLocations(), config, strategy, mapper);
}
/**
* creates instance with default storage location selector strategy
*
* This ctor is mainly for test cases, including test cases in other modules
*/
@VisibleForTesting
public SegmentLocalCacheManager(
SegmentLoaderConfig config,
@Json ObjectMapper mapper
)
{
this.config = config;
this.jsonMapper = mapper;
this.locations = config.toStorageLocations();
this.strategy = new LeastBytesUsedStorageLocationSelectorStrategy(locations);
log.info("Using storage location strategy: [%s]", this.strategy.getClass().getSimpleName());
}
static String getSegmentDir(DataSegment segment)
{
return DataSegmentPusher.getDefaultStorageDir(segment, false);
}
@Override
public boolean isSegmentCached(final DataSegment segment)
{
return findStoragePathIfCached(segment) != null;
}
/**
* This method will try to find if the segment is already downloaded on any location. If so, the segment path
* is returned. Along with that, location state is also updated with the segment location. Refer to
* {@link StorageLocation#maybeReserve(String, DataSegment)} for more details.
* If the segment files are damaged in any location, they are removed from the location.
* @param segment - Segment to check
* @return - Path corresponding to segment directory if found, null otherwise.
*/
@Nullable
private File findStoragePathIfCached(final DataSegment segment)
{
for (StorageLocation location : locations) {
String storageDir = getSegmentDir(segment);
File localStorageDir = location.segmentDirectoryAsFile(storageDir);
if (localStorageDir.exists()) {
if (checkSegmentFilesIntact(localStorageDir)) {
log.warn(
"[%s] may be damaged. Delete all the segment files and pull from DeepStorage again.",
localStorageDir.getAbsolutePath()
);
cleanupCacheFiles(location.getPath(), localStorageDir);
location.removeSegmentDir(localStorageDir, segment);
break;
} else {
// Before returning, we also reserve the space. Refer to the StorageLocation#maybeReserve documentation for details.
location.maybeReserve(storageDir, segment);
return localStorageDir;
}
}
}
return null;
}
/**
* check data intact.
* @param dir segments cache dir
* @return true means segment files may be damaged.
*/
private boolean checkSegmentFilesIntact(File dir)
{
return checkSegmentFilesIntactWithStartMarker(dir);
}
/**
* If there is 'downloadStartMarker' existed in localStorageDir, the segments files might be damaged.
* Because each time, Druid will delete the 'downloadStartMarker' file after pulling and unzip the segments from DeepStorage.
* downloadStartMarker existed here may mean something error during download segments and the segment files may be damaged.
*/
private boolean checkSegmentFilesIntactWithStartMarker(File localStorageDir)
{
final File downloadStartMarker = new File(localStorageDir.getPath(), DOWNLOAD_START_MARKER_FILE_NAME);
return downloadStartMarker.exists();
}
/**
* Make sure segments files in loc is intact, otherwise function like loadSegments will failed because of segment files is damaged.
* @param segment
* @return
* @throws SegmentLoadingException
*/
@Override
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
{
final ReferenceCountingLock lock = createOrGetLock(segment);
synchronized (lock) {
try {
File segmentDir = findStoragePathIfCached(segment);
if (segmentDir != null) {
return segmentDir;
}
return loadSegmentWithRetry(segment);
}
finally {
unlock(segment, lock);
}
}
}
/**
* If we have already reserved a location before, probably via {@link #reserve(DataSegment)}, then only that location
* should be tried. Otherwise, we would fetch locations using {@link StorageLocationSelectorStrategy} and try all
* of them one by one till there is success.
* Location may fail because of IO failure, most likely in two cases:<p>
* 1. druid don't have the write access to this location, most likely the administrator doesn't config it correctly<p>
* 2. disk failure, druid can't read/write to this disk anymore
* <p>
* Locations are fetched using {@link StorageLocationSelectorStrategy}.
*/
private File loadSegmentWithRetry(DataSegment segment) throws SegmentLoadingException
{
String segmentDir = getSegmentDir(segment);
// Try the already reserved location. If location has been reserved outside, then we do not release the location
// here and simply delete any downloaded files. That is, we revert anything we do in this function and nothing else.
for (StorageLocation loc : locations) {
if (loc.isReserved(segmentDir)) {
File storageDir = loc.segmentDirectoryAsFile(segmentDir);
boolean success = loadInLocationWithStartMarkerQuietly(loc, segment, storageDir, false);
if (!success) {
throw new SegmentLoadingException("Failed to load segment %s in reserved location [%s]", segment.getId(), loc.getPath().getAbsolutePath());
}
return storageDir;
}
}
// No location was reserved so we try all the locations
Iterator<StorageLocation> locationsIterator = strategy.getLocations();
while (locationsIterator.hasNext()) {
StorageLocation loc = locationsIterator.next();
// storageDir is the file path corresponding to segment dir
File storageDir = loc.reserve(segmentDir, segment);
if (storageDir != null) {
boolean success = loadInLocationWithStartMarkerQuietly(loc, segment, storageDir, true);
if (success) {
return storageDir;
}
}
}
throw new SegmentLoadingException("Failed to load segment %s in all locations.", segment.getId());
}
/**
* A helper method over {@link #loadInLocationWithStartMarker(DataSegment, File)} that catches the {@link SegmentLoadingException}
* and emits alerts.
* @param loc - {@link StorageLocation} where segment is to be downloaded in.
* @param segment - {@link DataSegment} to download
* @param storageDir - {@link File} pointing to segment directory
* @param releaseLocation - Whether to release the location in case of failures
* @return - True if segment was downloaded successfully, false otherwise.
*/
private boolean loadInLocationWithStartMarkerQuietly(StorageLocation loc, DataSegment segment, File storageDir, boolean releaseLocation)
{
try {
loadInLocationWithStartMarker(segment, storageDir);
return true;
}
catch (SegmentLoadingException e) {
try {
log.makeAlert(
e,
"Failed to load segment in current location [%s], try next location if any",
loc.getPath().getAbsolutePath()
).addData("location", loc.getPath().getAbsolutePath()).emit();
}
finally {
if (releaseLocation) {
loc.removeSegmentDir(storageDir, segment);
}
cleanupCacheFiles(loc.getPath(), storageDir);
}
}
return false;
}
private void loadInLocationWithStartMarker(DataSegment segment, File storageDir) throws SegmentLoadingException
{
// We use a marker to prevent the case where a segment is downloaded, but before the download completes,
// the parent directories of the segment are removed
final File downloadStartMarker = new File(storageDir, DOWNLOAD_START_MARKER_FILE_NAME);
synchronized (directoryWriteRemoveLock) {
if (!storageDir.mkdirs()) {
log.debug("Unable to make parent file[%s]", storageDir);
}
try {
if (!downloadStartMarker.createNewFile()) {
throw new SegmentLoadingException("Was not able to create new download marker for [%s]", storageDir);
}
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Unable to create marker file for [%s]", storageDir);
}
}
loadInLocation(segment, storageDir);
if (!downloadStartMarker.delete()) {
throw new SegmentLoadingException("Unable to remove marker file for [%s]", storageDir);
}
}
private void loadInLocation(DataSegment segment, File storageDir) throws SegmentLoadingException
{
// LoadSpec isn't materialized until here so that any system can interpret Segment without having to have all the
// LoadSpec dependencies.
final LoadSpec loadSpec = jsonMapper.convertValue(segment.getLoadSpec(), LoadSpec.class);
final LoadSpec.LoadSpecResult result = loadSpec.loadSegment(storageDir);
if (result.getSize() != segment.getSize()) {
log.warn(
"Segment [%s] is different than expected size. Expected [%d] found [%d]",
segment.getId(),
segment.getSize(),
result.getSize()
);
}
}
@Override
public boolean reserve(final DataSegment segment)
{
final ReferenceCountingLock lock = createOrGetLock(segment);
synchronized (lock) {
try {
// May be the segment was already loaded [This check is required to account for restart scenarios]
if (null != findStoragePathIfCached(segment)) {
return true;
}
String storageDirStr = getSegmentDir(segment);
// check if we already reserved the segment
for (StorageLocation location : locations) {
if (location.isReserved(storageDirStr)) {
return true;
}
}
// Not found in any location, reserve now
for (Iterator<StorageLocation> it = strategy.getLocations(); it.hasNext(); ) {
StorageLocation location = it.next();
if (null != location.reserve(storageDirStr, segment)) {
return true;
}
}
}
finally {
unlock(segment, lock);
}
}
return false;
}
@Override
public boolean release(final DataSegment segment)
{
final ReferenceCountingLock lock = createOrGetLock(segment);
synchronized (lock) {
try {
String storageDir = getSegmentDir(segment);
// Release the first location encountered
for (StorageLocation location : locations) {
if (location.isReserved(storageDir)) {
File localStorageDir = location.segmentDirectoryAsFile(storageDir);
if (localStorageDir.exists()) {
throw new ISE(
"Asking to release a location '%s' while the segment directory '%s' is present on disk. Any state on disk must be deleted before releasing",
location.getPath().getAbsolutePath(),
localStorageDir.getAbsolutePath()
);
}
return location.release(storageDir, segment.getSize());
}
}
}
finally {
unlock(segment, lock);
}
}
return false;
}
@Override
public void cleanup(DataSegment segment)
{
if (!config.isDeleteOnRemove()) {
return;
}
final ReferenceCountingLock lock = createOrGetLock(segment);
synchronized (lock) {
try {
File loc = findStoragePathIfCached(segment);
if (loc == null) {
log.warn("Asked to cleanup something[%s] that didn't exist. Skipping.", segment.getId());
return;
}
// If storageDir.mkdirs() success, but downloadStartMarker.createNewFile() failed,
// in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not.
// So we should always clean all possible locations here
for (StorageLocation location : locations) {
File localStorageDir = new File(location.getPath(), getSegmentDir(segment));
if (localStorageDir.exists()) {
// Druid creates folders of the form dataSource/interval/version/partitionNum.
// We need to clean up all these directories if they are all empty.
cleanupCacheFiles(location.getPath(), localStorageDir);
location.removeSegmentDir(localStorageDir, segment);
}
}
}
finally {
unlock(segment, lock);
}
}
}
private void cleanupCacheFiles(File baseFile, File cacheFile)
{
if (cacheFile.equals(baseFile)) {
return;
}
synchronized (directoryWriteRemoveLock) {
log.info("Deleting directory[%s]", cacheFile);
try {
FileUtils.deleteDirectory(cacheFile);
}
catch (Exception e) {
log.error(e, "Unable to remove directory[%s]", cacheFile);
}
}
File parent = cacheFile.getParentFile();
if (parent != null) {
File[] children = parent.listFiles();
if (children == null || children.length == 0) {
cleanupCacheFiles(baseFile, parent);
}
}
}
private ReferenceCountingLock createOrGetLock(DataSegment dataSegment)
{
return segmentLocks.compute(
dataSegment,
(segment, lock) -> {
final ReferenceCountingLock nonNullLock;
if (lock == null) {
nonNullLock = new ReferenceCountingLock();
} else {
nonNullLock = lock;
}
nonNullLock.increment();
return nonNullLock;
}
);
}
@SuppressWarnings("ObjectEquality")
private void unlock(DataSegment dataSegment, ReferenceCountingLock lock)
{
segmentLocks.compute(
dataSegment,
(segment, existingLock) -> {
if (existingLock == null) {
throw new ISE("Lock has already been removed");
} else if (existingLock != lock) {
throw new ISE("Different lock instance");
} else {
if (existingLock.numReferences == 1) {
return null;
} else {
existingLock.decrement();
return existingLock;
}
}
}
);
}
@VisibleForTesting
private static class ReferenceCountingLock
{
private int numReferences;
private void increment()
{
++numReferences;
}
private void decrement()
{
--numReferences;
}
}
@VisibleForTesting
public ConcurrentHashMap<DataSegment, ReferenceCountingLock> getSegmentLocks()
{
return segmentLocks;
}
@VisibleForTesting
public List<StorageLocation> getLocations()
{
return locations;
}
}