blob: 87feb33119ae69403c23329b1b0176e0963b2038 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.replicator.nrt;
import java.io.Closeable;
import java.io.IOException;
import java.util.PriorityQueue;
import org.apache.lucene.store.AlreadyClosedException;
/** Runs CopyJob(s) in background thread; each ReplicaNode has an instance of this
* running. At a given there could be one NRT copy job running, and multiple
* pre-warm merged segments jobs. */
class Jobs extends Thread implements Closeable {
private final PriorityQueue<CopyJob> queue = new PriorityQueue<>();
private final Node node;
public Jobs(Node node) {
this.node = node;
}
private boolean finish;
/** Returns null if we are closing, else, returns the top job or waits for one to arrive if the queue is empty. */
private synchronized SimpleCopyJob getNextJob() {
while (true) {
if (finish) {
return null;
} else if (queue.isEmpty()) {
try {
wait();
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
} else {
return (SimpleCopyJob) queue.poll();
}
}
}
@Override
public void run() {
while (true) {
SimpleCopyJob topJob = getNextJob();
if (topJob == null) {
assert finish;
break;
}
this.setName("jobs o" + topJob.ord);
assert topJob != null;
boolean result;
try {
result = topJob.visit();
} catch (Throwable t) {
if ((t instanceof AlreadyClosedException) == false) {
node.message("exception during job.visit job=" + topJob + "; now cancel");
t.printStackTrace(System.out);
} else {
node.message("AlreadyClosedException during job.visit job=" + topJob + "; now cancel");
}
try {
topJob.cancel("unexpected exception in visit", t);
} catch (Throwable t2) {
node.message("ignore exception calling cancel: " + t2);
t2.printStackTrace(System.out);
}
try {
topJob.onceDone.run(topJob);
} catch (Throwable t2) {
node.message("ignore exception calling OnceDone: " + t2);
t2.printStackTrace(System.out);
}
continue;
}
if (result == false) {
// Job isn't done yet; put it back:
synchronized (this) {
queue.offer(topJob);
}
} else {
// Job finished, now notify caller:
try {
topJob.onceDone.run(topJob);
} catch (Throwable t) {
node.message("ignore exception calling OnceDone: " + t);
t.printStackTrace(System.out);
}
}
}
node.message("top: jobs now exit run thread");
synchronized(this) {
// Gracefully cancel any jobs we didn't finish:
while (queue.isEmpty() == false) {
SimpleCopyJob job = (SimpleCopyJob) queue.poll();
node.message("top: Jobs: now cancel job=" + job);
try {
job.cancel("jobs closing", null);
} catch (Throwable t) {
node.message("ignore exception calling cancel");
t.printStackTrace(System.out);
}
try {
job.onceDone.run(job);
} catch (Throwable t) {
node.message("ignore exception calling OnceDone");
t.printStackTrace(System.out);
}
}
}
}
public synchronized void launch(CopyJob job) {
if (finish == false) {
queue.offer(job);
notify();
} else {
throw new AlreadyClosedException("closed");
}
}
/** Cancels any existing jobs that are copying the same file names as this one */
public synchronized void cancelConflictingJobs(CopyJob newJob) throws IOException {
for (CopyJob job : queue) {
if (job.conflicts(newJob)) {
node.message("top: now cancel existing conflicting job=" + job + " due to newJob=" + newJob);
job.cancel("conflicts with new job", null);
}
}
}
@Override
public synchronized void close() {
finish = true;
notify();
try {
join();
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
}