Merge pull request #458 from peihe/fixup-bq-diff
Forward port fixups in Beam PR-1032
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java
index 552c604..8f4ff79 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java
@@ -177,13 +177,11 @@
list.setPageToken(pageToken);
}
- TableDataList result =
- executeWithBackOff(
- list,
- String.format(
- "Error reading from BigQuery table %s of dataset %s.",
- ref.getTableId(),
- ref.getDatasetId()));
+ TableDataList result = executeWithBackOff(
+ list,
+ String.format(
+ "Error reading from BigQuery table %s of dataset %s.",
+ ref.getTableId(), ref.getDatasetId()));
pageToken = result.getPageToken();
iteratorOverCurrentBatch =
@@ -370,7 +368,7 @@
executeWithBackOff(
client.datasets().insert(projectId, dataset),
String.format(
- "Error when trying to create the temporary dataset %s in project %s",
+ "Error when trying to create the temporary dataset %s in project %s.",
datasetId, projectId));
}
@@ -407,7 +405,7 @@
Job dryRunJob = new Job()
.setConfiguration(new JobConfiguration()
.setQuery(new JobConfigurationQuery()
- .setQuery(query))
+ .setQuery(query))
.setDryRun(true));
JobStatistics jobStats = executeWithBackOff(
client.jobs().insert(projectId, dryRunJob),
@@ -508,7 +506,6 @@
}
}
}
-
return result;
}
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java
index 4516d5d..d6ac5b3 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java
@@ -133,6 +133,17 @@
new TableFieldSchema().setName("anniversary_time").setType("TIME"))));
}
+ private static Table noTableQuerySchema() {
+ return new Table()
+ .setSchema(
+ new TableSchema()
+ .setFields(
+ Arrays.asList(
+ new TableFieldSchema().setName("name").setType("STRING"),
+ new TableFieldSchema().setName("count").setType("INTEGER"),
+ new TableFieldSchema().setName("photo").setType("BYTES"))));
+ }
+
private static Table tableWithLocation() {
return new Table()
.setLocation("EU");
@@ -206,6 +217,7 @@
assertEquals("2000-01-01", row.get("anniversary_date"));
assertEquals("2000-01-01 00:00:00.000005", row.get("anniversary_datetime"));
assertEquals("00:00:00.000005", row.get("anniversary_time"));
+
assertFalse(iterator.advance());
}
@@ -257,14 +269,13 @@
when(mockJobsGet.execute()).thenReturn(getJob);
// Mock table schema fetch.
- when(mockTablesGet.execute()).thenReturn(tableWithBasicSchema());
+ when(mockTablesGet.execute()).thenReturn(noTableQuerySchema());
byte[] photoBytes = "photograph".getBytes();
String photoBytesEncoded = BaseEncoding.base64().encode(photoBytes);
// Mock table data fetch.
when(mockTabledataList.execute()).thenReturn(
- rawDataList(rawRow("Arthur", 42, photoBytesEncoded,
- "2000-01-01", "2000-01-01 00:00:00.000005", "00:00:00.000005")));
+ rawDataList(rawRow("Arthur", 42, photoBytesEncoded)));
// Run query and verify
String query = String.format(
@@ -277,10 +288,10 @@
TableRow row = iterator.getCurrent();
assertTrue(row.containsKey("name"));
- assertTrue(row.containsKey("answer"));
+ assertTrue(row.containsKey("count"));
assertTrue(row.containsKey("photo"));
assertEquals("Arthur", row.get("name"));
- assertEquals(42, row.get("answer"));
+ assertEquals(42, row.get("count"));
assertEquals(photoBytesEncoded, row.get("photo"));
assertFalse(iterator.advance());