blob: c22a6adc7d098a51e9d6947e61aea439122897a5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.stream;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Producer that can write to streams
*/
public class StreamProducer extends DefaultProducer {
private static final transient Logger LOG = LoggerFactory.getLogger(StreamProducer.class);
private static final String TYPES = "out,err,file,header,url";
private static final String INVALID_URI = "Invalid uri, valid form: 'stream:{" + TYPES + "}'";
private static final List<String> TYPES_LIST = Arrays.asList(TYPES.split(","));
private StreamEndpoint endpoint;
private String uri;
public StreamProducer(StreamEndpoint endpoint, String uri) throws Exception {
super(endpoint);
this.endpoint = endpoint;
validateUri(uri);
}
public void process(Exchange exchange) throws Exception {
delay(endpoint.getDelay());
OutputStream outputStream = System.out;
boolean isSystemStream = false;
if ("out".equals(uri)) {
isSystemStream = true;
outputStream = System.out;
} else if ("err".equals(uri)) {
isSystemStream = true;
outputStream = System.err;
} else if ("file".equals(uri)) {
outputStream = resolveStreamFromFile();
} else if ("header".equals(uri)) {
outputStream = resolveStreamFromHeader(exchange.getIn().getHeader("stream"), exchange);
} else if ("url".equals(uri)) {
outputStream = resolveStreamFromUrl();
}
writeToStream(outputStream, exchange);
closeStream(outputStream, isSystemStream);
}
private OutputStream resolveStreamFromUrl() throws IOException {
String u = endpoint.getUrl();
ObjectHelper.notEmpty(u, "url");
LOG.debug("About to write to url: {}", u);
URL url = new URL(u);
URLConnection c = url.openConnection();
return c.getOutputStream();
}
private OutputStream resolveStreamFromFile() throws IOException {
String fileName = endpoint.getFileName();
ObjectHelper.notEmpty(fileName, "fileName");
LOG.debug("About to write to file: {}", fileName);
File f = new File(fileName);
// will create a new file if missing or append to existing
f.createNewFile();
return new FileOutputStream(f);
}
private OutputStream resolveStreamFromHeader(Object o, Exchange exchange) throws CamelExchangeException {
return exchange.getContext().getTypeConverter().convertTo(OutputStream.class, o);
}
private void delay(long ms) throws InterruptedException {
if (ms == 0) {
return;
}
LOG.trace("Delaying {} millis", ms);
Thread.sleep(ms);
}
private void writeToStream(OutputStream outputStream, Exchange exchange) throws IOException, CamelExchangeException {
Object body = exchange.getIn().getBody();
// if not a string then try as byte array first
if (!(body instanceof String)) {
byte[] bytes = exchange.getIn().getBody(byte[].class);
if (bytes != null) {
LOG.debug("Writing as byte[]: {} to {}", bytes, outputStream);
outputStream.write(bytes);
return;
}
}
// okay now fallback to mandatory converterable to string
String s = exchange.getIn().getMandatoryBody(String.class);
Charset charset = endpoint.getCharset();
Writer writer = new OutputStreamWriter(outputStream, charset);
BufferedWriter bw = new BufferedWriter(writer);
if (LOG.isDebugEnabled()) {
LOG.debug("Writing as text: {} to {} using encoding: {}", new Object[]{body, outputStream, charset});
}
bw.write(s);
bw.write("\n");
bw.flush();
}
private void closeStream(OutputStream outputStream, boolean isSystemStream) throws Exception {
// important: do not close the writer on a standard system.out etc.
if (outputStream != null && !isSystemStream) {
outputStream.close();
}
}
private void validateUri(String uri) throws Exception {
String[] s = uri.split(":");
if (s.length < 2) {
throw new IllegalArgumentException(INVALID_URI);
}
String[] t = s[1].split("\\?");
if (t.length < 1) {
throw new IllegalArgumentException(INVALID_URI);
}
this.uri = t[0].trim();
if (this.uri.startsWith("//")) {
this.uri = this.uri.substring(2);
}
if (!TYPES_LIST.contains(this.uri)) {
throw new IllegalArgumentException(INVALID_URI);
}
}
}