| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig; |
| |
| import java.io.IOException; |
| |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.OutputFormat; |
| import org.apache.hadoop.mapreduce.RecordWriter; |
| import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger; |
| import org.apache.pig.classification.InterfaceAudience; |
| import org.apache.pig.classification.InterfaceStability; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.impl.util.UDFContext; |
| |
| |
| /** |
| * StoreFuncs take records from Pig's processing and store them into a data store. Most frequently |
| * this is an HDFS file, but it could also be an HBase instance, RDBMS, etc. |
| */ |
| @InterfaceAudience.Public |
| @InterfaceStability.Stable |
| public abstract class StoreFunc implements StoreFuncInterface { |
| |
| /** |
| * This method is called by the Pig runtime in the front end to convert the |
| * output location to an absolute path if the location is relative. The |
| * StoreFunc implementation is free to choose how it converts a relative |
| * location to an absolute location since this may depend on what the location |
| * string represent (hdfs path or some other data source). |
| * |
| * |
| * @param location location as provided in the "store" statement of the script |
| * @param curDir the current working direction based on any "cd" statements |
| * in the script before the "store" statement. If there are no "cd" statements |
| * in the script, this would be the home directory - |
| * <pre>/user/<username> </pre> |
| * @return the absolute location based on the arguments passed |
| * @throws IOException if the conversion is not possible |
| */ |
| @Override |
| public String relToAbsPathForStoreLocation(String location, Path curDir) |
| throws IOException { |
| return LoadFunc.getAbsolutePath(location, curDir); |
| } |
| |
| /** |
| * Return the OutputFormat associated with StoreFunc. This will be called |
| * on the front end during planning and on the backend during |
| * execution. |
| * @return the {@link OutputFormat} associated with StoreFunc |
| * @throws IOException if an exception occurs while constructing the |
| * OutputFormat |
| * |
| */ |
| @Override |
| public abstract OutputFormat getOutputFormat() throws IOException; |
| |
| /** |
| * Communicate to the storer the location where the data needs to be stored. |
| * The location string passed to the {@link StoreFunc} here is the |
| * return value of {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} |
| * This method will be called in the frontend and backend multiple times. Implementations |
| * should bear in mind that this method is called multiple times and should |
| * ensure there are no inconsistent side effects due to the multiple calls. |
| * {@link #checkSchema(ResourceSchema)} will be called before any call to |
| * {@link #setStoreLocation(String, Job)}. |
| * |
| |
| * @param location Location returned by |
| * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} |
| * @param job The {@link Job} object |
| * @throws IOException if the location is not valid. |
| */ |
| @Override |
| public abstract void setStoreLocation(String location, Job job) throws IOException; |
| |
| /** |
| * Set the schema for data to be stored. This will be called on the |
| * front end during planning if the store is associated with a schema. |
| * A Store function should implement this function to |
| * check that a given schema is acceptable to it. For example, it |
| * can check that the correct partition keys are included; |
| * a storage function to be written directly to an OutputFormat can |
| * make sure the schema will translate in a well defined way. Default implementation |
| * is a no-op. |
| * @param s to be checked |
| * @throws IOException if this schema is not acceptable. It should include |
| * a detailed error message indicating what is wrong with the schema. |
| */ |
| @Override |
| public void checkSchema(ResourceSchema s) throws IOException { |
| // default implementation is a no-op |
| } |
| |
| /** |
| * Initialize StoreFunc to write data. This will be called during |
| * execution on the backend before the call to putNext. |
| * @param writer RecordWriter to use. |
| * @throws IOException if an exception occurs during initialization |
| */ |
| @Override |
| public abstract void prepareToWrite(RecordWriter writer) throws IOException; |
| |
| /** |
| * Write a tuple to the data store. |
| * |
| * @param t the tuple to store. |
| * @throws IOException if an exception occurs during the write |
| */ |
| @Override |
| public abstract void putNext(Tuple t) throws IOException; |
| |
| /** |
| * This method will be called by Pig both in the front end and back end to |
| * pass a unique signature to the {@link StoreFunc} which it can use to store |
| * information in the {@link UDFContext} which it needs to store between |
| * various method invocations in the front end and back end. This method |
| * will be called before other methods in {@link StoreFunc}. This is necessary |
| * because in a Pig Latin script with multiple stores, the different |
| * instances of store functions need to be able to find their (and only their) |
| * data in the UDFContext object. The default implementation is a no-op. |
| * @param signature a unique signature to identify this StoreFunc |
| */ |
| @Override |
| public void setStoreFuncUDFContextSignature(String signature) { |
| // default implementation is a no-op |
| } |
| |
| /** |
| * This method will be called by Pig if the job which contains this store |
| * fails. Implementations can clean up output locations in this method to |
| * ensure that no incorrect/incomplete results are left in the output location. |
| * The default implementation deletes the output location if it |
| * is a {@link FileSystem} location. |
| * @param location Location returned by |
| * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} |
| * @param job The {@link Job} object - this should be used only to obtain |
| * cluster properties through {@link Job#getConfiguration()} and not to set/query |
| * any runtime job information. |
| */ |
| @Override |
| public void cleanupOnFailure(String location, Job job) |
| throws IOException { |
| cleanupOnFailureImpl(location, job); |
| } |
| |
| /** |
| * This method will be called by Pig if the job which contains this store |
| * is successful, and some cleanup of intermediate resources is required. |
| * Implementations can clean up output locations in this method to |
| * ensure that no incorrect/incomplete results are left in the output location. |
| * @param location Location returned by |
| * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} |
| * @param job The {@link Job} object - this should be used only to obtain |
| * cluster properties through {@link Job#getConfiguration()} and not to set/query |
| * any runtime job information. |
| */ |
| @Override |
| public void cleanupOnSuccess(String location, Job job) |
| throws IOException { |
| // DEFAULT: DO NOTHING, user-defined overrides can |
| // call cleanupOnFailureImpl(location, job) or ...? |
| } |
| |
| /** |
| * Default implementation for {@link #cleanupOnFailure(String, Job)} |
| * and {@link #cleanupOnSuccess(String, Job)}. This removes a file |
| * from HDFS. |
| * @param location file name (or URI) of file to remove |
| * @param job Hadoop job, used to access the appropriate file system. |
| * @throws IOException |
| */ |
| public static void cleanupOnFailureImpl(String location, Job job) |
| throws IOException { |
| Path path = new Path(location); |
| FileSystem fs = path.getFileSystem(job.getConfiguration()); |
| if(fs.exists(path)){ |
| fs.delete(path, true); |
| } |
| } |
| |
| // TODO When dropping support for JDK 7 move this as a default method to StoreFuncInterface |
| /** |
| * DAG execution engines like Tez support optimizing union by writing to |
| * output location in parallel from tasks of different vertices. Commit is |
| * called once all the vertices in the union are complete. This eliminates |
| * need to have a separate phase to read data output from previous phases, |
| * union them and write out again. |
| * |
| * Enabling the union optimization requires the OutputFormat to |
| * |
| * 1) Support creation of different part file names for tasks of different |
| * vertices. Conflicting filenames can create data corruption and loss. |
| * For eg: If task 0 of vertex1 and vertex2 both create filename as |
| * part-r-00000, then one of the files will be overwritten when promoting |
| * from temporary to final location leading to data loss. |
| * FileOutputFormat has mapreduce.output.basename config which enables |
| * naming files differently in different vertices. Classes extending |
| * FileOutputFormat and those prefixing file names with mapreduce.output.basename |
| * value will not encounter conflict. Cases like HBaseStorage which write to key |
| * value store and do not produce files also should not face any conflict. |
| * |
| * 2) Support calling of commit once at the end takes care of promoting |
| * temporary files of the different vertices into the final location. |
| * For eg: FileOutputFormat commit algorithm handles promoting of files produced |
| * by tasks of different vertices into final output location without issues |
| * if there is no file name conflict. In cases like HBaseStorage, the |
| * TableOutputCommitter does nothing on commit. |
| * |
| * If custom OutputFormat used by the StoreFunc does not support the above |
| * two criteria, then false should be returned. Union optimization will be |
| * disabled for the StoreFunc. |
| * |
| * Default implementation returns null and in that case planner falls back |
| * to {@link PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS} and |
| * {@link PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS} |
| * settings to determine if the StoreFunc supports it. |
| */ |
| public Boolean supportsParallelWriteToStoreLocation() { |
| return null; |
| } |
| |
| /** |
| * Issue a warning. Warning messages are aggregated and reported to |
| * the user. |
| * @param msg String message of the warning |
| * @param warningEnum type of warning |
| */ |
| public final void warn(String msg, Enum warningEnum) { |
| PigHadoopLogger.getInstance().warn(this, msg, warningEnum); |
| } |
| } |