| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.lucene.replicator.nrt; |
| |
| import java.io.IOException; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.Locale; |
| import java.util.Map; |
| import java.util.Set; |
| import org.apache.lucene.util.IOUtils; |
| |
| /** Handles one set of files that need copying, either because we have a |
| * new NRT point, or we are pre-copying merged files for merge warming. */ |
| class SimpleCopyJob extends CopyJob { |
| final Connection c; |
| |
| final byte[] copyBuffer = new byte[65536]; |
| final CopyState copyState; |
| |
| private Iterator<Map.Entry<String,FileMetaData>> iter; |
| |
| public SimpleCopyJob(String reason, Connection c, CopyState copyState, SimpleReplicaNode dest, Map<String,FileMetaData> files, boolean highPriority, OnceDone onceDone) |
| throws IOException { |
| super(reason, files, dest, highPriority, onceDone); |
| dest.message("create SimpleCopyJob o" + ord); |
| this.c = c; |
| this.copyState = copyState; |
| } |
| |
| @Override |
| public synchronized void start() throws IOException { |
| if (iter == null) { |
| iter = toCopy.iterator(); |
| |
| // Send all file names / offsets up front to avoid ping-ping latency: |
| try { |
| |
| // This means we resumed an already in-progress copy; we do this one first: |
| if (current != null) { |
| c.out.writeByte((byte) 0); |
| c.out.writeString(current.name); |
| c.out.writeVLong(current.getBytesCopied()); |
| totBytes += current.metaData.length; |
| } |
| |
| for (Map.Entry<String,FileMetaData> ent : toCopy) { |
| String fileName = ent.getKey(); |
| FileMetaData metaData = ent.getValue(); |
| totBytes += metaData.length; |
| c.out.writeByte((byte) 0); |
| c.out.writeString(fileName); |
| c.out.writeVLong(0); |
| } |
| c.out.writeByte((byte) 1); |
| c.flush(); |
| c.s.shutdownOutput(); |
| |
| if (current != null) { |
| // Do this only at the end, after sending all requested files, so we don't deadlock due to socket buffering waiting for primary to |
| // send us this length: |
| long len = c.in.readVLong(); |
| if (len != current.metaData.length) { |
| throw new IllegalStateException("file " + current.name + ": meta data says length=" + current.metaData.length + " but c.in says " + len); |
| } |
| } |
| |
| dest.message("SimpleCopyJob.init: done start files count=" + toCopy.size() + " totBytes=" + totBytes); |
| |
| } catch (Throwable t) { |
| cancel("exc during start", t); |
| throw new NodeCommunicationException("exc during start", t); |
| } |
| } else { |
| throw new IllegalStateException("already started"); |
| } |
| } |
| |
| @Override |
| public long getTotalBytesCopied() { |
| return totBytesCopied; |
| } |
| |
| @Override |
| public Set<String> getFileNamesToCopy() { |
| Set<String> fileNames = new HashSet<>(); |
| for(Map.Entry<String,FileMetaData> ent : toCopy) { |
| fileNames.add(ent.getKey()); |
| } |
| return fileNames; |
| } |
| |
| @Override |
| public Set<String> getFileNames() { |
| return files.keySet(); |
| } |
| |
| /** Higher priority and then "first come first serve" order. */ |
| @Override |
| public int compareTo(CopyJob _other) { |
| SimpleCopyJob other = (SimpleCopyJob) _other; |
| if (highPriority != other.highPriority) { |
| return highPriority ? -1 : 1; |
| } else if (ord < other.ord) { |
| return -1; |
| } else if (ord > other.ord) { |
| return 1; |
| } else { |
| return 0; |
| } |
| } |
| |
| @Override |
| public void finish() throws IOException { |
| dest.message(String.format(Locale.ROOT, |
| "top: file copy done; took %.1f msec to copy %d bytes; now rename %d tmp files", |
| (System.nanoTime() - startNS)/1000000.0, |
| totBytesCopied, |
| copiedFiles.size())); |
| |
| // NOTE: if any of the files we copied overwrote a file in the current commit point, we (ReplicaNode) removed the commit point up |
| // front so that the commit is not corrupt. This way if we hit exc here, or if we crash here, we won't leave a corrupt commit in |
| // the index: |
| for(Map.Entry<String,String> ent : copiedFiles.entrySet()) { |
| String tmpFileName = ent.getValue(); |
| String fileName = ent.getKey(); |
| |
| if (Node.VERBOSE_FILES) { |
| dest.message("rename file " + tmpFileName + " to " + fileName); |
| } |
| |
| // NOTE: if this throws exception, then some files have been moved to their true names, and others are leftover .tmp files. I don't |
| // think heroic exception handling is necessary (no harm will come, except some leftover files), nor warranted here (would make the |
| // code more complex, for the exceptional cases when something is wrong w/ your IO system): |
| dest.dir.rename(tmpFileName, fileName); |
| } |
| |
| copiedFiles.clear(); |
| } |
| |
| /** Do an iota of work; returns true if all copying is done */ |
| synchronized boolean visit() throws IOException { |
| if (exc != null) { |
| // We were externally cancelled: |
| return true; |
| } |
| |
| if (current == null) { |
| if (iter.hasNext() == false) { |
| c.close(); |
| return true; |
| } |
| |
| Map.Entry<String,FileMetaData> next = iter.next(); |
| FileMetaData metaData = next.getValue(); |
| String fileName = next.getKey(); |
| long len = c.in.readVLong(); |
| if (len != metaData.length) { |
| throw new IllegalStateException("file " + fileName + ": meta data says length=" + metaData.length + " but c.in says " + len); |
| } |
| current = new CopyOneFile(c.in, dest, fileName, metaData, copyBuffer); |
| } |
| |
| if (current.visit()) { |
| // This file is done copying |
| copiedFiles.put(current.name, current.tmpName); |
| totBytesCopied += current.getBytesCopied(); |
| assert totBytesCopied <= totBytes: "totBytesCopied=" + totBytesCopied + " totBytes=" + totBytes; |
| current = null; |
| return false; |
| } |
| |
| return false; |
| } |
| |
| protected CopyOneFile newCopyOneFile(CopyOneFile prev) { |
| return new CopyOneFile(prev, c.in); |
| } |
| |
| @Override |
| public synchronized void transferAndCancel(CopyJob prevJob) throws IOException { |
| try { |
| super.transferAndCancel(prevJob); |
| } finally { |
| IOUtils.closeWhileHandlingException(((SimpleCopyJob) prevJob).c); |
| } |
| } |
| |
| public synchronized void cancel(String reason, Throwable exc) throws IOException { |
| try { |
| super.cancel(reason, exc); |
| } finally { |
| IOUtils.closeWhileHandlingException(c); |
| } |
| } |
| |
| @Override |
| public boolean getFailed() { |
| return exc != null; |
| } |
| |
| @Override |
| public String toString() { |
| return "SimpleCopyJob(ord=" + ord + " " + reason + " highPriority=" + highPriority + " files count=" + files.size() + " bytesCopied=" + totBytesCopied + " (of " + totBytes + ") filesCopied=" + copiedFiles.size() + ")"; |
| } |
| |
| @Override |
| public void runBlocking() throws IOException { |
| while (visit() == false); |
| |
| if (getFailed()) { |
| throw new RuntimeException("copy failed: " + cancelReason, exc); |
| } |
| } |
| |
| @Override |
| public CopyState getCopyState() { |
| return copyState; |
| } |
| |
| @Override |
| public synchronized boolean conflicts(CopyJob _other) { |
| Set<String> filesToCopy = new HashSet<>(); |
| for(Map.Entry<String,FileMetaData> ent : toCopy) { |
| filesToCopy.add(ent.getKey()); |
| } |
| |
| SimpleCopyJob other = (SimpleCopyJob) _other; |
| synchronized (other) { |
| for(Map.Entry<String,FileMetaData> ent : other.toCopy) { |
| if (filesToCopy.contains(ent.getKey())) { |
| return true; |
| } |
| } |
| } |
| |
| return false; |
| } |
| } |