| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.hbase; |
| |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.nifi.annotation.behavior.DynamicProperties; |
| import org.apache.nifi.annotation.behavior.DynamicProperty; |
| import org.apache.nifi.annotation.lifecycle.OnScheduled; |
| import org.apache.nifi.components.AllowableValue; |
| import org.apache.nifi.components.PropertyDescriptor; |
| import org.apache.nifi.expression.ExpressionLanguageScope; |
| import org.apache.nifi.flowfile.FlowFile; |
| import org.apache.nifi.hbase.put.PutFlowFile; |
| import org.apache.nifi.processor.AbstractProcessor; |
| import org.apache.nifi.processor.ProcessContext; |
| import org.apache.nifi.processor.ProcessSession; |
| import org.apache.nifi.processor.Relationship; |
| import org.apache.nifi.processor.exception.ProcessException; |
| import org.apache.nifi.processor.util.StandardValidators; |
| import org.apache.nifi.security.krb.KerberosLoginException; |
| |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| |
| /** |
| * Base class for processors that put data to HBase. |
| */ |
| @DynamicProperties({ |
| @DynamicProperty(name = "visibility.<COLUMN FAMILY>", description = "Visibility label for everything under that column family " + |
| "when a specific label for a particular column qualifier is not available.", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, |
| value = "visibility label for <COLUMN FAMILY>" |
| ), |
| @DynamicProperty(name = "visibility.<COLUMN FAMILY>.<COLUMN QUALIFIER>", description = "Visibility label for the specified column qualifier " + |
| "qualified by a configured column family.", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, |
| value = "visibility label for <COLUMN FAMILY>:<COLUMN QUALIFIER>." |
| ) |
| }) |
| public abstract class AbstractPutHBase extends AbstractProcessor { |
| |
| protected static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder() |
| .name("HBase Client Service") |
| .description("Specifies the Controller Service to use for accessing HBase.") |
| .required(true) |
| .identifiesControllerService(HBaseClientService.class) |
| .build(); |
| protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() |
| .name("Table Name") |
| .description("The name of the HBase Table to put data into") |
| .required(true) |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .build(); |
| protected static final PropertyDescriptor ROW_ID = new PropertyDescriptor.Builder() |
| .name("Row Identifier") |
| .description("Specifies the Row ID to use when inserting data into HBase") |
| .required(false) // not all sub-classes will require this |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .build(); |
| |
| static final String STRING_ENCODING_VALUE = "String"; |
| static final String BYTES_ENCODING_VALUE = "Bytes"; |
| static final String BINARY_ENCODING_VALUE = "Binary"; |
| |
| |
| protected static final AllowableValue ROW_ID_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE, |
| "Stores the value of row id as a UTF-8 String."); |
| protected static final AllowableValue ROW_ID_ENCODING_BINARY = new AllowableValue(BINARY_ENCODING_VALUE, BINARY_ENCODING_VALUE, |
| "Stores the value of the rows id as a binary byte array. It expects that the row id is a binary formatted string."); |
| |
| static final PropertyDescriptor ROW_ID_ENCODING_STRATEGY = new PropertyDescriptor.Builder() |
| .name("Row Identifier Encoding Strategy") |
| .description("Specifies the data type of Row ID used when inserting data into HBase. The default behavior is" + |
| " to convert the row id to a UTF-8 byte array. Choosing Binary will convert a binary formatted string" + |
| " to the correct byte[] representation. The Binary option should be used if you are using Binary row" + |
| " keys in HBase") |
| .required(false) // not all sub-classes will require this |
| .expressionLanguageSupported(ExpressionLanguageScope.NONE) |
| .defaultValue(ROW_ID_ENCODING_STRING.getValue()) |
| .allowableValues(ROW_ID_ENCODING_STRING,ROW_ID_ENCODING_BINARY) |
| .build(); |
| protected static final PropertyDescriptor COLUMN_FAMILY = new PropertyDescriptor.Builder() |
| .name("Column Family") |
| .description("The Column Family to use when inserting data into HBase") |
| .required(true) |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .build(); |
| protected static final PropertyDescriptor COLUMN_QUALIFIER = new PropertyDescriptor.Builder() |
| .name("Column Qualifier") |
| .description("The Column Qualifier to use when inserting data into HBase") |
| .required(true) |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .build(); |
| protected static final PropertyDescriptor TIMESTAMP = new PropertyDescriptor.Builder() |
| .name("timestamp") |
| .displayName("Timestamp") |
| .description("The timestamp for the cells being created in HBase. This field can be left blank and HBase will use the current time.") |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR) |
| .build(); |
| protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() |
| .name("Batch Size") |
| .description("The maximum number of FlowFiles to process in a single execution. The FlowFiles will be " + |
| "grouped by table, and a single Put per table will be performed.") |
| .required(true) |
| .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) |
| .defaultValue("25") |
| .build(); |
| |
| public static final Relationship REL_SUCCESS = new Relationship.Builder() |
| .name("success") |
| .description("A FlowFile is routed to this relationship after it has been successfully stored in HBase") |
| .build(); |
| public static final Relationship REL_FAILURE = new Relationship.Builder() |
| .name("failure") |
| .description("A FlowFile is routed to this relationship if it cannot be sent to HBase") |
| .build(); |
| |
| protected HBaseClientService clientService; |
| |
| @OnScheduled |
| public void onScheduled(final ProcessContext context) { |
| clientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); |
| } |
| |
| @Override |
| protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { |
| if (propertyDescriptorName.startsWith("visibility.")) { |
| String[] parts = propertyDescriptorName.split("\\."); |
| String displayName; |
| String description; |
| |
| if (parts.length == 2) { |
| displayName = String.format("Column Family %s Default Visibility", parts[1]); |
| description = String.format("Default visibility setting for %s", parts[1]); |
| } else if (parts.length == 3) { |
| displayName = String.format("Column Qualifier %s.%s Default Visibility", parts[1], parts[2]); |
| description = String.format("Default visibility setting for %s.%s", parts[1], parts[2]); |
| } else { |
| return null; |
| } |
| |
| return new PropertyDescriptor.Builder() |
| .name(propertyDescriptorName) |
| .displayName(displayName) |
| .description(description) |
| .addValidator(StandardValidators.NON_BLANK_VALIDATOR) |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .dynamic(true) |
| .build(); |
| } |
| |
| return null; |
| } |
| |
| @Override |
| public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { |
| final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); |
| List<FlowFile> flowFiles = session.get(batchSize); |
| if (flowFiles == null || flowFiles.size() == 0) { |
| return; |
| } |
| |
| final Map<String,List<PutFlowFile>> tablePuts = new HashMap<>(); |
| |
| // Group FlowFiles by HBase Table |
| for (final FlowFile flowFile : flowFiles) { |
| final PutFlowFile putFlowFile = createPut(session, context, flowFile); |
| |
| if (putFlowFile == null) { |
| // sub-classes should log appropriate error messages before returning null |
| session.transfer(flowFile, REL_FAILURE); |
| } else if (!putFlowFile.isValid()) { |
| if (StringUtils.isBlank(putFlowFile.getTableName())) { |
| getLogger().error("Missing table name for FlowFile {}; routing to failure", new Object[]{flowFile}); |
| } else if (null == putFlowFile.getRow()) { |
| getLogger().error("Missing row id for FlowFile {}; routing to failure", new Object[]{flowFile}); |
| } else if (putFlowFile.getColumns() == null || putFlowFile.getColumns().isEmpty()) { |
| getLogger().error("No columns provided for FlowFile {}; routing to failure", new Object[]{flowFile}); |
| } else { |
| // really shouldn't get here, but just in case |
| getLogger().error("Failed to produce a put for FlowFile {}; routing to failure", new Object[]{flowFile}); |
| } |
| session.transfer(flowFile, REL_FAILURE); |
| } else { |
| List<PutFlowFile> putFlowFiles = tablePuts.get(putFlowFile.getTableName()); |
| if (putFlowFiles == null) { |
| putFlowFiles = new ArrayList<>(); |
| tablePuts.put(putFlowFile.getTableName(), putFlowFiles); |
| } |
| putFlowFiles.add(putFlowFile); |
| } |
| } |
| |
| getLogger().debug("Sending {} FlowFiles to HBase in {} put operations", new Object[]{flowFiles.size(), tablePuts.size()}); |
| |
| final long start = System.nanoTime(); |
| final List<PutFlowFile> successes = new ArrayList<>(); |
| |
| for (Map.Entry<String, List<PutFlowFile>> entry : tablePuts.entrySet()) { |
| try { |
| clientService.put(entry.getKey(), entry.getValue()); |
| successes.addAll(entry.getValue()); |
| } catch (final KerberosLoginException kle) { |
| getLogger().error("Failed to connect to HBase due to {}: Rolling back session, and penalizing flow files", kle, kle); |
| session.rollback(true); |
| } catch (final Exception e) { |
| getLogger().error(e.getMessage(), e); |
| for (PutFlowFile putFlowFile : entry.getValue()) { |
| getLogger().error("Failed to send {} to HBase due to {}; routing to failure", new Object[]{putFlowFile.getFlowFile(), e}); |
| final FlowFile failure = session.penalize(putFlowFile.getFlowFile()); |
| session.transfer(failure, REL_FAILURE); |
| } |
| } |
| } |
| |
| final long sendMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); |
| getLogger().debug("Sent {} FlowFiles to HBase successfully in {} milliseconds", new Object[]{successes.size(), sendMillis}); |
| |
| for (PutFlowFile putFlowFile : successes) { |
| session.transfer(putFlowFile.getFlowFile(), REL_SUCCESS); |
| final String details = "Put " + putFlowFile.getColumns().size() + " cells to HBase"; |
| session.getProvenanceReporter().send(putFlowFile.getFlowFile(), getTransitUri(putFlowFile), details, sendMillis); |
| } |
| } |
| |
| protected String getTransitUri(PutFlowFile putFlowFile) { |
| return clientService.toTransitUri(putFlowFile.getTableName(), new String(putFlowFile.getRow(), StandardCharsets.UTF_8)); |
| } |
| |
| protected byte[] getRow(final String row, final String encoding) { |
| //check to see if we need to modify the rowKey before we pass it down to the PutFlowFile |
| byte[] rowKeyBytes = null; |
| if(BINARY_ENCODING_VALUE.contentEquals(encoding)){ |
| rowKeyBytes = clientService.toBytesBinary(row); |
| }else{ |
| rowKeyBytes = row.getBytes(StandardCharsets.UTF_8); |
| } |
| return rowKeyBytes; |
| } |
| /** |
| * Sub-classes provide the implementation to create a put from a FlowFile. |
| * |
| * @param session |
| * the current session |
| * @param context |
| * the current context |
| * @param flowFile |
| * the FlowFile to create a Put from |
| * |
| * @return a PutFlowFile instance for the given FlowFile |
| */ |
| protected abstract PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile); |
| |
| } |