| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig.data; |
| |
| import static org.apache.pig.PigConfiguration.PIG_SCHEMA_TUPLE_ENABLED; |
| import static org.apache.pig.PigConstants.SCHEMA_TUPLE_ON_BY_DEFAULT; |
| |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.net.MalformedURLException; |
| import java.net.URL; |
| import java.net.URLClassLoader; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.pig.PigConstants; |
| import org.apache.pig.data.SchemaTupleClassGenerator.GenContext; |
| import org.apache.pig.data.utils.StructuresHelper.SchemaKey; |
| import org.apache.pig.data.utils.StructuresHelper.Triple; |
| import org.apache.pig.impl.PigContext; |
| import org.apache.pig.impl.logicalLayer.schema.Schema; |
| import org.apache.pig.impl.util.Utils; |
| |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| import com.google.common.io.Files; |
| |
| public class SchemaTupleBackend { |
| private static final Log LOG = LogFactory.getLog(SchemaTupleBackend.class); |
| |
| private Set<String> filesToResolve = Sets.newHashSet(); |
| |
| /** |
| * We use the URLClassLoader to resolve the generated classes because we can |
| * simply give it the directory we put all of the compiled class files into, |
| * and it will handle the dynamic loading. |
| */ |
| private URLClassLoader classLoader; |
| private Map<Triple<SchemaKey, Boolean, GenContext>, SchemaTupleFactory> schemaTupleFactoriesByTriple = Maps.newHashMap(); |
| private Map<Integer, SchemaTupleFactory> schemaTupleFactoriesById = Maps.newHashMap(); |
| private Configuration jConf; |
| |
| private File codeDir; |
| |
| private boolean isLocal; |
| private boolean abort = false; |
| |
| /** |
| * The only information this class needs is a directory of generated code to resolve |
| * classes in. |
| * @param jConf |
| * @param directory of generated code |
| */ |
| private SchemaTupleBackend(Configuration jConf, boolean isLocal) { |
| if (isLocal) { |
| String localCodeDir = jConf.get(PigConstants.LOCAL_CODE_DIR); |
| if (localCodeDir == null) { |
| LOG.debug("No local code dir set in local mode. Aborting code gen resolution."); |
| abort = true; |
| return; |
| } |
| codeDir = new File(jConf.get(PigConstants.LOCAL_CODE_DIR)); |
| } else { |
| codeDir = Files.createTempDir(); |
| codeDir.deleteOnExit(); |
| } |
| |
| try { |
| classLoader = new URLClassLoader(new URL[] { codeDir.toURI().toURL() }); |
| } catch (MalformedURLException e) { |
| throw new RuntimeException("Unable to make URLClassLoader for tempDir: " |
| + codeDir.getAbsolutePath()); |
| } |
| this.jConf = jConf; |
| this.isLocal = isLocal; |
| } |
| |
| /** |
| * This method fetches the SchemaTupleFactory that can create Tuples of the given |
| * Schema (ignoring aliases) and appendability. IMPORTANT: if no such SchemaTupleFactory |
| * is available, this returns null. |
| * @param schema |
| * @param true if it should be appendable |
| * @param the context in which this SchemaTupleFactory is being requested |
| * @return generating SchemaTupleFactory, null otherwise |
| */ |
| private SchemaTupleFactory internalNewSchemaTupleFactory(Schema s, boolean isAppendable, GenContext context) { |
| return newSchemaTupleFactory(Triple.make(new SchemaKey(s), isAppendable, context)); |
| } |
| |
| /** |
| * This method fetches the SchemaTupleFactory that generates the SchemaTuple |
| * registered with the given identifier. IMPORTANT: if no such SchemaTupleFactory |
| * is available, this returns null. |
| * @param identifier |
| * @return generating schemaTupleFactory, null otherwise |
| */ |
| private SchemaTupleFactory internalNewSchemaTupleFactory(int id) { |
| SchemaTupleFactory stf = schemaTupleFactoriesById.get(id); |
| if (stf == null) { |
| LOG.debug("No SchemaTupleFactory present for given identifier: " + id); |
| } |
| return stf; |
| } |
| |
| /** |
| * This method fetches the SchemaTupleFactory that can create Tuples of the given |
| * Schema and appendability. IMPORTANT: if no such SchemaTupleFactory is available, |
| * this returns null. |
| * @param SchemaKey/appendability pair |
| * @return generating SchemaTupleFactory, null otherwise |
| */ |
| private SchemaTupleFactory newSchemaTupleFactory(Triple<SchemaKey, Boolean, GenContext> trip) { |
| SchemaTupleFactory stf = schemaTupleFactoriesByTriple.get(trip); |
| if (stf == null) { |
| LOG.debug("No SchemaTupleFactory present for given SchemaKey/Boolean/Context combination " + trip); |
| } |
| return stf; |
| } |
| |
| /** |
| * This method copies all of the generated classes from the distributed cache to a local directory, |
| * and then seeks to resolve them and cache their respective SchemaTupleFactories. |
| * @param configuration |
| * @param true if the job is local |
| * @throws IOException |
| */ |
| private void copyAndResolve() throws IOException { |
| if (abort) { |
| LOG.debug("Nothing to resolve on the backend."); |
| return; |
| } |
| // Step one is to see if there are any classes in the distributed cache |
| if (!jConf.getBoolean(PIG_SCHEMA_TUPLE_ENABLED, SCHEMA_TUPLE_ON_BY_DEFAULT)) { |
| LOG.info("Key [" + PIG_SCHEMA_TUPLE_ENABLED +"] was not set... will not generate code."); |
| return; |
| } |
| // Step two is to copy everything from the distributed cache if we are in distributed mode |
| if (!isLocal) { |
| copyAllFromDistributedCache(); |
| } |
| // Step three is to see if the file needs to be resolved |
| // If there is a "$" in the name, we know that it is an inner |
| // class and thus doesn't need to be instantiated directly. |
| for (File f : codeDir.listFiles()) { |
| String name = f.getName().split("\\.")[0]; |
| if (!name.contains("$")) { |
| filesToResolve.add(name); |
| LOG.info("Added class to list of class to resolve: " + name); |
| } |
| } |
| // Step four is to actually try and resolve the classes |
| resolveClasses(); |
| } |
| |
| private void copyAllFromDistributedCache() throws IOException { |
| String toDeserialize = jConf.get(PigConstants.GENERATED_CLASSES_KEY); |
| if (toDeserialize == null) { |
| LOG.info("No classes in in key [" + PigConstants.GENERATED_CLASSES_KEY + "] to copy from distributed cache."); |
| return; |
| } |
| LOG.info("Copying files in key ["+PigConstants.GENERATED_CLASSES_KEY+"] from distributed cache: " + toDeserialize); |
| for (String s : toDeserialize.split(",")) { |
| LOG.info("Attempting to read file: " + s); |
| // The string is the symlink into the distributed cache |
| File src = new File(s); |
| FileInputStream fin = null; |
| FileOutputStream fos = null; |
| try { |
| fin = new FileInputStream(src); |
| fos = new FileOutputStream(new File(codeDir, s)); |
| |
| fin.getChannel().transferTo(0, src.length(), fos.getChannel()); |
| LOG.info("Successfully copied file to local directory."); |
| } finally { |
| if (fin != null) { |
| fin.close(); |
| } |
| if (fos != null) { |
| fos.close(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Once all of the files are copied from the distributed cache to the local |
| * temp directory, this will attempt to resolve those files and add their information. |
| */ |
| @SuppressWarnings("unchecked") |
| private void resolveClasses() { |
| for (String s : filesToResolve) { |
| SchemaTupleFactory.LOG.info("Attempting to resolve class: " + s); |
| // Step one is to simply attempt to get the class object from the classloader |
| // that includes the generated code. |
| Class<?> clazz; |
| try { |
| clazz = classLoader.loadClass(s); |
| } catch (ClassNotFoundException e) { |
| throw new RuntimeException("Unable to find class: " + s, e); |
| } |
| |
| // Step three is to check if the class is a SchemaTuple. If it isn't, |
| // we do not attempt to resolve it, because it is support code, such |
| // as anonymous classes. |
| if (!SchemaTuple.class.isAssignableFrom(clazz)) { |
| return; |
| } |
| |
| Class<SchemaTuple<?>> stClass = (Class<SchemaTuple<?>>)clazz; |
| |
| // Step four is to actually try to create the SchemaTuple instance. |
| SchemaTuple<?> st; |
| try { |
| st = stClass.newInstance(); |
| } catch (InstantiationException e) { |
| throw new RuntimeException("Error instantiating file: " + s, e); |
| } catch (IllegalAccessException e) { |
| throw new RuntimeException("Error accessing file: " + s, e); |
| } |
| |
| // Step five is to get information about the class. |
| boolean isAppendable = st instanceof AppendableSchemaTuple<?>; |
| int id = st.getSchemaTupleIdentifier(); |
| Schema schema = st.getSchema(); |
| |
| SchemaTupleFactory stf = new SchemaTupleFactory(stClass, st.getQuickGenerator()); |
| |
| for (GenContext context : GenContext.values()) { |
| if (context != GenContext.FORCE_LOAD && !context.shouldGenerate(stClass)) { |
| SchemaTupleFactory.LOG.debug("Context ["+context+"] not present for class, skipping."); |
| continue; |
| } |
| |
| // the SchemaKey (Schema sans alias) and appendability are how we will |
| // uniquely identify a SchemaTupleFactory |
| Triple<SchemaKey, Boolean, GenContext> trip = |
| Triple.make(new SchemaKey(schema), isAppendable, context); |
| |
| schemaTupleFactoriesByTriple.put(trip, stf); |
| |
| SchemaTupleFactory.LOG.info("Successfully resolved class for schema ["+schema+"] and appendability ["+isAppendable+"]" |
| + " in context: " + context); |
| } |
| schemaTupleFactoriesById.put(id, stf); |
| } |
| } |
| |
| public static void reset() { |
| stb = null; |
| } |
| |
| private static SchemaTupleBackend stb; |
| |
| public static void initialize(Configuration jConf, PigContext pigContext) throws IOException { |
| if (stb != null) { |
| SchemaTupleFrontend.lazyReset(pigContext); |
| } |
| initialize(jConf, pigContext.getExecType().isLocal()); |
| } |
| |
| public static void initialize(Configuration jConf) throws IOException { |
| initialize(jConf, Utils.isLocal(jConf)); |
| } |
| |
| public static void initialize(Configuration jConf, boolean isLocal) throws IOException { |
| if (stb != null) { |
| LOG.warn("SchemaTupleBackend has already been initialized"); |
| } else { |
| SchemaTupleFrontend.reset(); |
| SchemaTupleBackend stbInstance = new SchemaTupleBackend(jConf, isLocal); |
| stbInstance.copyAndResolve(); |
| stb = stbInstance; |
| } |
| } |
| |
| public static SchemaTupleFactory newSchemaTupleFactory(Schema s, boolean isAppendable, GenContext context) { |
| if (stb == null) { |
| // It is possible (though ideally should be avoided) for this to be called on the frontend if |
| // the Tuple processing path of the POPlan is invoked (perhaps for optimization purposes) |
| throw new RuntimeException("initialize was not called! Even when SchemaTuple feature is not set, it should be called."); |
| } |
| return stb.internalNewSchemaTupleFactory(s, isAppendable, context); |
| } |
| |
| protected static SchemaTupleFactory newSchemaTupleFactory(int id) { |
| if (stb == null) { |
| // It is possible (though ideally should be avoided) for this to be called on the frontend if |
| // the Tuple processing path of the POPlan is invoked (perhaps for optimization purposes) |
| throw new RuntimeException("initialize was not called! Even when SchemaTuple feature is not set, it should be called."); |
| } |
| return stb.internalNewSchemaTupleFactory(id); |
| } |
| } |