| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig.data; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.lang.annotation.ElementType; |
| import java.lang.annotation.Retention; |
| import java.lang.annotation.RetentionPolicy; |
| import java.lang.annotation.Target; |
| import java.util.List; |
| import java.util.Queue; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.pig.PigConfiguration; |
| import org.apache.pig.classification.InterfaceAudience; |
| import org.apache.pig.classification.InterfaceStability; |
| import org.apache.pig.impl.logicalLayer.schema.Schema; |
| import org.apache.pig.impl.util.JavaCompilerHelper; |
| import org.apache.pig.impl.util.ObjectSerializer; |
| |
| import com.google.common.collect.Lists; |
| |
| /** |
| * This class encapsulates the generation of SchemaTuples, as well as some logic |
| * around shipping code to the distributed cache. |
| */ |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| public class SchemaTupleClassGenerator { |
| private static final Log LOG = LogFactory.getLog(SchemaTupleClassGenerator.class); |
| |
| private SchemaTupleClassGenerator() {} |
| |
| /** |
| * The GenContext mechanism provides a level of control in where SchemaTupleFactories |
| * are used. By attaching a GenContext enum type to the registration of a Schema, |
| * the code can express the intent of where a SchemaTupleFactory is intended to be used. |
| * In this way, if a load func and a join both involve Tuples of the same Schema, it's |
| * possible to use SchemaTupleFactories in one but not in the other. |
| */ |
| public static enum GenContext { |
| /** |
| * This context is used in UDF code. Currently, this is only used for |
| * the inputs to UDF's. |
| */ |
| UDF (PigConfiguration.PIG_SCHEMA_TUPLE_USE_IN_UDF, true, GenerateUdf.class), |
| /** |
| * This context is for POForEach. This will use the expected output of a ForEach |
| * to return a typed Tuple. |
| */ |
| FOREACH (PigConfiguration.PIG_SCHEMA_TUPLE_USE_IN_FOREACH, true, GenerateForeach.class), |
| /** |
| * This context controls whether or not SchemaTuples will be used in FR joins. |
| * Currently, they will be used in the HashMap that FR Joins construct. |
| */ |
| FR_JOIN (PigConfiguration.PIG_SCHEMA_TUPLE_USE_IN_FRJOIN, true, GenerateFrJoin.class), |
| /** |
| * This context controls whether or not SchemaTuples will be used in merge joins. |
| */ |
| MERGE_JOIN (PigConfiguration.PIG_SCHEMA_TUPLE_USE_IN_MERGEJOIN, true, GenerateMergeJoin.class), |
| /** |
| * All registered Schemas will also be registered in one additional context. |
| * This context will allow users to "force" the load of a SchemaTupleFactory |
| * if one is present in any context. |
| */ |
| FORCE_LOAD (PigConfiguration.PIG_SCHEMA_TUPLE_ALLOW_FORCE, true, GenerateForceLoad.class); |
| |
| /** |
| * These annotations are used to mark a given SchemaTuple with |
| * the context in which is was intended to be generated. |
| */ |
| |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.TYPE) |
| public @interface GenerateUdf {} |
| |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.TYPE) |
| public @interface GenerateForeach {} |
| |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.TYPE) |
| public @interface GenerateFrJoin {} |
| |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.TYPE) |
| public @interface GenerateMergeJoin {} |
| |
| @Retention(RetentionPolicy.RUNTIME) |
| @Target(ElementType.TYPE) |
| public @interface GenerateForceLoad {} |
| |
| private String key; |
| private boolean defaultValue; |
| private Class<?> annotation; |
| |
| GenContext(String key, boolean defaultValue, Class<?> annotation) { |
| this.key = key; |
| this.defaultValue = defaultValue; |
| this.annotation = annotation; |
| } |
| |
| public String key() { |
| return key; |
| } |
| |
| public String getAnnotationCanonicalName() { |
| return annotation.getCanonicalName(); |
| } |
| |
| /** |
| * Checks the generated class to see if the annotation |
| * associated with this enum is present. |
| * @param clazz |
| * @return boolean type value |
| */ |
| @SuppressWarnings({ "unchecked", "rawtypes" }) |
| public boolean shouldGenerate(Class clazz) { |
| return clazz.getAnnotation(annotation) != null; |
| } |
| |
| /** |
| * Given a job configuration file, this checks to see |
| * if the default value has been overriden. |
| * @param conf |
| * @return boolean type value |
| */ |
| public boolean shouldGenerate(Configuration conf) { |
| String shouldString = conf.get(key); |
| if (shouldString == null) { |
| return defaultValue; |
| } |
| return Boolean.parseBoolean(shouldString); |
| } |
| } |
| |
| /** |
| * This value is used to distinguish all of the generated code. |
| * The general naming scheme used is SchemaTupe_identifier. Note that |
| * identifiers are incremented before code is actually generated. |
| */ |
| private static int nextGlobalClassIdentifier = 0; |
| |
| protected static void resetGlobalClassIdentifier() { |
| nextGlobalClassIdentifier = 0; |
| } |
| |
| /** |
| * This class actually generates the code for a given Schema. |
| * @param s as Schema |
| * @param appendable as boolean, true or false depending on whether it should be appendable |
| * @param id as int, id means identifier |
| * @param contexts which are a list of contexts in which the SchemaTuple is intended to be instantiated |
| */ |
| protected static void generateSchemaTuple(Schema s, boolean appendable, int id, File codeDir, GenContext... contexts) { |
| StringBuilder contextAnnotations = new StringBuilder(); |
| for (GenContext context : contexts) { |
| LOG.info("Including context: " + context); |
| contextAnnotations.append("@").append(context.getAnnotationCanonicalName()).append("\n"); |
| } |
| |
| String codeString = produceCodeString(s, appendable, id, contextAnnotations.toString(), codeDir); |
| |
| String name = "SchemaTuple_" + id; |
| |
| LOG.info("Compiling class " + name + " for Schema: " + s + ", and appendability: " + appendable); |
| |
| compileCodeString(name, codeString, codeDir); |
| } |
| |
| private static int generateSchemaTuple(Schema s, boolean appendable, File codeDir, GenContext... contexts) { |
| int id = SchemaTupleClassGenerator.getNextGlobalClassIdentifier(); |
| |
| generateSchemaTuple(s, appendable, id, codeDir, contexts); |
| |
| return id; |
| } |
| |
| /** |
| * This method generates the actual SchemaTuple for the given Schema. |
| * @param schema |
| * @param whether the class should be appendable |
| * @param identifier |
| * @return the generated class's implementation |
| */ |
| private static String produceCodeString(Schema s, boolean appendable, int id, String contextAnnotations, File codeDir) { |
| TypeInFunctionStringOutFactory f = new TypeInFunctionStringOutFactory(s, id, appendable, contextAnnotations, codeDir); |
| |
| for (Schema.FieldSchema fs : s.getFields()) { |
| f.process(fs); |
| } |
| |
| return f.end(); |
| } |
| |
| protected static int getNextGlobalClassIdentifier() { |
| return nextGlobalClassIdentifier++; |
| } |
| |
| /** |
| * This method takes generated code, and compiles it down to a class file. It will output |
| * the generated class file to the static temporary directory for generated code. Note |
| * that the compiler will use the classpath that Pig is instantiated with, as well as the |
| * generated directory. |
| * |
| * @param String of generated code |
| * @param name of class |
| */ |
| //TODO in the future, we can use ASM to generate the bytecode directly. |
| private static void compileCodeString(String className, String generatedCodeString, File codeDir) { |
| JavaCompilerHelper compiler = new JavaCompilerHelper(); |
| String tempDir = codeDir.getAbsolutePath(); |
| compiler.addToClassPath(tempDir); |
| LOG.debug("Compiling SchemaTuple code with classpath: " + compiler.getClassPath()); |
| compiler.compile(tempDir, new JavaCompilerHelper.JavaSourceFromString(className, generatedCodeString)); |
| LOG.info("Successfully compiled class: " + className); |
| } |
| |
| static class CompareToSpecificString extends TypeInFunctionStringOut { |
| private int id; |
| |
| public CompareToSpecificString(int id, boolean appendable) { |
| super(appendable); |
| this.id = id; |
| } |
| |
| @Override |
| public void prepare() { |
| add("@Override"); |
| add("protected int generatedCodeCompareToSpecific(SchemaTuple_"+id+" t) {"); |
| add(" int i = 0;"); |
| } |
| |
| @Override |
| public void process(int fieldNum, Schema.FieldSchema fs) { |
| add(" i = compare(checkIfNull_" + fieldNum + "(), getPos_" |
| + fieldNum + "(), t.checkIfNull_" + fieldNum + "(), t.getPos_" |
| + fieldNum + "());"); |
| add(" if (i != 0) {"); |
| add(" return i;"); |
| add(" }"); |
| } |
| |
| @Override |
| public void end() { |
| add(" return i;"); |
| add("}"); |
| } |
| } |
| |
| //TODO clear up how it deals with nulls etc. IE is the logic correct |
| static class CompareToString extends TypeInFunctionStringOut { |
| private int id; |
| |
| public CompareToString(int id) { |
| this.id = id; |
| } |
| |
| @Override |
| public void prepare() { |
| add("@Override"); |
| add("protected int generatedCodeCompareTo(SchemaTuple t, boolean checkType) {"); |
| add(" int i;"); |
| } |
| |
| boolean compTup = false; |
| boolean compStr = false; |
| boolean compIsNull = false; |
| boolean compByte = false; |
| |
| @Override |
| public void process(int fieldNum, Schema.FieldSchema fs) { |
| add(" i = compareWithElementAtPos(checkIfNull_" + fieldNum + "(), getPos_" + fieldNum + "(), t, " + fieldNum + ");"); |
| add(" if (i != 0) {"); |
| add(" return i;"); |
| add(" }"); |
| } |
| |
| @Override |
| public void end() { |
| add(" return 0;"); |
| add("}"); |
| } |
| } |
| |
| static class HashCode extends TypeInFunctionStringOut { |
| @Override |
| public void prepare() { |
| add("@Override"); |
| add("public int generatedCodeHashCode() {"); |
| add(" int h = 17;"); |
| } |
| |
| @Override |
| public void process(int fieldPos, Schema.FieldSchema fs) { |
| add(" h = hashCodePiece(h, getPos_" + fieldPos + "(), checkIfNull_" + fieldPos + "());"); |
| } |
| |
| @Override |
| public void end() { |
| add(" return h;"); |
| add("}"); |
| } |
| } |
| |
| static class FieldString extends TypeInFunctionStringOut { |
| private List<Queue<Integer>> listOfQueuesForIds; |
| private Schema schema; |
| |
| private int primitives = 0; |
| private int isNulls = 0; |
| |
| private int booleanBytes = 0; |
| private int booleans = 0; |
| private File codeDir; |
| |
| @Override |
| public void prepare() { |
| String s; |
| try { |
| s = ObjectSerializer.serialize(schema); |
| } catch (IOException e) { |
| throw new RuntimeException("Unable to serialize schema: " + schema, e); |
| } |
| add("private static Schema schema = staticSchemaGen(\"" + s + "\");"); |
| } |
| |
| @Override |
| public void process(int fieldPos, Schema.FieldSchema fs) { |
| if (!isTuple()) { |
| if (isPrimitive() && (primitives++ % 8 == 0)) { |
| add("private byte isNull_"+ isNulls++ +" = (byte)0xFF;"); |
| } |
| |
| if (isBoolean()) { |
| if (booleans++ % 8 == 0) { |
| add("private byte booleanByte_"+ booleanBytes++ +";"); |
| } |
| } else { |
| add("private "+typeName()+" pos_"+fieldPos+";"); |
| } |
| } else { |
| int id = SchemaTupleClassGenerator.generateSchemaTuple(fs.schema, isAppendable(), codeDir()); |
| |
| for (Queue<Integer> q : listOfQueuesForIds) { |
| q.add(id); |
| } |
| |
| add("private SchemaTuple_"+id+" pos_"+fieldPos+";"); |
| } |
| } |
| |
| @Override |
| public void end() { |
| addBreak(); |
| add("@Override"); |
| add("public Schema getSchema() {"); |
| add(" return schema;"); |
| add("}"); |
| addBreak(); |
| } |
| |
| public FieldString(File codeDir, List<Queue<Integer>> listOfQueuesForIds, Schema schema, boolean appendable) { |
| super(appendable); |
| this.codeDir = codeDir; |
| this.listOfQueuesForIds = listOfQueuesForIds; |
| this.schema = schema; |
| } |
| |
| public File codeDir() { |
| return codeDir; |
| } |
| } |
| |
| static class SetPosString extends TypeInFunctionStringOut { |
| private Queue<Integer> idQueue; |
| |
| private int byteField = 0; //this is for setting booleans |
| private int byteIncr = 0; //this is for counting the booleans we've encountered |
| |
| @Override |
| public void process(int fieldPos, Schema.FieldSchema fs) { |
| if (!isTuple()) { |
| add("public void setPos_"+fieldPos+"("+typeName()+" v) {"); |
| if (isPrimitive()) { |
| add(" setNull_"+fieldPos+"(false);"); |
| } |
| |
| if (!isBoolean()) { |
| add(" pos_"+fieldPos+" = v;"); |
| } else { |
| add(" booleanByte_" + byteField + " = BytesHelper.setBitByPos(booleanByte_" + byteField + ", v, " + byteIncr++ + ");"); |
| |
| if (byteIncr % 8 == 0) { |
| byteIncr = 0; |
| byteField++; |
| } |
| } |
| |
| add("}"); |
| } else { |
| int nestedSchemaTupleId = idQueue.remove(); |
| add("public void setPos_"+fieldPos+"(SchemaTuple_"+nestedSchemaTupleId+" t) {"); |
| add(" pos_" + fieldPos + " = t;"); |
| add("}"); |
| addBreak(); |
| add("public void setPos_"+fieldPos+"(SchemaTuple t) {"); |
| add(" if (pos_"+fieldPos+" == null) {"); |
| add(" pos_"+fieldPos+" = new SchemaTuple_"+nestedSchemaTupleId+"();"); |
| add(" }"); |
| add(" pos_" + fieldPos + ".setAndCatch(t);"); |
| add("}"); |
| addBreak(); |
| add("public void setPos_"+fieldPos+"(Tuple t) {"); |
| add(" if (pos_"+fieldPos+" == null) {"); |
| add(" pos_"+fieldPos+" = new SchemaTuple_"+nestedSchemaTupleId+"();"); |
| add(" }"); |
| add(" pos_" + fieldPos + ".setAndCatch(t);"); |
| add("}"); |
| } |
| addBreak(); |
| } |
| |
| public SetPosString(Queue<Integer> idQueue) { |
| this.idQueue = idQueue; |
| } |
| } |
| |
| static class ListSetString extends TypeInFunctionStringOut { |
| @Override |
| public void prepare() { |
| add("@Override"); |
| add("public void generatedCodeSetIterator(Iterator<Object> it) throws ExecException {"); |
| } |
| |
| @Override |
| public void process(int fieldPos, Schema.FieldSchema fs) { |
| add(" setPos_"+fieldPos+"(unbox(it.next(), getDummy_"+fieldPos+"()));"); |
| } |
| |
| @Override |
| public void end() { |
| add("}"); |
| } |
| } |
| |
| static class GenericSetString extends TypeInFunctionStringOut { |
| @Override |
| public void prepare() { |
| add("@Override"); |
| add("public void generatedCodeSetField(int fieldNum, Object val) throws ExecException {"); |
| add(" switch (fieldNum) {"); |
| } |
| |
| @Override |
| public void process(int fieldPos, Schema.FieldSchema fs) { |
| add(" case ("+fieldPos+"):"); |
| add(" if (val == null) {"); |
| add(" setNull_" + fieldPos + "(true);"); |
| add(" return;"); |
| add(" }"); |
| add(" setPos_"+fieldPos+"(unbox(val, getDummy_"+fieldPos+"()));"); |
| add(" break;"); |
| } |
| |
| @Override |
| public void end() { |
| add(" default:"); |
| add(" throw new ExecException(\"Invalid index given to set: \" + fieldNum);"); |
| add(" }"); |
| add("}"); |
| } |
| } |
| |
| static class GenericGetString extends TypeInFunctionStringOut { |
| @Override |
| public void prepare() { |
| add("@Override"); |
| add("public Object generatedCodeGetField(int fieldNum) throws ExecException {"); |
| add(" switch (fieldNum) {"); |
| } |
| |
| @Override |
| public void process(int fieldPos, Schema.FieldSchema fs) { |
| add(" case ("+fieldPos+"): return checkIfNull_"+fieldPos+"() ? null : box(getPos_"+fieldPos+"());"); |
| } |
| |
| @Override |
| public void end() { |
| add(" default: throw new ExecException(\"Invalid index given to get: \" + fieldNum);"); |
| add(" }"); |
| add("}"); |
| } |
| } |
| |
| static class GeneralIsNullString extends TypeInFunctionStringOut { |
| @Override |
| public void prepare() { |
| add("@Override"); |
| add("public boolean isGeneratedCodeFieldNull(int fieldNum) throws ExecException {"); |
| add(" switch (fieldNum) {"); |
| } |
| |
| @Override |
| public void process(int fieldPos, Schema.FieldSchema fs) { |
| add(" case ("+fieldPos+"): return checkIfNull_"+fieldPos+"();"); |
| } |
| |
| @Override |
| public void end() { |
| add(" default: throw new ExecException(\"Invalid index given: \" + fieldNum);"); |
| add(" }"); |
| add("}"); |
| } |
| } |
| |
| static class CheckIfNullString extends TypeInFunctionStringOut { |
| private int nullByte = 0; //the byte_ val |
| private int byteIncr = 0; //the mask we're on |
| |
| @Override |
| public void process(int fieldPos, Schema.FieldSchema fs) { |
| add("public boolean checkIfNull_" + fieldPos + "() {"); |
| if (isPrimitive()) { |
| add(" return BytesHelper.getBitByPos(isNull_" + nullByte + ", " + byteIncr++ +");"); |
| if (byteIncr % 8 == 0) { |
| byteIncr = 0; |
| nullByte++; |
| } |
| } else { |
| add(" return pos_" + fieldPos + " == null;"); |
| } |
| add("}"); |
| addBreak(); |
| } |
| } |
| |
| static class SetNullString extends TypeInFunctionStringOut { |
| private int nullByte = 0; //the byte_ val |
| private int byteIncr = 0; //the mask we're on |
| |
| @Override |
| public void process(int fieldPos, Schema.FieldSchema fs) { |
| add("public void setNull_"+fieldPos+"(boolean b) {"); |
| if (isPrimitive()) { |
| add(" isNull_" + nullByte + " = BytesHelper.setBitByPos(isNull_" + nullByte + ", b, " + byteIncr++ + ");"); |
| if (byteIncr % 8 == 0) { |
| byteIncr = 0; |
| nullByte++; |
| } |
| } else { |
| add(" if (b) {"); |
| add(" pos_" + fieldPos + " = null;"); |
| add(" }"); |
| } |
| add("}"); |
| addBreak(); |
| } |
| } |
| |
| //TODO should this do something different if t is null? |
| static class SetEqualToSchemaTupleSpecificString extends TypeInFunctionStringOut { |
| private int id; |
| |
| @Override |
| public void prepare() { |
| add("@Override"); |
| add("protected SchemaTuple generatedCodeSetSpecific(SchemaTuple_"+id+" t) {"); |
| } |
| |
| @Override |
| public void process(int fieldPos, Schema.FieldSchema fs) { |
| add(" if (t.checkIfNull_" + fieldPos + "()) {"); |
| add(" setNull_" + fieldPos + "(true);"); |
| add(" } else {"); |
| add(" setPos_"+fieldPos+"(t.getPos_"+fieldPos+"());"); |
| add(" }"); |
| addBreak(); |
| } |
| |
| @Override |
| public void end() { |
| add(" return this;"); |
| add("}"); |
| addBreak(); |
| } |
| |
| public SetEqualToSchemaTupleSpecificString(int id) { |
| this.id = id; |
| } |
| } |
| |
| static class IsSpecificSchemaTuple extends TypeInFunctionStringOut { |
| private int id; |
| |
| public IsSpecificSchemaTuple(int id) { |
| this.id = id; |
| } |
| |
| @Override |
| public void prepare() { |
| add("@Override"); |
| add("public boolean isSpecificSchemaTuple(Object o) {"); |
| add(" return o instanceof SchemaTuple_" + id + ";"); |
| add("}"); |
| } |
| } |
| |
| //this has to write the null state of all the fields, not just the null bytes, though those |
| //will have to be reconstructed |
| static class WriteNullsString extends TypeInFunctionStringOut { |
| String s = " boolean[] b = {\n"; |
| |
| @Override |
| public void prepare() { |
| add("@Override"); |
| add("protected boolean[] generatedCodeNullsArray() throws IOException {"); |
| } |
| |
| @Override |
| public void process(int fieldPos, Schema.FieldSchema fs) { |
| s += " checkIfNull_"+fieldPos+"(),\n"; |
| } |
| |
| @Override |
| public void end() { |
| s = s.substring(0, s.length() - 2) + "\n };"; |
| add(s); |
| add(" return b;"); |
| add("}"); |
| addBreak(); |
| } |
| |
| public WriteNullsString(boolean appendable) { |
| super(appendable); |
| } |
| } |
| |
| static class ReadString extends TypeInFunctionStringOut { |
| private Queue<Integer> idQueue; |
| |
| private int booleans = 0; |
| |
| @Override |
| public void prepare() { |
| add("@Override"); |
| add("protected void generatedCodeReadFields(DataInput in, boolean[] b) throws IOException {"); |
| } |
| |
| @Override |
| public void process(int fieldPos, Schema.FieldSchema fs) { |
| if (isBoolean()) { |
| booleans++; |
| add(" if (b["+fieldPos+"]) {"); |
| add(" setNull_"+fieldPos+"(true);"); |
| add(" } else {"); |
| add(" setNull_"+fieldPos+"(false);"); |
| add(" }"); |
| } else if (!isTuple()) { |
| add(" if (b["+fieldPos+"]) {"); |
| add(" setNull_"+fieldPos+"(true);"); |
| add(" } else {"); |
| add(" setPos_"+fieldPos+"(read(in, pos_"+fieldPos+"));"); |
| add(" }"); |
| addBreak(); |
| } else { |
| int nestedSchemaTupleId = idQueue.remove(); |
| add(" if (b["+fieldPos+"]) {"); |
| add(" setNull_"+fieldPos+"(true);"); |
| add(" } else {"); |
| add(" SchemaTuple_"+nestedSchemaTupleId+" st = new SchemaTuple_"+nestedSchemaTupleId+"();"); |
| add(" st.readFields(in);"); |
| add(" setPos_"+fieldPos+"(st);"); |
| add(" }"); |
| addBreak(); |
| } |
| } |
| |
| @Override |
| public void end() { |
| if (booleans > 0) { |
| int i = 0; |
| while (booleans > 0) { |
| add(" booleanByte_"+(i++)+" = in.readByte();"); |
| booleans -= 8; |
| } |
| } |
| add("}"); |
| addBreak(); |
| } |
| |
| public ReadString(Queue<Integer> idQueue, boolean appendable) { |
| super(appendable); |
| this.idQueue = idQueue; |
| } |
| } |
| |
| |
| static class WriteString extends TypeInFunctionStringOut { |
| @Override |
| public void prepare() { |
| add("@Override"); |
| add("protected void generatedCodeWriteElements(DataOutput out) throws IOException {"); |
| } |
| |
| private int booleans = 0; |
| |
| @Override |
| public void process(int fieldPos, Schema.FieldSchema fs) { |
| if (isBoolean()) { |
| booleans++; |
| } else { |
| add(" if (!checkIfNull_"+fieldPos+"()) {"); |
| add(" write(out, pos_"+fieldPos+");"); |
| add(" }"); |
| addBreak(); |
| } |
| } |
| |
| @Override |
| public void end() { |
| if (booleans > 0) { |
| int i = 0; |
| while (booleans > 0) { |
| add(" out.writeByte(booleanByte_"+(i++)+");"); |
| booleans -= 8; |
| } |
| } |
| add("}"); |
| addBreak(); |
| } |
| } |
| |
| //TODO need to include all of the objects from Schema (have it implement it's own getMemorySize()? |
| static class MemorySizeString extends TypeInFunctionStringOut { |
| private int size = 0; |
| |
| String s = " return SizeUtil.roundToEight("; |
| |
| @Override |
| public void prepare() { |
| add("@Override"); |
| add("public long getGeneratedCodeMemorySize() {"); |
| } |
| |
| private int booleans = 0; |
| private int primitives = 0; |
| |
| //TODO a null array or object variable still takes up space for the pointer, yes? |
| @Override |
| public void process(int fieldPos, Schema.FieldSchema fs) { |
| if (isInt() || isFloat()) { |
| size += 4; |
| } else if (isLong() || isDouble()) { |
| size += 8; |
| } else if (isBytearray()) { |
| size += 8; //the ptr |
| s += "(pos_"+fieldPos+" == null ? 0 : SizeUtil.roundToEight(12 + pos_"+fieldPos+".length) * 8) + "; |
| } else if (isBoolean()) { |
| if (booleans++ % 8 == 0) { |
| size++; //accounts for the byte used to store boolean values |
| } |
| } else if (isDateTime()) { |
| size += 10; // 8 for long and 2 for short |
| } else if (isBag()) { |
| size += 8; //the ptr |
| s += "(pos_"+fieldPos+" == null ? 0 : pos_"+fieldPos+".getMemorySize()) + "; |
| } else if (isMap() || isString() || isBigDecimal() || isBigInteger()) { |
| size += 8; //the ptr |
| s += "(pos_"+fieldPos+" == null ? 0 : SizeUtil.getPigObjMemSize(pos_"+fieldPos+")) + "; |
| } else if (isTuple()) { |
| size += 8; //the ptr |
| s += "(pos_"+fieldPos+" == null ? 8 : pos_"+fieldPos+".getMemorySize()) + "; |
| } else { |
| throw new RuntimeException("Unsupported type found: " + fs); |
| } |
| |
| if (isPrimitive() && primitives++ % 8 == 0) { |
| size++; //accounts for the null byte |
| } |
| } |
| |
| @Override |
| public void end() { |
| s += size + ");"; |
| add(s); |
| add("}"); |
| addBreak(); |
| } |
| } |
| |
| static class GetDummyString extends TypeInFunctionStringOut { |
| @Override |
| public void process(int fieldPos, Schema.FieldSchema fs) { |
| add("public "+typeName()+" getDummy_"+fieldPos+"() {"); |
| switch (fs.type) { |
| case (DataType.INTEGER): add(" return 0;"); break; |
| case (DataType.LONG): add(" return 0L;"); break; |
| case (DataType.FLOAT): add(" return 0.0f;"); break; |
| case (DataType.DOUBLE): add(" return 0.0;"); break; |
| case (DataType.BOOLEAN): add(" return true;"); break; |
| case (DataType.DATETIME): add(" return new DateTime();"); break; |
| case (DataType.BIGDECIMAL): add(" return (BigDecimal)null;"); break; |
| case (DataType.BIGINTEGER): add(" return (BigInteger)null;"); break; |
| case (DataType.BYTEARRAY): add(" return (byte[])null;"); break; |
| case (DataType.CHARARRAY): add(" return (String)null;"); break; |
| case (DataType.TUPLE): add(" return (Tuple)null;"); break; |
| case (DataType.BAG): add(" return (DataBag)null;"); break; |
| case (DataType.MAP): add(" return (Map<String,Object>)null;"); break; |
| default: throw new RuntimeException("Unsupported type"); |
| } |
| add("}"); |
| addBreak(); |
| } |
| } |
| |
| static class GetPosString extends TypeInFunctionStringOut { |
| private Queue<Integer> idQueue; |
| |
| private int booleanByte = 0; |
| private int booleans; |
| |
| @Override |
| public void process(int fieldPos, Schema.FieldSchema fs) { |
| if (!isTuple()) { |
| add("public "+typeName()+" getPos_"+fieldPos+"() {"); |
| } else { |
| int nestedSchemaTupleId = idQueue.remove(); |
| add("public SchemaTuple_" + nestedSchemaTupleId + " getPos_"+fieldPos+"() {"); |
| } |
| if (isBoolean()) { |
| add(" return BytesHelper.getBitByPos(booleanByte_" + booleanByte + ", " + booleans++ + ");"); |
| if (booleans % 8 == 0) { |
| booleanByte++; |
| booleans = 0; |
| } |
| } else { |
| add(" return pos_"+fieldPos+";"); |
| } |
| add("}"); |
| addBreak(); |
| } |
| |
| public GetPosString(Queue<Integer> idQueue) { |
| this.idQueue = idQueue; |
| } |
| } |
| |
| static class GetSchemaTupleIdentifierString extends TypeInFunctionStringOut { |
| private int id; |
| |
| @Override |
| public void end() { |
| add("@Override"); |
| add("public int getSchemaTupleIdentifier() {"); |
| add(" return "+id+";"); |
| add("}"); |
| addBreak(); |
| } |
| |
| public GetSchemaTupleIdentifierString(int id) { |
| this.id = id; |
| } |
| } |
| |
| static class SchemaSizeString extends TypeInFunctionStringOut { |
| int i = 0; |
| |
| @Override |
| public void process(int fieldNum, Schema.FieldSchema fS) { |
| i++; |
| } |
| |
| @Override |
| public void end() { |
| add("@Override"); |
| add("protected int schemaSize() {"); |
| add(" return " + i + ";"); |
| add("}"); |
| addBreak(); |
| } |
| } |
| |
| static class SizeString extends TypeInFunctionStringOut { |
| int i = 0; |
| |
| @Override |
| public void process(int fieldNum, Schema.FieldSchema fS) { |
| i++; |
| } |
| |
| @Override |
| public void end() { |
| add("@Override"); |
| add("protected int generatedCodeSize() {"); |
| add(" return " + i + ";"); |
| add("}"); |
| addBreak(); |
| } |
| |
| public SizeString(boolean appendable) { |
| super(appendable); |
| } |
| } |
| |
| static class GetTypeString extends TypeInFunctionStringOut { |
| @Override |
| public void prepare() { |
| add("@Override"); |
| add("public byte getGeneratedCodeFieldType(int fieldNum) throws ExecException {"); |
| add(" switch (fieldNum) {"); |
| } |
| |
| @Override |
| public void process(int fieldNum, Schema.FieldSchema fs) { |
| add(" case ("+fieldNum+"): return "+fs.type+";"); |
| } |
| |
| @Override |
| public void end() { |
| add(" default: throw new ExecException(\"Invalid index given: \" + fieldNum);"); |
| add(" }"); |
| add("}"); |
| addBreak(); |
| } |
| } |
| |
| static class SetEqualToSchemaTupleString extends TypeInFunctionStringOut { |
| int id; |
| |
| public SetEqualToSchemaTupleString(int id) { |
| this.id = id; |
| } |
| |
| @Override |
| public void prepare() { |
| add("@Override"); |
| add("protected SchemaTuple generatedCodeSet(SchemaTuple t, boolean checkClass) throws ExecException {"); |
| add(" if (checkClass && t instanceof SchemaTuple_"+id+") {"); |
| add(" return setSpecific((SchemaTuple_"+id+")t);"); |
| add(" }"); |
| addBreak(); |
| add(" if (t.size() < schemaSize()) {"); |
| add(" throw new ExecException(\"Given SchemaTuple does not have as many fields as \"+getClass()+\" (\"+t.size()+\" vs \"+schemaSize()+\")\");"); |
| add(" }"); |
| addBreak(); |
| add(" List<Schema.FieldSchema> theirFS = t.getSchema().getFields();"); |
| addBreak(); |
| } |
| |
| @Override |
| public void process(int fieldNum, Schema.FieldSchema fs) { |
| add(" if ("+fs.type+" != theirFS.get("+fieldNum+").type) {"); |
| add(" throw new ExecException(\"Given SchemaTuple does not match current in field " + fieldNum + ". Expected type: " + fs.type + ", found: \" + theirFS.get("+fieldNum+").type);"); |
| add(" }"); |
| add(" if (t.isNull("+fieldNum+")) {"); |
| add(" setNull_"+fieldNum+"(true);"); |
| add(" } else {"); |
| if (!isTuple()) { |
| add(" setPos_"+fieldNum+"(t.get" + proper(fs.type) + "("+fieldNum+"));"); |
| } else { |
| add(" setPos_"+fieldNum+"((Tuple)t.get("+fieldNum+"));"); |
| } |
| add(" }"); |
| addBreak(); |
| } |
| |
| @Override |
| public void end() { |
| add(" return this;"); |
| add("}"); |
| } |
| } |
| |
| static class TypeAwareGetString extends TypeAwareSetString { |
| public TypeAwareGetString(byte type) { |
| super(type); |
| } |
| |
| @Override |
| public void prepare() { |
| add("@Override"); |
| add("protected "+name()+" generatedCodeGet"+properName()+"(int fieldNum) throws ExecException {"); |
| add(" switch(fieldNum) {"); |
| } |
| |
| @Override |
| public void process(int fieldNum, Schema.FieldSchema fs) { |
| if (fs.type==thisType()) { |
| add(" case ("+fieldNum+"): return returnUnlessNull(checkIfNull_"+fieldNum+"(), getPos_"+fieldNum+"());"); |
| } |
| } |
| |
| @Override |
| public void end() { |
| add(" default:"); |
| add(" return unbox"+properName()+"(getTypeAwareBase(fieldNum, \""+name()+"\"));"); |
| add(" }"); |
| add("}"); |
| } |
| } |
| |
| static class TypeAwareSetString extends TypeInFunctionStringOut { |
| private byte type; |
| |
| public TypeAwareSetString(byte type) { |
| this.type = type; |
| } |
| |
| public byte thisType() { |
| return type; |
| } |
| |
| public String name() { |
| return typeName(type); |
| } |
| |
| public String properName() { |
| return proper(thisType()); |
| } |
| |
| @Override |
| public void prepare() { |
| add("@Override"); |
| add("protected void generatedCodeSet"+properName()+"(int fieldNum, "+name()+" val) throws ExecException {"); |
| add(" switch(fieldNum) {"); |
| } |
| |
| @Override |
| public void process(int fieldNum, Schema.FieldSchema fs) { |
| if (fs.type==thisType()) |
| add(" case ("+fieldNum+"): setPos_"+fieldNum+"(val); break;"); |
| } |
| |
| @Override |
| public void end() { |
| add(" default: setTypeAwareBase(fieldNum, val, \""+name()+"\");"); |
| add(" }"); |
| add("}"); |
| } |
| } |
| |
| //TODO need to use StringBuilder for all concatenation, not + |
| static class TypeInFunctionStringOutFactory { |
| private List<TypeInFunctionStringOut> listOfFutureMethods = Lists.newArrayList(); |
| private int id; |
| private boolean appendable; |
| private String contextAnnotations; |
| |
| public TypeInFunctionStringOutFactory(Schema s, int id, boolean appendable, String contextAnnotations, File codeDir) { |
| this.id = id; |
| this.appendable = appendable; |
| this.contextAnnotations = contextAnnotations; |
| |
| Queue<Integer> nextNestedSchemaIdForSetPos = Lists.newLinkedList(); |
| Queue<Integer> nextNestedSchemaIdForGetPos = Lists.newLinkedList(); |
| Queue<Integer> nextNestedSchemaIdForReadField = Lists.newLinkedList(); |
| |
| List<Queue<Integer>> listOfQueuesForIds = Lists.newArrayList(nextNestedSchemaIdForSetPos, nextNestedSchemaIdForGetPos, nextNestedSchemaIdForReadField); |
| |
| listOfFutureMethods.add(new FieldString(codeDir, listOfQueuesForIds, s, appendable)); //has to be run first |
| listOfFutureMethods.add(new SetPosString(nextNestedSchemaIdForSetPos)); |
| listOfFutureMethods.add(new GetPosString(nextNestedSchemaIdForGetPos)); |
| listOfFutureMethods.add(new GetDummyString()); |
| listOfFutureMethods.add(new GenericSetString()); |
| listOfFutureMethods.add(new GenericGetString()); |
| listOfFutureMethods.add(new GeneralIsNullString()); |
| listOfFutureMethods.add(new CheckIfNullString()); |
| listOfFutureMethods.add(new SetNullString()); |
| listOfFutureMethods.add(new SetEqualToSchemaTupleSpecificString(id)); |
| listOfFutureMethods.add(new WriteNullsString(appendable)); |
| listOfFutureMethods.add(new ReadString(nextNestedSchemaIdForReadField, appendable)); |
| listOfFutureMethods.add(new WriteString()); |
| listOfFutureMethods.add(new SizeString(appendable)); |
| listOfFutureMethods.add(new MemorySizeString()); |
| listOfFutureMethods.add(new GetSchemaTupleIdentifierString(id)); |
| listOfFutureMethods.add(new HashCode()); |
| listOfFutureMethods.add(new SchemaSizeString()); |
| listOfFutureMethods.add(new GetTypeString()); |
| listOfFutureMethods.add(new CompareToString(id)); |
| listOfFutureMethods.add(new CompareToSpecificString(id, appendable)); |
| listOfFutureMethods.add(new SetEqualToSchemaTupleString(id)); |
| listOfFutureMethods.add(new IsSpecificSchemaTuple(id)); |
| listOfFutureMethods.add(new TypeAwareSetString(DataType.INTEGER)); |
| listOfFutureMethods.add(new TypeAwareSetString(DataType.LONG)); |
| listOfFutureMethods.add(new TypeAwareSetString(DataType.FLOAT)); |
| listOfFutureMethods.add(new TypeAwareSetString(DataType.DOUBLE)); |
| listOfFutureMethods.add(new TypeAwareSetString(DataType.BYTEARRAY)); |
| listOfFutureMethods.add(new TypeAwareSetString(DataType.CHARARRAY)); |
| listOfFutureMethods.add(new TypeAwareSetString(DataType.BOOLEAN)); |
| listOfFutureMethods.add(new TypeAwareSetString(DataType.DATETIME)); |
| listOfFutureMethods.add(new TypeAwareSetString(DataType.BIGDECIMAL)); |
| listOfFutureMethods.add(new TypeAwareSetString(DataType.BIGINTEGER)); |
| listOfFutureMethods.add(new TypeAwareSetString(DataType.TUPLE)); |
| listOfFutureMethods.add(new TypeAwareSetString(DataType.BAG)); |
| listOfFutureMethods.add(new TypeAwareSetString(DataType.MAP)); |
| listOfFutureMethods.add(new TypeAwareGetString(DataType.INTEGER)); |
| listOfFutureMethods.add(new TypeAwareGetString(DataType.LONG)); |
| listOfFutureMethods.add(new TypeAwareGetString(DataType.FLOAT)); |
| listOfFutureMethods.add(new TypeAwareGetString(DataType.DOUBLE)); |
| listOfFutureMethods.add(new TypeAwareGetString(DataType.BYTEARRAY)); |
| listOfFutureMethods.add(new TypeAwareGetString(DataType.CHARARRAY)); |
| listOfFutureMethods.add(new TypeAwareGetString(DataType.BOOLEAN)); |
| listOfFutureMethods.add(new TypeAwareGetString(DataType.DATETIME)); |
| listOfFutureMethods.add(new TypeAwareGetString(DataType.BIGDECIMAL)); |
| listOfFutureMethods.add(new TypeAwareGetString(DataType.BIGINTEGER)); |
| listOfFutureMethods.add(new TypeAwareGetString(DataType.TUPLE)); |
| listOfFutureMethods.add(new TypeAwareGetString(DataType.BAG)); |
| listOfFutureMethods.add(new TypeAwareGetString(DataType.MAP)); |
| listOfFutureMethods.add(new ListSetString()); |
| |
| for (TypeInFunctionStringOut t : listOfFutureMethods) { |
| t.prepare(); |
| } |
| } |
| |
| public void process(Schema.FieldSchema fs) { |
| for (TypeInFunctionStringOut t : listOfFutureMethods) |
| t.prepareProcess(fs); |
| } |
| |
| public String end() { |
| StringBuilder head = |
| new StringBuilder() |
| .append("import java.util.List;\n") |
| .append("import java.util.Map;\n") |
| .append("import java.util.Iterator;\n") |
| .append("import java.io.DataOutput;\n") |
| .append("import java.io.DataInput;\n") |
| .append("import java.io.IOException;\n") |
| .append("import java.math.BigDecimal;\n") |
| .append("import java.math.BigInteger;\n") |
| .append("\n") |
| .append("import com.google.common.collect.Lists;\n") |
| .append("\n") |
| .append("import org.joda.time.DateTime;") |
| .append("\n") |
| .append("import org.apache.pig.data.DataType;\n") |
| .append("import org.apache.pig.data.DataBag;\n") |
| .append("import org.apache.pig.data.Tuple;\n") |
| .append("import org.apache.pig.data.SchemaTuple;\n") |
| .append("import org.apache.pig.data.AppendableSchemaTuple;\n") |
| .append("import org.apache.pig.data.utils.SedesHelper;\n") |
| .append("import org.apache.pig.data.utils.BytesHelper;\n") |
| .append("import org.apache.pig.data.DataByteArray;\n") |
| .append("import org.apache.pig.data.BinInterSedes;\n") |
| .append("import org.apache.pig.impl.util.Utils;\n") |
| .append("import org.apache.pig.impl.logicalLayer.schema.Schema;\n") |
| .append("import org.apache.pig.impl.logicalLayer.FrontendException;\n") |
| .append("import org.apache.pig.backend.executionengine.ExecException;\n") |
| .append("import org.apache.pig.data.SizeUtil;\n") |
| .append("import org.apache.pig.data.SchemaTuple.SchemaTupleQuickGenerator;\n") |
| .append("\n") |
| .append(contextAnnotations); |
| |
| if (appendable) { |
| head.append("public class SchemaTuple_"+id+" extends AppendableSchemaTuple<SchemaTuple_"+id+"> {\n"); |
| } else { |
| head.append("public class SchemaTuple_"+id+" extends SchemaTuple<SchemaTuple_"+id+"> {\n"); |
| } |
| |
| for (TypeInFunctionStringOut t : listOfFutureMethods) { |
| t.end(); |
| head.append(t.getContent()); |
| } |
| |
| head.append("\n") |
| .append(" @Override\n") |
| .append(" public SchemaTupleQuickGenerator<SchemaTuple_" + id + "> getQuickGenerator() {\n") |
| .append(" return new SchemaTupleQuickGenerator<SchemaTuple_" + id + ">() {\n") |
| .append(" @Override\n") |
| .append(" public SchemaTuple_" + id + " make() {\n") |
| .append(" return new SchemaTuple_" + id + "();\n") |
| .append(" }\n") |
| .append(" };\n") |
| .append(" }\n"); |
| |
| return head.append("}").toString(); |
| } |
| } |
| |
| static class TypeInFunctionStringOut { |
| private int fieldPos = 0; |
| private StringBuilder content = new StringBuilder(); |
| private byte type; |
| |
| public void prepare() {} |
| public void process(int fieldPos, Schema.FieldSchema fs) {} |
| public void end() {} |
| |
| public int appendable = -1; |
| |
| public StringBuilder getContent() { |
| return content; |
| } |
| |
| public TypeInFunctionStringOut() { |
| add("// this code generated by " + getClass()); |
| addBreak(); |
| } |
| |
| public boolean isAppendable() { |
| if (appendable == -1) { |
| throw new RuntimeException("Need to be given appendable status in " + getClass()); |
| } |
| return appendable == 1; |
| } |
| |
| public TypeInFunctionStringOut(boolean appendable) { |
| this(); |
| this.appendable = appendable ? 1 : 0; |
| } |
| |
| public StringBuilder spaces(int indent) { |
| StringBuilder out = new StringBuilder(); |
| String space = " "; |
| for (int i = 0; i < indent; i++) { |
| out.append(space); |
| } |
| return out; |
| } |
| |
| public void add(String s) { |
| for (String str : s.split("\\n")) { |
| content.append(spaces(1).append(str).append("\n")); |
| } |
| } |
| |
| public void addBreak() { |
| content.append("\n"); |
| } |
| |
| public void prepareProcess(Schema.FieldSchema fs) { |
| type = fs.type; |
| |
| process(fieldPos, fs); |
| fieldPos++; |
| } |
| |
| public boolean isInt() { |
| return type == DataType.INTEGER; |
| } |
| |
| public boolean isLong() { |
| return type == DataType.LONG; |
| } |
| |
| public boolean isFloat() { |
| return type == DataType.FLOAT; |
| } |
| |
| public boolean isDouble() { |
| return type == DataType.DOUBLE; |
| } |
| |
| public boolean isDateTime() { |
| return type == DataType.DATETIME; |
| } |
| |
| public boolean isBigDecimal() { |
| return type == DataType.BIGDECIMAL; |
| } |
| |
| public boolean isBigInteger() { |
| return type == DataType.BIGINTEGER; |
| } |
| |
| public boolean isPrimitive() { |
| return isInt() || isLong() || isFloat() || isDouble() || isBoolean(); |
| } |
| |
| public boolean isBoolean() { |
| return type == DataType.BOOLEAN; |
| } |
| |
| public boolean isString() { |
| return type == DataType.CHARARRAY; |
| } |
| |
| public boolean isBytearray() { |
| return type == DataType.BYTEARRAY; |
| } |
| |
| public boolean isTuple() { |
| return type == DataType.TUPLE; |
| } |
| |
| public boolean isBag() { |
| return type == DataType.BAG; |
| } |
| |
| public boolean isMap() { |
| return type == DataType.MAP; |
| } |
| |
| public boolean isObject() { |
| return !isPrimitive(); |
| } |
| |
| public String typeName() { |
| return typeName(type); |
| } |
| |
| public String typeName(byte type) { |
| switch(type) { |
| case (DataType.INTEGER): return "int"; |
| case (DataType.LONG): return "long"; |
| case (DataType.FLOAT): return "float"; |
| case (DataType.DOUBLE): return "double"; |
| case (DataType.BYTEARRAY): return "byte[]"; |
| case (DataType.CHARARRAY): return "String"; |
| case (DataType.BOOLEAN): return "boolean"; |
| case (DataType.DATETIME): return "DateTime"; |
| case (DataType.BIGDECIMAL): return "BigDecimal"; |
| case (DataType.BIGINTEGER): return "BigInteger"; |
| case (DataType.TUPLE): return "Tuple"; |
| case (DataType.BAG): return "DataBag"; |
| case (DataType.MAP): return "Map"; |
| default: throw new RuntimeException("Can't return String for given type " + DataType.findTypeName(type)); |
| } |
| } |
| |
| public String proper(byte type) { |
| String s = typeName(type); |
| switch (type) { |
| case DataType.BYTEARRAY: return "Bytes"; |
| default: return s.substring(0,1).toUpperCase() + s.substring(1); |
| } |
| } |
| } |
| } |