blob: f53f3b06a66b9c2c251008538ffc8137df960797 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.impala.thrift.THdfsPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
import com.google.errorprone.annotations.Immutable;
/**
* Represents the file format metadata for files stored in a table or partition.
*
* This class is immutable, and instances are stored in a global interner, since
* the number of distinct combinations of file formats and other parameters
* is typically quite low relative to the total number of partitions (e.g. almost
* all partitions will use the default quoting).
*/
@Immutable
public class HdfsStorageDescriptor {
public static final char DEFAULT_LINE_DELIM = '\n';
// hive by default uses ctrl-a as field delim
public static final char DEFAULT_FIELD_DELIM = '\u0001';
// hive by default has no escape char
public static final char DEFAULT_ESCAPE_CHAR = '\u0000';
// Serde parameters that are recognized by table writers.
private static final String BLOCK_SIZE = "blocksize";
// Important: don't change the ordering of these keys - if e.g. FIELD_DELIM is not
// found, the value of LINE_DELIM is used, so LINE_DELIM must be found first.
// Package visible for testing.
final static ImmutableList<String> DELIMITER_KEYS = ImmutableList.of(
serdeConstants.LINE_DELIM, serdeConstants.FIELD_DELIM,
serdeConstants.COLLECTION_DELIM, serdeConstants.MAPKEY_DELIM,
serdeConstants.ESCAPE_CHAR, serdeConstants.QUOTE_CHAR);
// The Parquet serde shows up multiple times as the location of the implementation
// has changed between Impala versions.
final static ImmutableList<String> COMPATIBLE_SERDES = ImmutableList.of(
"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", // (seq / text / parquet)
"org.apache.hadoop.hive.serde2.avro.AvroSerDe", // (avro)
"org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", // (rc)
"org.apache.hadoop.hive.ql.io.orc.OrcSerde", // (orc)
"parquet.hive.serde.ParquetHiveSerDe", // (parquet - legacy)
// TODO: Verify the following Parquet SerDe works with Impala and add
// support for the new input/output format classes. See IMPALA-4214.
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"); // (parquet)
private final static Logger LOG = LoggerFactory.getLogger(HdfsStorageDescriptor.class);
private final static Interner<HdfsStorageDescriptor> INTERNER =
Interners.newWeakInterner();
private final HdfsFileFormat fileFormat_;
private final byte lineDelim_;
private final byte fieldDelim_;
private final byte collectionDelim_;
private final byte mapKeyDelim_;
private final byte escapeChar_;
private final byte quoteChar_;
private final int blockSize_;
/**
* Returns a map from delimiter key to a single delimiter character,
* filling in defaults if explicit values are not found in the supplied
* serde descriptor.
*
* @throws InvalidStorageDescriptorException - if an invalid delimiter is found
*/
private static Map<String, Byte> extractDelimiters(SerDeInfo serdeInfo)
throws InvalidStorageDescriptorException {
// The metastore may return null for delimiter parameters,
// which means we need to use a default instead.
// We tried long and hard to find default values for delimiters in Hive,
// but could not find them.
Map<String, Byte> delimMap = new HashMap<>();
for (String delimKey: DELIMITER_KEYS) {
String delimValue = serdeInfo.getParameters().get(delimKey);
if (delimValue == null) {
if (delimKey.equals(serdeConstants.FIELD_DELIM)) {
delimMap.put(delimKey, (byte) DEFAULT_FIELD_DELIM);
} else if (delimKey.equals(serdeConstants.ESCAPE_CHAR)) {
delimMap.put(delimKey, (byte) DEFAULT_ESCAPE_CHAR);
} else if (delimKey.equals(serdeConstants.LINE_DELIM)) {
delimMap.put(delimKey, (byte) DEFAULT_LINE_DELIM);
} else {
delimMap.put(delimKey, delimMap.get(serdeConstants.FIELD_DELIM));
}
} else {
Byte delimByteValue = parseDelim(delimValue);
if (delimByteValue == null) {
throw new InvalidStorageDescriptorException("Invalid delimiter: '" +
delimValue + "'. Delimiter must be specified as a single character or " +
"as a decimal value in the range [-128:127]");
}
delimMap.put(delimKey, parseDelim(delimValue));
}
}
return delimMap;
}
/**
* Parses a delimiter in a similar way as Hive, with some additional error checking.
* A delimiter must fit in a single byte and can be specified in the following
* formats, as far as I can tell (there isn't documentation):
* - A single ASCII or unicode character (ex. '|')
* - An escape character in octal format (ex. \001. Stored in the metastore as a
* unicode character: \u0001).
* - A signed decimal integer in the range [-128:127]. Used to support delimiters
* for ASCII character values between 128-255 (-2 maps to ASCII 254).
*
* The delimiter is first parsed as a decimal number. If the parsing succeeds AND
* the resulting value fits in a signed byte, the byte value of the parsed int is
* returned. Otherwise, if the string has a single char, the byte value of this
* char is returned.
* If the delimiter is invalid, null will be returned.
*/
public static Byte parseDelim(String delimVal) {
Preconditions.checkNotNull(delimVal);
try {
// In the future we could support delimiters specified in hex format, but we would
// need support from the Hive side.
return Byte.parseByte(delimVal);
} catch (NumberFormatException e) {
if (delimVal.length() == 1) {
// Adding additional check as Java chars are two bytes.
// e.g. \u1111 as delimVal will return a valid byte '11'
int cp = Character.codePointAt(delimVal, 0);
if (cp >= 0 && cp <= 255) return (byte) cp;
}
}
return null;
}
private HdfsStorageDescriptor(String tblName, HdfsFileFormat fileFormat, byte lineDelim,
byte fieldDelim, byte collectionDelim, byte mapKeyDelim, byte escapeChar,
byte quoteChar, int blockSize) {
this.fileFormat_ = fileFormat;
this.lineDelim_ = lineDelim;
this.fieldDelim_ = fieldDelim;
this.collectionDelim_ = collectionDelim;
this.mapKeyDelim_ = mapKeyDelim;
this.quoteChar_ = quoteChar;
this.blockSize_ = blockSize;
// You can set the escape character as a tuple or row delim. Empirically,
// this is ignored by hive.
if (escapeChar == fieldDelim ||
escapeChar == lineDelim ||
escapeChar == collectionDelim) {
// TODO: we should output the table name here but it's hard to get to now.
this.escapeChar_ = DEFAULT_ESCAPE_CHAR;
LOG.warn("Escape character for table, " + tblName + " is set to "
+ "the same character as one of the delimiters. Ignoring escape character.");
} else {
this.escapeChar_ = escapeChar;
}
}
/**
* Thrown when constructing an HdfsStorageDescriptor from an invalid/unsupported
* metastore storage descriptor.
* TODO: Get rid of this class.
*/
public static class InvalidStorageDescriptorException extends CatalogException {
// Mandatory since Exception implements Serialisable
private static final long serialVersionUID = -555234913768134760L;
public InvalidStorageDescriptorException(String s) { super(s); }
public InvalidStorageDescriptorException(Exception ex) {
super(ex.getMessage(), ex);
}
}
/**
* Constructs a new HdfsStorageDescriptor from a StorageDescriptor retrieved from the
* metastore.
*
* @throws InvalidStorageDescriptorException - if the storage descriptor has invalid
* delimiters, an unsupported SerDe, or an unknown file format.
*/
public static HdfsStorageDescriptor fromStorageDescriptor(String tblName,
StorageDescriptor sd)
throws InvalidStorageDescriptorException {
Map<String, Byte> delimMap = extractDelimiters(sd.getSerdeInfo());
if (!COMPATIBLE_SERDES.contains(sd.getSerdeInfo().getSerializationLib())) {
throw new InvalidStorageDescriptorException(String.format("Impala does not " +
"support tables of this type. REASON: SerDe library '%s' is not " +
"supported.", sd.getSerdeInfo().getSerializationLib()));
}
// Extract the blocksize and compression specification from the SerDe parameters,
// if present.
Map<String, String> parameters = sd.getSerdeInfo().getParameters();
int blockSize = 0;
String blockValue = parameters.get(BLOCK_SIZE);
if (blockValue != null) {
blockSize = Integer.parseInt(blockValue);
}
try {
return INTERNER.intern(new HdfsStorageDescriptor(tblName,
HdfsFileFormat.fromJavaClassName(sd.getInputFormat()),
delimMap.get(serdeConstants.LINE_DELIM),
delimMap.get(serdeConstants.FIELD_DELIM),
delimMap.get(serdeConstants.COLLECTION_DELIM),
delimMap.get(serdeConstants.MAPKEY_DELIM),
delimMap.get(serdeConstants.ESCAPE_CHAR),
delimMap.get(serdeConstants.QUOTE_CHAR),
blockSize));
} catch (IllegalArgumentException ex) {
// Thrown by fromJavaClassName
throw new InvalidStorageDescriptorException(ex);
}
}
public static HdfsStorageDescriptor fromThriftPartition(THdfsPartition thriftPartition,
String tableName) {
return INTERNER.intern(new HdfsStorageDescriptor(tableName,
HdfsFileFormat.fromThrift(thriftPartition.getFileFormat()),
thriftPartition.lineDelim, thriftPartition.fieldDelim,
thriftPartition.collectionDelim, thriftPartition.mapKeyDelim,
thriftPartition.escapeChar,
(byte) '"', // TODO: We should probably add quoteChar to THdfsPartition.
thriftPartition.blockSize));
}
public HdfsStorageDescriptor cloneWithChangedFileFormat(HdfsFileFormat newFormat) {
return INTERNER.intern(new HdfsStorageDescriptor(
"<unknown>", newFormat, lineDelim_, fieldDelim_, collectionDelim_, mapKeyDelim_,
escapeChar_, quoteChar_, blockSize_));
}
public byte getLineDelim() { return lineDelim_; }
public byte getFieldDelim() { return fieldDelim_; }
public byte getCollectionDelim() { return collectionDelim_; }
public byte getMapKeyDelim() { return mapKeyDelim_; }
public byte getEscapeChar() { return escapeChar_; }
public HdfsFileFormat getFileFormat() { return fileFormat_; }
public int getBlockSize() { return blockSize_; }
@Override
public int hashCode() {
return Objects.hash(blockSize_, collectionDelim_, escapeChar_, fieldDelim_,
fileFormat_, lineDelim_, mapKeyDelim_, quoteChar_);
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
HdfsStorageDescriptor other = (HdfsStorageDescriptor) obj;
if (blockSize_ != other.blockSize_) return false;
if (collectionDelim_ != other.collectionDelim_) return false;
if (escapeChar_ != other.escapeChar_) return false;
if (fieldDelim_ != other.fieldDelim_) return false;
if (fileFormat_ != other.fileFormat_) return false;
if (lineDelim_ != other.lineDelim_) return false;
if (mapKeyDelim_ != other.mapKeyDelim_) return false;
if (quoteChar_ != other.quoteChar_) return false;
return true;
}
}