| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.jackrabbit.oak.plugins.index.lucene; |
| |
| import java.io.Closeable; |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import javax.management.openmbean.CompositeDataSupport; |
| import javax.management.openmbean.CompositeType; |
| import javax.management.openmbean.OpenDataException; |
| import javax.management.openmbean.OpenType; |
| import javax.management.openmbean.SimpleType; |
| import javax.management.openmbean.TabularData; |
| import javax.management.openmbean.TabularDataSupport; |
| import javax.management.openmbean.TabularType; |
| |
| import com.google.common.base.Function; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Sets; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.jackrabbit.oak.plugins.index.lucene.directory.CopyOnReadDirectory; |
| import org.apache.jackrabbit.oak.plugins.index.lucene.directory.CopyOnWriteDirectory; |
| import org.apache.jackrabbit.oak.plugins.index.lucene.directory.DirectoryUtils; |
| import org.apache.jackrabbit.oak.plugins.index.lucene.directory.IndexRootDirectory; |
| import org.apache.jackrabbit.oak.plugins.index.lucene.directory.IndexSanityChecker; |
| import org.apache.jackrabbit.oak.plugins.index.lucene.directory.LocalIndexDir; |
| import org.apache.jackrabbit.oak.plugins.index.lucene.directory.LocalIndexFile; |
| import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition; |
| import org.apache.lucene.store.Directory; |
| import org.apache.lucene.store.FSDirectory; |
| import org.apache.lucene.store.FilterDirectory; |
| import org.apache.lucene.store.NoLockFactory; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static com.google.common.base.Preconditions.checkState; |
| import static com.google.common.collect.Iterables.toArray; |
| import static com.google.common.collect.Iterables.transform; |
| import static com.google.common.collect.Maps.newConcurrentMap; |
| import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount; |
| |
| /** |
| * Copies index files to/from the local disk and the datastore. |
| */ |
| public class IndexCopier implements CopyOnReadStatsMBean, Closeable { |
| public static final Set<String> REMOTE_ONLY = ImmutableSet.of("segments.gen"); |
| private static final int MAX_FAILURE_ENTRIES = 10000; |
| private static final String WORK_DIR_NAME = "indexWriterDir"; |
| |
| private static final Logger log = LoggerFactory.getLogger(IndexCopier.class); |
| private final Executor executor; |
| private final File indexWorkDir; |
| |
| private final AtomicInteger readerLocalReadCount = new AtomicInteger(); |
| private final AtomicInteger writerLocalReadCount = new AtomicInteger(); |
| private final AtomicInteger readerRemoteReadCount = new AtomicInteger(); |
| private final AtomicInteger writerRemoteReadCount = new AtomicInteger(); |
| private final AtomicInteger invalidFileCount = new AtomicInteger(); |
| private final AtomicInteger deletedFileCount = new AtomicInteger(); |
| private final AtomicInteger scheduledForCopyCount = new AtomicInteger(); |
| private final AtomicInteger copyInProgressCount = new AtomicInteger(); |
| private final AtomicInteger maxCopyInProgressCount = new AtomicInteger(); |
| private final AtomicInteger maxScheduledForCopyCount = new AtomicInteger(); |
| private final AtomicInteger uploadCount = new AtomicInteger(); |
| private final AtomicInteger downloadCount = new AtomicInteger(); |
| private final AtomicLong copyInProgressSize = new AtomicLong(); |
| private final AtomicLong downloadSize = new AtomicLong(); |
| private final AtomicLong uploadSize = new AtomicLong(); |
| private final AtomicLong garbageCollectedSize = new AtomicLong(); |
| private final AtomicLong skippedFromUploadSize = new AtomicLong(); |
| private final AtomicLong downloadTime = new AtomicLong(); |
| private final AtomicLong uploadTime = new AtomicLong(); |
| |
| |
| private final Map<String, String> indexPathVersionMapping = newConcurrentMap(); |
| private final ConcurrentMap<String, LocalIndexFile> failedToDeleteFiles = newConcurrentMap(); |
| private final Set<LocalIndexFile> copyInProgressFiles = Collections.newSetFromMap(new ConcurrentHashMap<LocalIndexFile, Boolean>()); |
| private final boolean prefetchEnabled; |
| private volatile boolean closed; |
| private final IndexRootDirectory indexRootDirectory; |
| private final Set<String> validatedIndexPaths = Sets.newConcurrentHashSet(); |
| |
| public IndexCopier(Executor executor, File indexRootDir) throws IOException { |
| this(executor, indexRootDir, false); |
| } |
| |
| public IndexCopier(Executor executor, File indexRootDir, boolean prefetchEnabled) throws IOException { |
| this.executor = executor; |
| this.prefetchEnabled = prefetchEnabled; |
| this.indexWorkDir = initializerWorkDir(indexRootDir); |
| this.indexRootDirectory = new IndexRootDirectory(indexRootDir); |
| } |
| |
| public Directory wrapForRead(String indexPath, LuceneIndexDefinition definition, |
| Directory remote, String dirName) throws IOException { |
| Directory local = createLocalDirForIndexReader(indexPath, definition, dirName); |
| checkIntegrity(indexPath, local, remote); |
| return new CopyOnReadDirectory(this, remote, local, prefetchEnabled, indexPath, executor); |
| } |
| |
| public Directory wrapForWrite(LuceneIndexDefinition definition, Directory remote, boolean reindexMode, String dirName) throws IOException { |
| Directory local = createLocalDirForIndexWriter(definition, dirName); |
| String indexPath = definition.getIndexPath(); |
| checkIntegrity(indexPath, local, remote); |
| return new CopyOnWriteDirectory(this, remote, local, reindexMode, indexPath, executor); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| this.closed = true; |
| } |
| |
| public boolean isClosed() { |
| return closed; |
| } |
| |
| File getIndexWorkDir() { |
| return indexWorkDir; |
| } |
| |
| IndexRootDirectory getIndexRootDirectory() { |
| return indexRootDirectory; |
| } |
| |
| protected Directory createLocalDirForIndexWriter(LuceneIndexDefinition definition, String dirName) throws IOException { |
| String indexPath = definition.getIndexPath(); |
| File indexWriterDir = getIndexDir(definition, indexPath, dirName); |
| |
| //By design indexing in Oak is single threaded so Lucene locking |
| //can be disabled |
| Directory dir = FSDirectory.open(indexWriterDir, NoLockFactory.getNoLockFactory()); |
| |
| log.debug("IndexWriter would use {}", indexWriterDir); |
| return dir; |
| } |
| |
| protected Directory createLocalDirForIndexReader(String indexPath, LuceneIndexDefinition definition, String dirName) throws IOException { |
| File indexDir = getIndexDir(definition, indexPath, dirName); |
| Directory result = FSDirectory.open(indexDir); |
| |
| String newPath = indexDir.getAbsolutePath(); |
| String oldPath = indexPathVersionMapping.put(createIndexPathKey(indexPath, dirName), newPath); |
| if (!newPath.equals(oldPath) && oldPath != null) { |
| result = new DeleteOldDirOnClose(result, new File(oldPath)); |
| } |
| return result; |
| } |
| |
| public File getIndexDir(IndexDefinition definition, String indexPath, String dirName) throws IOException { |
| return indexRootDirectory.getIndexDir(definition, indexPath, dirName); |
| } |
| |
| Map<String, LocalIndexFile> getFailedToDeleteFiles() { |
| return Collections.unmodifiableMap(failedToDeleteFiles); |
| } |
| |
| private void failedToDelete(LocalIndexFile file){ |
| //Limit the size on best effort basis |
| if (failedToDeleteFiles.size() < MAX_FAILURE_ENTRIES) { |
| LocalIndexFile failedToDeleteFile = failedToDeleteFiles.putIfAbsent(file.getKey(), file); |
| if (failedToDeleteFile == null){ |
| failedToDeleteFile = file; |
| } |
| failedToDeleteFile.incrementAttemptToDelete(); |
| } else { |
| log.warn("Not able to delete {}. Currently more than {} file with total size {} are pending delete.", |
| file.deleteLog(), failedToDeleteFiles.size(), getGarbageSize()); |
| } |
| } |
| |
| private void successfullyDeleted(LocalIndexFile file, boolean fileExisted){ |
| LocalIndexFile failedToDeleteFile = failedToDeleteFiles.remove(file.getKey()); |
| if (failedToDeleteFile != null){ |
| log.debug("Deleted : {}", failedToDeleteFile.deleteLog()); |
| } |
| |
| if (fileExisted){ |
| garbageCollectedSize.addAndGet(file.getSize()); |
| deletedFileCount.incrementAndGet(); |
| } |
| } |
| |
| private void checkIntegrity(String indexPath, Directory local, Directory remote) throws IOException { |
| if (validatedIndexPaths.contains(indexPath)){ |
| return; |
| } |
| |
| //The integrity check needs to be done for the very first time at startup when |
| //a directory gets created as at that time it can be ensured that there is no |
| //work in progress files, no memory mapping issue etc |
| //Also at this time its required that state in local dir should exactly same as |
| //one in remote dir |
| synchronized (validatedIndexPaths){ |
| new IndexSanityChecker(indexPath, local, remote).check(); |
| validatedIndexPaths.add(indexPath); |
| } |
| } |
| |
| /** |
| * Creates the workDir. If it exists then it is cleaned |
| * |
| * @param indexRootDir root directory under which all indexing related files are managed |
| * @return work directory. Always empty |
| */ |
| private static File initializerWorkDir(File indexRootDir) throws IOException { |
| File workDir = new File(indexRootDir, WORK_DIR_NAME); |
| FileUtils.deleteDirectory(workDir); |
| checkState(workDir.mkdirs(), "Cannot create directory %s", workDir); |
| return workDir; |
| } |
| |
| /** |
| * Create a unique key based on indexPath and dirName used under that path |
| */ |
| private static String createIndexPathKey(String indexPath, String dirName){ |
| return indexPath.concat(dirName); |
| } |
| |
| public boolean deleteFile(Directory dir, String fileName, boolean copiedFromRemote){ |
| LocalIndexFile file = new LocalIndexFile(dir, fileName, DirectoryUtils.getFileLength(dir, fileName), copiedFromRemote); |
| boolean successFullyDeleted = false; |
| try { |
| boolean fileExisted = false; |
| if (dir.fileExists(fileName)) { |
| fileExisted = true; |
| dir.deleteFile(fileName); |
| } |
| successfullyDeleted(file, fileExisted); |
| successFullyDeleted = true; |
| } catch (IOException e) { |
| failedToDelete(file); |
| log.debug("Error occurred while removing deleted file {} from Local {}. " + |
| "Attempt would be made to delete it on next run ", fileName, dir, e); |
| } |
| return successFullyDeleted; |
| } |
| |
| /** |
| * This method would return the latest modification timestamp from the set of file{@code names} |
| * on the file system. |
| * The parameter {@code localDir} is expected to be an instance of {@link FSDirectory} (or wrapped one in |
| * {@link FilterDirectory}. If this assumption doesn't hold, the method would return -1. |
| * Each of file names are expected to be existing in {@code localDir}. If this fails the method shall return -1. |
| * In case of any error while computing modified timestamps on the file system, the method shall return -1. |
| * @param names file names to evaluate on local FS |
| * @param localDir {@link Directory} implementation to be used to get the files |
| * @return latest timestamp or -1 (with logs) in case of any doubt |
| */ |
| public static long getNewestLocalFSTimestampFor(Set<String> names, Directory localDir) { |
| File localFSDir = LocalIndexFile.getFSDir(localDir); |
| |
| if (localFSDir == null) { |
| log.warn("Couldn't get FSDirectory instance for {}.", localDir); |
| return -1; |
| } |
| |
| long maxTS = 0L; |
| for (String name : names) { |
| File f = new File(localFSDir, name); |
| |
| if (!f.exists()) { |
| log.warn("File {} doesn't exist in {}", name, localFSDir); |
| return -1; |
| } |
| |
| long modTS = f.lastModified(); |
| if (modTS == 0L) { |
| log.warn("Couldn't get lastModification timestamp for {} in {}", name, localFSDir); |
| return -1; |
| } |
| |
| if (modTS > maxTS) { |
| maxTS = modTS; |
| } |
| } |
| |
| return maxTS; |
| } |
| |
| /** |
| * @param name file name to evaluate on local FS |
| * @param localDir {@link Directory} implementation to be used to get the file |
| * @param millis timestamp to compare file's modified timestamp against |
| * @return {@code true} if file referred to be {@code name} is modified before {@code millis}; false otherwise |
| */ |
| public static boolean isFileModifiedBefore(String name, Directory localDir, long millis) { |
| File localFSDir = LocalIndexFile.getFSDir(localDir); |
| |
| if (localFSDir == null) { |
| log.warn("Couldn't get FSDirectory instance for {}.", localDir); |
| return false; |
| } |
| |
| File f = new File(localFSDir, name); |
| if (!f.exists()) { |
| log.warn("File {} doesn't exist in {}", name, localFSDir); |
| return false; |
| } |
| |
| long modTS = f.lastModified(); |
| if (modTS == 0L) { |
| log.warn("Couldn't get lastModification timestamp for {} in {}", name, localFSDir); |
| return false; |
| } |
| |
| return modTS < millis; |
| } |
| |
| public long startCopy(LocalIndexFile file) { |
| updateMaxInProgress(copyInProgressCount.incrementAndGet()); |
| copyInProgressSize.addAndGet(file.getSize()); |
| copyInProgressFiles.add(file); |
| return System.currentTimeMillis(); |
| } |
| |
| public boolean isCopyInProgress(LocalIndexFile file) { |
| return copyInProgressFiles.contains(file); |
| } |
| |
| public void doneCopy(LocalIndexFile file, long start) { |
| copyInProgressFiles.remove(file); |
| copyInProgressCount.decrementAndGet(); |
| copyInProgressSize.addAndGet(-file.getSize()); |
| |
| if(file.isCopyFromRemote()) { |
| downloadTime.addAndGet(System.currentTimeMillis() - start); |
| downloadSize.addAndGet(file.getSize()); |
| downloadCount.incrementAndGet(); |
| } else { |
| uploadSize.addAndGet(file.getSize()); |
| uploadTime.addAndGet(System.currentTimeMillis() - start); |
| uploadCount.incrementAndGet(); |
| } |
| |
| } |
| |
| private void updateMaxScheduled(int val) { |
| synchronized (maxScheduledForCopyCount){ |
| int current = maxScheduledForCopyCount.get(); |
| if (val > current){ |
| maxScheduledForCopyCount.set(val); |
| } |
| } |
| } |
| |
| private void updateMaxInProgress(int val) { |
| synchronized (maxCopyInProgressCount){ |
| int current = maxCopyInProgressCount.get(); |
| if (val > current){ |
| maxCopyInProgressCount.set(val); |
| } |
| } |
| } |
| |
| |
| private class DeleteOldDirOnClose extends FilterDirectory { |
| private final File oldIndexDir; |
| |
| protected DeleteOldDirOnClose(Directory in, File oldIndexDir) { |
| super(in); |
| this.oldIndexDir = oldIndexDir; |
| } |
| |
| @Override |
| public void close() throws IOException { |
| try { |
| super.close(); |
| } finally { |
| //Clean out the local dir irrespective of any error occurring upon |
| //close in wrapped directory |
| try{ |
| long totalDeletedSize = FileUtils.sizeOf(oldIndexDir); |
| FileUtils.deleteDirectory(oldIndexDir); |
| totalDeletedSize += indexRootDirectory.gcEmptyDirs(oldIndexDir); |
| garbageCollectedSize.addAndGet(totalDeletedSize); |
| log.debug("Removed old index content from {} ", oldIndexDir); |
| } catch (IOException e){ |
| log.warn("Not able to remove old version of copied index at {}", oldIndexDir, e); |
| } |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "DeleteOldDirOnClose wrapper for " + getDelegate(); |
| } |
| } |
| |
| //~------------------------------------------< Stats Collection > |
| |
| public void skippedUpload(long skippedFilesSize) { |
| skippedFromUploadSize.addAndGet(skippedFilesSize); |
| } |
| |
| public void scheduledForCopy() { |
| updateMaxScheduled(scheduledForCopyCount.incrementAndGet()); |
| } |
| |
| public void copyDone(){ |
| scheduledForCopyCount.decrementAndGet(); |
| } |
| |
| public void readFromRemote(boolean reader) { |
| if (reader) { |
| readerRemoteReadCount.incrementAndGet(); |
| } else { |
| writerRemoteReadCount.incrementAndGet(); |
| } |
| } |
| |
| public void readFromLocal(boolean reader) { |
| if (reader) { |
| readerLocalReadCount.incrementAndGet(); |
| } else { |
| writerLocalReadCount.incrementAndGet(); |
| } |
| } |
| |
| public void foundInvalidFile(){ |
| invalidFileCount.incrementAndGet(); |
| } |
| |
| //~------------------------------------------< CopyOnReadStatsMBean > |
| |
| @Override |
| public TabularData getIndexPathMapping() { |
| TabularDataSupport tds; |
| try{ |
| TabularType tt = new TabularType(IndexMappingData.class.getName(), |
| "Lucene Index Stats", IndexMappingData.TYPE, new String[]{"jcrPath"}); |
| tds = new TabularDataSupport(tt); |
| for (LocalIndexDir indexDir : indexRootDirectory.getAllLocalIndexes()){ |
| String size = humanReadableByteCount(indexDir.size()); |
| tds.put(new CompositeDataSupport(IndexMappingData.TYPE, |
| IndexMappingData.FIELD_NAMES, |
| new String[]{indexDir.getJcrPath(), indexDir.getFSPath(), size})); |
| } |
| } catch (OpenDataException e){ |
| throw new IllegalStateException(e); |
| } catch (IOException e) { |
| throw new IllegalStateException(e); |
| } |
| return tds; |
| } |
| |
| @Override |
| public boolean isPrefetchEnabled() { |
| return prefetchEnabled; |
| } |
| |
| @Override |
| public int getReaderLocalReadCount() { |
| return readerLocalReadCount.get(); |
| } |
| |
| @Override |
| public int getReaderRemoteReadCount() { |
| return readerRemoteReadCount.get(); |
| } |
| |
| @Override |
| public int getWriterLocalReadCount() { |
| return writerLocalReadCount.get(); |
| } |
| |
| @Override |
| public int getWriterRemoteReadCount() { |
| return writerRemoteReadCount.get(); |
| } |
| |
| public int getInvalidFileCount(){ |
| return invalidFileCount.get(); |
| } |
| |
| @Override |
| public String getDownloadSize() { |
| return humanReadableByteCount(downloadSize.get()); |
| } |
| |
| @Override |
| public long getDownloadTime() { |
| return downloadTime.get(); |
| } |
| |
| @Override |
| public int getDownloadCount() { |
| return downloadCount.get(); |
| } |
| |
| @Override |
| public int getUploadCount() { |
| return uploadCount.get(); |
| } |
| |
| @Override |
| public String getUploadSize() { |
| return humanReadableByteCount(uploadSize.get()); |
| } |
| |
| @Override |
| public long getUploadTime() { |
| return uploadTime.get(); |
| } |
| |
| @Override |
| public String getLocalIndexSize() { |
| return humanReadableByteCount(indexRootDirectory.getSize()); |
| } |
| |
| @Override |
| public String[] getGarbageDetails() { |
| return toArray(transform(failedToDeleteFiles.values(), |
| new Function<LocalIndexFile, String>() { |
| @Override |
| public String apply(LocalIndexFile input) { |
| return input.deleteLog(); |
| } |
| }), String.class); |
| } |
| |
| @Override |
| public String getGarbageSize() { |
| long garbageSize = 0; |
| for (LocalIndexFile failedToDeleteFile : failedToDeleteFiles.values()){ |
| garbageSize += failedToDeleteFile.getSize(); |
| } |
| return humanReadableByteCount(garbageSize); |
| } |
| |
| @Override |
| public int getScheduledForCopyCount() { |
| return scheduledForCopyCount.get(); |
| } |
| |
| @Override |
| public int getCopyInProgressCount() { |
| return copyInProgressCount.get(); |
| } |
| |
| @Override |
| public String getCopyInProgressSize() { |
| return humanReadableByteCount(copyInProgressSize.get()); |
| } |
| |
| @Override |
| public int getMaxCopyInProgressCount() { |
| return maxCopyInProgressCount.get(); |
| } |
| |
| @Override |
| public int getMaxScheduledForCopyCount() { |
| return maxScheduledForCopyCount.get(); |
| } |
| |
| public String getSkippedFromUploadSize() { |
| return humanReadableByteCount(skippedFromUploadSize.get()); |
| } |
| |
| @Override |
| public String[] getCopyInProgressDetails() { |
| return toArray(transform(copyInProgressFiles, |
| new Function<LocalIndexFile, String>() { |
| @Override |
| public String apply(LocalIndexFile input) { |
| return input.copyLog(); |
| } |
| }), String.class); |
| } |
| |
| @Override |
| public int getDeletedFilesCount() { |
| return deletedFileCount.get(); |
| } |
| |
| @Override |
| public String getGarbageCollectedSize() { |
| return humanReadableByteCount(garbageCollectedSize.get()); |
| } |
| |
| private static class IndexMappingData { |
| static final String[] FIELD_NAMES = new String[]{ |
| "jcrPath", |
| "fsPath", |
| "size", |
| }; |
| |
| static final String[] FIELD_DESCRIPTIONS = new String[]{ |
| "JCR Path", |
| "Filesystem Path", |
| "Size", |
| }; |
| |
| static final OpenType[] FIELD_TYPES = new OpenType[]{ |
| SimpleType.STRING, |
| SimpleType.STRING, |
| SimpleType.STRING, |
| }; |
| |
| static final CompositeType TYPE = createCompositeType(); |
| |
| static CompositeType createCompositeType() { |
| try { |
| return new CompositeType( |
| IndexMappingData.class.getName(), |
| "Composite data type for Index Mapping Data", |
| IndexMappingData.FIELD_NAMES, |
| IndexMappingData.FIELD_DESCRIPTIONS, |
| IndexMappingData.FIELD_TYPES); |
| } catch (OpenDataException e) { |
| throw new IllegalStateException(e); |
| } |
| } |
| } |
| } |