blob: b38e40dce6ac66dab0fa4fc5014a8d92bd4b8b30 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.data;
import static org.apache.pig.PigConfiguration.PIG_SCHEMA_TUPLE_ENABLED;
import static org.apache.pig.PigConstants.GENERATED_CLASSES_KEY;
import static org.apache.pig.PigConstants.LOCAL_CODE_DIR;
import static org.apache.pig.PigConstants.SCHEMA_TUPLE_ON_BY_DEFAULT;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
import org.apache.pig.data.utils.StructuresHelper.Pair;
import org.apache.pig.data.utils.StructuresHelper.SchemaKey;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
/**
* This class is to be used at job creation time. It provides the API that lets code
* register Schemas with pig to be generated. It is necessary to register these Schemas
* and reducers.
*/
public class SchemaTupleFrontend {
private static final Log LOG = LogFactory.getLog(SchemaTupleFrontend.class);
private static SchemaTupleFrontend stf;
/**
* Schemas registered for generation are held here.
*/
private static Map<Pair<SchemaKey, Boolean>, Pair<Integer, Set<GenContext>>> schemasToGenerate = Maps.newHashMap();
private int internalRegisterToGenerateIfPossible(Schema udfSchema, boolean isAppendable, GenContext type) {
Pair<SchemaKey, Boolean> key = Pair.make(new SchemaKey(udfSchema), isAppendable);
Pair<Integer, Set<GenContext>> pr = schemasToGenerate.get(key);
if (pr != null) {
pr.getSecond().add(type);
return pr.getFirst();
}
if (!SchemaTupleFactory.isGeneratable(udfSchema)) {
LOG.debug("Given Schema is not generatable: " + udfSchema);
return -1;
}
int id = SchemaTupleClassGenerator.getNextGlobalClassIdentifier();
Set<GenContext> contexts = Sets.newHashSet();
contexts.add(GenContext.FORCE_LOAD);
contexts.add(type);
schemasToGenerate.put(key, Pair.make(Integer.valueOf(id), contexts));
LOG.debug("Registering "+(isAppendable ? "Appendable" : "")+"Schema for generation ["
+ udfSchema + "] with id [" + id + "] and context: " + type);
return id;
}
private Map<Pair<SchemaKey, Boolean>, Pair<Integer, Set<GenContext>>> getSchemasToGenerate() {
return schemasToGenerate;
}
private static class SchemaTupleFrontendGenHelper {
private File codeDir;
private PigContext pigContext;
private Configuration conf;
public SchemaTupleFrontendGenHelper(PigContext pigContext, Configuration conf) {
codeDir = Files.createTempDir();
codeDir.deleteOnExit();
LOG.debug("Temporary directory for generated code created: "
+ codeDir.getAbsolutePath());
this.pigContext = pigContext;
this.conf = conf;
}
/**
* This method copies all class files present in the local temp directory to the distributed cache.
* All copied files will have a symlink of their name. No files will be copied if the current
* job is being run from local mode.
* @param pigContext
* @param conf
*/
private void internalCopyAllGeneratedToDistributedCache() {
LOG.info("Starting process to move generated code to distributed cache");
if (pigContext.getExecType().isLocal()) {
String codePath = codeDir.getAbsolutePath();
LOG.info("Distributed cache not supported or needed in local mode. Setting key ["
+ LOCAL_CODE_DIR + "] with code temp directory: " + codePath);
conf.set(LOCAL_CODE_DIR, codePath);
return;
} else {
// This let's us avoid NPE in some of the non-traditional pipelines
String codePath = codeDir.getAbsolutePath();
conf.set(LOCAL_CODE_DIR, codePath);
}
DistributedCache.createSymlink(conf); // we will read using symlinks
StringBuilder serialized = new StringBuilder();
boolean first = true;
// We attempt to copy over every file in the generated code temp directory
for (File f : codeDir.listFiles()) {
if (first) {
first = false;
} else {
serialized.append(",");
}
String symlink = f.getName(); //the class name will also be the symlink
serialized.append(symlink);
Path src = new Path(f.toURI());
Path dst;
try {
dst = FileLocalizer.getTemporaryPath(pigContext);
} catch (IOException e) {
throw new RuntimeException("Error getting temporary path in HDFS", e);
}
FileSystem fs;
try {
fs = dst.getFileSystem(conf);
} catch (IOException e) {
throw new RuntimeException("Unable to get FileSystem", e);
}
try {
fs.copyFromLocalFile(src, dst);
fs.setReplication(dst, (short)conf.getInt(MRConfiguration.SUMIT_REPLICATION, 3));
} catch (IOException e) {
throw new RuntimeException("Unable to copy from local filesystem to HDFS, src = "
+ src + ", dst = " + dst, e);
}
String destination = dst.toString() + "#" + symlink;
try {
DistributedCache.addCacheFile(new URI(destination), conf);
} catch (URISyntaxException e) {
throw new RuntimeException("Unable to add file to distributed cache: " + destination, e);
}
LOG.info("File successfully added to the distributed cache: " + symlink);
}
String toSer = serialized.toString();
LOG.info("Setting key [" + GENERATED_CLASSES_KEY + "] with classes to deserialize [" + toSer + "]");
// we must set a key in the job conf so individual jobs know to resolve the shipped classes
conf.set(GENERATED_CLASSES_KEY, toSer);
}
/**
* This sets into motion the generation of all "registered" Schemas. All code will be generated
* into the temporary directory.
* @return true of false depending on if there are any files to copy to the distributed cache
*/
private boolean generateAll(Map<Pair<SchemaKey, Boolean>, Pair<Integer, Set<GenContext>>> schemasToGenerate) {
boolean filesToShip = false;
if (!conf.getBoolean(PIG_SCHEMA_TUPLE_ENABLED, SCHEMA_TUPLE_ON_BY_DEFAULT)) {
LOG.info("Key ["+PIG_SCHEMA_TUPLE_ENABLED+"] is false, will not generate code.");
return false;
}
LOG.info("Generating all registered Schemas.");
for (Map.Entry<Pair<SchemaKey, Boolean>, Pair<Integer, Set<GenContext>>> entry : schemasToGenerate.entrySet()) {
Pair<SchemaKey, Boolean> keyPair = entry.getKey();
Schema s = keyPair.getFirst().get();
Pair<Integer, Set<GenContext>> valuePair = entry.getValue();
Set<GenContext> contextsToInclude = Sets.newHashSet();
boolean isShipping = false;
for (GenContext context : valuePair.getSecond()) {
if (!context.shouldGenerate(conf)) {
LOG.info("Skipping generation of Schema [" + s + "], as key value [" + context.key() + "] was false.");
} else {
isShipping = true;
contextsToInclude.add(context);
}
}
if (!isShipping) {
continue;
}
int id = valuePair.getFirst();
boolean isAppendable = keyPair.getSecond();
SchemaTupleClassGenerator.generateSchemaTuple(s, isAppendable, id, codeDir, contextsToInclude.toArray(new GenContext[0]));
filesToShip = true;
}
return filesToShip;
}
}
/**
* This allows the frontend/backend process to be repeated if on the same
* JVM (as in testing).
*/
public static void reset() {
stf = null;
schemasToGenerate.clear();
}
/**
* This method "registers" a Schema to be generated. It allows a portions of the code
* to register a Schema for generation without knowing whether code generation is enabled.
* A unique ID will be passed back that can be used internally to refer to generated SchemaTuples
* (such as in the case of serialization and deserialization). The context is necessary to allow
* the client to restrict where generated code can be used.
* @param udfSchema This is the Schema of a Tuple that we will potentially generate
* @param isAppendable This specifies whether or not we want the SchemaTuple to be appendable
* @param context This is the context in which users should be able to access the SchemaTuple
* @return identifier
*/
public static int registerToGenerateIfPossible(Schema udfSchema, boolean isAppendable, GenContext context) {
if (stf == null) {
if (pigContextToReset != null) {
Properties prop = pigContextToReset.getProperties();
prop.remove(GENERATED_CLASSES_KEY);
prop.remove(LOCAL_CODE_DIR);
pigContextToReset = null;
}
SchemaTupleBackend.reset();
SchemaTupleClassGenerator.resetGlobalClassIdentifier();
stf = new SchemaTupleFrontend();
}
if (udfSchema == null) {
return -1;
}
try {
udfSchema = udfSchema.clone();
} catch (CloneNotSupportedException e) {
throw new RuntimeException("Unable to clone Schema: " + udfSchema, e);
}
stripAliases(udfSchema);
return stf.internalRegisterToGenerateIfPossible(udfSchema, isAppendable, context);
}
private static void stripAliases(Schema s) {
for (Schema.FieldSchema fs : s.getFields()) {
fs.alias = null;
if (fs.schema != null) {
stripAliases(fs.schema);
}
}
}
/**
* This must be called when the code has been generated and the generated code needs to be shipped
* to the cluster, so that it may be used by the mappers and reducers.
* @param pigContext
* @param conf
*/
public static void copyAllGeneratedToDistributedCache(PigContext pigContext, Configuration conf) {
if (stf == null) {
LOG.debug("Nothing registered to generate.");
return;
}
SchemaTupleFrontendGenHelper stfgh = new SchemaTupleFrontendGenHelper(pigContext, conf);
stfgh.generateAll(stf.getSchemasToGenerate());
stfgh.internalCopyAllGeneratedToDistributedCache();
Properties prop = pigContext.getProperties();
String value = conf.get(GENERATED_CLASSES_KEY);
if (value != null) {
prop.setProperty(GENERATED_CLASSES_KEY, value);
} else {
prop.remove(GENERATED_CLASSES_KEY);
}
value = conf.get(LOCAL_CODE_DIR);
if (value != null) {
prop.setProperty(LOCAL_CODE_DIR, value);
} else {
prop.remove(LOCAL_CODE_DIR);
}
}
private static PigContext pigContextToReset = null;
/**
* This is a method which caches a PigContext object that has had
* relevant key values set by SchemaTupleBackend. This is necessary
* because in some cases, multiple cycles of jobs might run in the JVM,
* but the PigContext object may be shared, so we want to make sure to
* undo any changes we have made to it.
*/
protected static void lazyReset(PigContext pigContext) {
pigContextToReset = pigContext;
}
}