blob: 939bcf63a049ce12843ab6d8f4adb23ef512279a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import java.util.List;
import org.apache.impala.common.Pair;
import org.apache.impala.thrift.THdfsPartitionLocation;
import org.apache.impala.util.ListMap;
import com.google.common.base.Preconditions;
/**
* Utility class for storing HdfsPartition locations in a comrpessed format. Each
* instance of this class is owned by a single HdfsTable instance.
*
* This class is not thread-safe by itself since it is only modified when the lock on an
* HdfsTable object is held.
*
* TODO: Generalize this to compress other sets of Strings that are likely to share common
* prefixes, like table locations.
*
*/
class HdfsPartitionLocationCompressor {
int numClusteringColumns_;
// A bi-directional map between partition location prefixes and their compressed
// representation, an int.
final private ListMap<String> prefixMap_ = new ListMap<String>();
public HdfsPartitionLocationCompressor(int numClusteringColumns) {
numClusteringColumns_ = numClusteringColumns;
}
// Construct an HdfsPartitionLocationCompressor with a pre-filled bidirectional map
// (indexToPrefix_, prefixToIndex_).
public HdfsPartitionLocationCompressor(
int numClusteringColumns, List<String> prefixes) {
numClusteringColumns_ = numClusteringColumns;
prefixMap_.populate(prefixes);
}
public void setClusteringColumns(int numClusteringColumns) {
numClusteringColumns_ = numClusteringColumns;
}
public List<String> getPrefixes() {
return prefixMap_.getList();
}
// One direction of the map: returns the prefix associated with an index, or "" is the
// index is -1. Indexes less than -1 or greater than indexToPrefix_.size()-1 are invalid
// and casue and IllegalArgumentException to be thrown.
private String indexToPrefix(int i) {
// Uncompressed location are represented by -1:
if (i == -1) return "";
Preconditions.checkElementIndex(i, prefixMap_.size());
return prefixMap_.getEntry(i);
}
// Compress a location prefix, adding it to the bidirectional map (indexToPrefix_,
// prefixToIndex_) if it is not already present.
private int prefixToIndex(String s) {
return prefixMap_.getIndex(s);
}
// A surrogate for THdfsPartitionLocation, which represents a partition's location
// relative to its parent table's list of partition prefixes.
public class Location {
// 'prefix_index_' represents the portion of the partition's location that comes before
// the last N directories, where N is the number of partitioning columns.
// 'prefix_index_' is an index into
// HdfsPartitionLocationCompressor.this.indexToPrefix_. 'suffix_' is the rest of the
// partition location.
//
// TODO: Since each partition stores the literal values for the partitioning columns,
// we could also elide the column names and values from suffix_ when a partition is in
// the canonical location "/partitioning_column_name_1=value_1/..."
private final int prefix_index_;
private final String suffix_;
public Location(String location) {
Preconditions.checkNotNull(location);
Pair<String,String> locationParts = decompose(location);
prefix_index_ =
HdfsPartitionLocationCompressor.this.prefixToIndex(locationParts.first);
suffix_ = locationParts.second;
}
public Location(THdfsPartitionLocation thrift) {
Preconditions.checkNotNull(thrift);
prefix_index_ = thrift.prefix_index;
suffix_ = thrift.getSuffix();
}
public THdfsPartitionLocation toThrift() {
return new THdfsPartitionLocation(prefix_index_, suffix_);
}
@Override
public String toString() {
return HdfsPartitionLocationCompressor.this.indexToPrefix(prefix_index_) + suffix_;
}
@Override
public int hashCode() { return toString().hashCode(); }
@Override
public boolean equals(Object obj) {
return (obj instanceof Location) && (toString() == obj.toString());
}
// Decompose a location string by removing its last N directories, where N is the
// number of clustering columns. The result is a Pair<String,String> where the first
// String is the prefix and the second is the suffix. (In orther words, their
// concatenation equals the input.) If the input does not have at least N '/'
// characters, the prefix is empty and the suffix is the entire input.
private Pair<String,String> decompose(String s) {
Preconditions.checkNotNull(s);
int numClusteringColumns =
HdfsPartitionLocationCompressor.this.numClusteringColumns_;
if (numClusteringColumns == 0) return new Pair<String,String>(s, "");
// Iterate backwards over the input until we have passed 'numClusteringColumns'
// directories. What is left is the prefix.
int i = s.length() - 1;
// If the string ends in '/', iterating past it does not pass a clustering column.
if (i >= 0 && s.charAt(i) == '/') --i;
for (; numClusteringColumns > 0 && i >= 0; --i) {
if (s.charAt(i) == '/') --numClusteringColumns;
}
// If we successfully removed all the partition directories, s.charAt(i+1) is '/'
// and we can include it in the prefix.
if (0 == numClusteringColumns) ++i;
return new Pair<String,String>(s.substring(0, i + 1), s.substring(i + 1));
}
}
}