blob: cd3cb09427ca05e00616a135e2477ce66d94781c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.bigquery;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
import org.apache.nifi.processors.gcp.storage.PutGCSObject;
import org.apache.nifi.util.StringUtils;
import org.threeten.bp.Duration;
import org.threeten.bp.temporal.ChronoUnit;
import com.google.cloud.RetryOption;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobStatistics.LoadStatistics;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import com.google.common.collect.ImmutableList;
/**
* A processor for batch loading data into a Google BigQuery table
*/
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({ "google", "google cloud", "bq", "bigquery" })
@CapabilityDescription("Batch loads flow files content to a Google BigQuery table.")
@SeeAlso({ PutGCSObject.class, DeleteGCSObject.class })
@WritesAttributes({
@WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
})
public class PutBigQueryBatch extends AbstractBigQueryProcessor {
private static final List<String> TYPES = Arrays.asList(FormatOptions.json().getType(), FormatOptions.csv().getType(), FormatOptions.avro().getType());
private static final Validator FORMAT_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
final ValidationResult.Builder builder = new ValidationResult.Builder();
builder.subject(subject).input(input);
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return builder.valid(true).explanation("Contains Expression Language").build();
}
if (TYPES.contains(input.toUpperCase())) {
builder.valid(true);
} else {
builder.valid(false).explanation("Load File Type must be one of the following options: " + StringUtils.join(TYPES, ", "));
}
return builder.build();
}
};
public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR)
.displayName("Read Timeout")
.description(BigQueryAttributes.JOB_READ_TIMEOUT_DESC)
.required(true)
.defaultValue("5 minutes")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
.displayName("Table Schema")
.description(BigQueryAttributes.TABLE_SCHEMA_DESC)
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
.displayName("Load file type")
.description(BigQueryAttributes.SOURCE_TYPE_DESC)
.required(true)
.addValidator(FORMAT_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.CREATE_DISPOSITION_ATTR)
.displayName("Create Disposition")
.description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
.required(true)
.allowableValues(BigQueryAttributes.CREATE_IF_NEEDED, BigQueryAttributes.CREATE_NEVER)
.defaultValue(BigQueryAttributes.CREATE_IF_NEEDED.getValue())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.WRITE_DISPOSITION_ATTR)
.displayName("Write Disposition")
.description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
.required(true)
.allowableValues(BigQueryAttributes.WRITE_EMPTY, BigQueryAttributes.WRITE_APPEND, BigQueryAttributes.WRITE_TRUNCATE)
.defaultValue(BigQueryAttributes.WRITE_EMPTY.getValue())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor MAXBAD_RECORDS = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.MAX_BADRECORDS_ATTR)
.displayName("Max Bad Records")
.description(BigQueryAttributes.MAX_BADRECORDS_DESC)
.required(true)
.defaultValue("0")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor CSV_ALLOW_JAGGED_ROWS = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.CSV_ALLOW_JAGGED_ROWS_ATTR)
.displayName("CSV Input - Allow Jagged Rows")
.description(BigQueryAttributes.CSV_ALLOW_JAGGED_ROWS_DESC)
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
public static final PropertyDescriptor CSV_ALLOW_QUOTED_NEW_LINES = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.CSV_ALLOW_QUOTED_NEW_LINES_ATTR)
.displayName("CSV Input - Allow Quoted New Lines")
.description(BigQueryAttributes.CSV_ALLOW_QUOTED_NEW_LINES_DESC)
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
public static final PropertyDescriptor CSV_CHARSET = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.CSV_CHARSET_ATTR)
.displayName("CSV Input - Character Set")
.description(BigQueryAttributes.CSV_CHARSET_DESC)
.required(true)
.allowableValues("UTF-8", "ISO-8859-1")
.defaultValue("UTF-8")
.build();
public static final PropertyDescriptor CSV_FIELD_DELIMITER = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.CSV_FIELD_DELIMITER_ATTR)
.displayName("CSV Input - Field Delimiter")
.description(BigQueryAttributes.CSV_FIELD_DELIMITER_DESC)
.required(true)
.defaultValue(",")
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor CSV_QUOTE = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.CSV_QUOTE_ATTR)
.displayName("CSV Input - Quote")
.description(BigQueryAttributes.CSV_QUOTE_DESC)
.required(true)
.defaultValue("\"")
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor CSV_SKIP_LEADING_ROWS = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.CSV_SKIP_LEADING_ROWS_ATTR)
.displayName("CSV Input - Skip Leading Rows")
.description(BigQueryAttributes.CSV_SKIP_LEADING_ROWS_DESC)
.required(true)
.defaultValue("0")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor AVRO_USE_LOGICAL_TYPES = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.AVRO_USE_LOGICAL_TYPES_ATTR)
.displayName("Avro Input - Use Logical Types")
.description(BigQueryAttributes.AVRO_USE_LOGICAL_TYPES_DESC)
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return ImmutableList.<PropertyDescriptor> builder()
.addAll(super.getSupportedPropertyDescriptors())
.add(TABLE_SCHEMA)
.add(READ_TIMEOUT)
.add(SOURCE_TYPE)
.add(CREATE_DISPOSITION)
.add(WRITE_DISPOSITION)
.add(MAXBAD_RECORDS)
.add(CSV_ALLOW_JAGGED_ROWS)
.add(CSV_ALLOW_QUOTED_NEW_LINES)
.add(CSV_CHARSET)
.add(CSV_FIELD_DELIMITER)
.add(CSV_QUOTE)
.add(CSV_SKIP_LEADING_ROWS)
.add(AVRO_USE_LOGICAL_TYPES)
.build();
}
@Override
@OnScheduled
public void onScheduled(ProcessContext context) {
super.onScheduled(context);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String type = context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions(flowFile).getValue();
final TableId tableId;
if (StringUtils.isEmpty(projectId)) {
tableId = TableId.of(dataset, tableName);
} else {
tableId = TableId.of(projectId, dataset, tableName);
}
try {
FormatOptions formatOption;
if (type.equals(FormatOptions.csv().getType())) {
formatOption = FormatOptions.csv().toBuilder()
.setAllowJaggedRows(context.getProperty(CSV_ALLOW_JAGGED_ROWS).asBoolean())
.setAllowQuotedNewLines(context.getProperty(CSV_ALLOW_QUOTED_NEW_LINES).asBoolean())
.setEncoding(context.getProperty(CSV_CHARSET).getValue())
.setFieldDelimiter(context.getProperty(CSV_FIELD_DELIMITER).evaluateAttributeExpressions(flowFile).getValue())
.setQuote(context.getProperty(CSV_QUOTE).evaluateAttributeExpressions(flowFile).getValue())
.setSkipLeadingRows(context.getProperty(CSV_SKIP_LEADING_ROWS).evaluateAttributeExpressions(flowFile).asInteger())
.build();
} else {
formatOption = FormatOptions.of(type);
}
final Schema schema = BigQueryUtils.schemaFromString(context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions(flowFile).getValue());
final WriteChannelConfiguration writeChannelConfiguration = WriteChannelConfiguration.newBuilder(tableId)
.setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
.setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
.setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).evaluateAttributeExpressions(flowFile).asBoolean())
.setUseAvroLogicalTypes(context.getProperty(AVRO_USE_LOGICAL_TYPES).asBoolean())
.setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
.setSchema(schema)
.setFormatOptions(formatOption)
.build();
try (TableDataWriteChannel writer = getCloudService().writer(writeChannelConfiguration)) {
session.read(flowFile, rawIn -> {
ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn);
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
while (readableByteChannel.read(byteBuffer) >= 0) {
byteBuffer.flip();
writer.write(byteBuffer);
byteBuffer.clear();
}
});
// writer must be closed to get the job
writer.close();
Job job = writer.getJob();
Long timePeriod = context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.SECONDS);
Duration waitFor = Duration.of(timePeriod, ChronoUnit.SECONDS);
job = job.waitFor(RetryOption.totalTimeout(waitFor));
if (job != null) {
final Map<String, String> attributes = new HashMap<>();
attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, Long.toString(job.getStatistics().getCreationTime()));
attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, Long.toString(job.getStatistics().getEndTime()));
attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, Long.toString(job.getStatistics().getStartTime()));
attributes.put(BigQueryAttributes.JOB_LINK_ATTR, job.getSelfLink());
boolean jobError = (job.getStatus().getError() != null);
if (jobError) {
attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, job.getStatus().getError().getMessage());
attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, job.getStatus().getError().getReason());
attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, job.getStatus().getError().getLocation());
} else {
// in case it got looped back from error
flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_MSG_ATTR);
flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_REASON_ATTR);
flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR);
// add the number of records successfully added
if (job.getStatistics() instanceof LoadStatistics) {
final LoadStatistics stats = (LoadStatistics) job.getStatistics();
attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(stats.getOutputRows()));
}
}
if (!attributes.isEmpty()) {
flowFile = session.putAllAttributes(flowFile, attributes);
}
if (jobError) {
getLogger().log(LogLevel.WARN, job.getStatus().getError().getMessage());
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
} else {
session.getProvenanceReporter().send(flowFile, job.getSelfLink(), job.getStatistics().getEndTime() - job.getStatistics().getStartTime());
session.transfer(flowFile, REL_SUCCESS);
}
}
}
} catch (Exception ex) {
getLogger().log(LogLevel.ERROR, ex.getMessage(), ex);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
}