| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.jackrabbit.oak.plugins.index.lucene.directory; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Set; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Sets; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.jackrabbit.oak.commons.IOUtils; |
| import org.apache.jackrabbit.oak.commons.PerfLogger; |
| import org.apache.jackrabbit.oak.commons.concurrent.NotifyingFutureTask; |
| import org.apache.jackrabbit.oak.plugins.index.lucene.IndexCopier; |
| import org.apache.lucene.store.Directory; |
| import org.apache.lucene.store.FilterDirectory; |
| import org.apache.lucene.store.IOContext; |
| import org.apache.lucene.store.IndexInput; |
| import org.apache.lucene.store.IndexOutput; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.collect.Maps.newConcurrentMap; |
| import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount; |
| |
| public class CopyOnWriteDirectory extends FilterDirectory { |
| private static final Logger log = LoggerFactory.getLogger(CopyOnWriteDirectory.class); |
| private static final PerfLogger PERF_LOGGER = new PerfLogger(LoggerFactory.getLogger(log.getName() + ".perf")); |
| private final IndexCopier indexCopier; |
| /** |
| * Signal for the background thread to stop processing changes. |
| */ |
| private final Callable<Void> STOP = new Callable<Void>() { |
| @Override |
| public Void call() throws Exception { |
| return null; |
| } |
| }; |
| private final Directory remote; |
| private final Directory local; |
| private final Executor executor; |
| private final ConcurrentMap<String, COWFileReference> fileMap = newConcurrentMap(); |
| private final Set<String> deletedFilesLocal = Sets.newConcurrentHashSet(); |
| private final Set<String> skippedFiles = Sets.newConcurrentHashSet(); |
| |
| private final BlockingQueue<Callable<Void>> queue = new LinkedBlockingQueue<Callable<Void>>(); |
| private final AtomicReference<Throwable> errorInCopy = new AtomicReference<Throwable>(); |
| private final CountDownLatch copyDone = new CountDownLatch(1); |
| private final boolean reindexMode; |
| private final String indexPath; |
| |
| /** |
| * Current background task |
| */ |
| private volatile NotifyingFutureTask currentTask = NotifyingFutureTask.completed(); |
| |
| /** |
| * Completion handler: set the current task to the next task and schedules that one |
| * on the background thread. |
| */ |
| private final Runnable completionHandler = new Runnable() { |
| Callable<Void> task = new Callable<Void>() { |
| @SuppressWarnings("ThrowableResultOfMethodCallIgnored") |
| @Override |
| public Void call() throws Exception { |
| try { |
| Callable<Void> task = queue.poll(); |
| if (task != null && task != STOP) { |
| if (errorInCopy.get() != null) { |
| log.trace("[COW][{}] Skipping task {} as some exception occurred in previous run", |
| indexPath, task); |
| } else { |
| task.call(); |
| } |
| currentTask.onComplete(completionHandler); |
| } |
| |
| //Signal that all tasks completed |
| if (task == STOP){ |
| copyDone.countDown(); |
| } |
| } catch (Throwable t) { |
| errorInCopy.set(t); |
| log.debug("[COW][{}] Error occurred while copying files. Further processing would " + |
| "be skipped", indexPath, t); |
| currentTask.onComplete(completionHandler); |
| } |
| return null; |
| } |
| }; |
| |
| @Override |
| public void run() { |
| currentTask = new NotifyingFutureTask(task); |
| try { |
| executor.execute(currentTask); |
| } catch (RejectedExecutionException e){ |
| checkIfClosed(false); |
| throw e; |
| } |
| } |
| }; |
| |
| public CopyOnWriteDirectory(IndexCopier indexCopier, Directory remote, Directory local, boolean reindexMode, |
| String indexPath, Executor executor) throws |
| IOException { |
| super(local); |
| this.indexCopier = indexCopier; |
| this.remote = remote; |
| this.local = local; |
| this.executor = executor; |
| this.indexPath = indexPath; |
| this.reindexMode = reindexMode; |
| initialize(); |
| } |
| |
| @Override |
| public String[] listAll() throws IOException { |
| return Iterables.toArray(fileMap.keySet(), String.class); |
| } |
| |
| @Override |
| public boolean fileExists(String name) throws IOException { |
| return fileMap.containsKey(name); |
| } |
| |
| @Override |
| public void deleteFile(String name) throws IOException { |
| log.trace("[COW][{}] Deleted file {}", indexPath, name); |
| COWFileReference ref = fileMap.remove(name); |
| if (ref != null) { |
| ref.delete(); |
| } |
| } |
| |
| @Override |
| public long fileLength(String name) throws IOException { |
| COWFileReference ref = fileMap.get(name); |
| if (ref == null) { |
| throw new FileNotFoundException(name); |
| } |
| return ref.fileLength(); |
| } |
| |
| @Override |
| public IndexOutput createOutput(String name, IOContext context) throws IOException { |
| COWFileReference ref = fileMap.remove(name); |
| if (ref != null) { |
| ref.delete(); |
| } |
| ref = new COWLocalFileReference(name); |
| fileMap.put(name, ref); |
| return ref.createOutput(context); |
| } |
| |
| @Override |
| public void sync(Collection<String> names) throws IOException { |
| for (String name : names){ |
| COWFileReference file = fileMap.get(name); |
| if (file != null){ |
| file.sync(); |
| } |
| } |
| } |
| |
| @Override |
| public IndexInput openInput(String name, IOContext context) throws IOException { |
| COWFileReference ref = fileMap.get(name); |
| if (ref == null) { |
| throw new FileNotFoundException(name); |
| } |
| return ref.openInput(context); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| int pendingCopies = queue.size(); |
| addTask(STOP); |
| |
| //Wait for all pending copy task to finish |
| try { |
| long start = PERF_LOGGER.start(); |
| |
| //Loop untill queue finished or IndexCopier |
| //found to be closed. Doing it with timeout to |
| //prevent any bug causing the thread to wait indefinitely |
| while (!copyDone.await(10, TimeUnit.SECONDS)) { |
| if (indexCopier.isClosed()) { |
| throw new IndexCopierClosedException("IndexCopier found to be closed " + |
| "while processing copy task for" + remote.toString()); |
| } |
| } |
| PERF_LOGGER.end(start, -1, "[COW][{}] Completed pending copying task {}", indexPath, pendingCopies); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new IOException(e); |
| } |
| |
| Throwable t = errorInCopy.get(); |
| if (t != null){ |
| throw new IOException("Error occurred while copying files for " + indexPath, t); |
| } |
| |
| //Sanity check |
| checkArgument(queue.isEmpty(), "Copy queue still " + |
| "has pending task left [%d]. %s", queue.size(), queue); |
| |
| long skippedFilesSize = getSkippedFilesSize(); |
| |
| for (String fileName : deletedFilesLocal){ |
| deleteLocalFile(fileName); |
| } |
| |
| indexCopier.skippedUpload(skippedFilesSize); |
| |
| String msg = "[COW][{}] CopyOnWrite stats : Skipped copying {} files with total size {}"; |
| if ((reindexMode && skippedFilesSize > 0) || skippedFilesSize > 10 * FileUtils.ONE_MB){ |
| log.info(msg, indexPath, skippedFiles.size(), humanReadableByteCount(skippedFilesSize)); |
| } else { |
| log.debug(msg, indexPath, skippedFiles.size(), humanReadableByteCount(skippedFilesSize)); |
| } |
| |
| if (log.isTraceEnabled()){ |
| log.trace("[COW][{}] File listing - Upon completion {}", indexPath, Arrays.toString(remote.listAll())); |
| } |
| |
| local.close(); |
| remote.close(); |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("[COW][%s] Local %s, Remote %s", indexPath, local, remote); |
| } |
| |
| private long getSkippedFilesSize() { |
| long size = 0; |
| for (String name : skippedFiles){ |
| try{ |
| if (local.fileExists(name)){ |
| size += local.fileLength(name); |
| } |
| } catch (Exception ignore){ |
| |
| } |
| } |
| return size; |
| } |
| |
| private void deleteLocalFile(String fileName) { |
| indexCopier.deleteFile(local, fileName, false); |
| } |
| |
| private void initialize() throws IOException { |
| for (String name : remote.listAll()) { |
| fileMap.put(name, new COWRemoteFileReference(name)); |
| } |
| |
| if (log.isTraceEnabled()){ |
| log.trace("[COW][{}] File listing - At start {}", indexPath, Arrays.toString(remote.listAll())); |
| } |
| } |
| |
| private void addCopyTask(final String name){ |
| indexCopier.scheduledForCopy(); |
| addTask(new Callable<Void>() { |
| @Override |
| public Void call() throws Exception { |
| indexCopier.copyDone(); |
| if (deletedFilesLocal.contains(name)){ |
| skippedFiles.add(name); |
| log.trace("[COW][{}] Skip copying of deleted file {}", indexPath, name); |
| return null; |
| } |
| long fileSize = local.fileLength(name); |
| LocalIndexFile file = new LocalIndexFile(local, name, fileSize, false); |
| long perfStart = PERF_LOGGER.start(); |
| long start = indexCopier.startCopy(file); |
| |
| local.copy(remote, name, name, IOContext.DEFAULT); |
| |
| indexCopier.doneCopy(file, start); |
| PERF_LOGGER.end(perfStart, 0, "[COW][{}] Copied to remote {} -- size: {}", |
| indexPath, name, IOUtils.humanReadableByteCount(fileSize)); |
| return null; |
| } |
| |
| @Override |
| public String toString() { |
| return "Copy: " + name; |
| } |
| }); |
| } |
| |
| private void addDeleteTask(final String name){ |
| addTask(new Callable<Void>() { |
| @Override |
| public Void call() throws Exception { |
| if (!skippedFiles.contains(name)) { |
| log.trace("[COW][{}] Marking as deleted {}", indexPath, name); |
| remote.deleteFile(name); |
| } |
| return null; |
| } |
| |
| @Override |
| public String toString() { |
| return "Delete : " + name; |
| } |
| }); |
| } |
| |
| private void addTask(Callable<Void> task){ |
| checkIfClosed(true); |
| queue.add(task); |
| currentTask.onComplete(completionHandler); |
| } |
| |
| private void checkIfClosed(boolean throwException) { |
| if (indexCopier.isClosed()) { |
| IndexCopierClosedException e = new IndexCopierClosedException("IndexCopier found to be closed " + |
| "while processing" +remote.toString()); |
| errorInCopy.set(e); |
| copyDone.countDown(); |
| |
| if (throwException) { |
| throw e; |
| } |
| } |
| } |
| |
| private abstract class COWFileReference { |
| protected final String name; |
| |
| public COWFileReference(String name) { |
| this.name = name; |
| } |
| |
| public abstract long fileLength() throws IOException; |
| |
| public abstract IndexInput openInput(IOContext context) throws IOException; |
| |
| public abstract IndexOutput createOutput(IOContext context) throws IOException; |
| |
| public abstract void delete() throws IOException; |
| |
| public void sync() throws IOException { |
| |
| } |
| } |
| |
| private class COWRemoteFileReference extends COWFileReference { |
| private final long length; |
| |
| public COWRemoteFileReference(String name) throws IOException { |
| super(name); |
| this.length = remote.fileLength(name); |
| } |
| |
| @Override |
| public long fileLength() throws IOException { |
| return length; |
| } |
| |
| @Override |
| public IndexInput openInput(IOContext context) throws IOException { |
| if (checkIfLocalValid() && !IndexCopier.REMOTE_ONLY.contains(name)) { |
| indexCopier.readFromLocal(false); |
| return local.openInput(name, context); |
| } |
| indexCopier.readFromRemote(false); |
| return remote.openInput(name, context); |
| } |
| |
| @Override |
| public IndexOutput createOutput(IOContext context) throws IOException { |
| throw new UnsupportedOperationException("Cannot create output for existing remote file " + name); |
| } |
| |
| @Override |
| public void delete() throws IOException { |
| //Remote file should not be deleted locally as it might be |
| //in use by existing opened IndexSearcher. It would anyway |
| //get deleted by CopyOnRead later |
| //For now just record that these need to be deleted to avoid |
| //potential concurrent access of the NodeBuilder |
| addDeleteTask(name); |
| } |
| |
| private boolean checkIfLocalValid() throws IOException { |
| boolean validLocalCopyPresent = local.fileExists(name); |
| |
| if (validLocalCopyPresent) { |
| long localFileLength = local.fileLength(name); |
| long remoteFileLength = remote.fileLength(name); |
| validLocalCopyPresent = localFileLength == remoteFileLength; |
| |
| if (!validLocalCopyPresent) { |
| log.warn("COWRemoteFileReference::file ({}) differs in length. local: {}; remote: {}, init-remote-length", |
| localFileLength, remoteFileLength, length); |
| } |
| } else if (!IndexCopier.REMOTE_ONLY.contains(name)) { |
| log.warn("COWRemoteFileReference::local file ({}) doesn't exist", name); |
| } |
| |
| return validLocalCopyPresent; |
| } |
| } |
| |
| private class COWLocalFileReference extends COWFileReference { |
| public COWLocalFileReference(String name) { |
| super(name); |
| } |
| |
| @Override |
| public long fileLength() throws IOException { |
| return local.fileLength(name); |
| } |
| |
| @Override |
| public IndexInput openInput(IOContext context) throws IOException { |
| return local.openInput(name, context); |
| } |
| |
| @Override |
| public IndexOutput createOutput(IOContext context) throws IOException { |
| log.debug("[COW][{}] Creating output {}", indexPath, name); |
| return new CopyOnCloseIndexOutput(local.createOutput(name, context)); |
| } |
| |
| @Override |
| public void delete() throws IOException { |
| addDeleteTask(name); |
| deletedFilesLocal.add(name); |
| } |
| |
| @Override |
| public void sync() throws IOException { |
| local.sync(Collections.singleton(name)); |
| } |
| |
| /** |
| * Implementation note - As we are decorating existing implementation |
| * we would need to ensure that we also override methods (non abstract) |
| * which might be implemented in say FSIndexInput like setLength |
| */ |
| private class CopyOnCloseIndexOutput extends IndexOutput { |
| private final IndexOutput delegate; |
| |
| public CopyOnCloseIndexOutput(IndexOutput delegate) { |
| this.delegate = delegate; |
| } |
| |
| @Override |
| public void flush() throws IOException { |
| delegate.flush(); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| delegate.close(); |
| //Schedule this file to be copied in background |
| addCopyTask(name); |
| } |
| |
| @Override |
| public long getFilePointer() { |
| return delegate.getFilePointer(); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public void seek(long pos) throws IOException { |
| delegate.seek(pos); |
| } |
| |
| @Override |
| public long length() throws IOException { |
| return delegate.length(); |
| } |
| |
| @Override |
| public void writeByte(byte b) throws IOException { |
| delegate.writeByte(b); |
| } |
| |
| @Override |
| public void writeBytes(byte[] b, int offset, int length) throws IOException { |
| delegate.writeBytes(b, offset, length); |
| } |
| |
| @Override |
| public void setLength(long length) throws IOException { |
| delegate.setLength(length); |
| } |
| } |
| } |
| } |