[FLINK-9311] [pubsub] Clean up / add documentation and style issues in the PubSub connector
diff --git a/flink-examples-streaming-gcp-pubsub/pom.xml b/flink-examples-streaming-gcp-pubsub/pom.xml
new file mode 100644
index 0000000..ab1c91f
--- /dev/null
+++ b/flink-examples-streaming-gcp-pubsub/pom.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flink-examples-build-helper</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.9-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-examples-streaming-gcp-pubsub_${scala.binary.version}</artifactId>
+ <name>flink-examples-streaming-gcp-pubsub</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-gcp-pubsub_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-examples-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <finalName>PubSub</finalName>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <shadeTestJar>false</shadeTestJar>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <createDependencyReducedPom>false</createDependencyReducedPom>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.flink.streaming.examples.gcp.pubsub.PubSubExample</mainClass>
+ </transformer>
+ </transformers>
+ <artifactSet>
+ <includes>
+ <include>org.apache.flink:flink-connector-gcp-pubsub*</include>
+ <include>com.google.cloud:google-cloud-pubsub</include>
+ <include>com.google.*:*</include>
+ <include>org.threeten:*</include>
+ <include>io.grpc:*</include>
+ <include>io.opencensus:*</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>com.google</pattern>
+ <shadedPattern>org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google</shadedPattern>
+ </relocation>
+ </relocations>
+
+ <filters>
+ <filter>
+ <artifact>org.apache.flink:flink-examples-streaming_*</artifact>
+ <includes>
+ <include>org/apache/flink/streaming/examples/gcp/pubsub/**</include>
+ </includes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/IntegerSerializer.java b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/IntegerSerializer.java
new file mode 100644
index 0000000..3c1eab4
--- /dev/null
+++ b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/IntegerSerializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.gcp.pubsub;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+
+import com.google.pubsub.v1.PubsubMessage;
+
+import java.io.IOException;
+import java.math.BigInteger;
+
+/**
+ * Deserialization schema to deserialize messages produced by {@link PubSubPublisher}.
+ * The byte[] received by this schema must contain a single Integer.
+ */
+class IntegerSerializer implements PubSubDeserializationSchema<Integer>, SerializationSchema<Integer> {
+
+ @Override
+ public Integer deserialize(PubsubMessage message) throws IOException {
+ return new BigInteger(message.getData().toByteArray()).intValue();
+ }
+
+ @Override
+ public boolean isEndOfStream(Integer integer) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<Integer> getProducedType() {
+ return TypeInformation.of(Integer.class);
+ }
+
+ @Override
+ public byte[] serialize(Integer integer) {
+ return BigInteger.valueOf(integer).toByteArray();
+ }
+}
diff --git a/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
new file mode 100644
index 0000000..7b66577
--- /dev/null
+++ b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.gcp.pubsub;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
+import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple PubSub example.
+ *
+ * <p>Before starting a flink job it will publish 10 messages on the input topic.
+ *
+ * Then a flink job is started to read these 10 messages from the input-subscription,
+ * it will print them to stdout
+ * and then write them to a the output-topic.</p>
+ */
+public class PubSubExample {
+ private static final Logger LOG = LoggerFactory.getLogger(PubSubExample.class);
+
+ public static void main(String[] args) throws Exception {
+ // parse input arguments
+ final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+ if (parameterTool.getNumberOfParameters() < 3) {
+ System.out.println("Missing parameters!\n" +
+ "Usage: flink run PubSub.jar --input-subscription <subscription> --input-topicName <topic> --output-topicName <output-topic> " +
+ "--google-project <google project name> ");
+ return;
+ }
+
+ String projectName = parameterTool.getRequired("google-project");
+ String inputTopicName = parameterTool.getRequired("input-topicName");
+ String subscriptionName = parameterTool.getRequired("input-subscription");
+ String outputTopicName = parameterTool.getRequired("output-topicName");
+
+ PubSubPublisher pubSubPublisher = new PubSubPublisher(projectName, inputTopicName);
+ pubSubPublisher.publish(10);
+
+ runFlinkJob(projectName, subscriptionName, outputTopicName);
+ }
+
+ private static void runFlinkJob(String projectName, String subscriptionName, String outputTopicName) throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(1000L);
+
+ env.addSource(PubSubSource.newBuilder(Integer.class)
+ .withDeserializationSchema(new IntegerSerializer())
+ .withProjectName(projectName)
+ .withSubscriptionName(subscriptionName)
+ .build())
+ .map(PubSubExample::printAndReturn).disableChaining()
+ .addSink(PubSubSink.newBuilder(Integer.class)
+ .withSerializationSchema(new IntegerSerializer())
+ .withProjectName(projectName)
+ .withTopicName(outputTopicName).build());
+
+ env.execute("Flink Streaming PubSubReader");
+ }
+
+ private static Integer printAndReturn(Integer i) {
+ LOG.info("Processed message with payload: " + i);
+ return i;
+ }
+}
diff --git a/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubPublisher.java b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubPublisher.java
new file mode 100644
index 0000000..8f7bfe6
--- /dev/null
+++ b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubPublisher.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.gcp.pubsub;
+
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+
+import java.math.BigInteger;
+
+/**
+ * Helper class to send PubSubMessages to a PubSub topic.
+ */
+class PubSubPublisher {
+ private final String projectName;
+ private final String topicName;
+
+ PubSubPublisher(String projectName, String topicName) {
+ this.projectName = projectName;
+ this.topicName = topicName;
+ }
+
+ /**
+ * Publish messages with as payload a single integer.
+ * The integers inside the messages start from 0 and increase by one for each message send.
+ * @param amountOfMessages amount of messages to send
+ */
+ void publish(int amountOfMessages) {
+ Publisher publisher = null;
+ try {
+ publisher = Publisher.newBuilder(ProjectTopicName.of(projectName, topicName)).build();
+ for (int i = 0; i < amountOfMessages; i++) {
+ ByteString messageData = ByteString.copyFrom(BigInteger.valueOf(i).toByteArray());
+ PubsubMessage message = PubsubMessage.newBuilder().setData(messageData).build();
+ publisher.publish(message).get();
+
+ System.out.println("Published message: " + i);
+ Thread.sleep(100L);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ if (publisher != null) {
+ publisher.shutdown();
+ }
+ } catch (Exception e) {
+ }
+ }
+ }
+}