blob: 4444de660b6c796d3c9163fc3b7326fa099c2060 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import org.apache.impala.thrift.TPartitionStats;
import org.apache.impala.common.JniUtil;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.util.MetaStoreUtil;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.codec.binary.Base64;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.TSerializer;
import org.apache.thrift.TException;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
/**
* Handles serialising and deserialising intermediate statistics from the Hive MetaStore
* via the parameters map attached to every Hive partition object.
*/
public class PartitionStatsUtil {
public static final String INCREMENTAL_STATS_NUM_CHUNKS =
"impala_intermediate_stats_num_chunks";
public static final String INCREMENTAL_STATS_CHUNK_PREFIX =
"impala_intermediate_stats_chunk";
private final static Logger LOG = LoggerFactory.getLogger(PartitionStatsUtil.class);
/**
* Reconstructs a TPartitionStats object from its serialised form in the given parameter
* map. Returns null if no stats are serialised, and throws an exception if there was an
* error during deserialisation.
*/
public static TPartitionStats partStatsFromParameters(
Map<String, String> hmsParameters) throws ImpalaException {
if (hmsParameters == null) return null;
String numChunksStr = hmsParameters.get(INCREMENTAL_STATS_NUM_CHUNKS);
if (numChunksStr == null) return null;
int numChunks = Integer.parseInt(numChunksStr);
if (numChunks == 0) return null;
Preconditions.checkState(numChunks >= 0);
StringBuilder encodedStats = new StringBuilder();
for (int i = 0; i < numChunks; ++i) {
String chunk = hmsParameters.get(INCREMENTAL_STATS_CHUNK_PREFIX + i);
if (chunk == null) {
throw new ImpalaRuntimeException("Missing stats chunk: " + i);
}
encodedStats.append(chunk);
}
byte[] decodedStats = Base64.decodeBase64(encodedStats.toString());
TCompactProtocol.Factory protocolFactory = new TCompactProtocol.Factory();
TPartitionStats ret = new TPartitionStats();
JniUtil.deserializeThrift(protocolFactory, ret, decodedStats);
return ret;
}
/**
* Serialises a TPartitionStats object to a partition.
*/
public static void partStatsToParameters(TPartitionStats partStats,
HdfsPartition partition) {
// null stats means logically delete the stats from this partition
if (partStats == null) {
deletePartStats(partition);
return;
}
// The HMS has a 4k (as of Hive 0.13, Impala 2.0) limit on the length of any parameter
// string. The serialised version of the partition stats is often larger than this.
// Therefore, we naively 'chunk' the byte string into 4k pieces, and store the number
// of pieces in a separate parameter field.
//
// The object itself is first serialised by Thrift, and then base-64 encoded to be a
// valid string. This inflates its length somewhat; we may want to consider a
// different scheme or at least understand why this scheme doesn't seem much more
// effective than an ASCII representation.
try {
TCompactProtocol.Factory protocolFactory = new TCompactProtocol.Factory();
TSerializer serializer = new TSerializer(protocolFactory);
byte[] serialized = serializer.serialize(partStats);
String base64 = new String(Base64.encodeBase64(serialized));
List<String> chunks =
chunkStringForHms(base64, MetaStoreUtil.MAX_PROPERTY_VALUE_LENGTH);
partition.putToParameters(
INCREMENTAL_STATS_NUM_CHUNKS, Integer.toString(chunks.size()));
for (int i = 0; i < chunks.size(); ++i) {
partition.putToParameters(INCREMENTAL_STATS_CHUNK_PREFIX + i, chunks.get(i));
}
} catch (TException e) {
LOG.error("Error saving partition stats: ", e);
// TODO: What to throw here?
}
}
public static void deletePartStats(HdfsPartition partition) {
partition.putToParameters(INCREMENTAL_STATS_NUM_CHUNKS, "0");
for (Iterator<String> it = partition.getParameters().keySet().iterator();
it.hasNext(); ) {
if (it.next().startsWith(INCREMENTAL_STATS_CHUNK_PREFIX)) {
it.remove();
}
}
}
static private List<String> chunkStringForHms(String data, int chunkLen) {
int idx = 0;
List<String> ret = Lists.newArrayList();
while (idx < data.length()) {
int remaining = data.length() - idx;
int chunkSize = (chunkLen > remaining) ? remaining : chunkLen;
ret.add(data.substring(idx, idx + chunkSize));
idx += chunkSize;
}
return ret;
}
}