blob: 665e465afa8d7705921cb7ed1dc3f9582fc2fb1c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.builtin;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
/**
* Flatten a bag into a string. This UDF will use the character '_'
* as the default delimiter if one is not provided.
*
* Example:
*
* bag = {(a),(b),(c)}
* BagToString(bag) -> 'a_b_c'
*
* BagToString(bag, '+') -> 'a+b+c'
*
* bag = {(a,b), (c,d), (e,f)}
* BagToString(bag) --> 'a_b_c_d_e_f'
*
* If input bag is null, this UTF will return null;
*/
public class BagToString extends EvalFunc<String> {
private static final String USAGE_STRING = "Usage BagToString(dataBag) or BagToString(dataBag, delimiter)";
private static final String DEFAULT_DELIMITER = "_";
@Override
public String exec(Tuple inputTuple) throws IOException {
if ((inputTuple.size() != 1) && (inputTuple.size() != 2)) {
throw new ExecException(USAGE_STRING, PigException.INPUT);
}
Object firstArg = inputTuple.get(0);
if (firstArg == null) {
return null;
}
if (!(firstArg instanceof DataBag)) {
throw new ExecException(USAGE_STRING + " found type " + firstArg.getClass().getName(), PigException.INPUT);
}
if ((inputTuple.size() == 2) && !(inputTuple.get(1) instanceof String)) {
throw new ExecException("Usage BagToTuple(DataBag, String)", PigException.INPUT);
}
DataBag bag = (DataBag) inputTuple.get(0);
String delimeter = DEFAULT_DELIMITER;
if (inputTuple.size() == 2) {
delimeter = (String)inputTuple.get(1);
}
StringBuilder buffer = new StringBuilder();
try {
for (Tuple t : bag) {
if (t != null) {
for (int i = 0; i < t.size(); i++) {
if (buffer.length() > 0) {
buffer.append(delimeter);
}
buffer.append(t.get(i));
}
}
}
return buffer.toString();
} catch (Exception e) {
String msg = "Encourntered error while flattening a bag "
+ this.getClass().getSimpleName();
throw new ExecException(msg, PigException.BUG, e);
}
}
@Override
public Schema outputSchema(Schema inputSchema) {
try {
if ((inputSchema == null) || ((inputSchema.size() != 1) && (inputSchema.size() != 2))) {
throw new RuntimeException("Expecting 2 inputs, found: " +
((inputSchema == null) ? 0 : inputSchema.size()));
}
FieldSchema inputFieldSchema = inputSchema.getField(0);
if (inputFieldSchema.type != DataType.BAG) {
throw new RuntimeException("Expecting a bag of tuples: {()}, found data type: " +
DataType.findTypeName(inputFieldSchema.type));
}
// first field in the bag schema
FieldSchema firstFieldSchema = inputFieldSchema.schema.getField(0);
if ((firstFieldSchema == null) || (firstFieldSchema.schema == null)
|| firstFieldSchema.schema.size() < 1) {
throw new RuntimeException("Expecting a bag and a delimeter, found: " + inputSchema);
}
if (firstFieldSchema.type != DataType.TUPLE) {
throw new RuntimeException("Expecting a bag and a delimeter, found: " + inputSchema);
}
if (inputSchema.size() == 2) {
FieldSchema secondInputFieldSchema = inputSchema.getField(1);
if (secondInputFieldSchema.type != DataType.CHARARRAY) {
throw new RuntimeException("Expecting a bag and a delimeter, found: " + inputSchema);
}
}
return new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY));
} catch (FrontendException e) {
e.printStackTrace();
return null;
}
}
@Override
public boolean allowCompileTimeCalculation() {
return true;
}
}