blob: 1daa33afedf623a6e839c3e927ae0dbde1540552 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.pig;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.Tables;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.pig.IcebergPigInputFormat.IcebergRecordReader;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.NaNUtil;
import org.apache.pig.Expression;
import org.apache.pig.Expression.BetweenExpression;
import org.apache.pig.Expression.BinaryExpression;
import org.apache.pig.Expression.Column;
import org.apache.pig.Expression.Const;
import org.apache.pig.Expression.InExpression;
import org.apache.pig.Expression.OpType;
import org.apache.pig.Expression.UnaryExpression;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.LoadPredicatePushdown;
import org.apache.pig.LoadPushDown;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IcebergStorage extends LoadFunc implements LoadMetadata, LoadPredicatePushdown, LoadPushDown {
private static final Logger LOG = LoggerFactory.getLogger(IcebergStorage.class);
public static final String PIG_ICEBERG_TABLES_IMPL = "pig.iceberg.tables.impl";
private static Tables iceberg;
private static Map<String, Table> tables = Maps.newConcurrentMap();
private static Map<String, String> locations = Maps.newConcurrentMap();
private String signature;
private IcebergRecordReader reader;
@Override
public void setLocation(String location, Job job) {
LOG.info("[{}]: setLocation() -> {}", signature, location);
locations.put(signature, location);
Configuration conf = job.getConfiguration();
copyUDFContextToScopedConfiguration(conf, IcebergPigInputFormat.ICEBERG_SCHEMA);
copyUDFContextToScopedConfiguration(conf, IcebergPigInputFormat.ICEBERG_PROJECTED_FIELDS);
copyUDFContextToScopedConfiguration(conf, IcebergPigInputFormat.ICEBERG_FILTER_EXPRESSION);
}
@Override
public InputFormat getInputFormat() {
LOG.info("[{}]: getInputFormat()", signature);
String location = locations.get(signature);
return new IcebergPigInputFormat(tables.get(location), signature);
}
@Override
public Tuple getNext() throws IOException {
if (!reader.nextKeyValue()) {
return null;
}
return (Tuple) reader.getCurrentValue();
}
@Override
public void prepareToRead(RecordReader newReader, PigSplit split) {
LOG.info("[{}]: prepareToRead() -> {}", signature, split);
this.reader = (IcebergRecordReader) newReader;
}
@Override
public ResourceSchema getSchema(String location, Job job) throws IOException {
LOG.info("[{}]: getSchema() -> {}", signature, location);
Schema schema = load(location, job).schema();
storeInUDFContext(IcebergPigInputFormat.ICEBERG_SCHEMA, schema);
return SchemaUtil.convert(schema);
}
@Override
public ResourceStatistics getStatistics(String location, Job job) {
LOG.info("[{}]: getStatistics() -> : {}", signature, location);
return null;
}
@Override
public String[] getPartitionKeys(String location, Job job) {
LOG.info("[{}]: getPartitionKeys()", signature);
return new String[0];
}
@Override
public void setPartitionFilter(Expression partitionFilter) {
LOG.info("[{}]: setPartitionFilter() -> {}", signature, partitionFilter);
}
@Override
public List<String> getPredicateFields(String location, Job job) throws IOException {
LOG.info("[{}]: getPredicateFields() -> {}", signature, location);
Schema schema = load(location, job).schema();
List<String> result = Lists.newArrayList();
for (Types.NestedField nf : schema.columns()) {
switch (nf.type().typeId()) {
case MAP:
case LIST:
case STRUCT:
continue;
default:
result.add(nf.name());
}
}
return result;
}
@Override
public ImmutableList<OpType> getSupportedExpressionTypes() {
LOG.info("[{}]: getSupportedExpressionTypes()", signature);
return ImmutableList.of(OpType.OP_AND, OpType.OP_OR, OpType.OP_EQ, OpType.OP_NE, OpType.OP_NOT, OpType.OP_GE,
OpType.OP_GT, OpType.OP_LE, OpType.OP_LT, OpType.OP_BETWEEN, OpType.OP_IN, OpType.OP_NULL);
}
@Override
public void setPushdownPredicate(Expression predicate) throws IOException {
LOG.info("[{}]: setPushdownPredicate()", signature);
LOG.info("[{}]: Pig predicate expression: {}", signature, predicate);
org.apache.iceberg.expressions.Expression icebergExpression = convert(predicate);
LOG.info("[{}]: Iceberg predicate expression: {}", signature, icebergExpression);
storeInUDFContext(IcebergPigInputFormat.ICEBERG_FILTER_EXPRESSION, icebergExpression);
}
private org.apache.iceberg.expressions.Expression convert(Expression expression) throws IOException {
OpType op = expression.getOpType();
if (expression instanceof BinaryExpression) {
Expression lhs = ((BinaryExpression) expression).getLhs();
Expression rhs = ((BinaryExpression) expression).getRhs();
switch (op) {
case OP_AND:
return Expressions.and(convert(lhs), convert(rhs));
case OP_OR:
return Expressions.or(convert(lhs), convert(rhs));
case OP_BETWEEN:
BetweenExpression between = (BetweenExpression) rhs;
return Expressions.and(
convert(OpType.OP_GE, (Column) lhs, (Const) between.getLower()),
convert(OpType.OP_LE, (Column) lhs, (Const) between.getUpper())
);
case OP_IN:
return ((InExpression) rhs).getValues().stream()
.map(value -> convert(OpType.OP_EQ, (Column) lhs, (Const) value))
.reduce(Expressions.alwaysFalse(), Expressions::or);
default:
if (lhs instanceof Column && rhs instanceof Const) {
return convert(op, (Column) lhs, (Const) rhs);
} else if (lhs instanceof Const && rhs instanceof Column) {
throw new FrontendException("Invalid expression ordering " + expression);
}
}
} else if (expression instanceof UnaryExpression) {
Expression unary = ((UnaryExpression) expression).getExpression();
switch (op) {
case OP_NOT: return Expressions.not(convert(unary));
case OP_NULL: return Expressions.isNull(((Column) unary).getName());
default: throw new FrontendException("Unsupported unary operator" + op);
}
}
throw new FrontendException("Failed to pushdown expression " + expression);
}
private org.apache.iceberg.expressions.Expression convert(OpType op, Column col, Const constant) {
String name = col.getName();
Object value = constant.getValue();
switch (op) {
case OP_GE: return Expressions.greaterThanOrEqual(name, value);
case OP_GT: return Expressions.greaterThan(name, value);
case OP_LE: return Expressions.lessThanOrEqual(name, value);
case OP_LT: return Expressions.lessThan(name, value);
case OP_EQ: return NaNUtil.isNaN(value) ? Expressions.isNaN(name) : Expressions.equal(name, value);
case OP_NE: return NaNUtil.isNaN(value) ? Expressions.notNaN(name) : Expressions.notEqual(name, value);
}
throw new RuntimeException(
String.format("[%s]: Failed to pushdown expression: %s %s %s", signature, col, op, constant));
}
@Override
public List<OperatorSet> getFeatures() {
return Collections.singletonList(OperatorSet.PROJECTION);
}
@Override
public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList) {
LOG.info("[{}]: pushProjection() -> {}", signature, requiredFieldList);
try {
List<String> projection = requiredFieldList.getFields()
.stream().map(RequiredField::getAlias).collect(Collectors.toList());
storeInUDFContext(IcebergPigInputFormat.ICEBERG_PROJECTED_FIELDS, (Serializable) projection);
} catch (IOException e) {
throw new RuntimeException(e);
}
return new RequiredFieldResponse(true);
}
@Override
public void setUDFContextSignature(String newSignature) {
this.signature = newSignature;
}
private void storeInUDFContext(String key, Serializable value) throws IOException {
Properties properties = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{signature});
properties.setProperty(key, ObjectSerializer.serialize(value));
}
private void copyUDFContextToScopedConfiguration(Configuration conf, String key) {
String value = UDFContext.getUDFContext()
.getUDFProperties(this.getClass(), new String[]{signature}).getProperty(key);
if (value != null) {
conf.set(key + '.' + signature, value);
}
}
@Override
public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
return location;
}
private Table load(String location, Job job) throws IOException {
if (iceberg == null) {
Class<?> tablesImpl = job.getConfiguration().getClass(PIG_ICEBERG_TABLES_IMPL, HadoopTables.class);
LOG.info("Initializing iceberg tables implementation: {}", tablesImpl);
iceberg = (Tables) ReflectionUtils.newInstance(tablesImpl, job.getConfiguration());
}
Table result = tables.get(location);
if (result == null) {
try {
LOG.info("[{}]: Loading table for location: {}", signature, location);
result = iceberg.load(location);
tables.put(location, result);
} catch (Exception e) {
throw new FrontendException("Failed to instantiate tables implementation", e);
}
}
return result;
}
}