Merge pull request #14767 from [BEAM-7819] Add missing fields to python apache_beam.io.gcp.pubsub.PubsubMessage
[BEAM-7819] Add missing fields to python apache_beam.io.gcp.pubsub.PubsubMessage
diff --git a/CHANGES.md b/CHANGES.md
index c35b778..90d0123 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -119,24 +119,20 @@
* Fixed X (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
-# [2.29.0] - Release branch cut, any updates should be cherry-picked
+# [2.29.0] - 2021-04-29
## Highlights
-* New highly anticipated feature X added to Python SDK ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
-* New highly anticipated feature Y added to Java SDK ([BEAM-Y](https://issues.apache.org/jira/browse/BEAM-Y)).
* Spark Classic and Portable runners officially support Spark 3 ([BEAM-7093](https://issues.apache.org/jira/browse/BEAM-7093)).
* Official Java 11 support for most runners (Dataflow, Flink, Spark) ([BEAM-2530](https://issues.apache.org/jira/browse/BEAM-2530)).
* DataFrame API now supports GroupBy.apply ([BEAM-11628](https://issues.apache.org/jira/browse/BEAM-11628)).
## I/Os
-* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* Added support for S3 filesystem on AWS SDK V2 (Java) ([BEAM-7637](https://issues.apache.org/jira/browse/BEAM-7637))
## New Features / Improvements
-* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* DataFrame API now supports pandas 1.2.x ([BEAM-11531](https://issues.apache.org/jira/browse/BEAM-11531)).
* Multiple DataFrame API bugfixes ([BEAM-12071](https://issues.apache/jira/browse/BEAM-12071), [BEAM-11929](https://issues.apache/jira/browse/BEAM-11929))
@@ -146,17 +142,11 @@
To restore the old behavior, one can register `FakeDeterministicFastPrimitivesCoder` with
`beam.coders.registry.register_fallback_coder(beam.coders.coders.FakeDeterministicFastPrimitivesCoder())`
or use the `allow_non_deterministic_key_coders` pipeline option.
-* X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
## Deprecations
-* X behavior is deprecated and will be removed in X versions ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* Support for Flink 1.8 and 1.9 will be removed in the next release (2.30.0) ([BEAM-11948](https://issues.apache.org/jira/browse/BEAM-11948)).
-## Known Issues
-
-* Fixed X (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
-
# [2.28.0] - 2021-02-22
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index f6d61b5..d2e2898 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -417,7 +417,6 @@
// a dependency version which should match across multiple
// Maven artifacts.
def activemq_version = "5.14.5"
- def antlr_version = "4.9.2"
def autovalue_version = "1.8.1"
def aws_java_sdk_version = "1.11.974"
def aws_java_sdk2_version = "2.15.31"
@@ -467,8 +466,8 @@
activemq_junit : "org.apache.activemq.tooling:activemq-junit:$activemq_version",
activemq_kahadb_store : "org.apache.activemq:activemq-kahadb-store:$activemq_version",
activemq_mqtt : "org.apache.activemq:activemq-mqtt:$activemq_version",
- antlr : "org.antlr:antlr4:$antlr_version",
- antlr_runtime : "org.antlr:antlr4-runtime:$antlr_version",
+ antlr : "org.antlr:antlr4:4.7",
+ antlr_runtime : "org.antlr:antlr4-runtime:4.7",
args4j : "args4j:args4j:2.33",
auto_value_annotations : "com.google.auto.value:auto-value-annotations:$autovalue_version",
avro : "org.apache.avro:avro:1.8.2",
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expand.go b/sdks/go/pkg/beam/core/runtime/xlangx/expand.go
index ff320f7..8748ddc 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/expand.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/expand.go
@@ -13,6 +13,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+// Package xlangx contains various low-level utilities needed for adding
+// cross-language transforms to the pipeline.
package xlangx
import (
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/payload.go b/sdks/go/pkg/beam/core/runtime/xlangx/payload.go
index c038463..cdb3457 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/payload.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/payload.go
@@ -19,10 +19,10 @@
"bytes"
"reflect"
+ pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/schema"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
- "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
"google.golang.org/protobuf/proto"
)
@@ -53,7 +53,7 @@
}
// Put schema and row into payload proto, and marshal it.
- ecp := &pipeline_v1.ExternalConfigurationPayload{
+ ecp := &pipepb.ExternalConfigurationPayload{
Schema: scm,
Payload: buf.Bytes(),
}
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go b/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
index 1ba1ac4..b434650 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
@@ -100,7 +100,7 @@
var resolvedDeps []*pipepb.ArtifactInformation
for _, a := range resolvedArtifacts {
- name, _ := artifact.MustExtractFilePayload(a)
+ name, sha256 := artifact.MustExtractFilePayload(a)
fullTmpPath := filepath.Join(tmpPath, "/", name)
fullSdkPath := fullTmpPath
if len(cfg.SdkPath) > 0 {
@@ -111,7 +111,8 @@
TypeUrn: "beam:artifact:type:file:v1",
TypePayload: protox.MustEncode(
&pipepb.ArtifactFilePayload{
- Path: fullSdkPath,
+ Path: fullSdkPath,
+ Sha256: sha256,
},
),
RoleUrn: a.RoleUrn,
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImplReflectiveFunctionBase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImplReflectiveFunctionBase.java
index 9146e05..70d7965 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImplReflectiveFunctionBase.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImplReflectiveFunctionBase.java
@@ -20,6 +20,7 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
@@ -113,8 +114,7 @@
return add(type, name, false);
}
- public ParameterListBuilder add(
- final Class<?> type, final String name, final boolean optional) {
+ public ParameterListBuilder add(final Type type, final String name, final boolean optional) {
final int ordinal = builder.size();
builder.add(
new FunctionParameter() {
@@ -142,7 +142,7 @@
}
public ParameterListBuilder addMethodParameters(Method method) {
- final Class<?>[] types = method.getParameterTypes();
+ final Type[] types = method.getGenericParameterTypes();
for (int i = 0; i < types.length; i++) {
add(
types[i],
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java
index 9299473..5ad634fb 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java
@@ -142,7 +142,20 @@
sqlScalarUdfs.put(createFunctionStmt.getNamePath(), createFunctionStmt);
break;
case USER_DEFINED_JAVA_SCALAR_FUNCTIONS:
- validateJavaUdf(createFunctionStmt);
+ String functionName = String.join(".", createFunctionStmt.getNamePath());
+ for (FunctionArgumentType argumentType :
+ createFunctionStmt.getSignature().getFunctionArgumentList()) {
+ Type type = argumentType.getType();
+ if (type == null) {
+ throw new UnsupportedOperationException(
+ "UDF templated argument types are not supported.");
+ }
+ validateJavaUdfZetaSqlType(type, functionName);
+ }
+ if (createFunctionStmt.getReturnType() == null) {
+ throw new IllegalArgumentException("UDF return type must not be null.");
+ }
+ validateJavaUdfZetaSqlType(createFunctionStmt.getReturnType(), functionName);
String jarPath = getJarPath(createFunctionStmt);
ScalarFn scalarFn =
javaUdfLoader.loadScalarFunction(createFunctionStmt.getNamePath(), jarPath);
@@ -174,29 +187,14 @@
ImmutableList.of(createFunctionStmt.getSignature())));
}
- void validateJavaUdf(ResolvedNodes.ResolvedCreateFunctionStmt createFunctionStmt) {
- for (FunctionArgumentType argumentType :
- createFunctionStmt.getSignature().getFunctionArgumentList()) {
- Type type = argumentType.getType();
- if (type == null) {
- throw new UnsupportedOperationException("UDF templated argument types are not supported.");
- }
- validateJavaUdfZetaSqlType(type);
- }
- if (createFunctionStmt.getReturnType() == null) {
- throw new IllegalArgumentException("UDF return type must not be null.");
- }
- validateJavaUdfZetaSqlType(createFunctionStmt.getReturnType());
- }
-
/**
* Throws {@link UnsupportedOperationException} if ZetaSQL type is not supported in Java UDF.
* Supported types are a subset of the types supported by {@link BeamJavaUdfCalcRule}.
*
- * <p>Supported types should be kept in sync with {@link
- * #validateJavaUdfCalciteType(RelDataType)}.
+ * <p>Supported types should be kept in sync with {@link #validateJavaUdfCalciteType(RelDataType,
+ * String)}.
*/
- void validateJavaUdfZetaSqlType(Type type) {
+ void validateJavaUdfZetaSqlType(Type type, String functionName) {
switch (type.getKind()) {
case TYPE_BOOL:
case TYPE_BYTES:
@@ -204,17 +202,20 @@
case TYPE_DOUBLE:
case TYPE_INT64:
case TYPE_STRING:
+ case TYPE_TIMESTAMP:
// These types are supported.
break;
+ case TYPE_ARRAY:
+ validateJavaUdfZetaSqlType(type.asArray().getElementType(), functionName);
+ break;
case TYPE_NUMERIC:
case TYPE_TIME:
case TYPE_DATETIME:
- case TYPE_TIMESTAMP:
- case TYPE_ARRAY:
case TYPE_STRUCT:
default:
throw new UnsupportedOperationException(
- "ZetaSQL type not allowed in Java UDF: " + type.getKind().name());
+ String.format(
+ "ZetaSQL type %s not allowed in function %s", type.getKind().name(), functionName));
}
}
@@ -365,7 +366,10 @@
ScalarFunctionImpl scalarFunction = (ScalarFunctionImpl) function;
// Validate types before converting from Calcite to ZetaSQL, since the conversion may fail
// for unsupported types.
- validateScalarFunctionImpl(scalarFunction);
+ for (FunctionParameter parameter : scalarFunction.getParameters()) {
+ validateJavaUdfCalciteType(parameter.getType(typeFactory), functionName);
+ }
+ validateJavaUdfCalciteType(scalarFunction.getReturnType(typeFactory), functionName);
Method method = scalarFunction.method;
javaScalarUdfs.put(path, UserFunctionDefinitions.JavaScalarFunction.create(method, ""));
FunctionArgumentType resultType =
@@ -414,39 +418,37 @@
.collect(Collectors.toList());
}
- private void validateScalarFunctionImpl(ScalarFunctionImpl scalarFunction) {
- for (FunctionParameter parameter : scalarFunction.getParameters()) {
- validateJavaUdfCalciteType(parameter.getType(typeFactory));
- }
- validateJavaUdfCalciteType(scalarFunction.getReturnType(typeFactory));
- }
-
/**
* Throws {@link UnsupportedOperationException} if Calcite type is not supported in Java UDF.
* Supported types are a subset of the corresponding Calcite types supported by {@link
* BeamJavaUdfCalcRule}.
*
- * <p>Supported types should be kept in sync with {@link #validateJavaUdfZetaSqlType(Type)}.
+ * <p>Supported types should be kept in sync with {@link #validateJavaUdfZetaSqlType(Type,
+ * String)}.
*/
- private void validateJavaUdfCalciteType(RelDataType type) {
+ private void validateJavaUdfCalciteType(RelDataType type, String functionName) {
switch (type.getSqlTypeName()) {
case BIGINT:
+ case BOOLEAN:
case DATE:
case DOUBLE:
- case BOOLEAN:
+ case TIMESTAMP:
case VARCHAR:
case VARBINARY:
// These types are supported.
break;
+ case ARRAY:
+ validateJavaUdfCalciteType(type.getComponentType(), functionName);
+ break;
case DECIMAL:
case TIME:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- case TIMESTAMP:
- case ARRAY:
case ROW:
default:
throw new UnsupportedOperationException(
- "Calcite type not allowed in ZetaSQL Java UDF: " + type.getSqlTypeName().getName());
+ String.format(
+ "Calcite type %s not allowed in function %s",
+ type.getSqlTypeName().getName(), functionName));
}
}
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalogTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalogTest.java
index 2196046..94c984c 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalogTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalogTest.java
@@ -25,6 +25,7 @@
import com.google.zetasql.AnalyzerOptions;
import com.google.zetasql.resolvedast.ResolvedNodes;
import java.lang.reflect.Method;
+import java.sql.Time;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
@@ -53,14 +54,14 @@
}
}
- public static class ReturnsArrayFn implements BeamSqlUdf {
- public List<Long> eval() {
- return ImmutableList.of(1L, 2L, 3L);
+ public static class ReturnsArrayTimeFn implements BeamSqlUdf {
+ public List<Time> eval() {
+ return ImmutableList.of(new Time(0));
}
}
- public static class TakesArrayFn implements BeamSqlUdf {
- public Long eval(List<Long> ls) {
+ public static class TakesArrayTimeFn implements BeamSqlUdf {
+ public Long eval(List<Time> ls) {
return 0L;
}
}
@@ -92,10 +93,10 @@
public void rejectsScalarFunctionImplWithUnsupportedReturnType() throws NoSuchMethodException {
JdbcConnection jdbcConnection = createJdbcConnection();
SchemaPlus calciteSchema = jdbcConnection.getCurrentSchemaPlus();
- Method method = ReturnsArrayFn.class.getMethod("eval");
+ Method method = ReturnsArrayTimeFn.class.getMethod("eval");
calciteSchema.add("return_array", ScalarFunctionImpl.create(method));
thrown.expect(UnsupportedOperationException.class);
- thrown.expectMessage("Calcite type not allowed in ZetaSQL Java UDF: ARRAY");
+ thrown.expectMessage("Calcite type TIME not allowed in function return_array");
BeamZetaSqlCatalog beamCatalog =
BeamZetaSqlCatalog.create(
calciteSchema, jdbcConnection.getTypeFactory(), SqlAnalyzer.baseAnalyzerOptions());
@@ -105,10 +106,10 @@
public void rejectsScalarFunctionImplWithUnsupportedParameterType() throws NoSuchMethodException {
JdbcConnection jdbcConnection = createJdbcConnection();
SchemaPlus calciteSchema = jdbcConnection.getCurrentSchemaPlus();
- Method method = TakesArrayFn.class.getMethod("eval", List.class);
+ Method method = TakesArrayTimeFn.class.getMethod("eval", List.class);
calciteSchema.add("take_array", ScalarFunctionImpl.create(method));
thrown.expect(UnsupportedOperationException.class);
- thrown.expectMessage("Calcite type not allowed in ZetaSQL Java UDF: ARRAY");
+ thrown.expectMessage("Calcite type TIME not allowed in function take_array");
BeamZetaSqlCatalog beamCatalog =
BeamZetaSqlCatalog.create(
calciteSchema, jdbcConnection.getTypeFactory(), SqlAnalyzer.baseAnalyzerOptions());
@@ -125,14 +126,14 @@
analyzerOptions);
String sql =
- "CREATE FUNCTION foo() RETURNS ARRAY<INT64> LANGUAGE java OPTIONS (path='/does/not/exist');";
+ "CREATE FUNCTION foo() RETURNS ARRAY<TIME> LANGUAGE java OPTIONS (path='/does/not/exist');";
ResolvedNodes.ResolvedStatement resolvedStatement =
Analyzer.analyzeStatement(sql, analyzerOptions, beamCatalog.getZetaSqlCatalog());
ResolvedNodes.ResolvedCreateFunctionStmt createFunctionStmt =
(ResolvedNodes.ResolvedCreateFunctionStmt) resolvedStatement;
thrown.expect(UnsupportedOperationException.class);
- thrown.expectMessage("ZetaSQL type not allowed in Java UDF: TYPE_ARRAY");
+ thrown.expectMessage("ZetaSQL type TYPE_TIME not allowed in function foo");
beamCatalog.addFunction(createFunctionStmt);
}
@@ -147,14 +148,14 @@
analyzerOptions);
String sql =
- "CREATE FUNCTION foo(a ARRAY<INT64>) RETURNS INT64 LANGUAGE java OPTIONS (path='/does/not/exist');";
+ "CREATE FUNCTION foo(a ARRAY<TIME>) RETURNS INT64 LANGUAGE java OPTIONS (path='/does/not/exist');";
ResolvedNodes.ResolvedStatement resolvedStatement =
Analyzer.analyzeStatement(sql, analyzerOptions, beamCatalog.getZetaSqlCatalog());
ResolvedNodes.ResolvedCreateFunctionStmt createFunctionStmt =
(ResolvedNodes.ResolvedCreateFunctionStmt) resolvedStatement;
thrown.expect(UnsupportedOperationException.class);
- thrown.expectMessage("ZetaSQL type not allowed in Java UDF: TYPE_ARRAY");
+ thrown.expectMessage("ZetaSQL type TYPE_TIME not allowed in function foo");
beamCatalog.addFunction(createFunctionStmt);
}
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTypeTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTypeTest.java
index 98fb25f..fda9ef8 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTypeTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTypeTest.java
@@ -18,7 +18,9 @@
package org.apache.beam.sdk.extensions.sql.zetasql;
import java.sql.Date;
+import java.sql.Timestamp;
import java.time.LocalDate;
+import java.util.List;
import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
@@ -36,7 +38,10 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.SchemaPlus;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.Frameworks;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.junit.Before;
import org.junit.Rule;
@@ -77,6 +82,8 @@
.addDoubleField("float64_neg_inf")
.addDoubleField("float64_nan")
.addLogicalTypeField("f_date", SqlTypes.DATE)
+ .addDateTimeField("f_timestamp")
+ .addArrayField("array_int64", Schema.FieldType.INT64)
.build())
.addRows(
true /* boolean_true */,
@@ -101,7 +108,9 @@
Double.POSITIVE_INFINITY /* float64_inf */,
Double.NEGATIVE_INFINITY /* float64_neg_inf */,
Double.NaN /* float64_nan */,
- LocalDate.of(2021, 4, 26) /* f_date */);
+ LocalDate.of(2021, 4, 26) /* f_date */,
+ new DateTime(2021, 5, 6, 3, 48, 32, DateTimeZone.UTC) /* f_timestamp */,
+ ImmutableList.of(1L, 2L, 3L) /* array_int64 */);
@Before
public void setUp() throws NoSuchMethodException {
@@ -132,6 +141,12 @@
ScalarFunctionImpl.create(DoubleIdentityFn.class.getMethod("eval", Double.class)));
schema.add(
"test_date", ScalarFunctionImpl.create(DateIdentityFn.class.getMethod("eval", Date.class)));
+ schema.add(
+ "test_timestamp",
+ ScalarFunctionImpl.create(TimestampIdentityFn.class.getMethod("eval", Timestamp.class)));
+ schema.add(
+ "test_array",
+ ScalarFunctionImpl.create(ListIdentityFn.class.getMethod("eval", List.class)));
this.config = Frameworks.newConfigBuilder(config).defaultSchema(schema).build();
}
@@ -172,6 +187,18 @@
}
}
+ public static class TimestampIdentityFn implements BeamSqlUdf {
+ public Timestamp eval(Timestamp input) {
+ return input;
+ }
+ }
+
+ public static class ListIdentityFn implements BeamSqlUdf {
+ public List<Long> eval(List<Long> input) {
+ return input;
+ }
+ }
+
private void runUdfTypeTest(String query, Object result, Schema.TypeName typeName) {
runUdfTypeTest(query, result, Schema.FieldType.of(typeName));
}
@@ -452,8 +479,7 @@
@Test
public void testDateLiteral() {
- runUdfTypeTest(
- "SELECT test_date('2021-04-26') FROM table;", LocalDate.of(2021, 4, 26), SqlTypes.DATE);
+ runUdfTypeTest("SELECT test_date('2021-04-26');", LocalDate.of(2021, 4, 26), SqlTypes.DATE);
}
@Test
@@ -461,4 +487,36 @@
runUdfTypeTest(
"SELECT test_date(f_date) FROM table;", LocalDate.of(2021, 4, 26), SqlTypes.DATE);
}
+
+ @Test
+ public void testTimestampLiteral() {
+ runUdfTypeTest(
+ "SELECT test_timestamp('2021-05-06 03:48:32Z');",
+ new DateTime(2021, 5, 6, 3, 48, 32, DateTimeZone.UTC),
+ Schema.TypeName.DATETIME);
+ }
+
+ @Test
+ public void testTimestampInput() {
+ runUdfTypeTest(
+ "SELECT test_timestamp(f_timestamp) FROM table;",
+ new DateTime(2021, 5, 6, 3, 48, 32, DateTimeZone.UTC),
+ Schema.TypeName.DATETIME);
+ }
+
+ @Test
+ public void testArrayLiteral() {
+ runUdfTypeTest(
+ "SELECT test_array(ARRAY<INT64>[1, 2, 3]);",
+ ImmutableList.of(1L, 2L, 3L),
+ Schema.FieldType.array(Schema.FieldType.INT64));
+ }
+
+ @Test
+ public void testArrayInput() {
+ runUdfTypeTest(
+ "SELECT test_array(array_int64) FROM table;",
+ ImmutableList.of(1L, 2L, 3L),
+ Schema.FieldType.array(Schema.FieldType.INT64));
+ }
}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 76fa14c..fd14ab4 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -208,7 +208,7 @@
include(":sdks:python:test-suites:tox:py38")
include(":vendor:grpc-1_26_0")
include(":vendor:grpc-1_36_0")
-include(":vendor:bytebuddy-1_10_8")
+include(":vendor:bytebuddy-1_11_0")
include(":vendor:calcite-1_26_0")
include(":vendor:guava-26_0-jre")
include(":website")
diff --git a/vendor/bytebuddy-1_10_8/build.gradle.kts b/vendor/bytebuddy-1_11_0/build.gradle.kts
similarity index 89%
rename from vendor/bytebuddy-1_10_8/build.gradle.kts
rename to vendor/bytebuddy-1_11_0/build.gradle.kts
index dee66ae..b55c4ed 100644
--- a/vendor/bytebuddy-1_10_8/build.gradle.kts
+++ b/vendor/bytebuddy-1_11_0/build.gradle.kts
@@ -18,7 +18,7 @@
plugins { id("org.apache.beam.vendor-java") }
-description = "Apache Beam :: Vendored Dependencies :: ByteBuddy :: 1.10.8"
+description = "Apache Beam :: Vendored Dependencies :: ByteBuddy :: 1.11.0"
group = "org.apache.beam"
version = "0.1"
@@ -26,14 +26,14 @@
val vendorJava = project.extensions.extraProperties.get("vendorJava") as groovy.lang.Closure<*>
vendorJava(
mapOf(
- "dependencies" to listOf("net.bytebuddy:byte-buddy:1.10.8"),
+ "dependencies" to listOf("net.bytebuddy:byte-buddy:1.11.0"),
"relocations" to mapOf(
- "net.bytebuddy" to "org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy"),
+ "net.bytebuddy" to "org.apache.beam.vendor.bytebuddy.v1_11_0.net.bytebuddy"),
"exclusions" to listOf(
"**/module-info.class"
),
"groupId" to group,
- "artifactId" to "beam-vendor-bytebuddy-1_10_8",
+ "artifactId" to "beam-vendor-bytebuddy-1_11_0",
"version" to version
)
)