blob: 73518f6e0c549d3b7bd840e8c83dd9e804aea737 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.hcatalog;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import com.google.auto.value.AutoValue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.hadoop.WritableCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.stats.StatsUtils;
import org.apache.hive.hcatalog.common.HCatConstants;
import org.apache.hive.hcatalog.common.HCatException;
import org.apache.hive.hcatalog.common.HCatUtil;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.data.transfer.DataTransferFactory;
import org.apache.hive.hcatalog.data.transfer.HCatReader;
import org.apache.hive.hcatalog.data.transfer.HCatWriter;
import org.apache.hive.hcatalog.data.transfer.ReadEntity;
import org.apache.hive.hcatalog.data.transfer.ReaderContext;
import org.apache.hive.hcatalog.data.transfer.WriteEntity;
import org.apache.hive.hcatalog.data.transfer.WriterContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* IO to read and write data using HCatalog.
*
* <h3>Reading using HCatalog</h3>
*
* <p>HCatalog source supports reading of HCatRecord from a HCatalog managed source, for eg. Hive.
*
* <p>To configure a HCatalog source, you must specify a metastore URI and a table name. Other
* optional parameters are database &amp; filter For instance:
*
* <pre>{@code
* Map<String, String> configProperties = new HashMap<>();
* configProperties.put("hive.metastore.uris","thrift://metastore-host:port");
*
* pipeline
* .apply(HCatalogIO.read()
* .withConfigProperties(configProperties)
* .withDatabase("default") //optional, assumes default if none specified
* .withTable("employee")
* .withFilter(filterString) //optional, may be specified if the table is partitioned
* }</pre>
*
* <h3>Writing using HCatalog</h3>
*
* <p>HCatalog sink supports writing of HCatRecord to a HCatalog managed source, for eg. Hive.
*
* <p>To configure a HCatalog sink, you must specify a metastore URI and a table name. Other
* optional parameters are database, partition &amp; batchsize The destination table should exist
* beforehand, the transform does not create a new table if it does not exist For instance:
*
* <pre>{@code
* Map<String, String> configProperties = new HashMap<>();
* configProperties.put("hive.metastore.uris","thrift://metastore-host:port");
*
* pipeline
* .apply(...)
* .apply(HCatalogIO.write()
* .withConfigProperties(configProperties)
* .withDatabase("default") //optional, assumes default if none specified
* .withTable("employee")
* .withPartition(partitionValues) //optional, may be specified if the table is partitioned
* .withBatchSize(1024L)) //optional, assumes a default batch size of 1024 if none specified
* }</pre>
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public class HCatalogIO {
private static final Logger LOG = LoggerFactory.getLogger(HCatalogIO.class);
private static final long BATCH_SIZE = 1024L;
private static final String DEFAULT_DATABASE = "default";
/** Write data to Hive. */
public static Write write() {
return new AutoValue_HCatalogIO_Write.Builder().setBatchSize(BATCH_SIZE).build();
}
/** Read data from Hive. */
public static Read read() {
return new AutoValue_HCatalogIO_Read.Builder().setDatabase(DEFAULT_DATABASE).build();
}
private HCatalogIO() {}
/** A {@link PTransform} to read data using HCatalog. */
@VisibleForTesting
@AutoValue
public abstract static class Read extends PTransform<PBegin, PCollection<HCatRecord>> {
@Nullable
abstract Map<String, String> getConfigProperties();
@Nullable
abstract String getDatabase();
@Nullable
abstract String getTable();
@Nullable
abstract String getFilter();
@Nullable
abstract ReaderContext getContext();
@Nullable
abstract Integer getSplitId();
abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setConfigProperties(Map<String, String> configProperties);
abstract Builder setDatabase(String database);
abstract Builder setTable(String table);
abstract Builder setFilter(String filter);
abstract Builder setSplitId(Integer splitId);
abstract Builder setContext(ReaderContext context);
abstract Read build();
}
/** Sets the configuration properties like metastore URI. */
public Read withConfigProperties(Map<String, String> configProperties) {
return toBuilder().setConfigProperties(new HashMap<>(configProperties)).build();
}
/** Sets the database name. This is optional, assumes 'default' database if none specified */
public Read withDatabase(String database) {
return toBuilder().setDatabase(database).build();
}
/** Sets the table name to read from. */
public Read withTable(String table) {
return toBuilder().setTable(table).build();
}
/** Sets the filter details. This is optional, assumes none if not specified */
public Read withFilter(String filter) {
return toBuilder().setFilter(filter).build();
}
Read withSplitId(int splitId) {
checkArgument(splitId >= 0, "Invalid split id-" + splitId);
return toBuilder().setSplitId(splitId).build();
}
Read withContext(ReaderContext context) {
return toBuilder().setContext(context).build();
}
@Override
public PCollection<HCatRecord> expand(PBegin input) {
checkArgument(getTable() != null, "withTable() is required");
checkArgument(getConfigProperties() != null, "withConfigProperties() is required");
return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedHCatalogSource(this)));
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("configProperties", getConfigProperties().toString()));
builder.add(DisplayData.item("table", getTable()));
builder.addIfNotNull(DisplayData.item("database", getDatabase()));
builder.addIfNotNull(DisplayData.item("filter", getFilter()));
}
}
/** A HCatalog {@link BoundedSource} reading {@link HCatRecord} from a given instance. */
@VisibleForTesting
static class BoundedHCatalogSource extends BoundedSource<HCatRecord> {
private final Read spec;
BoundedHCatalogSource(Read spec) {
this.spec = spec;
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Coder<HCatRecord> getOutputCoder() {
return (Coder) WritableCoder.of(DefaultHCatRecord.class);
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
spec.populateDisplayData(builder);
}
@Override
public BoundedReader<HCatRecord> createReader(PipelineOptions options) {
return new BoundedHCatalogReader(this);
}
/**
* Returns the size of the table in bytes, does not take into consideration filter/partition
* details passed, if any.
*/
@Override
public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
Configuration conf = new Configuration();
for (Entry<String, String> entry : spec.getConfigProperties().entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
IMetaStoreClient client = null;
try {
HiveConf hiveConf = HCatUtil.getHiveConf(conf);
client = HCatUtil.getHiveMetastoreClient(hiveConf);
Table table = HCatUtil.getTable(client, spec.getDatabase(), spec.getTable());
return StatsUtils.getFileSizeForTable(hiveConf, table);
} finally {
// IMetaStoreClient is not AutoCloseable, closing it manually
if (client != null) {
client.close();
}
}
}
/**
* Calculates the 'desired' number of splits based on desiredBundleSizeBytes which is passed as
* a hint to native API. Retrieves the actual splits generated by native API, which could be
* different from the 'desired' split count calculated using desiredBundleSizeBytes
*/
@Override
public List<BoundedSource<HCatRecord>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
int desiredSplitCount = 1;
long estimatedSizeBytes = getEstimatedSizeBytes(options);
if (desiredBundleSizeBytes > 0 && estimatedSizeBytes > 0) {
desiredSplitCount = (int) Math.ceil((double) estimatedSizeBytes / desiredBundleSizeBytes);
}
ReaderContext readerContext = getReaderContext(desiredSplitCount);
// process the splits returned by native API
// this could be different from 'desiredSplitCount' calculated above
LOG.info(
"Splitting into bundles of {} bytes: "
+ "estimated size {}, desired split count {}, actual split count {}",
desiredBundleSizeBytes,
estimatedSizeBytes,
desiredSplitCount,
readerContext.numSplits());
List<BoundedSource<HCatRecord>> res = new ArrayList<>();
for (int split = 0; split < readerContext.numSplits(); split++) {
res.add(new BoundedHCatalogSource(spec.withContext(readerContext).withSplitId(split)));
}
return res;
}
private ReaderContext getReaderContext(long desiredSplitCount) throws HCatException {
ReadEntity entity =
new ReadEntity.Builder()
.withDatabase(spec.getDatabase())
.withTable(spec.getTable())
.withFilter(spec.getFilter())
.build();
// pass the 'desired' split count as an hint to the API
Map<String, String> configProps = new HashMap<>(spec.getConfigProperties());
configProps.put(
HCatConstants.HCAT_DESIRED_PARTITION_NUM_SPLITS, String.valueOf(desiredSplitCount));
return DataTransferFactory.getHCatReader(entity, configProps).prepareRead();
}
static class BoundedHCatalogReader extends BoundedSource.BoundedReader<HCatRecord> {
private final BoundedHCatalogSource source;
private HCatRecord current;
private Iterator<HCatRecord> hcatIterator;
BoundedHCatalogReader(BoundedHCatalogSource source) {
this.source = source;
}
@Override
public boolean start() throws HCatException {
HCatReader reader =
DataTransferFactory.getHCatReader(source.spec.getContext(), source.spec.getSplitId());
hcatIterator = reader.read();
return advance();
}
@Override
public boolean advance() {
if (hcatIterator.hasNext()) {
current = hcatIterator.next();
return true;
} else {
current = null;
return false;
}
}
@Override
public BoundedHCatalogSource getCurrentSource() {
return source;
}
@Override
public HCatRecord getCurrent() {
if (current == null) {
throw new NoSuchElementException("Current element is null");
}
return current;
}
@Override
public void close() {
// nothing to close/release
}
}
}
/** A {@link PTransform} to write to a HCatalog managed source. */
@AutoValue
public abstract static class Write extends PTransform<PCollection<HCatRecord>, PDone> {
@Nullable
abstract Map<String, String> getConfigProperties();
@Nullable
abstract String getDatabase();
@Nullable
abstract String getTable();
@Nullable
abstract Map<String, String> getPartition();
abstract long getBatchSize();
abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setConfigProperties(Map<String, String> configProperties);
abstract Builder setDatabase(String database);
abstract Builder setTable(String table);
abstract Builder setPartition(Map<String, String> partition);
abstract Builder setBatchSize(long batchSize);
abstract Write build();
}
/** Sets the configuration properties like metastore URI. */
public Write withConfigProperties(Map<String, String> configProperties) {
return toBuilder().setConfigProperties(new HashMap<>(configProperties)).build();
}
/** Sets the database name. This is optional, assumes 'default' database if none specified */
public Write withDatabase(String database) {
return toBuilder().setDatabase(database).build();
}
/** Sets the table name to write to, the table should exist beforehand. */
public Write withTable(String table) {
return toBuilder().setTable(table).build();
}
/** Sets the partition details. */
public Write withPartition(Map<String, String> partition) {
return toBuilder().setPartition(partition).build();
}
/**
* Sets batch size for the write operation. This is optional, assumes a default batch size of
* 1024 if not set
*/
public Write withBatchSize(long batchSize) {
return toBuilder().setBatchSize(batchSize).build();
}
@Override
public PDone expand(PCollection<HCatRecord> input) {
checkArgument(getConfigProperties() != null, "withConfigProperties() is required");
checkArgument(getTable() != null, "withTable() is required");
input.apply(ParDo.of(new WriteFn(this)));
return PDone.in(input.getPipeline());
}
private static class WriteFn extends DoFn<HCatRecord, Void> {
private final Write spec;
private WriterContext writerContext;
private HCatWriter slaveWriter;
private HCatWriter masterWriter;
private List<HCatRecord> hCatRecordsBatch;
WriteFn(Write spec) {
this.spec = spec;
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.addIfNotNull(DisplayData.item("database", spec.getDatabase()));
builder.add(DisplayData.item("table", spec.getTable()));
builder.addIfNotNull(DisplayData.item("partition", String.valueOf(spec.getPartition())));
builder.add(DisplayData.item("configProperties", spec.getConfigProperties().toString()));
builder.add(DisplayData.item("batchSize", spec.getBatchSize()));
}
@Setup
public void initiateWrite() throws HCatException {
WriteEntity entity =
new WriteEntity.Builder()
.withDatabase(spec.getDatabase())
.withTable(spec.getTable())
.withPartition(spec.getPartition())
.build();
masterWriter = DataTransferFactory.getHCatWriter(entity, spec.getConfigProperties());
writerContext = masterWriter.prepareWrite();
slaveWriter = DataTransferFactory.getHCatWriter(writerContext);
}
@StartBundle
public void startBundle() {
hCatRecordsBatch = new ArrayList<>();
}
@ProcessElement
public void processElement(ProcessContext ctx) throws HCatException {
hCatRecordsBatch.add(ctx.element());
if (hCatRecordsBatch.size() >= spec.getBatchSize()) {
flush();
}
}
@FinishBundle
public void finishBundle() throws HCatException {
flush();
}
private void flush() throws HCatException {
if (hCatRecordsBatch.isEmpty()) {
return;
}
try {
slaveWriter.write(hCatRecordsBatch.iterator());
masterWriter.commit(writerContext);
} catch (HCatException e) {
LOG.error("Exception in flush - write/commit data to Hive", e);
// abort on exception
masterWriter.abort(writerContext);
throw e;
} finally {
hCatRecordsBatch.clear();
}
}
@Teardown
public void tearDown() {
if (slaveWriter != null) {
slaveWriter = null;
}
if (masterWriter != null) {
masterWriter = null;
}
if (writerContext != null) {
writerContext = null;
}
}
}
}
}