blob: a28d0f99f2e965cfef1068474df509a6fbf2978d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connectors.hbase.table;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.connectors.hbase.util.HBaseTypeUtils;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* Helps to specify an HBase Table's schema.
*/
public class HBaseTableSchema implements Serializable {
// A Map with key as column family.
private final Map<String, Map<String, TypeInformation<?>>> familyMap = new LinkedHashMap<>();
// charset to parse HBase keys and strings. UTF-8 by default.
private String charset = "UTF-8";
// count total qualifier number
private int totalQualifiers = 0;
/**
* Adds a column defined by family, qualifier, and type to the table schema.
*
* @param family the family name
* @param qualifier the qualifier name
* @param clazz the data type of the qualifier
*/
@Deprecated
public void addColumn(String family, String qualifier, Class<?> clazz) {
if (!HBaseTypeUtils.isSupportedType(clazz)) {
// throw exception
throw new IllegalArgumentException("Unsupported class type found " + clazz + ". " +
"Better to use byte[].class and deserialize using user defined scalar functions");
}
// redirect to new method
addColumn(family, qualifier, TypeExtractor.getForClass(clazz));
}
/**
* Adds a column defined by family, qualifier, and type to the table schema.
*
* @param family the family name
* @param qualifier the qualifier name
* @param type the data type info of the qualifier
*/
public void addColumn(String family, String qualifier, TypeInformation<?> type) {
Preconditions.checkNotNull(family, "family name");
Preconditions.checkNotNull(qualifier, "qualifier name");
Preconditions.checkNotNull(type, "qualifier type");
Map<String, TypeInformation<?>> qualifierMap = this.familyMap.get(family);
if (!HBaseTypeUtils.isSupportedType(type)) {
// throw exception
throw new IllegalArgumentException("Unsupported type found " + type + ". " +
"Better to use byte[].class and deserialize using user defined scalar functions");
}
if (qualifierMap == null) {
qualifierMap = new LinkedHashMap<>();
}
// validate qualifier name's uniqueness
if (qualifierMap.containsKey(qualifier)) {
throw new IllegalArgumentException("qualifier[" + qualifier + "] in column[" + family + "] already exists!");
}
qualifierMap.put(qualifier, type);
familyMap.put(family, qualifierMap);
totalQualifiers++;
}
/**
* Sets the charset for value strings and HBase identifiers.
*
* @param charset the charset for value strings and HBase identifiers.
*/
public void setCharset(String charset) {
this.charset = charset;
}
/**
* Returns the names of all registered column families.
*
* @return The names of all registered column families.
*/
public String[] getFamilyNames() {
return this.familyMap.keySet().toArray(new String[this.familyMap.size()]);
}
/**
* Returns the HBase identifiers of all registered column families.
*
* @return The HBase identifiers of all registered column families.
*/
public byte[][] getFamilyKeys() {
Charset c = Charset.forName(charset);
byte[][] familyKeys = new byte[this.familyMap.size()][];
int i = 0;
for (String name : this.familyMap.keySet()) {
familyKeys[i++] = name.getBytes(c);
}
return familyKeys;
}
/**
* Returns the names of all registered column qualifiers of a specific column family.
*
* @param family The name of the column family for which the column qualifier names are returned.
* @return The names of all registered column qualifiers of a specific column family.
*/
public String[] getQualifierNames(String family) {
Map<String, TypeInformation<?>> qualifierMap = familyMap.get(family);
if (qualifierMap == null) {
throw new IllegalArgumentException("Family " + family + " does not exist in schema.");
}
String[] qualifierNames = new String[qualifierMap.size()];
int i = 0;
for (String qualifier: qualifierMap.keySet()) {
qualifierNames[i] = qualifier;
i++;
}
return qualifierNames;
}
/**
* Returns the HBase identifiers of all registered column qualifiers for a specific column family.
*
* @param family The name of the column family for which the column qualifier identifiers are returned.
* @return The HBase identifiers of all registered column qualifiers for a specific column family.
*/
public byte[][] getQualifierKeys(String family) {
Map<String, TypeInformation<?>> qualifierMap = familyMap.get(family);
if (qualifierMap == null) {
throw new IllegalArgumentException("Family " + family + " does not exist in schema.");
}
Charset c = Charset.forName(charset);
byte[][] qualifierKeys = new byte[qualifierMap.size()][];
int i = 0;
for (String name : qualifierMap.keySet()) {
qualifierKeys[i++] = name.getBytes(c);
}
return qualifierKeys;
}
/**
* Returns the types of all registered column qualifiers of a specific column family.
*
* @param family The name of the column family for which the column qualifier types are returned.
* @return The types of all registered column qualifiers of a specific column family.
*/
public TypeInformation<?>[] getQualifierTypes(String family) {
Map<String, TypeInformation<?>> qualifierMap = familyMap.get(family);
if (qualifierMap == null) {
throw new IllegalArgumentException("Family " + family + " does not exist in schema.");
}
TypeInformation<?>[] typeInformation = new TypeInformation[qualifierMap.size()];
int i = 0;
for (TypeInformation<?> typeInfo : qualifierMap.values()) {
typeInformation[i] = typeInfo;
i++;
}
return typeInformation;
}
/**
* Returns the names and types of all registered column qualifiers of a specific column family.
*
* @param family The name of the column family for which the column qualifier names and types are returned.
* @return The names and types of all registered column qualifiers of a specific column family.
*/
public Map<String, TypeInformation<?>> getFamilyInfo(String family) {
return familyMap.get(family);
}
/**
* Returns the charset for value strings and HBase identifiers.
*
* @return The charset for value strings and HBase identifiers.
*/
public String getStringCharset() {
return this.charset;
}
public int getTotalQualifiers() {
return totalQualifiers;
}
/**
* A helper method returns a flat qualifiers list in defining order.
* Qualifier info is a Tuple3 representation:
* (byte[] columnFamily, byte[] qualifier, TypeInformation typeInfo)
*/
public List<Tuple3<byte[], byte[], TypeInformation<?>>> getFlatByteQualifiers() {
List<Tuple3<byte[], byte[], TypeInformation<?>>> qualifierList = new ArrayList<>();
for (String family : getFamilyNames()) {
byte[] columnBytes = Bytes.toBytes(family);
String[] qualifierNames = getQualifierNames(family);
TypeInformation[] qualifierTypes = getQualifierTypes(family);
for (int idx = 0; idx < qualifierNames.length; idx++) {
qualifierList.add(new Tuple3(columnBytes, Bytes.toBytes(qualifierNames[idx]), qualifierTypes[idx]));
}
}
return qualifierList;
}
/**
* A helper method returns a flat qualifiers list in defining order.
* Qualifier info is a Tuple3 representation:
* (String columnFamily, String qualifier, TypeInformation typeInfo)
*/
public List<Tuple3<String, String, TypeInformation<?>>> getFlatStringQualifiers() {
List<Tuple3<String, String, TypeInformation<?>>> qualifierList = new ArrayList<>();
for (String family : getFamilyNames()) {
String[] qualifierNames = getQualifierNames(family);
TypeInformation[] qualifierTypes = getQualifierTypes(family);
for (int idx = 0; idx < qualifierNames.length; idx++) {
qualifierList.add(new Tuple3(family, qualifierNames[idx], qualifierTypes[idx]));
}
}
return qualifierList;
}
}