| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.impala.catalog; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| |
| import javax.xml.bind.DatatypeConverter; |
| |
| import org.apache.impala.analysis.KuduPartitionParam; |
| import org.apache.impala.common.ImpalaRuntimeException; |
| import org.apache.impala.service.BackendConfig; |
| import org.apache.impala.thrift.TColumn; |
| import org.apache.impala.thrift.TResultSet; |
| import org.apache.impala.thrift.TResultSetMetadata; |
| import org.apache.impala.util.KuduUtil; |
| import org.apache.impala.util.TResultRowBuilder; |
| import org.apache.kudu.ColumnSchema; |
| import org.apache.kudu.Schema; |
| import org.apache.kudu.client.KuduClient; |
| import org.apache.kudu.client.LocatedTablet; |
| import org.apache.kudu.client.PartitionSchema; |
| import org.apache.kudu.client.PartitionSchema.HashBucketSchema; |
| import org.apache.kudu.client.PartitionSchema.RangeSchema; |
| |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Preconditions; |
| |
| /** |
| * Frontend interface for interacting with a Kudu-backed table. |
| */ |
| public interface FeKuduTable extends FeTable { |
| /** |
| * Return the comma-separated list of masters for the Kudu cluster |
| * backing this table. |
| */ |
| String getKuduMasterHosts(); |
| |
| /** |
| * Return the name of the Kudu table backing this table. |
| */ |
| String getKuduTableName(); |
| |
| /** |
| * Return the names of the columns that make up the primary key |
| * of this table. |
| */ |
| List<String> getPrimaryKeyColumnNames(); |
| |
| /** |
| * Return the Kudu partitioning clause information. |
| */ |
| List<KuduPartitionParam> getPartitionBy(); |
| |
| /** |
| * Utility functions for acting on FeKuduTable. |
| * |
| * When we fully move to Java 8, these can become default methods of the |
| * interface. |
| */ |
| public static abstract class Utils { |
| /** |
| * Returns the range-based partitioning of the given table if it exists, |
| * null otherwise. |
| */ |
| private static KuduPartitionParam getRangePartitioning(FeKuduTable table) { |
| for (KuduPartitionParam partitionParam: table.getPartitionBy()) { |
| if (partitionParam.getType() == KuduPartitionParam.Type.RANGE) { |
| return partitionParam; |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Returns the column names of the table's range-based partitioning or an empty |
| * list if the table doesn't have a range-based partitioning. |
| */ |
| public static List<String> getRangePartitioningColNames(FeKuduTable table) { |
| KuduPartitionParam rangePartitioning = getRangePartitioning(table); |
| if (rangePartitioning == null) return Collections.<String>emptyList(); |
| return rangePartitioning.getColumnNames(); |
| } |
| |
| public static List<KuduPartitionParam> loadPartitionByParams( |
| org.apache.kudu.client.KuduTable kuduTable) { |
| List<KuduPartitionParam> ret = new ArrayList<>(); |
| |
| Preconditions.checkNotNull(kuduTable); |
| Schema tableSchema = kuduTable.getSchema(); |
| PartitionSchema partitionSchema = kuduTable.getPartitionSchema(); |
| for (HashBucketSchema hashBucketSchema: partitionSchema.getHashBucketSchemas()) { |
| List<String> columnNames = new ArrayList<>(); |
| for (int colId: hashBucketSchema.getColumnIds()) { |
| columnNames.add(getColumnNameById(tableSchema, colId)); |
| } |
| ret.add(KuduPartitionParam.createHashParam(columnNames, |
| hashBucketSchema.getNumBuckets())); |
| } |
| RangeSchema rangeSchema = partitionSchema.getRangeSchema(); |
| List<Integer> columnIds = rangeSchema.getColumns(); |
| if (columnIds.isEmpty()) return ret; |
| List<String> columnNames = new ArrayList<>(); |
| for (int colId: columnIds) columnNames.add(getColumnNameById(tableSchema, colId)); |
| // We don't populate the split values because Kudu's API doesn't currently support |
| // retrieving the split values for range partitions. |
| // TODO: File a Kudu JIRA. |
| ret.add(KuduPartitionParam.createRangeParam(columnNames, null)); |
| |
| return ret; |
| } |
| |
| public static TResultSet getTableStats(FeKuduTable table) |
| throws ImpalaRuntimeException { |
| TResultSet result = new TResultSet(); |
| TResultSetMetadata resultSchema = new TResultSetMetadata(); |
| result.setSchema(resultSchema); |
| |
| resultSchema.addToColumns(new TColumn("# Rows", Type.INT.toThrift())); |
| resultSchema.addToColumns(new TColumn("Start Key", Type.STRING.toThrift())); |
| resultSchema.addToColumns(new TColumn("Stop Key", Type.STRING.toThrift())); |
| resultSchema.addToColumns(new TColumn("Leader Replica", Type.STRING.toThrift())); |
| resultSchema.addToColumns(new TColumn("# Replicas", Type.INT.toThrift())); |
| |
| KuduClient client = KuduUtil.getKuduClient(table.getKuduMasterHosts()); |
| try { |
| org.apache.kudu.client.KuduTable kuduTable = client.openTable( |
| table.getKuduTableName()); |
| List<LocatedTablet> tablets = |
| kuduTable.getTabletsLocations(BackendConfig.INSTANCE.getKuduClientTimeoutMs()); |
| if (tablets.isEmpty()) { |
| TResultRowBuilder builder = new TResultRowBuilder(); |
| result.addToRows( |
| builder.add("-1").add("N/A").add("N/A").add("N/A").add("-1").get()); |
| return result; |
| } |
| for (LocatedTablet tab: tablets) { |
| TResultRowBuilder builder = new TResultRowBuilder(); |
| builder.add("-1"); // The Kudu client API doesn't expose tablet row counts. |
| builder.add(DatatypeConverter.printHexBinary( |
| tab.getPartition().getPartitionKeyStart())); |
| builder.add(DatatypeConverter.printHexBinary( |
| tab.getPartition().getPartitionKeyEnd())); |
| LocatedTablet.Replica leader = tab.getLeaderReplica(); |
| if (leader == null) { |
| // Leader might be null, if it is not yet available (e.g. during |
| // leader election in Kudu) |
| builder.add("Leader n/a"); |
| } else { |
| builder.add(leader.getRpcHost() + ":" + leader.getRpcPort().toString()); |
| } |
| builder.add(tab.getReplicas().size()); |
| result.addToRows(builder.get()); |
| } |
| |
| } catch (Exception e) { |
| throw new ImpalaRuntimeException("Error accessing Kudu for table stats.", e); |
| } |
| return result; |
| } |
| |
| public static TResultSet getRangePartitions(FeKuduTable table) |
| throws ImpalaRuntimeException { |
| TResultSet result = new TResultSet(); |
| TResultSetMetadata resultSchema = new TResultSetMetadata(); |
| result.setSchema(resultSchema); |
| |
| // Build column header |
| String header = "RANGE (" + Joiner.on(',').join( |
| Utils.getRangePartitioningColNames(table)) + ")"; |
| resultSchema.addToColumns(new TColumn(header, Type.STRING.toThrift())); |
| KuduClient client = KuduUtil.getKuduClient(table.getKuduMasterHosts()); |
| try { |
| org.apache.kudu.client.KuduTable kuduTable = client.openTable( |
| table.getKuduTableName()); |
| // The Kudu table API will return the partitions in sorted order by value. |
| List<String> partitions = kuduTable.getFormattedRangePartitions( |
| BackendConfig.INSTANCE.getKuduClientTimeoutMs()); |
| if (partitions.isEmpty()) { |
| TResultRowBuilder builder = new TResultRowBuilder(); |
| result.addToRows(builder.add("").get()); |
| return result; |
| } |
| for (String partition: partitions) { |
| TResultRowBuilder builder = new TResultRowBuilder(); |
| builder.add(partition); |
| result.addToRows(builder.get()); |
| } |
| } catch (Exception e) { |
| throw new ImpalaRuntimeException("Error accessing Kudu for table partitions.", e); |
| } |
| return result; |
| } |
| |
| /** |
| * Returns the name of a Kudu column with id 'colId'. |
| */ |
| private static String getColumnNameById(Schema tableSchema, int colId) { |
| Preconditions.checkNotNull(tableSchema); |
| ColumnSchema col = tableSchema.getColumnByIndex(tableSchema.getColumnIndex(colId)); |
| Preconditions.checkNotNull(col); |
| return col.getName(); |
| } |
| } |
| } |