blob: 3f7dfbe571cd1e003157b420b78d01f5a0fb59e6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.utils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.Utils;
/**
* This UDF can be used to check that a tuple presented by HCatLoader has the
* right types for the fields
*
* Usage is :
*
* register testudf.jar;
* a = load 'numbers' using HCatLoader(...);
* b = foreach a generate HCatTypeCheck('intnum1000:int,id:int,intnum5:int,intnum100:int,intnum:int,longnum:long,floatnum:float,doublenum:double', *);
* store b into 'output';
*
* The schema string (the first argument to the UDF) is of the form one would provide in a
* pig load statement.
*
* The output should only contain the value '1' in all rows. (This UDF returns
* the integer value 1 if all fields have the right type, else throws IOException)
*
*/
public class HCatTypeCheck extends EvalFunc<Integer> {
static HashMap<Byte, Class<?>> typeMap = new HashMap<Byte, Class<?>>();
@Override
public Integer exec(Tuple input) throws IOException {
String schemaStr = (String) input.get(0);
Schema s = null;
try {
s = getSchemaFromString(schemaStr);
} catch (Exception e) {
throw new IOException(e);
}
for (int i = 0; i < s.size(); i++) {
check(s.getField(i).type, input.get(i + 1)); // input.get(i+1) since input.get(0) is the schema;
}
return 1;
}
static {
typeMap.put(DataType.INTEGER, Integer.class);
typeMap.put(DataType.LONG, Long.class);
typeMap.put(DataType.FLOAT, Float.class);
typeMap.put(DataType.DOUBLE, Double.class);
typeMap.put(DataType.CHARARRAY, String.class);
typeMap.put(DataType.TUPLE, Tuple.class);
typeMap.put(DataType.MAP, Map.class);
typeMap.put(DataType.BAG, DataBag.class);
}
private void die(String expectedType, Object o) throws IOException {
throw new IOException("Expected " + expectedType + ", got " +
o.getClass().getName());
}
private String check(Byte type, Object o) throws IOException {
if (o == null) {
return "";
}
if (check(typeMap.get(type), o)) {
if (type.equals(DataType.MAP)) {
Map<String, String> m = (Map<String, String>) o;
check(m);
} else if (type.equals(DataType.BAG)) {
DataBag bg = (DataBag) o;
for (Tuple tuple : bg) {
Map<String, String> m = (Map<String, String>) tuple.get(0);
check(m);
}
} else if (type.equals(DataType.TUPLE)) {
Tuple t = (Tuple) o;
if (!check(Integer.class, t.get(0)) ||
!check(String.class, t.get(1)) ||
!check(Double.class, t.get(2))) {
die("t:tuple(num:int,str:string,dbl:double)", t);
}
}
} else {
die(typeMap.get(type).getName(), o);
}
return o.toString();
}
/**
* @param m
* @throws IOException
*/
private void check(Map<String, String> m) throws IOException {
for (Entry<String, String> e : m.entrySet()) {
// just access key and value to ensure they are correct
if (!check(String.class, e.getKey())) {
die("String", e.getKey());
}
if (!check(String.class, e.getValue())) {
die("String", e.getValue());
}
}
}
private boolean check(Class<?> expected, Object actual) {
if (actual == null) {
return true;
}
return expected.isAssignableFrom(actual.getClass());
}
Schema getSchemaFromString(String schemaString) throws Exception {
/** ByteArrayInputStream stream = new ByteArrayInputStream(schemaString.getBytes()) ;
QueryParser queryParser = new QueryParser(stream) ;
Schema schema = queryParser.TupleSchema() ;
Schema.setSchemaDefaultType(schema, org.apache.pig.data.DataType.BYTEARRAY);
return schema;
*/
return Utils.getSchemaFromString(schemaString);
}
}