blob: 3022bb35a06b0f27ebf2b5a859c4bd7ba3865656 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.hdfs;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import javax.xml.ws.Holder;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.TypeConverter;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
public class HdfsWritableFactories {
public enum WritableType {
NULL {
@Override
public Class getWritableClass() {
return NullWritable.class;
}
},
BOOLEAN {
@Override
public Class getWritableClass() {
return BooleanWritable.class;
}
},
BYTE {
@Override
public Class getWritableClass() {
return ByteWritable.class;
}
},
INT {
@Override
public Class getWritableClass() {
return IntWritable.class;
}
},
FLOAT {
@Override
public Class getWritableClass() {
return FloatWritable.class;
}
},
LONG {
@Override
public Class getWritableClass() {
return LongWritable.class;
}
},
DOUBLE {
@Override
public Class getWritableClass() {
return DoubleWritable.class;
}
},
TEXT {
@Override
public Class getWritableClass() {
return Text.class;
}
},
BYTES {
@Override
public Class getWritableClass() {
return BytesWritable.class;
}
};
public abstract Class getWritableClass();
}
interface HdfsWritableFactory {
Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size);
Object read(Writable writable, Holder<Integer> size);
}
public static final class HdfsNullWritableFactory implements HdfsWritableFactory {
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
size.value = 0;
return NullWritable.get();
}
@Override
public Object read(Writable writable, Holder<Integer> size) {
size.value = 0;
return null;
}
}
public static final class HdfsByteWritableFactory implements HdfsWritableFactory {
private static final int SIZE = 1;
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
size.value = SIZE;
ByteWritable writable = new ByteWritable();
writable.set(typeConverter.convertTo(Byte.class, value));
return writable;
}
@Override
public Object read(Writable writable, Holder<Integer> size) {
size.value = SIZE;
return ((ByteWritable) writable).get();
}
}
public static final class HdfsBooleanWritableFactory implements HdfsWritableFactory {
private static final int SIZE = 1;
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
size.value = SIZE;
BooleanWritable writable = new BooleanWritable();
writable.set(typeConverter.convertTo(Boolean.class, value));
return writable;
}
@Override
public Object read(Writable writable, Holder<Integer> size) {
size.value = SIZE;
return ((BooleanWritable) writable).get();
}
}
public static final class HdfsBytesWritableFactory implements HdfsWritableFactory {
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
BytesWritable writable = new BytesWritable();
ByteBuffer bb = (ByteBuffer) value;
writable.set(bb.array(), 0, bb.array().length);
size.value = bb.array().length;
return writable;
}
@Override
public Object read(Writable writable, Holder<Integer> size) {
size.value = ((BytesWritable) writable).getLength();
ByteBuffer bb = ByteBuffer.allocate(size.value);
bb.put(((BytesWritable) writable).getBytes(), 0, size.value);
return bb;
}
}
public static final class HdfsDoubleWritableFactory implements HdfsWritableFactory {
private static final int SIZE = 8;
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
size.value = SIZE;
DoubleWritable writable = new DoubleWritable();
writable.set(typeConverter.convertTo(Double.class, value));
return writable;
}
@Override
public Object read(Writable writable, Holder<Integer> size) {
size.value = SIZE;
return ((DoubleWritable) writable).get();
}
}
public static final class HdfsFloatWritableFactory implements HdfsWritableFactory {
private static final int SIZE = 4;
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
size.value = SIZE;
FloatWritable writable = new FloatWritable();
writable.set(typeConverter.convertTo(Float.class, value));
return writable;
}
@Override
public Object read(Writable writable, Holder<Integer> size) {
size.value = SIZE;
return ((FloatWritable) writable).get();
}
}
public static final class HdfsIntWritableFactory implements HdfsWritableFactory {
private static final int SIZE = 4;
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
size.value = SIZE;
IntWritable writable = new IntWritable();
writable.set(typeConverter.convertTo(Integer.class, value));
return writable;
}
@Override
public Object read(Writable writable, Holder<Integer> size) {
size.value = SIZE;
return ((IntWritable) writable).get();
}
}
public static final class HdfsLongWritableFactory implements HdfsWritableFactory {
private static final int SIZE = 8;
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
size.value = SIZE;
LongWritable writable = new LongWritable();
writable.set(typeConverter.convertTo(Long.class, value));
return writable;
}
@Override
public Object read(Writable writable, Holder<Integer> size) {
size.value = SIZE;
return ((LongWritable) writable).get();
}
}
public static final class HdfsTextWritableFactory implements HdfsWritableFactory {
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
Text writable = new Text();
writable.set(typeConverter.convertTo(String.class, value));
size.value = writable.getBytes().length;
return writable;
}
@Override
public Object read(Writable writable, Holder<Integer> size) {
size.value = ((Text) writable).getLength();
return writable.toString();
}
}
public static final class HdfsObjectWritableFactory implements HdfsWritableFactory {
@Override
public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
InputStream is = null;
try {
is = typeConverter.convertTo(InputStream.class, value);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
IOUtils.copyBytes(is, bos, HdfsConstants.DEFAULT_BUFFERSIZE, false);
BytesWritable writable = new BytesWritable();
writable.set(bos.toByteArray(), 0, bos.toByteArray().length);
size.value = bos.toByteArray().length;
return writable;
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
} finally {
if (is != null) {
try {
is.close();
} catch (IOException e) {
throw new RuntimeException("Error closing stream", e);
}
}
}
}
@Override
public Object read(Writable writable, Holder<Integer> size) {
size.value = 0;
return null;
}
}
}