blob: 9b4c5fc40de4b4193a3a516bd6d5801a6ce46d8b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.store.blob.process;
import java.io.Closeable;
import java.lang.invoke.MethodHandles;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.lucene.util.NamedThreadFactory;
import org.apache.solr.store.shared.SharedStoreManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link Runnable} that will start a set of threads {@link CorePullerThread} to process tasks
* pulling local core.
*/
public abstract class CoreSyncFeeder implements Runnable, Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final SharedStoreManager storeManager;
/**
* Maximum number of elements in the queue, NOT counting re-inserts after failures. Total queue size might therefore
* exceed this value by the number of syncThreads which is around 5 or 10...
* <p>
* Note that this queue sits behind another tracking queue (see {@link CorePullTracker}).
* The other queue has to be large, this one does not.
*/
protected static final int ALMOST_MAX_WORKER_QUEUE_SIZE = 2000;
/**
* When a transient error occurs, the number of attempts to sync with Blob before giving up. Attempts are spaced by
* at least 10 seconds (see {@link CorePullTask#MIN_RETRY_DELAY_MS}) which means we'll retry for at least 90 seconds
* before giving up. This is something to adjust as the implementation of the delay between retries is cleaned up,
* see {@link CorePullTask}.
*/
protected static final int MAX_ATTEMPTS = 10;
private final int numSyncThreads;
/**
* Used to interrupt close()
*/
private volatile Thread executionThread;
private volatile boolean closed = false;
protected CoreSyncFeeder(SharedStoreManager storeManager, int numSyncThreads) {
this.numSyncThreads = numSyncThreads;
this.storeManager = storeManager;
}
@Override
public void run() {
// Record where we run so we can be interrupted from close(). If we get interrupted before this point we will
// not
// close anything and not log anything, but that's ok we haven't started anything either.
this.executionThread = Thread.currentThread();
// If close() executed before runningFeeder was set, we need to exit
if (closed) { return; }
try {
// We'll be submitting tasks to a queue from which threads are going to pick them up and execute them.
// PeerSyncer uses a ThreadPoolExecutor for this. Here instead we explicitly start the threads to
// simplify our life (and the code), because by not using an executor we're not forced to use a
// BlockingQueue<Runnable> as the work queue and instead use a DeduplicatingList.
NamedThreadFactory threadFactory = new NamedThreadFactory(getMonsterThreadName());
// TODO set daemon?
Set<Thread> syncerThreads = new HashSet<>();
for (int i = 0; i < numSyncThreads; i++) {
Thread t = threadFactory.newThread(this.getSyncer());
syncerThreads.add(t);
}
try {
// Starting the threads after having created them so that we can interrupt all threads in the finally
for (Thread t : syncerThreads) {
t.start();
}
feedTheMonsters();
} catch (Throwable e) {
if (!closed) {
log.error("CoreSyncFeeder thread encountered an error and is exiting "
+ "while close() was not called. " + ExceptionUtils.getStackTrace(e));
}
} finally {
// If we stop, have our syncer "thread pool" stop as well since there's not much they can do anyway
// then...
for (Thread t : syncerThreads) {
t.interrupt();
}
}
} finally {
this.executionThread = null;
}
}
boolean shouldContinueRunning() {
return !this.storeManager.getCoreContainer().isShutDown();
}
@Override
public void close() {
if (!closed) {
closed = true;
Thread thread = this.executionThread;
if (thread != null) {
this.executionThread = null; // race to set to null but ok to try to interrupt twice
log.info(String.format(Locale.ROOT, "Closing CoreSyncFeeder; interrupting execution thread %s.", thread.getName()));
thread.interrupt();
} else {
log.warn("Closing CoreSyncFeeder before any syncer thread was started. Weird.");
}
}
}
abstract String getMonsterThreadName();
abstract void feedTheMonsters() throws InterruptedException;
abstract Runnable getSyncer();
}