blob: 56d0b233385956ce996ca1b6c7e33100a3a7919b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.dataformat.beanio;
import java.io.File;
import java.io.InputStream;
import java.io.Reader;
import java.nio.charset.Charset;
import java.util.Properties;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Message;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.WrappedFile;
import org.apache.camel.support.ResourceHelper;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
import org.beanio.BeanReader;
import org.beanio.BeanReaderErrorHandler;
import org.beanio.StreamFactory;
import static org.apache.camel.dataformat.beanio.BeanIOHelper.getOrCreateBeanReaderErrorHandler;
/**
* You can use {@link BeanIOSplitter} with the Camel Splitter EIP to split big payloads
* using a stream mode to avoid reading the entire content into memory.
*/
public class BeanIOSplitter implements Expression {
private BeanIOConfiguration configuration = new BeanIOConfiguration();
private StreamFactory factory;
public BeanIOSplitter() throws Exception {
}
public BeanIOSplitter(BeanIOConfiguration configuration) {
this.configuration = configuration;
}
public BeanIOSplitter(String mapping, String streamName) {
setMapping(mapping);
setStreamName(streamName);
}
protected StreamFactory createStreamFactory(CamelContext camelContext) throws Exception {
ObjectHelper.notNull(getStreamName(), "Stream name not configured.");
// Create the stream factory that will be used to read/write objects.
StreamFactory answer = StreamFactory.newInstance();
// Load the mapping file using the resource helper to ensure it can be loaded in OSGi and other environments
InputStream is = ResourceHelper.resolveMandatoryResourceAsInputStream(camelContext, getMapping());
try {
if (getProperties() != null) {
answer.load(is, getProperties());
} else {
answer.load(is);
}
} finally {
IOHelper.close(is);
}
return answer;
}
public Object evaluate(Exchange exchange) throws Exception {
Message msg = exchange.getIn();
Object body = msg.getBody();
if (factory == null) {
factory = createStreamFactory(exchange.getContext());
}
BeanReader beanReader = null;
if (body instanceof WrappedFile) {
body = ((WrappedFile) body).getFile();
}
if (body instanceof File) {
File file = (File) body;
beanReader = factory.createReader(getStreamName(), file);
}
if (beanReader == null) {
Reader reader = msg.getMandatoryBody(Reader.class);
beanReader = factory.createReader(getStreamName(), reader);
}
BeanIOIterator iterator = new BeanIOIterator(beanReader);
BeanReaderErrorHandler errorHandler = getOrCreateBeanReaderErrorHandler(configuration, exchange, null, iterator);
beanReader.setErrorHandler(errorHandler);
return iterator;
}
@Override
public <T> T evaluate(Exchange exchange, Class<T> type) {
try {
Object result = evaluate(exchange);
return exchange.getContext().getTypeConverter().convertTo(type, exchange, result);
} catch (Exception e) {
throw RuntimeCamelException.wrapRuntimeCamelException(e);
}
}
public BeanIOConfiguration getConfiguration() {
return configuration;
}
public void setConfiguration(BeanIOConfiguration configuration) {
this.configuration = configuration;
}
public StreamFactory getFactory() {
return factory;
}
public void setFactory(StreamFactory factory) {
this.factory = factory;
}
public String getMapping() {
return configuration.getMapping();
}
public void setIgnoreUnexpectedRecords(boolean ignoreUnexpectedRecords) {
configuration.setIgnoreUnexpectedRecords(ignoreUnexpectedRecords);
}
public void setProperties(Properties properties) {
configuration.setProperties(properties);
}
public void setStreamName(String streamName) {
configuration.setStreamName(streamName);
}
public boolean isIgnoreUnidentifiedRecords() {
return configuration.isIgnoreUnidentifiedRecords();
}
public boolean isIgnoreInvalidRecords() {
return configuration.isIgnoreInvalidRecords();
}
public void setIgnoreInvalidRecords(boolean ignoreInvalidRecords) {
configuration.setIgnoreInvalidRecords(ignoreInvalidRecords);
}
public void setEncoding(Charset encoding) {
configuration.setEncoding(encoding);
}
public boolean isIgnoreUnexpectedRecords() {
return configuration.isIgnoreUnexpectedRecords();
}
public Properties getProperties() {
return configuration.getProperties();
}
public String getStreamName() {
return configuration.getStreamName();
}
public void setMapping(String mapping) {
configuration.setMapping(mapping);
}
public void setIgnoreUnidentifiedRecords(boolean ignoreUnidentifiedRecords) {
configuration.setIgnoreUnidentifiedRecords(ignoreUnidentifiedRecords);
}
public Charset getEncoding() {
return configuration.getEncoding();
}
public BeanReaderErrorHandler getBeanReaderErrorHandler() {
return configuration.getBeanReaderErrorHandler();
}
public void setBeanReaderErrorHandler(BeanReaderErrorHandler beanReaderErrorHandler) {
configuration.setBeanReaderErrorHandler(beanReaderErrorHandler);
}
public String getBeanReaderErrorHandlerType() {
return configuration.getBeanReaderErrorHandlerType();
}
public void setBeanReaderErrorHandlerType(String beanReaderErrorHandlerType) {
configuration.setBeanReaderErrorHandlerType(beanReaderErrorHandlerType);
}
public void setBeanReaderErrorHandlerType(Class<?> beanReaderErrorHandlerType) {
configuration.setBeanReaderErrorHandlerType(beanReaderErrorHandlerType);
}
}