blob: cb8170703cf005a707eaab7d818f235c610d99e4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.io.hcatalog;
import com.google.common.base.Objects;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.Target;
import org.apache.crunch.io.CrunchOutputs;
import org.apache.crunch.io.FormatBundle;
import org.apache.crunch.io.MapReduceTarget;
import org.apache.crunch.io.OutputHandler;
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hive.hcatalog.common.HCatUtil;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.mapreduce.CrunchHCatOutputFormat;
import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;
import org.apache.thrift.TException;
import java.io.IOException;
import java.util.Map;
import javax.annotation.Nullable;
public class HCatTarget implements MapReduceTarget {
private static final PType<HCatRecord> PTYPE = Writables.writables(HCatRecord.class);
private static final PType<DefaultHCatRecord> DEFAULT_PTYPE = Writables.writables(DefaultHCatRecord.class);
private final OutputJobInfo info;
private final FormatBundle bundle = FormatBundle.forOutput(CrunchHCatOutputFormat.class);
private Table hiveTableCached;
/**
* Constructs a new instance to write to the provided hive {@code table} name.
* Writes to the "default" database.
*
* Note: if the destination table is partitioned, this constructor should not
* be used. It will only be usable by unpartitioned tables
*
* @param table
* the hive table to write to
*/
public HCatTarget(String table) {
this(null, table, null);
}
/**
* Constructs a new instance to write to the provided hive {@code table} name,
* using the provided {@code database}. If null, uses "default" database.
*
* Note: if the destination table is partitioned, this constructor should not
* be used. It will only be usable by unpartitioned tables
*
* @param database
* the hive database to use for table namespacing
* @param table
* the hive table to write to
*/
public HCatTarget(@Nullable String database, String table) {
this(database, table, null);
}
/**
* Constructs a new instance to write to the provided hive {@code table} name
* and {@code partitionValues}. Writes to the "default" database.
*
* Note: partitionValues will be assembled into a single directory path.
*
* For example, if the partition values are:
*
* <pre>
* [year, 2017],
* [month,11],
* [day, 10]
*
* The constructed directory path will be
* "[dataLocationRoot]/year=2017/month=11/day=10"
* </pre>
*
* @param table
* the hive table to write to
* @param partitionValues
* the partition within the table it should be written
*/
public HCatTarget(String table, Map<String, String> partitionValues) {
this(null, table, partitionValues);
}
/**
* Constructs a new instance to write to the provided {@code database},
* {@code table}, and to the specified {@code partitionValues}. If
* {@code database} isn't specified, the "default" database is used
*
* Note: partitionValues will be assembled into a single directory path.
*
* For example, if the partition values are:
*
* <pre>
* [year, 2017],
* [month,11],
* [day, 10]
*
* The constructed directory path will be
* "[dataLocationRoot]/year=2017/month=11/day=10"
* </pre>
*
* @param database
* the hive database to use for table namespacing
* @param table
* the hive table to write to
* @param partitionValues
* the partition within the table it should be written
*/
public HCatTarget(@Nullable String database, String table, @Nullable Map<String, String> partitionValues) {
this.info = OutputJobInfo.create(database, table, partitionValues);
}
@Override
public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
if (Strings.isNullOrEmpty(name)) {
throw new AssertionError("Named output wasn't generated. This shouldn't happen");
}
CrunchOutputs.addNamedOutput(job, name, bundle, NullWritable.class, HCatRecord.class);
try {
CrunchHCatOutputFormat.setOutput(job, info);
// set the schema into config. this would be necessary if any downstream
// tasks need the schema translated between a format (e.g. avro) and
// HCatRecord for the destination table
Table table = getHiveTable(job.getConfiguration());
CrunchHCatOutputFormat.setSchema(job, HCatUtil.extractSchema(table));
} catch (TException | IOException e) {
throw new CrunchRuntimeException(e);
}
}
@Override
public Target outputConf(String key, String value) {
bundle.set(key, value);
return this;
}
@Override
public Target fileSystem(FileSystem fileSystem) {
// not currently supported/applicable for HCatalog
return this;
}
@Override
public FileSystem getFileSystem() {
// not currently supported/applicable for HCatalog
return null;
}
@Override
public boolean handleExisting(WriteMode writeMode, long lastModifiedAt, Configuration conf) {
return writeMode == WriteMode.DEFAULT;
}
@Override
public boolean accept(OutputHandler handler, PType<?> ptype) {
if (!acceptType(ptype)) {
return false;
}
handler.configure(this, ptype);
return true;
}
@Override
public Converter<?, ?, ?, ?> getConverter(PType<?> ptype) {
return ptype.getConverter();
}
@Override
public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
if (acceptType(ptype))
return (SourceTarget<T>) new HCatSourceTarget(info.getDatabaseName(), info.getTableName());
return null;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("database", info.getDatabaseName())
.append("table", info.getTableName())
.append("partition", info.getPartitionValues())
.toString();
}
@Override
public int hashCode() {
return Objects.hashCode(info.getDatabaseName(), info.getTableName(), info.getPartitionValues());
}
@Override
public boolean equals(Object o) {
if (o == null || !getClass().equals(o.getClass())) {
return false;
}
HCatTarget that = (HCatTarget) o;
return Objects.equal(this.info.getDatabaseName(), that.info.getDatabaseName())
&& Objects.equal(this.info.getTableName(), that.info.getTableName())
&& Objects.equal(this.info.getPartitionValues(), that.info.getPartitionValues());
}
private boolean acceptType(PType<?> ptype) {
return Objects.equal(ptype, PTYPE) || Objects.equal(ptype, DEFAULT_PTYPE);
}
private Table getHiveTable(Configuration conf) throws IOException, TException {
if (hiveTableCached != null) {
return hiveTableCached;
}
IMetaStoreClient hiveMetastoreClient = HCatUtil.getHiveMetastoreClient(new HiveConf(conf, HCatTarget.class));
hiveTableCached = HCatUtil.getTable(hiveMetastoreClient, info.getDatabaseName(), info.getTableName());
return hiveTableCached;
}
}