blob: 1f90687a26f171de1c671f6fcc94d5db1f3982d6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kudu;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import java.io.IOException;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.kudu.Common;
import org.apache.kudu.Schema;
import org.apache.kudu.client.AbstractKuduScannerBuilder;
import org.apache.kudu.client.AsyncKuduClient;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduPredicate;
import org.apache.kudu.client.KuduScanToken;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.RowError;
import org.apache.kudu.client.RowResult;
import org.apache.kudu.client.RowResultIterator;
import org.apache.kudu.client.SessionConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** An implementation of the {@link KuduService} that uses a Kudu instance. */
class KuduServiceImpl<T> implements KuduService<T> {
private static final Logger LOG = LoggerFactory.getLogger(KuduServiceImpl.class);
@Override
public Writer createWriter(KuduIO.Write<T> spec) throws KuduException {
return new WriterImpl(spec);
}
@Override
public BoundedSource.BoundedReader createReader(KuduIO.KuduSource source) {
return new ReaderImpl(source);
}
@Override
public List<byte[]> createTabletScanners(KuduIO.Read spec) throws KuduException {
try (KuduClient client = getKuduClient(spec.getMasterAddresses())) {
KuduTable table = client.openTable(spec.getTable());
KuduScanToken.KuduScanTokenBuilder builder = client.newScanTokenBuilder(table);
configureBuilder(spec, table.getSchema(), builder);
List<KuduScanToken> tokens = builder.build();
return tokens.stream().map(t -> uncheckCall(t::serialize)).collect(Collectors.toList());
}
}
/** Writer storing an entity into Apache Kudu table. */
class WriterImpl implements Writer<T> {
private final KuduIO.FormatFunction<T> formatFunction;
private KuduClient client;
private KuduSession session;
private KuduTable table;
WriterImpl(KuduIO.Write<T> spec) throws KuduException {
checkNotNull(spec.masterAddresses(), "masterAddresses cannot be null");
checkNotNull(spec.table(), "table cannot be null");
this.formatFunction = checkNotNull(spec.formatFn(), "formatFn cannot be null");
client =
new AsyncKuduClient.AsyncKuduClientBuilder(spec.masterAddresses()).build().syncClient();
table = client.openTable(spec.table());
}
@Override
public void openSession() throws KuduException {
// errors are collected per session so we align session with the bundle
session = client.newSession();
// async flushing as per the official kudu-spark approach
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
}
@Override
public void write(T entity) throws KuduException {
checkState(session != null, "must call openSession() before writing");
session.apply(formatFunction.apply(new TableAndRecord(table, entity)));
}
@Override
public void closeSession() throws Exception {
try {
session.close();
if (session.countPendingErrors() > 0) {
LOG.error("At least {} errors occurred writing to Kudu", session.countPendingErrors());
RowError[] errors = session.getPendingErrors().getRowErrors();
for (int i = 0; errors != null && i < 3 && i < errors.length; i++) {
LOG.error("Sample error: {}", errors[i]);
}
throw new Exception(
"At least " + session.countPendingErrors() + " error(s) occurred writing to Kudu");
}
} finally {
session = null;
}
}
@Override
public void close() throws Exception {
client.close();
client = null;
}
}
/** Bounded reader of an Apache Kudu table. */
class ReaderImpl extends BoundedSource.BoundedReader<T> {
private final KuduIO.KuduSource<T> source;
private KuduClient client;
private KuduScanner scanner;
private RowResultIterator iter;
private RowResult current;
private long recordsReturned;
ReaderImpl(KuduIO.KuduSource<T> source) {
this.source = source;
}
@Override
public boolean start() throws IOException {
LOG.debug("Starting Kudu reader");
client =
new AsyncKuduClient.AsyncKuduClientBuilder(source.spec.getMasterAddresses())
.build()
.syncClient();
if (source.serializedToken != null) {
// tokens available if the source is already split
scanner = KuduScanToken.deserializeIntoScanner(source.serializedToken, client);
} else {
KuduTable table = client.openTable(source.spec.getTable());
KuduScanner.KuduScannerBuilder builder =
table.getAsyncClient().syncClient().newScannerBuilder(table);
configureBuilder(source.spec, table.getSchema(), builder);
scanner = builder.build();
}
return advance();
}
/**
* Returns the current record transformed into the desired type.
*
* @return the current record
* @throws NoSuchElementException If the current does not exist
*/
@Override
public T getCurrent() throws NoSuchElementException {
if (current != null) {
return source.spec.getParseFn().apply(current);
} else {
throw new NoSuchElementException(
"No current record (Indicates misuse. Perhaps advance() was not called?)");
}
}
@Override
public boolean advance() throws KuduException {
// scanner pages over results, with each page holding an iterator of records
if (iter == null || (!iter.hasNext() && scanner.hasMoreRows())) {
iter = scanner.nextRows();
}
if (iter != null && iter.hasNext()) {
current = iter.next();
++recordsReturned;
return true;
}
return false;
}
@Override
public void close() throws IOException {
LOG.debug("Closing reader after reading {} records.", recordsReturned);
if (scanner != null) {
scanner.close();
scanner = null;
}
if (client != null) {
client.close();
client = null;
}
}
@Override
public synchronized KuduIO.KuduSource getCurrentSource() {
return source;
}
}
/** Creates a new synchronous client. */
private synchronized KuduClient getKuduClient(List<String> masterAddresses) {
return new AsyncKuduClient.AsyncKuduClientBuilder(masterAddresses).build().syncClient();
}
/** Configures the scanner builder to conform to the spec. */
private static <T2> void configureBuilder(
KuduIO.Read<T2> spec, Schema schema, AbstractKuduScannerBuilder builder) {
builder.cacheBlocks(true); // as per kudu-spark
if (spec.getBatchSize() != null) {
builder.batchSizeBytes(spec.getBatchSize());
}
if (spec.getProjectedColumns() != null) {
builder.setProjectedColumnNames(spec.getProjectedColumns());
}
if (spec.getFaultTolerent() != null) {
builder.setFaultTolerant(spec.getFaultTolerent());
}
if (spec.getSerializablePredicates() != null) {
for (Common.ColumnPredicatePB predicate : spec.getSerializablePredicates()) {
builder.addPredicate(KuduPredicate.fromPB(schema, predicate));
}
}
}
/** Wraps the callable converting checked to RuntimeExceptions. */
private static <T> T uncheckCall(Callable<T> callable) {
try {
return callable.call();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}