blob: 2a2bc4ac96ba18275e70766cd1182c4f7ed005f1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.lang.Thread.State;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.CommonParams.ID;
/**
* @since 6.0.0
*/
public class DaemonStream extends TupleStream implements Expressible {
private TupleStream tupleStream;
private StreamRunner streamRunner;
private ArrayBlockingQueue<Tuple> queue;
private int queueSize;
private boolean eatTuples;
private AtomicLong iterations = new AtomicLong();
private long startTime;
private long stopTime;
private Exception exception;
private long runInterval;
private String id;
private Map<String, DaemonStream> daemons;
private boolean terminate;
private boolean closed = false;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public DaemonStream(StreamExpression expression, StreamFactory factory) throws IOException{
List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
TupleStream tupleStream = factory.constructStream(streamExpressions.get(0));
StreamExpressionNamedParameter idExpression = factory.getNamedOperand(expression, ID);
StreamExpressionNamedParameter runExpression = factory.getNamedOperand(expression, "runInterval");
StreamExpressionNamedParameter queueExpression = factory.getNamedOperand(expression, "queueSize");
StreamExpressionNamedParameter terminateExpression = factory.getNamedOperand(expression, "terminate");
String id = null;
long runInterval = 0L;
int queueSize = 0;
boolean terminate = false;
if(idExpression == null) {
throw new IOException("Invalid expression id parameter expected");
} else {
id = ((StreamExpressionValue) idExpression.getParameter()).getValue();
}
if(runExpression == null) {
runInterval = 2000;
} else {
runInterval = Long.parseLong(((StreamExpressionValue) runExpression.getParameter()).getValue());
}
if(queueExpression != null) {
queueSize= Integer.parseInt(((StreamExpressionValue) queueExpression.getParameter()).getValue());
}
if(terminateExpression != null) {
terminate = Boolean.parseBoolean(((StreamExpressionValue) terminateExpression.getParameter()).getValue());
}
if(1 != streamExpressions.size()){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
}
init(tupleStream, id, runInterval, queueSize, terminate);
}
public DaemonStream(TupleStream tupleStream, String id, long runInterval, int queueSize, boolean terminate) {
init(tupleStream, id, runInterval, queueSize, terminate);
}
public DaemonStream(TupleStream tupleStream, String id, long runInterval, int queueSize) {
this(tupleStream, id, runInterval, queueSize, false);
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException{
return toExpression(factory, true);
}
private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
if(includeStreams){
// streams
if(tupleStream instanceof Expressible){
expression.addParameter(((Expressible)tupleStream).toExpression(factory));
} else {
throw new IOException("This UniqueStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
}
else{
expression.addParameter("<stream>");
}
expression.addParameter(new StreamExpressionNamedParameter(ID, id));
expression.addParameter(new StreamExpressionNamedParameter("runInterval", Long.toString(runInterval)));
expression.addParameter(new StreamExpressionNamedParameter("queueSize", Integer.toString(queueSize)));
expression.addParameter(new StreamExpressionNamedParameter("terminate", Boolean.toString(terminate)));
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withChildren(new Explanation[] {
tupleStream.toExplanation(factory)
})
.withFunctionName(factory.getFunctionName(this.getClass()))
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression(toExpression(factory, false).toString());
}
public int remainingCapacity() {
return this.queue.remainingCapacity();
}
public void init(TupleStream tupleStream, String id, long runInterval, int queueSize) {
init(tupleStream, id, runInterval, queueSize, false);
}
@SuppressWarnings({"unchecked", "rawtypes"})
public void init(TupleStream tupleStream, String id, long runInterval, int queueSize, boolean terminate) {
this.tupleStream = tupleStream;
this.id = id;
this.runInterval = runInterval;
this.queueSize = queueSize;
this.terminate = terminate;
if(queueSize > 0) {
queue = new ArrayBlockingQueue(queueSize);
eatTuples = false;
} else {
eatTuples = true;
}
}
public int hashCode() {
return id.hashCode();
}
public boolean equals(Object o) {
if(o instanceof DaemonStream) {
return id.equals(((DaemonStream)o).id);
}
return false;
}
public String getId() {
return id;
}
public void open() throws IOException {
if (this.streamRunner != null && this.closed == false) {
log.error("There is already a running daemon named '{}', no action taken", id);
throw new IOException("There is already an open daemon named '" + id + "', no action taken.");
}
this.closed = false;
this.streamRunner = new StreamRunner(runInterval, id);
ExecutorService service = ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrNamedThreadFactory("DaemonStream-" + id));
try {
service.submit(this.streamRunner);
}
finally {
service.shutdown();
}
}
public Tuple read() throws IOException {
try {
return queue.take();
} catch (Exception e) {
throw new IOException(e);
}
}
public StreamComparator getStreamSort() {
return tupleStream.getStreamSort();
}
public void setStreamContext(StreamContext streamContext) {
this.tupleStream.setStreamContext(streamContext);
}
public void shutdown() {
streamRunner.setShutdown(true);
}
public void close() {
if(closed) {
return;
}
if (streamRunner != null) {
streamRunner.setShutdown(true);
}
this.closed = true;
}
public List<TupleStream> children() {
List<TupleStream> children = new ArrayList<>();
children.add(tupleStream);
return children;
}
public synchronized Tuple getInfo() {
Tuple tuple = new Tuple();
tuple.put(ID, id);
tuple.put("startTime", startTime);
tuple.put("stopTime", stopTime);
tuple.put("iterations", iterations.get());
tuple.put("state", streamRunner.getState().toString());
if (exception != null) {
tuple.put("exception", exception.getMessage());
}
return tuple;
}
public void setDaemons(Map<String, DaemonStream> daemons) {
this.daemons = daemons;
}
private synchronized void setStartTime(long startTime) {
this.startTime = startTime;
}
private synchronized void setStopTime(long stopTime) {
this.stopTime = stopTime;
}
private class StreamRunner implements Runnable {
private long sleepMillis = 1000;
private long runInterval;
private long lastRun;
private String id;
// a reference to the Thread that is executing the stream to track its state
private volatile Thread executingThread;
private boolean shutdown;
public StreamRunner(long runInterval, String id) {
this.runInterval = runInterval;
this.id = id;
}
public synchronized void setShutdown(boolean shutdown) {
this.shutdown = shutdown;
}
public synchronized boolean getShutdown() {
return shutdown;
}
public State getState() {
if (executingThread == null) {
if (shutdown) {
return Thread.State.TERMINATED;
} else {
return Thread.State.NEW;
}
} else {
return executingThread.getState();
}
}
public void run() {
executingThread = Thread.currentThread();
try {
stream();
} finally {
setShutdown(true);
executingThread = null;
}
}
private void stream() {
int errors = 0;
setStartTime(new Date().getTime());
OUTER:
while (!getShutdown()) {
long now = new Date().getTime();
if ((now - lastRun) > this.runInterval) {
lastRun = now;
try {
tupleStream.open();
INNER:
while (true) {
Tuple tuple = tupleStream.read();
if (tuple.EOF) {
errors = 0; // Reset errors on successful run.
if (tuple.getFields().containsKey("sleepMillis")) {
this.sleepMillis = tuple.getLong("sleepMillis");
if(terminate && sleepMillis > 0) {
//TopicStream provides sleepMillis > 0 if the last run had no Tuples.
//This means the topic queue is empty. Time to terminate.
//Remove ourselves from the daemons map.
if(daemons != null) {
daemons.remove(id);
}
//Break out of the thread loop and end the run.
break OUTER;
}
this.runInterval = -1;
}
break INNER;
} else if (!eatTuples) {
try {
queue.put(tuple);
} catch (InterruptedException e) {
break OUTER;
}
}
}
} catch (IOException e) {
exception = e;
log.error("Error in DaemonStream: {}", id, e);
++errors;
if (errors > 100) {
log.error("Too many consecutive errors. Stopping DaemonStream: {}", id);
break OUTER;
}
} catch (Throwable t) {
log.error("Fatal Error in DaemonStream: {}", id, t);
//For anything other then IOException break out of the loop and shutdown the thread.
break OUTER;
} finally {
try {
tupleStream.close();
} catch (IOException e1) {
if (exception == null) {
exception = e1;
log.error("Error in DaemonStream: {}", id, e1);
break OUTER;
}
}
}
}
iterations.incrementAndGet();
if (sleepMillis > 0) {
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
log.error("Error in DaemonStream:{}", id, e);
break OUTER;
}
}
}
if(!eatTuples) {
try {
queue.put(Tuple.EOF());
} catch (InterruptedException e) {
log.error("Error in DaemonStream:{}", id, e);
}
}
setStopTime(new Date().getTime());
}
}
}