| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.segment.loading; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.inject.Inject; |
| import org.apache.druid.guice.annotations.Json; |
| import org.apache.druid.java.util.common.FileUtils; |
| import org.apache.druid.java.util.common.ISE; |
| import org.apache.druid.java.util.emitter.EmittingLogger; |
| import org.apache.druid.timeline.DataSegment; |
| |
| import javax.annotation.Nonnull; |
| import javax.annotation.Nullable; |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| /** |
| */ |
| public class SegmentLocalCacheManager implements SegmentCacheManager |
| { |
| @VisibleForTesting |
| static final String DOWNLOAD_START_MARKER_FILE_NAME = "downloadStartMarker"; |
| |
| private static final EmittingLogger log = new EmittingLogger(SegmentLocalCacheManager.class); |
| |
| private final SegmentLoaderConfig config; |
| private final ObjectMapper jsonMapper; |
| |
| private final List<StorageLocation> locations; |
| |
| // This directoryWriteRemoveLock is used when creating or removing a directory |
| private final Object directoryWriteRemoveLock = new Object(); |
| |
| /** |
| * A map between segment and referenceCountingLocks. |
| * |
| * These locks should be acquired whenever getting or deleting files for a segment. |
| * If different threads try to get or delete files simultaneously, one of them creates a lock first using |
| * {@link #createOrGetLock}. And then, all threads compete with each other to get the lock. |
| * Finally, the lock should be released using {@link #unlock}. |
| * |
| * An example usage is: |
| * |
| * final ReferenceCountingLock lock = createOrGetLock(segment); |
| * synchronized (lock) { |
| * try { |
| * doSomething(); |
| * } |
| * finally { |
| * unlock(lock); |
| * } |
| * } |
| */ |
| private final ConcurrentHashMap<DataSegment, ReferenceCountingLock> segmentLocks = new ConcurrentHashMap<>(); |
| |
| private final StorageLocationSelectorStrategy strategy; |
| |
| // Note that we only create this via injection in historical and realtime nodes. Peons create these |
| // objects via SegmentCacheManagerFactory objects, so that they can store segments in task-specific |
| // directories rather than statically configured directories. |
| @Inject |
| public SegmentLocalCacheManager( |
| List<StorageLocation> locations, |
| SegmentLoaderConfig config, |
| @Nonnull StorageLocationSelectorStrategy strategy, |
| @Json ObjectMapper mapper |
| ) |
| { |
| this.config = config; |
| this.jsonMapper = mapper; |
| this.locations = locations; |
| this.strategy = strategy; |
| log.info("Using storage location strategy: [%s]", this.strategy.getClass().getSimpleName()); |
| } |
| |
| @VisibleForTesting |
| SegmentLocalCacheManager( |
| SegmentLoaderConfig config, |
| @Nonnull StorageLocationSelectorStrategy strategy, |
| @Json ObjectMapper mapper |
| ) |
| { |
| this(config.toStorageLocations(), config, strategy, mapper); |
| } |
| |
| /** |
| * creates instance with default storage location selector strategy |
| * |
| * This ctor is mainly for test cases, including test cases in other modules |
| */ |
| public SegmentLocalCacheManager( |
| SegmentLoaderConfig config, |
| @Json ObjectMapper mapper |
| ) |
| { |
| this.config = config; |
| this.jsonMapper = mapper; |
| this.locations = config.toStorageLocations(); |
| this.strategy = new LeastBytesUsedStorageLocationSelectorStrategy(locations); |
| log.info("Using storage location strategy: [%s]", this.strategy.getClass().getSimpleName()); |
| } |
| |
| @Override |
| public boolean isSegmentCached(final DataSegment segment) |
| { |
| return findStorageLocationIfLoaded(segment) != null; |
| } |
| |
| @Nullable |
| private StorageLocation findStorageLocationIfLoaded(final DataSegment segment) |
| { |
| for (StorageLocation location : locations) { |
| File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false)); |
| if (localStorageDir.exists()) { |
| if (checkSegmentFilesIntact(localStorageDir)) { |
| log.warn("[%s] may be damaged. Delete all the segment files and pull from DeepStorage again.", localStorageDir.getAbsolutePath()); |
| cleanupCacheFiles(location.getPath(), localStorageDir); |
| location.removeSegmentDir(localStorageDir, segment); |
| break; |
| } else { |
| return location; |
| } |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * check data intact. |
| * @param dir segments cache dir |
| * @return true means segment files may be damaged. |
| */ |
| private boolean checkSegmentFilesIntact(File dir) |
| { |
| return checkSegmentFilesIntactWithStartMarker(dir); |
| } |
| |
| /** |
| * If there is 'downloadStartMarker' existed in localStorageDir, the segments files might be damaged. |
| * Because each time, Druid will delete the 'downloadStartMarker' file after pulling and unzip the segments from DeepStorage. |
| * downloadStartMarker existed here may mean something error during download segments and the segment files may be damaged. |
| */ |
| private boolean checkSegmentFilesIntactWithStartMarker(File localStorageDir) |
| { |
| final File downloadStartMarker = new File(localStorageDir.getPath(), DOWNLOAD_START_MARKER_FILE_NAME); |
| return downloadStartMarker.exists(); |
| } |
| |
| /** |
| * Make sure segments files in loc is intact, otherwise function like loadSegments will failed because of segment files is damaged. |
| * @param segment |
| * @return |
| * @throws SegmentLoadingException |
| */ |
| @Override |
| public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException |
| { |
| final ReferenceCountingLock lock = createOrGetLock(segment); |
| synchronized (lock) { |
| try { |
| StorageLocation loc = findStorageLocationIfLoaded(segment); |
| String storageDir = DataSegmentPusher.getDefaultStorageDir(segment, false); |
| |
| if (loc == null) { |
| loc = loadSegmentWithRetry(segment, storageDir); |
| } else { |
| // If the segment is already downloaded on disk, we just update the current usage |
| loc.maybeReserve(storageDir, segment); |
| } |
| return new File(loc.getPath(), storageDir); |
| } |
| finally { |
| unlock(segment, lock); |
| } |
| } |
| } |
| |
| /** |
| * location may fail because of IO failure, most likely in two cases:<p> |
| * 1. druid don't have the write access to this location, most likely the administrator doesn't config it correctly<p> |
| * 2. disk failure, druid can't read/write to this disk anymore |
| * |
| * Locations are fetched using {@link StorageLocationSelectorStrategy}. |
| */ |
| private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException |
| { |
| Iterator<StorageLocation> locationsIterator = strategy.getLocations(); |
| |
| while (locationsIterator.hasNext()) { |
| |
| StorageLocation loc = locationsIterator.next(); |
| |
| File storageDir = loc.reserve(storageDirStr, segment); |
| if (storageDir != null) { |
| try { |
| loadInLocationWithStartMarker(segment, storageDir); |
| return loc; |
| } |
| catch (SegmentLoadingException e) { |
| try { |
| log.makeAlert( |
| e, |
| "Failed to load segment in current location [%s], try next location if any", |
| loc.getPath().getAbsolutePath() |
| ).addData("location", loc.getPath().getAbsolutePath()).emit(); |
| } |
| finally { |
| loc.removeSegmentDir(storageDir, segment); |
| cleanupCacheFiles(loc.getPath(), storageDir); |
| } |
| } |
| } |
| } |
| throw new SegmentLoadingException("Failed to load segment %s in all locations.", segment.getId()); |
| } |
| |
| private void loadInLocationWithStartMarker(DataSegment segment, File storageDir) throws SegmentLoadingException |
| { |
| // We use a marker to prevent the case where a segment is downloaded, but before the download completes, |
| // the parent directories of the segment are removed |
| final File downloadStartMarker = new File(storageDir, DOWNLOAD_START_MARKER_FILE_NAME); |
| synchronized (directoryWriteRemoveLock) { |
| if (!storageDir.mkdirs()) { |
| log.debug("Unable to make parent file[%s]", storageDir); |
| } |
| try { |
| if (!downloadStartMarker.createNewFile()) { |
| throw new SegmentLoadingException("Was not able to create new download marker for [%s]", storageDir); |
| } |
| } |
| catch (IOException e) { |
| throw new SegmentLoadingException(e, "Unable to create marker file for [%s]", storageDir); |
| } |
| } |
| loadInLocation(segment, storageDir); |
| |
| if (!downloadStartMarker.delete()) { |
| throw new SegmentLoadingException("Unable to remove marker file for [%s]", storageDir); |
| } |
| } |
| |
| private void loadInLocation(DataSegment segment, File storageDir) throws SegmentLoadingException |
| { |
| // LoadSpec isn't materialized until here so that any system can interpret Segment without having to have all the |
| // LoadSpec dependencies. |
| final LoadSpec loadSpec = jsonMapper.convertValue(segment.getLoadSpec(), LoadSpec.class); |
| final LoadSpec.LoadSpecResult result = loadSpec.loadSegment(storageDir); |
| if (result.getSize() != segment.getSize()) { |
| log.warn( |
| "Segment [%s] is different than expected size. Expected [%d] found [%d]", |
| segment.getId(), |
| segment.getSize(), |
| result.getSize() |
| ); |
| } |
| } |
| |
| @Override |
| public void cleanup(DataSegment segment) |
| { |
| if (!config.isDeleteOnRemove()) { |
| return; |
| } |
| |
| final ReferenceCountingLock lock = createOrGetLock(segment); |
| synchronized (lock) { |
| try { |
| StorageLocation loc = findStorageLocationIfLoaded(segment); |
| |
| if (loc == null) { |
| log.warn("Asked to cleanup something[%s] that didn't exist. Skipping.", segment.getId()); |
| return; |
| } |
| |
| // If storageDir.mkdirs() success, but downloadStartMarker.createNewFile() failed, |
| // in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not. |
| // So we should always clean all possible locations here |
| for (StorageLocation location : locations) { |
| File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false)); |
| if (localStorageDir.exists()) { |
| // Druid creates folders of the form dataSource/interval/version/partitionNum. |
| // We need to clean up all these directories if they are all empty. |
| cleanupCacheFiles(location.getPath(), localStorageDir); |
| location.removeSegmentDir(localStorageDir, segment); |
| } |
| } |
| } |
| finally { |
| unlock(segment, lock); |
| } |
| } |
| } |
| |
| private void cleanupCacheFiles(File baseFile, File cacheFile) |
| { |
| if (cacheFile.equals(baseFile)) { |
| return; |
| } |
| |
| synchronized (directoryWriteRemoveLock) { |
| log.info("Deleting directory[%s]", cacheFile); |
| try { |
| FileUtils.deleteDirectory(cacheFile); |
| } |
| catch (Exception e) { |
| log.error(e, "Unable to remove directory[%s]", cacheFile); |
| } |
| } |
| |
| File parent = cacheFile.getParentFile(); |
| if (parent != null) { |
| File[] children = parent.listFiles(); |
| if (children == null || children.length == 0) { |
| cleanupCacheFiles(baseFile, parent); |
| } |
| } |
| } |
| |
| private ReferenceCountingLock createOrGetLock(DataSegment dataSegment) |
| { |
| return segmentLocks.compute( |
| dataSegment, |
| (segment, lock) -> { |
| final ReferenceCountingLock nonNullLock; |
| if (lock == null) { |
| nonNullLock = new ReferenceCountingLock(); |
| } else { |
| nonNullLock = lock; |
| } |
| nonNullLock.increment(); |
| return nonNullLock; |
| } |
| ); |
| } |
| |
| @SuppressWarnings("ObjectEquality") |
| private void unlock(DataSegment dataSegment, ReferenceCountingLock lock) |
| { |
| segmentLocks.compute( |
| dataSegment, |
| (segment, existingLock) -> { |
| if (existingLock == null) { |
| throw new ISE("Lock has already been removed"); |
| } else if (existingLock != lock) { |
| throw new ISE("Different lock instance"); |
| } else { |
| if (existingLock.numReferences == 1) { |
| return null; |
| } else { |
| existingLock.decrement(); |
| return existingLock; |
| } |
| } |
| } |
| ); |
| } |
| |
| @VisibleForTesting |
| private static class ReferenceCountingLock |
| { |
| private int numReferences; |
| |
| private void increment() |
| { |
| ++numReferences; |
| } |
| |
| private void decrement() |
| { |
| --numReferences; |
| } |
| } |
| |
| @VisibleForTesting |
| public ConcurrentHashMap<DataSegment, ReferenceCountingLock> getSegmentLocks() |
| { |
| return segmentLocks; |
| } |
| |
| @VisibleForTesting |
| public List<StorageLocation> getLocations() |
| { |
| return locations; |
| } |
| } |