blob: 28f085a5d964830ff0a5b45bc3a21fc40976b3dc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.stream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultMessage;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Consumer that can read from streams
*/
public class StreamConsumer extends DefaultConsumer implements Runnable {
private static final transient Logger LOG = LoggerFactory.getLogger(StreamConsumer.class);
private static final String TYPES = "in,file,url";
private static final String INVALID_URI = "Invalid uri, valid form: 'stream:{" + TYPES + "}'";
private static final List<String> TYPES_LIST = Arrays.asList(TYPES.split(","));
private ExecutorService executor;
private InputStream inputStream = System.in;
private StreamEndpoint endpoint;
private String uri;
private boolean initialPromptDone;
private final List<Object> lines = new CopyOnWriteArrayList<Object>();
public StreamConsumer(StreamEndpoint endpoint, Processor processor, String uri) throws Exception {
super(endpoint, processor);
this.endpoint = endpoint;
this.uri = uri;
validateUri(uri);
}
@Override
protected void doStart() throws Exception {
super.doStart();
initializeStream();
executor = endpoint.getCamelContext().getExecutorServiceStrategy().newSingleThreadExecutor(this, endpoint.getEndpointUri());
executor.execute(this);
if (endpoint.getGroupLines() < 0) {
throw new IllegalArgumentException("Option groupLines must be 0 or positive number, was " + endpoint.getGroupLines());
}
}
@Override
public void doStop() throws Exception {
// important: do not close the stream as it will close the standard
// system.in etc.
if (executor != null) {
executor.shutdownNow();
executor = null;
}
lines.clear();
super.doStop();
}
public void run() {
try {
readFromStream();
} catch (Exception e) {
getExceptionHandler().handleException(e);
}
}
private BufferedReader initializeStream() throws Exception {
if ("in".equals(uri)) {
inputStream = System.in;
} else if ("file".equals(uri)) {
inputStream = resolveStreamFromFile();
} else if ("url".equals(uri)) {
inputStream = resolveStreamFromUrl();
}
Charset charset = endpoint.getCharset();
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, charset));
return br;
}
private void readFromStream() throws Exception {
String line;
BufferedReader br = initializeStream();
if (endpoint.isScanStream()) {
// repeat scanning from stream
while (isRunAllowed()) {
line = br.readLine();
if (LOG.isTraceEnabled()) {
LOG.trace("Read line: " + line);
}
boolean eos = line == null;
if (!eos && isRunAllowed()) {
processLine(line);
} else if (eos && isRunAllowed() && endpoint.isRetry()) {
//try and re-open stream
br = initializeStream();
}
try {
Thread.sleep(endpoint.getScanStreamDelay());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
} else {
// regular read stream once until end of stream
boolean eos = false;
while (!eos && isRunAllowed()) {
if (endpoint.getPromptMessage() != null) {
doPromptMessage();
}
line = br.readLine();
if (LOG.isTraceEnabled()) {
LOG.trace("Read line: " + line);
}
eos = line == null;
if (!eos && isRunAllowed()) {
processLine(line);
}
}
}
// important: do not close the reader as it will close the standard system.in etc.
}
/**
* Strategy method for processing the line
*/
protected synchronized void processLine(Object line) throws Exception {
if (endpoint.getGroupLines() > 0) {
// remember line
lines.add(line);
// should we flush lines?
if (lines.size() >= endpoint.getGroupLines()) {
// spit out lines
Exchange exchange = endpoint.createExchange();
// create message with the lines
Message msg = new DefaultMessage();
List<Object> copy = new ArrayList<Object>(lines);
msg.setBody(copy);
exchange.setIn(msg);
// clear lines
lines.clear();
getProcessor().process(exchange);
}
} else {
// single line
Exchange exchange = endpoint.createExchange();
Message msg = new DefaultMessage();
msg.setBody(line);
exchange.setIn(msg);
getProcessor().process(exchange);
}
}
/**
* Strategy method for prompting the prompt message
*/
protected void doPromptMessage() {
long delay = 0;
if (!initialPromptDone && endpoint.getInitialPromptDelay() > 0) {
initialPromptDone = true;
delay = endpoint.getInitialPromptDelay();
} else if (endpoint.getPromptDelay() > 0) {
delay = endpoint.getPromptDelay();
}
if (delay > 0) {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (inputStream == System.in) {
System.out.print(endpoint.getPromptMessage());
}
}
private InputStream resolveStreamFromUrl() throws IOException {
String u = endpoint.getUrl();
ObjectHelper.notEmpty(u, "url");
if (LOG.isDebugEnabled()) {
LOG.debug("About to read from url: " + u);
}
URL url = new URL(u);
URLConnection c = url.openConnection();
return c.getInputStream();
}
private InputStream resolveStreamFromFile() throws IOException {
String fileName = endpoint.getFileName();
ObjectHelper.notEmpty(fileName, "fileName");
FileInputStream fileStream;
File file = new File(fileName);
if (LOG.isDebugEnabled()) {
LOG.debug("File to be scanned : " + file.getName() + ", path : " + file.getAbsolutePath());
}
if (file.canRead()) {
fileStream = new FileInputStream(file);
} else {
throw new IllegalArgumentException(INVALID_URI);
}
return fileStream;
}
private void validateUri(String uri) throws IllegalArgumentException {
String[] s = uri.split(":");
if (s.length < 2) {
throw new IllegalArgumentException(INVALID_URI);
}
String[] t = s[1].split("\\?");
if (t.length < 1) {
throw new IllegalArgumentException(INVALID_URI);
}
this.uri = t[0].trim();
if (this.uri.startsWith("//")) {
this.uri = this.uri.substring(2);
}
if (!TYPES_LIST.contains(this.uri)) {
throw new IllegalArgumentException(INVALID_URI);
}
}
}