blob: dd881ae84c587ea08d3988e40f8b73b8545c5204 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.pig.piggybank.storage.avro;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.pig.Expression;
import org.apache.pig.FileInputLoadFunc;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreFunc;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.StoreResources;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.builtin.FuncUtils;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
/**
* AvroStorage is used to load/store Avro data <br/>
* Document can be found <a href='https://cwiki.apache.org/PIG/avrostorage.html'>here</a>
*/
public class AvroStorage extends FileInputLoadFunc implements StoreFuncInterface, LoadMetadata, StoreResources {
private static final Log LOG = LogFactory.getLog(AvroStorage.class);
/* storeFunc parameters */
private static final String NOTNULL = "NOTNULL";
private static final String AVRO_OUTPUT_SCHEMA_PROPERTY = "avro_output_schema";
private static final String AVRO_INPUT_SCHEMA_PROPERTY = "avro_input_schema";
private static final String AVRO_INPUT_PIG_SCHEMA_PROPERTY = "avro_input_pig_schema";
private static final String AVRO_MERGED_SCHEMA_PROPERTY = "avro_merged_schema_map";
private static final String SCHEMA_DELIM = "#";
private static final String SCHEMA_KEYVALUE_DELIM = "@";
private static final String NO_SCHEMA_CHECK = "no_schema_check";
private static final String IGNORE_BAD_FILES = "ignore_bad_files";
private static final String MULTIPLE_SCHEMAS = "multiple_schemas";
/* FIXME: we use this variable to distinguish schemas specified
* by different AvroStorage calls and will remove this once
* StoreFunc provides access to Pig output schema in backend.
* Default value is 0.
*/
private int storeFuncIndex = 0;
private PigAvroRecordWriter writer = null; /* avro record writer */
private Schema outputAvroSchema = null; /* output avro schema */
/* indicate whether data is nullable */
private boolean nullable = true;
/* loadFunc parameters */
private PigAvroRecordReader reader = null; /* avro record writer */
private Schema inputAvroSchema = null; /* input avro schema */
private Schema userSpecifiedAvroSchema = null; /* avro schema specified in constructor args */
/* if multiple avro record schemas are merged, this map associates each input
* record with a remapping of its fields relative to the merged schema. please
* see AvroStorageUtils.getSchemaToMergedSchemaMap() for more details.
*/
private Map<Path, Map<Integer, Integer>> schemaToMergedSchemaMap = null;
/* whether input avro files have the same schema or not. If input avro files
* do not have the same schema, we merge these schemas into a single schema
* and derive the pig schema from it.
*/
private boolean useMultipleSchemas = false;
private boolean checkSchema = true; /*whether check schema of input directories*/
private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */
private String contextSignature = null;
/**
* Empty constructor. Output schema is derived from pig schema.
*/
public AvroStorage() {
outputAvroSchema = null;
nullable = true;
AvroStorageLog.setDebugLevel(0);
checkSchema = true;
}
/**
* Constructor of quoted string list
*
* @param parts quoted string list
* @throws IOException
* @throws ParseException
*/
public AvroStorage(String[] parts) throws IOException, ParseException {
outputAvroSchema = null;
nullable = true;
checkSchema = true;
if (parts.length == 1
&& !parts[0].equalsIgnoreCase(NO_SCHEMA_CHECK)
&& !parts[0].equalsIgnoreCase(IGNORE_BAD_FILES)
&& !parts[0].equalsIgnoreCase(MULTIPLE_SCHEMAS)) {
/* If one parameter is given, and that is not 'no_schema_check',
* 'ignore_bad_files', or 'multiple_schemas', then it must be a
* json string.
*/
init(parseJsonString(parts[0]));
} else {
/* parse parameters */
init(parseStringList(parts));
}
}
/**
* Set input location and obtain input schema.
*/
@SuppressWarnings("unchecked")
@Override
public void setLocation(String location, Job job) throws IOException {
if (inputAvroSchema != null) {
return;
}
if (!UDFContext.getUDFContext().isFrontend()) {
Properties udfProps = getUDFProperties();
String mergedSchema = udfProps.getProperty(AVRO_MERGED_SCHEMA_PROPERTY);
if (mergedSchema != null) {
HashMap<URI, Map<Integer, Integer>> mergedSchemaMap =
(HashMap<URI, Map<Integer, Integer>>) ObjectSerializer.deserialize(mergedSchema);
schemaToMergedSchemaMap = new HashMap<Path, Map<Integer, Integer>>();
for (Entry<URI, Map<Integer, Integer>> entry : mergedSchemaMap.entrySet()) {
schemaToMergedSchemaMap.put(new Path(entry.getKey()), entry.getValue());
}
}
String schema = udfProps.getProperty(AVRO_INPUT_SCHEMA_PROPERTY);
if (schema != null) {
try {
inputAvroSchema = new Schema.Parser().parse(schema);
return;
} catch (Exception e) {
// Cases like testMultipleSchemas2 cause exception while deserializing
// symbols. In that case, we get it again.
LOG.warn("Exception while trying to deserialize schema in backend. " +
"Will construct again. schema= " + schema, e);
}
}
}
if (inputAvroSchema == null || UDFContext.getUDFContext().isFrontend()) {
Configuration conf = job.getConfiguration();
Set<Path> paths = getGlobPaths(location, conf, true);
if (!paths.isEmpty()) {
// Set top level directories in input format. Adding all files will
// bloat configuration size
FileInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));
// Scan all directories including sub directories for schema
if (inputAvroSchema == null) {
setInputAvroSchema(paths, conf);
}
} else {
throw new IOException("Input path \'" + location + "\' is not found");
}
}
}
@Override
public void setUDFContextSignature(String signature) {
this.contextSignature = signature;
super.setUDFContextSignature(signature);
}
/**
* Set input avro schema. If the 'multiple_schemas' option is enabled, we merge multiple
* schemas and use that merged schema; otherwise, we simply use the schema from the first
* file in the paths set.
*
* @param paths set of input files
* @param conf configuration
* @return avro schema
* @throws IOException
*/
protected void setInputAvroSchema(Set<Path> paths, Configuration conf) throws IOException {
if(userSpecifiedAvroSchema != null) {
inputAvroSchema = userSpecifiedAvroSchema;
}
else {
inputAvroSchema = useMultipleSchemas ? getMergedSchema(paths, conf)
: getAvroSchema(paths, conf);
}
}
/**
* Get avro schema of first input file that matches the location pattern.
*
* @param paths set of input files
* @param conf configuration
* @return avro schema
* @throws IOException
*/
protected Schema getAvroSchema(Set<Path> paths, Configuration conf) throws IOException {
if (paths == null || paths.isEmpty()) {
return null;
}
Iterator<Path> iterator = paths.iterator();
Schema schema = null;
while (iterator.hasNext()) {
Path path = iterator.next();
FileSystem fs = FileSystem.get(path.toUri(), conf);
schema = getAvroSchema(path, fs);
if (schema != null) {
break;
}
}
return schema;
}
/**
* Get avro schema of input path. There are three cases:
* 1. if path is a file, then return its avro schema;
* 2. if path is a first-level directory (no sub-directories), then
* return the avro schema of one underlying file;
* 3. if path contains sub-directories, then recursively check
* whether all of them share the same schema and return it
* if so or throw an exception if not.
*
* @param path input path
* @param fs file system
* @return avro schema of data
* @throws IOException if underlying sub-directories do not share the same schema; or if input path is empty or does not exist
*/
@SuppressWarnings("deprecation")
protected Schema getAvroSchema(Path path, FileSystem fs) throws IOException {
if (!fs.exists(path) || !Utils.VISIBLE_FILES.accept(path))
return null;
/* if path is first level directory or is a file */
if (!fs.isDirectory(path)) {
return getSchema(path, fs);
}
FileStatus[] ss = fs.listStatus(path, Utils.VISIBLE_FILES);
Schema schema = null;
if (ss.length > 0) {
if (AvroStorageUtils.noDir(ss))
return getSchema(path, fs);
/*otherwise, check whether schemas of underlying directories are the same */
for (FileStatus s : ss) {
Schema newSchema = getAvroSchema(s.getPath(), fs);
if (schema == null) {
schema = newSchema;
if(!checkSchema) {
System.out.println("Do not check schema; use schema of " + s.getPath());
return schema;
}
} else if (newSchema != null && !schema.equals(newSchema)) {
throw new IOException( "Input path is " + path + ". Sub-direcotry " + s.getPath()
+ " contains different schema " + newSchema + " than " + schema);
}
}
}
if (schema == null)
System.err.println("Cannot get avro schema! Input path " + path + " might be empty.");
return schema;
}
/**
* Merge multiple input avro schemas into one. Note that we can't merge arbitrary schemas.
* Please see AvroStorageUtils.mergeSchema() for what's allowed and what's not allowed.
*
* @param basePaths set of input dir or files
* @param conf configuration
* @return avro schema
* @throws IOException
*/
protected Schema getMergedSchema(Set<Path> basePaths, Configuration conf) throws IOException {
Schema result = null;
Map<Path, Schema> mergedFiles = new HashMap<Path, Schema>();
Set<Path> paths = AvroStorageUtils.getAllFilesRecursively(basePaths, conf);
for (Path path : paths) {
FileSystem fs = FileSystem.get(path.toUri(), conf);
Schema schema = getSchema(path, fs);
if (schema != null) {
result = AvroStorageUtils.mergeSchema(result, schema);
mergedFiles.put(path, schema);
}
}
// schemaToMergedSchemaMap is only needed when merging multiple records.
if ((schemaToMergedSchemaMap == null || schemaToMergedSchemaMap.isEmpty()) &&
mergedFiles.size() > 1 && result.getType().equals(Schema.Type.RECORD)) {
schemaToMergedSchemaMap = AvroStorageUtils.getSchemaToMergedSchemaMap(result, mergedFiles);
}
return result;
}
/**
* This method is called by {@link #getAvroSchema}. The default implementation
* returns the schema of an avro file; or the schema of the last file in a first-level
* directory (it does not contain sub-directories).
*
* @param path path of a file or first level directory
* @param fs file system
* @return avro schema
* @throws IOException
*/
protected Schema getSchema(Path path, FileSystem fs) throws IOException {
return AvroStorageUtils.getSchema(path, fs);
}
/**
* This method is called to return the schema of an avro schema file. This
* method is different than {@link #getSchema}, which returns the schema
* from a data file.
*
* @param path path of a file or first level directory
* @param fs file system
* @return avro schema
* @throws IOException
*/
protected Schema getSchemaFromFile(Path path, FileSystem fs) throws IOException {
/* get path of the last file */
Path lastFile = AvroStorageUtils.getLast(path, fs);
if (lastFile == null) {
return null;
}
/* read in file and obtain schema */
GenericDatumReader<Object> avroReader = new GenericDatumReader<Object>();
InputStream hdfsInputStream = fs.open(lastFile);
Schema ret = Schema.parse(hdfsInputStream);
hdfsInputStream.close();
return ret;
}
@SuppressWarnings("rawtypes")
@Override
public InputFormat getInputFormat() throws IOException {
AvroStorageLog.funcCall("getInputFormat");
InputFormat result = null;
if(inputAvroSchema != null) {
result = new PigAvroInputFormat(
inputAvroSchema, ignoreBadFiles, schemaToMergedSchemaMap, useMultipleSchemas);
} else {
result = new TextInputFormat();
}
return result;
}
@SuppressWarnings("rawtypes")
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
AvroStorageLog.funcCall("prepareToRead");
this.reader = (PigAvroRecordReader) reader;
}
@Override
public Tuple getNext() throws IOException {
try {
if (!reader.nextKeyValue()) {
return null;
}
return (Tuple) this.reader.getCurrentValue();
} catch (Exception e) {
throw new IOException(e);
}
}
/* implement LoadMetadata */
/**
* Get avro schema from "location" and return the converted
* PigSchema.
*/
@Override
public ResourceSchema getSchema(String location, Job job) throws IOException {
/* get avro schema */
AvroStorageLog.funcCall("getSchema");
if (inputAvroSchema == null) {
Configuration conf = job.getConfiguration();
// If within a script, you store to one location and read from same
// location using AvroStorage getPaths will be empty. Since
// getSchema is called during script parsing we don't want to fail
// here if path not found
Set<Path> paths = getGlobPaths(location, conf, false);
if (!paths.isEmpty()) {
setInputAvroSchema(paths, conf);
}
}
if(inputAvroSchema != null) {
AvroStorageLog.details( "avro input schema:" + inputAvroSchema);
/* convert to pig schema */
ResourceSchema pigSchema = AvroSchema2Pig.convert(inputAvroSchema);
AvroStorageLog.details("pig input schema:" + pigSchema);
if (pigSchema.getFields().length == 1){
pigSchema = pigSchema.getFields()[0].getSchema();
}
Properties udfProps = getUDFProperties();
udfProps.put(AVRO_INPUT_SCHEMA_PROPERTY, inputAvroSchema.toString());
udfProps.put(AVRO_INPUT_PIG_SCHEMA_PROPERTY, pigSchema);
if (schemaToMergedSchemaMap != null) {
HashMap<URI, Map<Integer, Integer>> mergedSchemaMap = new HashMap<URI, Map<Integer, Integer>>();
for (Entry<Path, Map<Integer, Integer>> entry : schemaToMergedSchemaMap.entrySet()) {
//Path is not serializable
mergedSchemaMap.put(entry.getKey().toUri(), entry.getValue());
}
udfProps.put(AVRO_MERGED_SCHEMA_PROPERTY,
ObjectSerializer.serialize(mergedSchemaMap));
}
return pigSchema;
} else {
return null;
}
}
@Override
public ResourceStatistics getStatistics(String location, Job job) throws IOException {
return null; // Nothing to do
}
@Override
public String[] getPartitionKeys(String location, Job job) throws IOException {
return null; // Nothing to do
}
@Override
public void setPartitionFilter(Expression partitionFilter) throws IOException {
// Nothing to do
}
/**
* build a property map from a json object
*
* @param jsonString json object in string format
* @return a property map
* @throws ParseException
*/
@SuppressWarnings("unchecked")
protected Map<String, Object> parseJsonString(String jsonString) throws ParseException {
/*parse the json object */
JSONParser parser = new JSONParser();
JSONObject obj = (JSONObject) parser.parse(jsonString);
Set<Entry<String, Object>> entries = obj.entrySet();
for (Entry<String, Object> entry : entries) {
String key = entry.getKey();
Object value = entry.getValue();
if (key.equalsIgnoreCase("debug") || key.equalsIgnoreCase("index")) {
/* convert long values to integer */
int v = ((Long) value).intValue();
obj.put(key, v);
}
else if (key.equalsIgnoreCase("schema") || key.matches("field\\d+")) {
/* convert avro schema (as json object) to string */
obj.put(key, value.toString().trim());
}
}
return obj;
}
/**
* build a property map from a string list
*
* @param parts input string list
* @return a property map
* @throws IOException
* @throws ParseException
*/
protected Map<String, Object> parseStringList(String[] parts) throws IOException {
Map<String, Object> map = new HashMap<String, Object>();
for (int i = 0; i < parts.length; ) {
String name = parts[i].trim();
if (name.equalsIgnoreCase(NO_SCHEMA_CHECK)) {
checkSchema = false;
/* parameter only, so increase iteration counter by 1 */
i += 1;
} else if (name.equalsIgnoreCase(IGNORE_BAD_FILES)) {
ignoreBadFiles = true;
/* parameter only, so increase iteration counter by 1 */
i += 1;
} else if (name.equalsIgnoreCase(MULTIPLE_SCHEMAS)) {
useMultipleSchemas = true;
/* parameter only, so increase iteration counter by 1 */
i += 1;
} else {
String value = parts[i+1].trim();
if (name.equalsIgnoreCase("debug")
|| name.equalsIgnoreCase("index")) {
/* store value as integer */
map.put(name, Integer.parseInt(value));
} else if (name.equalsIgnoreCase("data")
|| name.equalsIgnoreCase("same")
|| name.equalsIgnoreCase("schema")
|| name.equalsIgnoreCase("schema_file")
|| name.equalsIgnoreCase("schema_uri")
|| name.matches("field\\d+")) {
/* store value as string */
map.put(name, value);
} else if (name.equalsIgnoreCase("nullable")) {
/* store value as boolean */
map.put(name, Boolean.getBoolean(value));
} else {
throw new IOException("Invalid parameter:" + name);
}
/* parameter/value pair, so increase iteration counter by 2 */
i += 2;
}
}
return map;
}
/**
* Initialize output avro schema using input property map
*/
protected void init(Map<String, Object> inputs) throws IOException {
/*used to store field schemas */
List<Field> fields = null;
/* set debug level */
if (inputs.containsKey("debug")) {
AvroStorageLog.setDebugLevel((Integer) inputs.get("debug"));
}
/* initialize schema manager, if any */
AvroSchemaManager schemaManager = null;
if (inputs.containsKey("data")) {
Path path = new Path((String) inputs.get("data"));
AvroStorageLog.details("data path=" + path.toUri().toString());
FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
Schema schema = getAvroSchema(path, fs);
schemaManager = new AvroSchemaManager(schema);
}
else if (inputs.containsKey("schema_file")) {
Path path = new Path((String) inputs.get("schema_file"));
AvroStorageLog.details("schemaFile path=" + path.toUri().toString());
FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
Schema schema = getSchemaFromFile(path, fs);
schemaManager = new AvroSchemaManager(schema);
}
/* iterate input property map */
for (Entry<String, Object> entry : inputs.entrySet()) {
String name = entry.getKey().trim();
Object value = entry.getValue();
if (name.equalsIgnoreCase("index")) {
/* set index of store function */
storeFuncIndex = (Integer) value;
} else if (name.equalsIgnoreCase("same")) {
/* use schema in the specified path as output schema */
Path path = new Path( ((String) value).trim());
AvroStorageLog.details("data path=" + path.toUri().toString());
FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
outputAvroSchema = getAvroSchema(path, fs);
} else if (name.equalsIgnoreCase("nullable")) {
nullable = (Boolean) value;
} else if (name.equalsIgnoreCase("schema")) {
outputAvroSchema = Schema.parse((String) value);
userSpecifiedAvroSchema = outputAvroSchema;
} else if (name.equalsIgnoreCase("schema_uri")) {
/* use the contents of the specified path as output schema */
Path path = new Path( ((String) value).trim());
AvroStorageLog.details("schema_uri path=" + path.toUri().toString());
FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
outputAvroSchema = getSchemaFromFile(path, fs);
userSpecifiedAvroSchema = outputAvroSchema;
} else if (name.matches("field\\d+")) {
/*set schema of dth field */
if (fields == null)
fields = new ArrayList<Field>();
int index = Integer.parseInt(name.substring("field".length()));
String content = ((String) value).trim();
Field field = null;
if (content.equalsIgnoreCase(NOTNULL)) {
/* null means deriving avro schema from pig schema but not null*/
field = AvroStorageUtils.createUDField(index, null);
} else if (content.startsWith("def:")) {
if (schemaManager == null)
throw new IOException("Please specify data parameter (using \"data\") before this one.");
String alias = content.substring("def:".length());
Schema s = schemaManager.getSchema(alias);
if (s == null)
throw new IOException("Cannot find matching schema for alias:" + alias);
/* use pre-defined schema*/
field = AvroStorageUtils.createUDField(index, s);
AvroStorageLog.details("Use pre-defined schema(" + alias + "): " + s + " for field " + index);
} else {
Schema schema = null;
try {
schema = Schema.parse(content);
} catch (RuntimeException e) {
/* might be primary schema like int or long */
schema = Schema.parse("\"" + content + "\"");
}
field = AvroStorageUtils.createUDField(index, schema);
}
fields.add(field);
} else if (!name.equalsIgnoreCase("data")
&& !name.equalsIgnoreCase("schema_file")
&& !name.equalsIgnoreCase("debug")) {
throw new IOException("Invalid parameter:" + name);
}
}
/* if schemas of some fields are set */
if (fields != null && outputAvroSchema == null) {
outputAvroSchema = AvroStorageUtils.createUDPartialRecordSchema();
outputAvroSchema.setFields(fields);
}
/* print warning if both output and nullable are specified;
* and nullable will be ignored.*/
if (outputAvroSchema != null) {
if (!nullable) {
AvroStorageLog.warn("Invalid parameter--nullable cannot be false while "
+ "output schema is not null. Will ignore nullable.\n\n");
nullable = true;
}
}
}
@Override
public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
return LoadFunc.getAbsolutePath(location, curDir);
}
@Override
public void setStoreLocation(String location, Job job) throws IOException {
AvroStorageLog.details("output location=" + location);
FileOutputFormat.setOutputPath(job, new Path(location));
}
/**
* Append newly specified schema
*/
@Override
public void checkSchema(ResourceSchema s) throws IOException {
AvroStorageLog.funcCall("Check schema");
Properties property = getUDFProperties();
String prevSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
AvroStorageLog.details("Previously defined schemas=" + prevSchemaStr);
String key = getSchemaKey();
Map<String, String> schemaMap = (prevSchemaStr != null)
? parseSchemaMap(prevSchemaStr)
: null;
if (schemaMap != null && schemaMap.containsKey(key)) {
AvroStorageLog.warn("Duplicate value for key-" + key + ". Will ignore the new schema.");
return;
}
/* validate and convert output schema */
Schema schema = outputAvroSchema != null
? (checkSchema
? PigSchema2Avro.validateAndConvert(outputAvroSchema, s)
: outputAvroSchema)
: PigSchema2Avro.convert(s, nullable);
AvroStorageLog.info("key=" + key + " outputSchema=" + schema);
String schemaStr = schema.toString();
String append = key + SCHEMA_KEYVALUE_DELIM + schemaStr;
String newSchemaStr = (schemaMap != null)
? prevSchemaStr + SCHEMA_DELIM + append
: append;
property.setProperty(AVRO_OUTPUT_SCHEMA_PROPERTY, newSchemaStr);
AvroStorageLog.details("New schemas=" + newSchemaStr);
}
/**
* Returns UDFProperties based on <code>contextSignature</code>.
*/
private Properties getUDFProperties() {
return UDFContext.getUDFContext()
.getUDFProperties(this.getClass(), new String[] {contextSignature});
}
private String getSchemaKey() {
return Integer.toString(storeFuncIndex);
}
private Map<String, String> parseSchemaMap(String input) throws IOException {
AvroStorageLog.details("Parse schema map from " + input);
String[] entries = input.split(SCHEMA_DELIM);
Map<String, String> map = new HashMap<String, String>();
for (String entry : entries) {
AvroStorageLog.details("Entry = " + entry);
if (entry.length() == 0)
continue;
String[] parts = entry.split(SCHEMA_KEYVALUE_DELIM);
if (parts.length != 2)
throw new IOException("Expect 2 fields in " + entry);
map.put(parts[0], parts[1]);
}
return map;
}
@SuppressWarnings("rawtypes")
@Override
public OutputFormat getOutputFormat() throws IOException {
AvroStorageLog.funcCall("getOutputFormat");
Properties property = getUDFProperties();
String allSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
Map<String, String> map = (allSchemaStr != null) ? parseSchemaMap(allSchemaStr) : null;
String key = getSchemaKey();
Schema schema = (map == null || !map.containsKey(key)) ? outputAvroSchema : Schema.parse(map.get(key));
if (schema == null)
throw new IOException("Output schema is null!");
AvroStorageLog.details("Output schema=" + schema);
return new PigAvroOutputFormat(schema);
}
@SuppressWarnings("rawtypes")
@Override
public void prepareToWrite(RecordWriter writer) throws IOException {
this.writer = (PigAvroRecordWriter) writer;
}
@Override
public void setStoreFuncUDFContextSignature(String signature) {
this.contextSignature = signature;
super.setUDFContextSignature(signature);
}
@Override
public void cleanupOnFailure(String location, Job job) throws IOException {
StoreFunc.cleanupOnFailureImpl(location, job);
}
@Override
public void cleanupOnSuccess(String location, Job job) throws IOException {
// Nothing to do
}
@Override
public void putNext(Tuple t) throws IOException {
try {
this.writer.write(NullWritable.get(), t.getAll().size() == 1 ? t.get(0) : t);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public List<String> getShipFiles() {
ArrayList<Class> classList = new ArrayList<Class>();
classList.add(JSONParser.class);
return FuncUtils.getShipFiles(classList);
}
}