blob: a91f238763bf923d6fd7626e1e49bb492dc63c82 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.stream;
import java.nio.charset.Charset;
import java.util.Map;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.apache.camel.support.DefaultEndpoint;
/**
* The stream: component provides access to the system-in, system-out and system-err streams as well as allowing streaming of file and URL.
*/
@UriEndpoint(firstVersion = "1.3.0", scheme = "stream", title = "Stream", syntax = "stream:kind", label = "file,system")
public class StreamEndpoint extends DefaultEndpoint {
private transient Charset charset;
@UriPath(enums = "in,out,err,header,file,url") @Metadata(required = true)
private String kind;
@UriParam
private String url;
@UriParam
private String fileName;
@UriParam(label = "consumer")
private boolean scanStream;
@UriParam(label = "consumer")
private boolean retry;
@UriParam(label = "consumer")
private boolean fileWatcher;
@UriParam(label = "producer")
private boolean closeOnDone;
@UriParam(label = "consumer")
private long scanStreamDelay;
@UriParam(label = "producer")
private long delay;
@UriParam
private String encoding;
@UriParam(label = "consumer")
private String promptMessage;
@UriParam(label = "consumer")
private long promptDelay;
@UriParam(label = "consumer", defaultValue = "2000")
private long initialPromptDelay = 2000;
@UriParam(label = "consumer")
private int groupLines;
@UriParam(label = "producer")
private int autoCloseCount;
@UriParam(label = "consumer")
private GroupStrategy groupStrategy = new DefaultGroupStrategy();
@UriParam(label = "advanced", prefix = "httpHeaders.", multiValue = true)
private Map<String, Object> httpHeaders;
@UriParam(label = "advanced")
private int connectTimeout;
@UriParam(label = "advanced")
private int readTimeout;
public StreamEndpoint(String endpointUri, Component component) throws Exception {
super(endpointUri, component);
}
public Consumer createConsumer(Processor processor) throws Exception {
StreamConsumer answer = new StreamConsumer(this, processor, getEndpointUri());
if (isFileWatcher() && !"file".equals(getKind())) {
throw new IllegalArgumentException("File watcher is only possible if reading streams from files");
}
configureConsumer(answer);
return answer;
}
public Producer createProducer() throws Exception {
return new StreamProducer(this, getEndpointUri());
}
protected Exchange createExchange(Object body, long index, boolean last) {
Exchange exchange = createExchange();
exchange.getIn().setBody(body);
exchange.getIn().setHeader(StreamConstants.STREAM_INDEX, index);
exchange.getIn().setHeader(StreamConstants.STREAM_COMPLETE, last);
return exchange;
}
// Properties
//-------------------------------------------------------------------------
public String getKind() {
return kind;
}
/**
* Kind of stream to use such as System.in or System.out.
*/
public void setKind(String kind) {
this.kind = kind;
}
public String getFileName() {
return fileName;
}
/**
* When using the stream:file URI format, this option specifies the filename to stream to/from.
*/
public void setFileName(String fileName) {
this.fileName = fileName;
}
public String getUrl() {
return url;
}
/**
* When using the stream:url URI format, this option specifies the URL to stream to/from.
* The input/output stream will be opened using the JDK URLConnection facility.
*/
public void setUrl(String url) {
this.url = url;
}
public long getDelay() {
return delay;
}
/**
* Initial delay in milliseconds before producing the stream.
*/
public void setDelay(long delay) {
this.delay = delay;
}
public String getEncoding() {
return encoding;
}
/**
* You can configure the encoding (is a charset name) to use text-based streams (for example, message body is a String object).
* If not provided, Camel uses the JVM default Charset.
*/
public void setEncoding(String encoding) {
this.encoding = encoding;
}
public String getPromptMessage() {
return promptMessage;
}
/**
* Message prompt to use when reading from stream:in; for example, you could set this to Enter a command:
*/
public void setPromptMessage(String promptMessage) {
this.promptMessage = promptMessage;
}
public long getPromptDelay() {
return promptDelay;
}
/**
* Optional delay in milliseconds before showing the message prompt.
*/
public void setPromptDelay(long promptDelay) {
this.promptDelay = promptDelay;
}
public long getInitialPromptDelay() {
return initialPromptDelay;
}
/**
* Initial delay in milliseconds before showing the message prompt. This delay occurs only once.
* Can be used during system startup to avoid message prompts being written while other logging is done to the system out.
*/
public void setInitialPromptDelay(long initialPromptDelay) {
this.initialPromptDelay = initialPromptDelay;
}
public boolean isScanStream() {
return scanStream;
}
/**
* To be used for continuously reading a stream such as the unix tail command.
*/
public void setScanStream(boolean scanStream) {
this.scanStream = scanStream;
}
public GroupStrategy getGroupStrategy() {
return groupStrategy;
}
/**
* Allows to use a custom GroupStrategy to control how to group lines.
*/
public void setGroupStrategy(GroupStrategy strategy) {
this.groupStrategy = strategy;
}
public boolean isRetry() {
return retry;
}
/**
* Will retry opening the stream if it's overwritten, somewhat like tail --retry
* <p/>
* If reading from files then you should also enable the fileWatcher option, to make it work reliable.
*/
public void setRetry(boolean retry) {
this.retry = retry;
}
public boolean isFileWatcher() {
return fileWatcher;
}
/**
* To use JVM file watcher to listen for file change events to support re-loading files that may be overwritten, somewhat like tail --retry
*/
public void setFileWatcher(boolean fileWatcher) {
this.fileWatcher = fileWatcher;
}
public boolean isCloseOnDone() {
return closeOnDone;
}
/**
* This option is used in combination with Splitter and streaming to the same file.
* The idea is to keep the stream open and only close when the Splitter is done, to improve performance.
* Mind this requires that you only stream to the same file, and not 2 or more files.
*/
public void setCloseOnDone(boolean closeOnDone) {
this.closeOnDone = closeOnDone;
}
public long getScanStreamDelay() {
return scanStreamDelay;
}
/**
* Delay in milliseconds between read attempts when using scanStream.
*/
public void setScanStreamDelay(long scanStreamDelay) {
this.scanStreamDelay = scanStreamDelay;
}
public int getGroupLines() {
return groupLines;
}
/**
* To group X number of lines in the consumer.
* For example to group 10 lines and therefore only spit out an Exchange with 10 lines, instead of 1 Exchange per line.
*/
public void setGroupLines(int groupLines) {
this.groupLines = groupLines;
}
public int getAutoCloseCount() {
return autoCloseCount;
}
/**
* Number of messages to process before closing stream on Producer side.
* Never close stream by default (only when Producer is stopped). If more messages are sent, the stream is reopened for another autoCloseCount batch.
*/
public void setAutoCloseCount(int autoCloseCount) {
this.autoCloseCount = autoCloseCount;
}
public Charset getCharset() {
return charset;
}
public Map<String, Object> getHttpHeaders() {
return httpHeaders;
}
/**
* Optional http headers to use in request when using HTTP URL.
*/
public void setHttpHeaders(Map<String, Object> httpHeaders) {
this.httpHeaders = httpHeaders;
}
public int getConnectTimeout() {
return connectTimeout;
}
/**
* Sets a specified timeout value, in milliseconds, to be used
* when opening a communications link to the resource referenced
* by this URLConnection. If the timeout expires before the
* connection can be established, a
* java.net.SocketTimeoutException is raised. A timeout of zero is
* interpreted as an infinite timeout.
*/
public void setConnectTimeout(int connectTimeout) {
this.connectTimeout = connectTimeout;
}
public int getReadTimeout() {
return readTimeout;
}
/**
* Sets the read timeout to a specified timeout, in
* milliseconds. A non-zero value specifies the timeout when
* reading from Input stream when a connection is established to a
* resource. If the timeout expires before there is data available
* for read, a java.net.SocketTimeoutException is raised. A
* timeout of zero is interpreted as an infinite timeout.
*/
public void setReadTimeout(int readTimeout) {
this.readTimeout = readTimeout;
}
// Implementations
//-------------------------------------------------------------------------
protected void doStart() throws Exception {
charset = loadCharset();
}
Charset loadCharset() {
if (encoding == null) {
encoding = Charset.defaultCharset().name();
log.debug("No encoding parameter using default charset: {}", encoding);
}
if (!Charset.isSupported(encoding)) {
throw new IllegalArgumentException("The encoding: " + encoding + " is not supported");
}
return Charset.forName(encoding);
}
}