blob: 6bed8be5b5e66e3dbebf0bfd486361918a0c4295 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.bigquery;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.nio.file.Paths;
import java.util.Iterator;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Field.Mode;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TableResult;
public class PutBigQueryStreamingIT extends AbstractBigQueryIT {
private Schema schema;
@Before
public void setup() throws InitializationException {
runner = TestRunners.newTestRunner(PutBigQueryStreaming.class);
runner = setCredentialsControllerService(runner);
runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
runner.setProperty(AbstractGCPProcessor.PROJECT_ID, PROJECT_ID);
}
private void createTable(String tableName) {
TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), tableName);
// Table field definition
Field id = Field.newBuilder("id", LegacySQLTypeName.INTEGER).setMode(Mode.REQUIRED).build();
Field name = Field.newBuilder("name", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
Field alias = Field.newBuilder("alias", LegacySQLTypeName.STRING).setMode(Mode.REPEATED).build();
Field zip = Field.newBuilder("zip", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
Field city = Field.newBuilder("city", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
Field addresses = Field.newBuilder("addresses", LegacySQLTypeName.RECORD, zip, city).setMode(Mode.REPEATED).build();
Field position = Field.newBuilder("position", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
Field company = Field.newBuilder("company", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
Field job = Field.newBuilder("job", LegacySQLTypeName.RECORD, position, company).setMode(Mode.NULLABLE).build();
// Table schema definition
schema = Schema.of(id, name, alias, addresses, job);
TableDefinition tableDefinition = StandardTableDefinition.of(schema);
TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();
// create table
bigquery.create(tableInfo);
}
private void deleteTable(String tableName) {
TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), tableName);
bigquery.delete(tableId);
}
@Test
public void PutBigQueryStreamingNoError() throws Exception {
String tableName = Thread.currentThread().getStackTrace()[1].getMethodName();
createTable(tableName);
runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.enableControllerService(jsonReader);
runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-correct-data.json"));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "2");
TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
Iterator<FieldValueList> iterator = result.getValues().iterator();
FieldValueList firstElt = iterator.next();
FieldValueList sndElt = iterator.next();
assertTrue(firstElt.get("name").getStringValue().endsWith("Doe"));
assertTrue(sndElt.get("name").getStringValue().endsWith("Doe"));
FieldValueList john;
FieldValueList jane;
john = firstElt.get("name").getStringValue().equals("John Doe") ? firstElt : sndElt;
jane = firstElt.get("name").getStringValue().equals("Jane Doe") ? firstElt : sndElt;
assertEquals(jane.get("job").getRecordValue().get(0).getStringValue(), "Director");
assertTrue(john.get("alias").getRepeatedValue().size() == 2);
assertTrue(john.get("addresses").getRepeatedValue().get(0).getRecordValue().get(0).getStringValue().endsWith("000"));
deleteTable(tableName);
}
@Test
public void PutBigQueryStreamingFullError() throws Exception {
String tableName = Thread.currentThread().getStackTrace()[1].getMethodName();
createTable(tableName);
runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.enableControllerService(jsonReader);
runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-bad-data.json"));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_FAILURE, 1);
runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_FAILURE).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "0");
TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
assertFalse(result.getValues().iterator().hasNext());
deleteTable(tableName);
}
@Test
public void PutBigQueryStreamingPartialError() throws Exception {
String tableName = Thread.currentThread().getStackTrace()[1].getMethodName();
createTable(tableName);
runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.enableControllerService(jsonReader);
runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
runner.setProperty(BigQueryAttributes.SKIP_INVALID_ROWS_ATTR, "true");
runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-bad-data.json"));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_FAILURE, 1);
runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_FAILURE).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "1");
TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
Iterator<FieldValueList> iterator = result.getValues().iterator();
FieldValueList firstElt = iterator.next();
assertFalse(iterator.hasNext());
assertEquals(firstElt.get("name").getStringValue(), "Jane Doe");
deleteTable(tableName);
}
}