blob: af2e58f99d144c8a0a6b821572d3fbfa4f87a5cb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.hdfs;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.camel.util.URISupport;
import org.apache.hadoop.io.SequenceFile;
public class HdfsConfiguration {
private URI uri;
private String hostName;
private int port = HdfsConstants.DEFAULT_PORT;
private String path;
private boolean overwrite = true;
private boolean append;
private int bufferSize = HdfsConstants.DEFAULT_BUFFERSIZE;
private short replication = HdfsConstants.DEFAULT_REPLICATION;
private long blockSize = HdfsConstants.DEFAULT_BLOCKSIZE;
private SequenceFile.CompressionType compressionType = HdfsConstants.DEFAULT_COMPRESSIONTYPE;
private HdfsCompressionCodec compressionCodec = HdfsConstants.DEFAULT_CODEC;
private HdfsFileType fileType = HdfsFileType.NORMAL_FILE;
private HdfsFileSystemType fileSystemType = HdfsFileSystemType.HDFS;
private HdfsWritableFactories.WritableType keyType = HdfsWritableFactories.WritableType.NULL;
private HdfsWritableFactories.WritableType valueType = HdfsWritableFactories.WritableType.BYTES;
private String openedSuffix = HdfsConstants.DEFAULT_OPENED_SUFFIX;
private String readSuffix = HdfsConstants.DEFAULT_READ_SUFFIX;
private long initialDelay;
private long delay = HdfsConstants.DEFAULT_DELAY;
private String pattern = HdfsConstants.DEFAULT_PATTERN;
private int chunkSize = HdfsConstants.DEFAULT_BUFFERSIZE;
private int checkIdleInterval = HdfsConstants.DEFAULT_CHECK_IDLE_INTERVAL;
private List<HdfsProducer.SplitStrategy> splitStrategies;
public HdfsConfiguration() {
}
private Boolean getBoolean(Map hdfsSettings, String param, Boolean dflt) {
if (hdfsSettings.containsKey(param)) {
return Boolean.valueOf((String) hdfsSettings.get(param));
} else {
return dflt;
}
}
private Integer getInteger(Map hdfsSettings, String param, Integer dflt) {
if (hdfsSettings.containsKey(param)) {
return Integer.valueOf((String) hdfsSettings.get(param));
} else {
return dflt;
}
}
private Short getShort(Map hdfsSettings, String param, Short dflt) {
if (hdfsSettings.containsKey(param)) {
return Short.valueOf((String) hdfsSettings.get(param));
} else {
return dflt;
}
}
private Long getLong(Map hdfsSettings, String param, Long dflt) {
if (hdfsSettings.containsKey(param)) {
return Long.valueOf((String) hdfsSettings.get(param));
} else {
return dflt;
}
}
private HdfsFileType getFileType(Map hdfsSettings, String param, HdfsFileType dflt) {
String eit = (String) hdfsSettings.get(param);
if (eit != null) {
return HdfsFileType.valueOf(eit);
} else {
return dflt;
}
}
private HdfsFileSystemType getFileSystemType(Map hdfsSettings, String param, HdfsFileSystemType dflt) {
String eit = (String) hdfsSettings.get(param);
if (eit != null) {
return HdfsFileSystemType.valueOf(eit);
} else {
return dflt;
}
}
private HdfsWritableFactories.WritableType getWritableType(Map hdfsSettings, String param, HdfsWritableFactories.WritableType dflt) {
String eit = (String) hdfsSettings.get(param);
if (eit != null) {
return HdfsWritableFactories.WritableType.valueOf(eit);
} else {
return dflt;
}
}
private SequenceFile.CompressionType getCompressionType(Map hdfsSettings, String param, SequenceFile.CompressionType ct) {
String eit = (String) hdfsSettings.get(param);
if (eit != null) {
return SequenceFile.CompressionType.valueOf(eit);
} else {
return ct;
}
}
private HdfsCompressionCodec getCompressionCodec(Map hdfsSettings, String param, HdfsCompressionCodec cd) {
String eit = (String) hdfsSettings.get(param);
if (eit != null) {
return HdfsCompressionCodec.valueOf(eit);
} else {
return cd;
}
}
private String getString(Map hdfsSettings, String param, String dflt) {
if (hdfsSettings.containsKey(param)) {
return (String) hdfsSettings.get(param);
} else {
return dflt;
}
}
private List<HdfsProducer.SplitStrategy> getSplitStrategies(Map hdfsSettings) {
List<HdfsProducer.SplitStrategy> strategies = new ArrayList<HdfsProducer.SplitStrategy>();
for (Object obj : hdfsSettings.keySet()) {
String key = (String) obj;
if ("splitStrategy".equals(key)) {
String eit = (String) hdfsSettings.get(key);
if (eit != null) {
String[] strstrategies = eit.split(",");
for (int i = 0; i < strstrategies.length; i++) {
String strstrategy = strstrategies[i];
String tokens[] = strstrategy.split(":");
if (tokens.length != 2) {
throw new IllegalArgumentException("Wrong Split Strategy " + key + "=" + eit);
}
HdfsProducer.SplitStrategyType sst = HdfsProducer.SplitStrategyType.valueOf(tokens[0]);
long ssv = Long.valueOf(tokens[1]);
strategies.add(new HdfsProducer.SplitStrategy(sst, ssv));
}
}
}
}
return strategies;
}
public void checkConsumerOptions() {
}
public void checkProducerOptions() {
if (isAppend()) {
if (getSplitStrategies().size() != 0) {
throw new IllegalArgumentException("Split Strategies incompatible with append=true");
}
if (getFileType() != HdfsFileType.NORMAL_FILE) {
throw new IllegalArgumentException("append=true works only with NORMAL_FILEs");
}
}
}
public void parseURI(URI uri) throws URISyntaxException {
String protocol = uri.getScheme();
if (!protocol.equalsIgnoreCase("hdfs")) {
throw new IllegalArgumentException("Unrecognized Cache protocol: " + protocol + " for uri: " + uri);
}
hostName = uri.getHost();
port = uri.getPort() == -1 ? HdfsConstants.DEFAULT_PORT : uri.getPort();
path = uri.getPath();
Map hdfsSettings = URISupport.parseParameters(uri);
overwrite = getBoolean(hdfsSettings, "overwrite", overwrite);
append = getBoolean(hdfsSettings, "append", append);
bufferSize = getInteger(hdfsSettings, "bufferSize", bufferSize);
replication = getShort(hdfsSettings, "replication", replication);
blockSize = getLong(hdfsSettings, "blockSize", blockSize);
compressionType = getCompressionType(hdfsSettings, "compressionType", compressionType);
compressionCodec = getCompressionCodec(hdfsSettings, "compressionCodec", compressionCodec);
fileType = getFileType(hdfsSettings, "fileType", fileType);
fileSystemType = getFileSystemType(hdfsSettings, "fileSystemType", fileSystemType);
keyType = getWritableType(hdfsSettings, "keyType", keyType);
valueType = getWritableType(hdfsSettings, "valueType", valueType);
openedSuffix = getString(hdfsSettings, "openedSuffix", openedSuffix);
readSuffix = getString(hdfsSettings, "readSuffix", readSuffix);
initialDelay = getLong(hdfsSettings, "initialDelay", initialDelay);
delay = getLong(hdfsSettings, "delay", delay);
pattern = getString(hdfsSettings, "pattern", pattern);
chunkSize = getInteger(hdfsSettings, "chunkSize", chunkSize);
splitStrategies = getSplitStrategies(hdfsSettings);
}
public URI getUri() {
return uri;
}
public void setUri(URI uri) {
this.uri = uri;
}
public String getHostName() {
return hostName;
}
public void setHostName(String hostName) {
this.hostName = hostName;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
public boolean isOverwrite() {
return overwrite;
}
public void setOverwrite(boolean overwrite) {
this.overwrite = overwrite;
}
public boolean isAppend() {
return append;
}
public void setAppend(boolean append) {
this.append = append;
}
public int getBufferSize() {
return bufferSize;
}
public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}
public short getReplication() {
return replication;
}
public void setReplication(short replication) {
this.replication = replication;
}
public long getBlockSize() {
return blockSize;
}
public void setBlockSize(long blockSize) {
this.blockSize = blockSize;
}
public HdfsFileType getFileType() {
return fileType;
}
public void setFileType(HdfsFileType fileType) {
this.fileType = fileType;
}
public SequenceFile.CompressionType getCompressionType() {
return compressionType;
}
public void setCompressionType(SequenceFile.CompressionType compressionType) {
this.compressionType = compressionType;
}
public HdfsCompressionCodec getCompressionCodec() {
return compressionCodec;
}
public void setCompressionCodec(HdfsCompressionCodec compressionCodec) {
this.compressionCodec = compressionCodec;
}
public void setFileSystemType(HdfsFileSystemType fileSystemType) {
this.fileSystemType = fileSystemType;
}
public HdfsFileSystemType getFileSystemType() {
return fileSystemType;
}
public HdfsWritableFactories.WritableType getKeyType() {
return keyType;
}
public void setKeyType(HdfsWritableFactories.WritableType keyType) {
this.keyType = keyType;
}
public HdfsWritableFactories.WritableType getValueType() {
return valueType;
}
public void setValueType(HdfsWritableFactories.WritableType valueType) {
this.valueType = valueType;
}
public void setOpenedSuffix(String openedSuffix) {
this.openedSuffix = openedSuffix;
}
public String getOpenedSuffix() {
return openedSuffix;
}
public void setReadSuffix(String readSuffix) {
this.readSuffix = readSuffix;
}
public String getReadSuffix() {
return readSuffix;
}
public void setInitialDelay(long initialDelay) {
this.initialDelay = initialDelay;
}
public long getInitialDelay() {
return initialDelay;
}
public void setDelay(long delay) {
this.delay = delay;
}
public long getDelay() {
return delay;
}
public void setPattern(String pattern) {
this.pattern = pattern;
}
public String getPattern() {
return pattern;
}
public void setChunkSize(int chunkSize) {
this.chunkSize = chunkSize;
}
public int getChunkSize() {
return chunkSize;
}
public void setCheckIdleInterval(int checkIdleInterval) {
this.checkIdleInterval = checkIdleInterval;
}
public int getCheckIdleInterval() {
return checkIdleInterval;
}
public List<HdfsProducer.SplitStrategy> getSplitStrategies() {
return splitStrategies;
}
public void setSplitStrategy(String splitStrategy) {
// noop
}
}