| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.impala.util; |
| |
| import static java.lang.String.format; |
| import static org.apache.impala.service.KuduCatalogOpExecutor.GOT_KUDU_CLIENT; |
| |
| import java.math.BigDecimal; |
| import java.math.BigInteger; |
| import java.sql.Date; |
| import java.time.LocalDate; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| import org.apache.impala.analysis.Analyzer; |
| import org.apache.impala.analysis.DescriptorTable; |
| import org.apache.impala.analysis.Expr; |
| import org.apache.impala.analysis.FunctionCallExpr; |
| import org.apache.impala.analysis.InsertStmt; |
| import org.apache.impala.analysis.KuduPartitionExpr; |
| import org.apache.impala.analysis.LiteralExpr; |
| import org.apache.impala.catalog.FeKuduTable; |
| import org.apache.impala.catalog.ScalarType; |
| import org.apache.impala.catalog.Type; |
| import org.apache.impala.common.AnalysisException; |
| import org.apache.impala.common.ImpalaRuntimeException; |
| import org.apache.impala.common.InternalException; |
| import org.apache.impala.common.Pair; |
| import org.apache.impala.service.BackendConfig; |
| import org.apache.impala.service.FeSupport; |
| import org.apache.impala.thrift.TColumn; |
| import org.apache.impala.thrift.TColumnEncoding; |
| import org.apache.impala.thrift.TColumnValue; |
| import org.apache.impala.thrift.TExpr; |
| import org.apache.impala.thrift.TExprNode; |
| import org.apache.impala.thrift.TExprNodeType; |
| import org.apache.impala.thrift.THdfsCompression; |
| import org.apache.kudu.ColumnSchema; |
| import org.apache.kudu.ColumnSchema.CompressionAlgorithm; |
| import org.apache.kudu.ColumnSchema.Encoding; |
| import org.apache.kudu.ColumnTypeAttributes; |
| import org.apache.kudu.Schema; |
| import org.apache.kudu.client.KuduClient; |
| import org.apache.kudu.client.KuduClient.KuduClientBuilder; |
| import org.apache.kudu.client.PartialRow; |
| import org.apache.kudu.client.RangePartitionBound; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Lists; |
| |
| public class KuduUtil { |
| |
| private static final String KUDU_TABLE_NAME_PREFIX = "impala::"; |
| |
| // Number of worker threads created by each KuduClient, regardless of whether or not |
| // they're needed. Impala does not share KuduClients between operations, so the number |
| // of threads created can get very large under concurrent workloads. This number should |
| // be sufficient for the Frontend/Catalog use, and has been tested in stress tests. |
| private static int KUDU_CLIENT_WORKER_THREAD_COUNT = 5; |
| |
| // Maps lists of master addresses to KuduClients, for sharing clients across the FE. |
| private static Map<String, KuduClient> kuduClients_ = |
| new ConcurrentHashMap<String, KuduClient>(); |
| |
| /** |
| * Gets a KuduClient for the specified Kudu master addresses (as a comma-separated |
| * list of host:port pairs). It will look up and share an existing KuduClient, if |
| * possible, or it will create a new one to return. |
| * The 'admin operation timeout' and the 'operation timeout' are set to |
| * BackendConfig.getKuduClientTimeoutMs(). The 'admin operations timeout' is used for |
| * operations like creating/deleting tables. The 'operation timeout' is used when |
| * fetching tablet metadata. |
| */ |
| public static KuduClient getKuduClient(String kuduMasters) { |
| KuduClient client = kuduClients_.computeIfAbsent(kuduMasters, k -> { |
| KuduClientBuilder b = new KuduClient.KuduClientBuilder(kuduMasters); |
| b.defaultAdminOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs()); |
| b.defaultOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs()); |
| b.workerCount(KUDU_CLIENT_WORKER_THREAD_COUNT); |
| b.saslProtocolName(BackendConfig.INSTANCE.getKuduSaslProtocolName()); |
| return b.build(); |
| }); |
| return client; |
| } |
| |
| /** |
| * Wrapper to get kudu client and mark the given 'catalogTimeline' when it finishes. |
| */ |
| public static KuduClient getKuduClient(String masterHosts, |
| EventSequence catalogTimeline) { |
| KuduClient kudu = KuduUtil.getKuduClient(masterHosts); |
| catalogTimeline.markEvent(GOT_KUDU_CLIENT); |
| return kudu; |
| } |
| |
| /** |
| * Creates a PartialRow from a list of range partition boundary values. |
| * 'rangePartitionColumns' must be specified in Kudu case. |
| */ |
| private static PartialRow parseRangePartitionBoundaryValues(Schema schema, |
| List<String> rangePartitionColumns, List<TExpr> boundaryValues) |
| throws ImpalaRuntimeException { |
| Preconditions.checkState(rangePartitionColumns.size() == boundaryValues.size()); |
| PartialRow bound = new PartialRow(schema); |
| for (int i = 0; i < boundaryValues.size(); ++i) { |
| String colName = rangePartitionColumns.get(i); |
| ColumnSchema col = schema.getColumn(colName); |
| Preconditions.checkNotNull(col); |
| setKey(col, boundaryValues.get(i), schema.getColumnIndex(colName), bound); |
| } |
| return bound; |
| } |
| |
| /** |
| * Builds and returns a range-partition bound used in the creation of a Kudu |
| * table. The range-partition bound consists of a PartialRow with the boundary |
| * values and a RangePartitionBound indicating if the bound is inclusive or exclusive. |
| * Throws an ImpalaRuntimeException if an error occurs while parsing the boundary |
| * values. 'rangePartitionColumns' must be specified in Kudu case. |
| */ |
| public static Pair<PartialRow, RangePartitionBound> buildRangePartitionBound( |
| Schema schema, List<String> rangePartitionColumns, List<TExpr> boundaryValues, |
| boolean isInclusiveBound) throws ImpalaRuntimeException { |
| if (boundaryValues == null || boundaryValues.isEmpty()) { |
| // TODO: Do we need to set the bound type? |
| return new Pair<PartialRow, RangePartitionBound>(new PartialRow(schema), |
| RangePartitionBound.INCLUSIVE_BOUND); |
| } |
| PartialRow bound = |
| parseRangePartitionBoundaryValues(schema, rangePartitionColumns, boundaryValues); |
| RangePartitionBound boundType = null; |
| if (isInclusiveBound) { |
| boundType = RangePartitionBound.INCLUSIVE_BOUND; |
| } else { |
| boundType = RangePartitionBound.EXCLUSIVE_BOUND; |
| } |
| return new Pair<PartialRow, RangePartitionBound>(bound, boundType); |
| } |
| |
| /** |
| * Sets the value 'boundaryVal' in 'key' at 'pos'. Checks if 'boundaryVal' has the |
| * expected data type. |
| */ |
| private static void setKey(ColumnSchema col, TExpr boundaryVal, int pos, PartialRow key) |
| throws ImpalaRuntimeException { |
| Preconditions.checkState(boundaryVal.getNodes().size() == 1); |
| TExprNode literal = boundaryVal.getNodes().get(0); |
| String colName = col.getName(); |
| org.apache.kudu.Type type = col.getType(); |
| switch (type) { |
| case INT8: |
| checkCorrectType(literal.isSetInt_literal(), type, colName, literal); |
| key.addByte(pos, (byte) literal.getInt_literal().getValue()); |
| break; |
| case INT16: |
| checkCorrectType(literal.isSetInt_literal(), type, colName, literal); |
| key.addShort(pos, (short) literal.getInt_literal().getValue()); |
| break; |
| case INT32: |
| checkCorrectType(literal.isSetInt_literal(), type, colName, literal); |
| key.addInt(pos, (int) literal.getInt_literal().getValue()); |
| break; |
| case INT64: |
| checkCorrectType(literal.isSetInt_literal(), type, colName, literal); |
| key.addLong(pos, literal.getInt_literal().getValue()); |
| break; |
| case VARCHAR: |
| checkCorrectType(literal.isSetString_literal(), type, colName, literal); |
| key.addVarchar(pos, literal.getString_literal().getValue()); |
| break; |
| case STRING: |
| checkCorrectType(literal.isSetString_literal(), type, colName, literal); |
| key.addString(pos, literal.getString_literal().getValue()); |
| break; |
| case UNIXTIME_MICROS: |
| checkCorrectType(literal.isSetInt_literal(), type, colName, literal); |
| key.addLong(pos, literal.getInt_literal().getValue()); |
| break; |
| case DATE: |
| checkCorrectType(literal.isSetDate_literal(), type, colName, literal); |
| key.addDate(pos, Date.valueOf(LocalDate.ofEpochDay( |
| literal.getDate_literal().getDays_since_epoch()))); |
| break; |
| case DECIMAL: |
| checkCorrectType(literal.isSetDecimal_literal(), type, colName, literal); |
| BigInteger unscaledVal = new BigInteger(literal.getDecimal_literal().getValue()); |
| int scale = col.getTypeAttributes().getScale(); |
| key.addDecimal(pos, new BigDecimal(unscaledVal, scale)); |
| break; |
| default: |
| throw new ImpalaRuntimeException("Key columns not supported for type: " |
| + type.toString()); |
| } |
| } |
| |
| /** |
| * Returns the actual value of the specified defaultValue literal. The returned type is |
| * the value type stored by Kudu for the column. For example, The `impalaType` is |
| * translated to a Kudu Type 'type' and if 'type' is 'INT8', the returned |
| * value is a Java byte, or if 'type' is 'UNIXTIME_MICROS', the returned value is |
| * a Java long. |
| */ |
| public static Object getKuduDefaultValue( |
| TExpr defaultValue, Type impalaType, String colName) throws ImpalaRuntimeException { |
| Preconditions.checkState(defaultValue.getNodes().size() == 1); |
| TExprNode literal = defaultValue.getNodes().get(0); |
| if (literal.getNode_type() == TExprNodeType.NULL_LITERAL) return null; |
| org.apache.kudu.Type type = KuduUtil.fromImpalaType(impalaType); |
| switch (type) { |
| case INT8: |
| checkCorrectType(literal.isSetInt_literal(), type, colName, literal); |
| return (byte) literal.getInt_literal().getValue(); |
| case INT16: |
| checkCorrectType(literal.isSetInt_literal(), type, colName, literal); |
| return (short) literal.getInt_literal().getValue(); |
| case INT32: |
| checkCorrectType(literal.isSetInt_literal(), type, colName, literal); |
| return (int) literal.getInt_literal().getValue(); |
| case INT64: |
| checkCorrectType(literal.isSetInt_literal(), type, colName, literal); |
| return (long) literal.getInt_literal().getValue(); |
| case FLOAT: |
| checkCorrectType(literal.isSetFloat_literal(), type, colName, literal); |
| return (float) literal.getFloat_literal().getValue(); |
| case DOUBLE: |
| checkCorrectType(literal.isSetFloat_literal(), type, colName, literal); |
| return (double) literal.getFloat_literal().getValue(); |
| case VARCHAR: |
| case STRING: |
| checkCorrectType(literal.isSetString_literal(), type, colName, literal); |
| return literal.getString_literal().getValue(); |
| case BOOL: |
| checkCorrectType(literal.isSetBool_literal(), type, colName, literal); |
| return literal.getBool_literal().isValue(); |
| case UNIXTIME_MICROS: |
| checkCorrectType(literal.isSetInt_literal(), type, colName, literal); |
| return literal.getInt_literal().getValue(); |
| case DATE: |
| checkCorrectType(literal.isSetDate_literal(), type, colName, literal); |
| return Date.valueOf(LocalDate.ofEpochDay( |
| literal.getDate_literal().getDays_since_epoch())); |
| case DECIMAL: |
| checkCorrectType(literal.isSetDecimal_literal(), type, colName, literal); |
| BigInteger unscaledVal = new BigInteger(literal.getDecimal_literal().getValue()); |
| return new BigDecimal(unscaledVal, impalaType.getDecimalDigits()); |
| default: |
| throw new ImpalaRuntimeException("Unsupported value for column type: " + |
| type.toString()); |
| } |
| } |
| |
| public static Encoding fromThrift(TColumnEncoding encoding) |
| throws ImpalaRuntimeException { |
| switch (encoding) { |
| case AUTO: |
| return Encoding.AUTO_ENCODING; |
| case PLAIN: |
| return Encoding.PLAIN_ENCODING; |
| case PREFIX: |
| return Encoding.PREFIX_ENCODING; |
| case GROUP_VARINT: |
| return Encoding.GROUP_VARINT; |
| case RLE: |
| return Encoding.RLE; |
| case DICTIONARY: |
| return Encoding.DICT_ENCODING; |
| case BIT_SHUFFLE: |
| return Encoding.BIT_SHUFFLE; |
| default: |
| throw new ImpalaRuntimeException("Unsupported encoding: " + |
| encoding.toString()); |
| } |
| } |
| |
| public static TColumnEncoding toThrift(Encoding encoding) |
| throws ImpalaRuntimeException { |
| switch (encoding) { |
| case AUTO_ENCODING: |
| return TColumnEncoding.AUTO; |
| case PLAIN_ENCODING: |
| return TColumnEncoding.PLAIN; |
| case PREFIX_ENCODING: |
| return TColumnEncoding.PREFIX; |
| case GROUP_VARINT: |
| return TColumnEncoding.GROUP_VARINT; |
| case RLE: |
| return TColumnEncoding.RLE; |
| case DICT_ENCODING: |
| return TColumnEncoding.DICTIONARY; |
| case BIT_SHUFFLE: |
| return TColumnEncoding.BIT_SHUFFLE; |
| default: |
| throw new ImpalaRuntimeException("Unsupported encoding: " + |
| encoding.toString()); |
| } |
| } |
| |
| public static CompressionAlgorithm fromThrift(THdfsCompression compression) |
| throws ImpalaRuntimeException { |
| switch (compression) { |
| case DEFAULT: |
| return CompressionAlgorithm.DEFAULT_COMPRESSION; |
| case NONE: |
| return CompressionAlgorithm.NO_COMPRESSION; |
| case SNAPPY: |
| return CompressionAlgorithm.SNAPPY; |
| case LZ4: |
| return CompressionAlgorithm.LZ4; |
| case ZLIB: |
| return CompressionAlgorithm.ZLIB; |
| default: |
| throw new ImpalaRuntimeException("Unsupported compression algorithm: " + |
| compression.toString()); |
| } |
| } |
| |
| public static THdfsCompression toThrift(CompressionAlgorithm compression) |
| throws ImpalaRuntimeException { |
| switch (compression) { |
| case NO_COMPRESSION: |
| return THdfsCompression.NONE; |
| case DEFAULT_COMPRESSION: |
| return THdfsCompression.DEFAULT; |
| case SNAPPY: |
| return THdfsCompression.SNAPPY; |
| case LZ4: |
| return THdfsCompression.LZ4; |
| case ZLIB: |
| return THdfsCompression.ZLIB; |
| default: |
| throw new ImpalaRuntimeException("Unsupported compression algorithm: " + |
| compression.toString()); |
| } |
| } |
| |
| public static TColumn setColumnOptions(TColumn column, boolean isKey, |
| boolean isPrimaryKeyUnique, Boolean isNullable, boolean isAutoIncrementing, |
| Encoding encoding, CompressionAlgorithm compression, Expr defaultValue, |
| Integer blockSize, String kuduName) { |
| column.setIs_key(isKey); |
| column.setIs_primary_key_unique(isPrimaryKeyUnique); |
| if (isNullable != null) column.setIs_nullable(isNullable); |
| column.setIs_auto_incrementing(isAutoIncrementing); |
| try { |
| if (encoding != null) column.setEncoding(toThrift(encoding)); |
| if (compression != null) column.setCompression(toThrift(compression)); |
| } catch (ImpalaRuntimeException e) { |
| // This shouldn't happen |
| throw new IllegalStateException(String.format("Error parsing " + |
| "encoding/compression values for Kudu column '%s': %s", column.getColumnName(), |
| e.getMessage())); |
| } |
| |
| if (defaultValue != null) { |
| Preconditions.checkState(defaultValue instanceof LiteralExpr); |
| column.setDefault_value(defaultValue.treeToThrift()); |
| } |
| if (blockSize != null) column.setBlock_size(blockSize); |
| Preconditions.checkNotNull(kuduName); |
| column.setKudu_column_name(kuduName); |
| return column; |
| } |
| |
| /** |
| * If correctType is true, returns. Otherwise throws a formatted error message |
| * indicating problems with the type of the literal of the range literal. |
| */ |
| private static void checkCorrectType(boolean correctType, org.apache.kudu.Type t, |
| String colName, TExprNode boundaryVal) throws ImpalaRuntimeException { |
| if (correctType) return; |
| throw new ImpalaRuntimeException( |
| format("Expected '%s' literal for column '%s' got '%s'", t.getName(), colName, |
| Type.fromThrift(boundaryVal.getType()).toSql())); |
| } |
| |
| public static boolean isSupportedKeyType(org.apache.impala.catalog.Type type) { |
| return type.isIntegerType() || type.isStringType() || type.isTimestamp() |
| || type.isDate(); |
| } |
| |
| /** |
| * When Kudu's integration with the Hive Metastore is enabled, returns the Kudu |
| * table name with the format 'metastoreDbName.metastoreTableName'. Otherwise, |
| * returns with the format 'impala::metastoreDbName.metastoreTableName'. This |
| * should only be used for managed table. |
| */ |
| public static String getDefaultKuduTableName(String metastoreDbName, |
| String metastoreTableName, boolean isHMSIntegrationEnabled) { |
| return isHMSIntegrationEnabled ? metastoreDbName + "." + metastoreTableName : |
| KUDU_TABLE_NAME_PREFIX + metastoreDbName + "." + metastoreTableName; |
| } |
| |
| /** |
| * Check if the given name is the default Kudu table name for managed table |
| * whether Kudu's integration with the Hive Metastore is enabled or not. |
| */ |
| public static boolean isDefaultKuduTableName(String name, |
| String metastoreDbName, String metastoreTableName) { |
| return getDefaultKuduTableName(metastoreDbName, |
| metastoreTableName, true).equals(name) || |
| getDefaultKuduTableName(metastoreDbName, |
| metastoreTableName, false).equals(name); |
| } |
| |
| /** |
| * Converts a given Impala catalog type to the Kudu type. Throws an exception if the |
| * type cannot be converted. |
| */ |
| public static org.apache.kudu.Type fromImpalaType(Type t) |
| throws ImpalaRuntimeException { |
| if (!t.isScalarType()) { |
| throw new ImpalaRuntimeException(format( |
| "Type %s is not supported in Kudu", t.toSql())); |
| } |
| ScalarType s = (ScalarType) t; |
| switch (s.getPrimitiveType()) { |
| case TINYINT: return org.apache.kudu.Type.INT8; |
| case SMALLINT: return org.apache.kudu.Type.INT16; |
| case INT: return org.apache.kudu.Type.INT32; |
| case BIGINT: return org.apache.kudu.Type.INT64; |
| case BOOLEAN: return org.apache.kudu.Type.BOOL; |
| case STRING: return org.apache.kudu.Type.STRING; |
| case DOUBLE: return org.apache.kudu.Type.DOUBLE; |
| case FLOAT: return org.apache.kudu.Type.FLOAT; |
| case TIMESTAMP: return org.apache.kudu.Type.UNIXTIME_MICROS; |
| case DECIMAL: return org.apache.kudu.Type.DECIMAL; |
| case DATE: return org.apache.kudu.Type.DATE; |
| case VARCHAR: return org.apache.kudu.Type.VARCHAR; |
| case BINARY: return org.apache.kudu.Type.BINARY; |
| /* Fall through below */ |
| case INVALID_TYPE: |
| case NULL_TYPE: |
| case DATETIME: |
| case CHAR: |
| default: |
| throw new ImpalaRuntimeException(format( |
| "Type %s is not supported in Kudu", s.toSql())); |
| } |
| } |
| |
| public static Type toImpalaType(org.apache.kudu.Type t, |
| ColumnTypeAttributes typeAttributes) throws ImpalaRuntimeException { |
| switch (t) { |
| case BOOL: return Type.BOOLEAN; |
| case DOUBLE: return Type.DOUBLE; |
| case FLOAT: return Type.FLOAT; |
| case INT8: return Type.TINYINT; |
| case INT16: return Type.SMALLINT; |
| case INT32: return Type.INT; |
| case INT64: return Type.BIGINT; |
| case STRING: return Type.STRING; |
| case UNIXTIME_MICROS: return Type.TIMESTAMP; |
| case DATE: return Type.DATE; |
| case DECIMAL: |
| return ScalarType.createDecimalType( |
| typeAttributes.getPrecision(), typeAttributes.getScale()); |
| case VARCHAR: return ScalarType.createVarcharType(typeAttributes.getLength()); |
| case BINARY: return Type.BINARY; |
| default: |
| throw new ImpalaRuntimeException(String.format( |
| "Kudu type '%s' is not supported in Impala", t.getName())); |
| } |
| } |
| |
| /** |
| * Creates and returns an Expr that takes rows being inserted by 'insertStmt' and |
| * returns the partition number for each row. |
| */ |
| public static Expr createPartitionExpr(InsertStmt insertStmt, Analyzer analyzer) |
| throws AnalysisException { |
| Preconditions.checkState(insertStmt.getTargetTable() instanceof FeKuduTable); |
| Expr kuduPartitionExpr = new KuduPartitionExpr(DescriptorTable.TABLE_SINK_ID, |
| (FeKuduTable) insertStmt.getTargetTable(), |
| Lists.newArrayList(insertStmt.getPartitionKeyExprs()), |
| insertStmt.getPartitionColPos()); |
| kuduPartitionExpr.analyze(analyzer); |
| return kuduPartitionExpr; |
| } |
| |
| // Used for test assertions |
| public static int getkuduClientsSize() { |
| return kuduClients_.size(); |
| } |
| |
| // Used for generating log messages |
| public static String getPrimaryKeyString(boolean isPrimaryKeyUnique) { |
| StringBuilder sb = new StringBuilder(); |
| if (!isPrimaryKeyUnique) sb.append("NON UNIQUE "); |
| sb.append("PRIMARY KEY"); |
| return sb.toString(); |
| } |
| |
| // Get auto-incrementing column name of Kudu table |
| public static String getAutoIncrementingColumnName() { |
| return Schema.getAutoIncrementingColumnName(); |
| } |
| } |