blob: 14a9bde8710e4657ec5f23e78fdd69228fe2df7c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.apache.nifi.hbase.HBaseTestUtil.getHBaseClientService;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
public class TestPutHBaseJSON {
public static final String DEFAULT_TABLE_NAME = "nifi";
public static final String DEFAULT_ROW = "row1";
public static final String DEFAULT_COLUMN_FAMILY = "family1";
public static final Long DEFAULT_TIMESTAMP = 1L;
@Test
public void testCustomValidate() throws InitializationException {
// missing row id and row id field name should be invalid
TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
getHBaseClientService(runner);
runner.assertNotValid();
// setting both properties should still be invalid
runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, "rowId");
runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowFieldName");
runner.assertNotValid();
// only a row id field name should make it valid
runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowFieldName");
runner.assertValid();
// only a row id should make it valid
runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, "rowId");
runner.assertValid();
}
@Test
public void testSingleJsonDocAndProvidedRowId() throws IOException, InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
runner.enqueue(content.getBytes("UTF-8"));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
final Map<String,byte[]> expectedColumns = new HashMap<>();
expectedColumns.put("field1", hBaseClient.toBytes("value1"));
expectedColumns.put("field2", hBaseClient.toBytes("value2"));
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
assertEquals("hbase://" + DEFAULT_TABLE_NAME + "/" + DEFAULT_ROW, event.getTransitUri());
}
@Test
public void testSingleJsonDocAndProvidedRowIdwithNonString() throws IOException, InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
runner.setProperty(PutHBaseJSON.FIELD_ENCODING_STRATEGY, PutHBaseJSON.BYTES_ENCODING_VALUE);
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
final String content = "{ \"field1\" : 1.23456, \"field2\" : 2345235, \"field3\" : false }";
runner.enqueue(content.getBytes("UTF-8"));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
final Map<String,byte[]> expectedColumns = new HashMap<>();
expectedColumns.put("field1", hBaseClient.toBytes(1.23456d));
expectedColumns.put("field2", hBaseClient.toBytes(2345235l));
expectedColumns.put("field3", hBaseClient.toBytes(false));
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
assertEquals("hbase://" + DEFAULT_TABLE_NAME + "/" + DEFAULT_ROW, event.getTransitUri());
}
@Test
public void testSingJsonDocAndExtractedRowId() throws IOException, InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowField");
final String content = "{ \"rowField\" : \"myRowId\", \"field1\" : \"value1\", \"field2\" : \"value2\" }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
// should be a put with row id of myRowId, and rowField shouldn't end up in the columns
final Map<String,byte[]> expectedColumns1 = new HashMap<>();
expectedColumns1.put("field1", hBaseClient.toBytes("value1"));
expectedColumns1.put("field2", hBaseClient.toBytes("value2"));
HBaseTestUtil.verifyPut("myRowId", DEFAULT_COLUMN_FAMILY, expectedColumns1, puts);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
assertEquals(1, events.size());
HBaseTestUtil.verifyEvent(runner.getProvenanceEvents(), "hbase://" + DEFAULT_TABLE_NAME + "/myRowId", ProvenanceEventType.SEND);
}
@Test
public void testSingJsonDocAndExtractedRowIdMissingField() throws IOException, InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowField");
final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0);
outFile.assertContentEquals(content);
// should be no provenance events
assertEquals(0, runner.getProvenanceEvents().size());
// no puts should have made it to the client
assertEquals(0, hBaseClient.getFlowFilePuts().size());
}
@Test
public void testMultipleJsonDocsRouteToFailure() throws IOException, InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
final String content1 = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
final String content2 = "{ \"field3\" : \"value3\", \"field4\" : \"value4\" }";
final String content = "[ " + content1 + " , " + content2 + " ]";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0);
outFile.assertContentEquals(content);
// should be no provenance events
assertEquals(0, runner.getProvenanceEvents().size());
// no puts should have made it to the client
assertEquals(0, hBaseClient.getFlowFilePuts().size());
}
@Test
public void testELWithProvidedRowId() throws IOException, InitializationException {
final TestRunner runner = getTestRunner("${hbase.table}", "${hbase.colFamily}", "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, "${hbase.rowId}");
final Map<String,String> attributes = new HashMap<>();
attributes.put("hbase.table", "myTable");
attributes.put("hbase.colFamily", "myColFamily");
attributes.put("hbase.rowId", "myRowId");
final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
runner.enqueue(content.getBytes("UTF-8"), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get("myTable");
assertEquals(1, puts.size());
final Map<String,byte[]> expectedColumns = new HashMap<>();
expectedColumns.put("field1", hBaseClient.toBytes("value1"));
expectedColumns.put("field2", hBaseClient.toBytes("value2"));
HBaseTestUtil.verifyPut("myRowId", "myColFamily", expectedColumns, puts);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
assertEquals(1, events.size());
HBaseTestUtil.verifyEvent(runner.getProvenanceEvents(), "hbase://myTable/myRowId", ProvenanceEventType.SEND);
}
@Test
public void testELWithExtractedRowId() throws IOException, InitializationException {
final TestRunner runner = getTestRunner("${hbase.table}", "${hbase.colFamily}", "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "${hbase.rowField}");
final Map<String,String> attributes = new HashMap<>();
attributes.put("hbase.table", "myTable");
attributes.put("hbase.colFamily", "myColFamily");
attributes.put("hbase.rowField", "field1");
final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
runner.enqueue(content.getBytes("UTF-8"), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get("myTable");
assertEquals(1, puts.size());
final Map<String,byte[]> expectedColumns = new HashMap<>();
expectedColumns.put("field2", hBaseClient.toBytes("value2"));
HBaseTestUtil.verifyPut("value1", "myColFamily", expectedColumns, puts);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
assertEquals(1, events.size());
HBaseTestUtil.verifyEvent(runner.getProvenanceEvents(), "hbase://myTable/value1", ProvenanceEventType.SEND);
}
@Test
public void testNullAndArrayElementsWithWarnStrategy() throws InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_WARN.getValue());
// should route to success because there is at least one valid field
final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
// should have skipped field1 and field3
final Map<String,byte[]> expectedColumns = new HashMap<>();
expectedColumns.put("field2", hBaseClient.toBytes("value2"));
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
}
@Test
public void testNullAndArrayElementsWithIgnoreStrategy() throws InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_IGNORE.getValue());
// should route to success because there is at least one valid field
final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
// should have skipped field1 and field3
final Map<String,byte[]> expectedColumns = new HashMap<>();
expectedColumns.put("field2", hBaseClient.toBytes("value2"));
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
}
@Test
public void testNullAndArrayElementsWithFailureStrategy() throws InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_FAIL.getValue());
// should route to success because there is at least one valid field
final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0);
outFile.assertContentEquals(content);
// should be no provenance events
assertEquals(0, runner.getProvenanceEvents().size());
// no puts should have made it to the client
assertEquals(0, hBaseClient.getFlowFilePuts().size());
}
@Test
public void testNullAndArrayElementsWithTextStrategy() throws InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_TEXT.getValue());
// should route to success because there is at least one valid field
final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
// should have skipped field1 and field3
final Map<String,byte[]> expectedColumns = new HashMap<>();
expectedColumns.put("field1", hBaseClient.toBytes("[{\"child_field1\":\"child_value1\"}]"));
expectedColumns.put("field2", hBaseClient.toBytes("value2"));
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
}
@Test
public void testNestedDocWithTextStrategy() throws InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_TEXT.getValue());
// should route to success because there is at least one valid field
final String content = "{ \"field1\" : { \"child_field1\" : \"child_value1\" }, \"field2\" : \"value2\", \"field3\" : null }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
// should have skipped field1 and field3
final Map<String,byte[]> expectedColumns = new HashMap<>();
expectedColumns.put("field1", hBaseClient.toBytes("{\"child_field1\":\"child_value1\"}"));
expectedColumns.put("field2", hBaseClient.toBytes("value2"));
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts);
}
@Test
public void testAllElementsAreNullOrArrays() throws InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_WARN.getValue());
// should route to failure since it would produce a put with no columns
final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : null }";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0);
outFile.assertContentEquals(content);
// should be no provenance events
assertEquals(0, runner.getProvenanceEvents().size());
// no puts should have made it to the client
assertEquals(0, hBaseClient.getFlowFilePuts().size());
}
@Test
public void testInvalidJson() throws InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
final String content = "NOT JSON";
runner.enqueue(content.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
}
@Test
public void testTimestamp() throws UnsupportedEncodingException, InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
runner.setProperty(PutHBaseJSON.TIMESTAMP, DEFAULT_TIMESTAMP.toString());
final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
runner.enqueue(content.getBytes("UTF-8"));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
final Map<String,byte[]> expectedColumns = new HashMap<>();
expectedColumns.put("field1", hBaseClient.toBytes("value1"));
expectedColumns.put("field2", hBaseClient.toBytes("value2"));
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, DEFAULT_TIMESTAMP, expectedColumns, puts);
}
@Test
public void testTimestampWithEL() throws UnsupportedEncodingException, InitializationException {
final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
runner.setProperty(PutHBaseJSON.TIMESTAMP, "${hbase.timestamp}");
final Map<String,String> attributes = new HashMap<>();
attributes.put("hbase.timestamp", DEFAULT_TIMESTAMP.toString());
final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
runner.enqueue(content.getBytes("UTF-8"), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getFlowFilePuts());
assertEquals(1, hBaseClient.getFlowFilePuts().size());
final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
assertEquals(1, puts.size());
final Map<String,byte[]> expectedColumns = new HashMap<>();
expectedColumns.put("field1", hBaseClient.toBytes("value1"));
expectedColumns.put("field2", hBaseClient.toBytes("value2"));
HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, DEFAULT_TIMESTAMP, expectedColumns, puts);
}
private TestRunner getTestRunner(String table, String columnFamily, String batchSize) {
final TestRunner runner = TestRunners.newTestRunner(PutHBaseJSON.class);
runner.setProperty(PutHBaseJSON.TABLE_NAME, table);
runner.setProperty(PutHBaseJSON.COLUMN_FAMILY, columnFamily);
runner.setProperty(PutHBaseJSON.BATCH_SIZE, batchSize);
return runner;
}
}