[FLINK-9311] [pubsub] Clean up / add documentation and style issues in the PubSub connector
diff --git a/flink-examples-streaming-gcp-pubsub/pom.xml b/flink-examples-streaming-gcp-pubsub/pom.xml
new file mode 100644
index 0000000..ab1c91f
--- /dev/null
+++ b/flink-examples-streaming-gcp-pubsub/pom.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<artifactId>flink-examples-build-helper</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.9-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-examples-streaming-gcp-pubsub_${scala.binary.version}</artifactId>
+	<name>flink-examples-streaming-gcp-pubsub</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-gcp-pubsub_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-examples-streaming_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<finalName>PubSub</finalName>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-deploy-plugin</artifactId>
+				<configuration>
+					<skip>true</skip>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<shadeTestJar>false</shadeTestJar>
+							<shadedArtifactAttached>false</shadedArtifactAttached>
+							<createDependencyReducedPom>false</createDependencyReducedPom>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.streaming.examples.gcp.pubsub.PubSubExample</mainClass>
+								</transformer>
+							</transformers>
+							<artifactSet>
+								<includes>
+									<include>org.apache.flink:flink-connector-gcp-pubsub*</include>
+									<include>com.google.cloud:google-cloud-pubsub</include>
+									<include>com.google.*:*</include>
+									<include>org.threeten:*</include>
+									<include>io.grpc:*</include>
+									<include>io.opencensus:*</include>
+								</includes>
+							</artifactSet>
+							<relocations>
+								<relocation>
+									<pattern>com.google</pattern>
+									<shadedPattern>org.apache.flink.streaming.examples.gcp.pubsub.shaded.com.google</shadedPattern>
+								</relocation>
+							</relocations>
+
+							<filters>
+								<filter>
+									<artifact>org.apache.flink:flink-examples-streaming_*</artifact>
+									<includes>
+										<include>org/apache/flink/streaming/examples/gcp/pubsub/**</include>
+									</includes>
+								</filter>
+							</filters>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>
diff --git a/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/IntegerSerializer.java b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/IntegerSerializer.java
new file mode 100644
index 0000000..3c1eab4
--- /dev/null
+++ b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/IntegerSerializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.gcp.pubsub;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+
+import com.google.pubsub.v1.PubsubMessage;
+
+import java.io.IOException;
+import java.math.BigInteger;
+
+/**
+ * Deserialization schema to deserialize messages produced by {@link PubSubPublisher}.
+ * The byte[] received by this schema must contain a single Integer.
+ */
+class IntegerSerializer implements PubSubDeserializationSchema<Integer>, SerializationSchema<Integer> {
+
+	@Override
+	public Integer deserialize(PubsubMessage message) throws IOException {
+		return new BigInteger(message.getData().toByteArray()).intValue();
+	}
+
+	@Override
+	public boolean isEndOfStream(Integer integer) {
+		return false;
+	}
+
+	@Override
+	public TypeInformation<Integer> getProducedType() {
+		return TypeInformation.of(Integer.class);
+	}
+
+	@Override
+	public byte[] serialize(Integer integer) {
+		return BigInteger.valueOf(integer).toByteArray();
+	}
+}
diff --git a/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
new file mode 100644
index 0000000..7b66577
--- /dev/null
+++ b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubExample.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.gcp.pubsub;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
+import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple PubSub example.
+ *
+ * <p>Before starting a flink job it will publish 10 messages on the input topic.
+ *
+ * Then a flink job is started to read these 10 messages from the input-subscription,
+ * it will print them to stdout
+ * and then write them to a the output-topic.</p>
+ */
+public class PubSubExample {
+	private static final Logger LOG = LoggerFactory.getLogger(PubSubExample.class);
+
+	public static void main(String[] args) throws Exception {
+		// parse input arguments
+		final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+		if (parameterTool.getNumberOfParameters() < 3) {
+			System.out.println("Missing parameters!\n" +
+								"Usage: flink run PubSub.jar --input-subscription <subscription> --input-topicName <topic> --output-topicName <output-topic> " +
+								"--google-project <google project name> ");
+			return;
+		}
+
+		String projectName = parameterTool.getRequired("google-project");
+		String inputTopicName = parameterTool.getRequired("input-topicName");
+		String subscriptionName = parameterTool.getRequired("input-subscription");
+		String outputTopicName = parameterTool.getRequired("output-topicName");
+
+		PubSubPublisher pubSubPublisher = new PubSubPublisher(projectName, inputTopicName);
+		pubSubPublisher.publish(10);
+
+		runFlinkJob(projectName, subscriptionName, outputTopicName);
+	}
+
+	private static void runFlinkJob(String projectName, String subscriptionName, String outputTopicName) throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.enableCheckpointing(1000L);
+
+		env.addSource(PubSubSource.newBuilder(Integer.class)
+								.withDeserializationSchema(new IntegerSerializer())
+								.withProjectName(projectName)
+								.withSubscriptionName(subscriptionName)
+								.build())
+			.map(PubSubExample::printAndReturn).disableChaining()
+			.addSink(PubSubSink.newBuilder(Integer.class)
+								.withSerializationSchema(new IntegerSerializer())
+								.withProjectName(projectName)
+								.withTopicName(outputTopicName).build());
+
+		env.execute("Flink Streaming PubSubReader");
+	}
+
+	private static Integer printAndReturn(Integer i) {
+		LOG.info("Processed message with payload: " + i);
+		return i;
+	}
+}
diff --git a/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubPublisher.java b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubPublisher.java
new file mode 100644
index 0000000..8f7bfe6
--- /dev/null
+++ b/flink-examples-streaming-gcp-pubsub/src/main/java/org/apache/flink/streaming/examples/gcp/pubsub/PubSubPublisher.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.gcp.pubsub;
+
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+
+import java.math.BigInteger;
+
+/**
+ * Helper class to send PubSubMessages to a PubSub topic.
+ */
+class PubSubPublisher {
+	private final String projectName;
+	private final String topicName;
+
+	PubSubPublisher(String projectName, String topicName) {
+		this.projectName = projectName;
+		this.topicName = topicName;
+	}
+
+	/**
+	 * Publish messages with as payload a single integer.
+	 * The integers inside the messages start from 0 and increase by one for each message send.
+	 * @param amountOfMessages amount of messages to send
+	 */
+	void publish(int amountOfMessages) {
+		Publisher publisher = null;
+		try {
+			publisher = Publisher.newBuilder(ProjectTopicName.of(projectName, topicName)).build();
+			for (int i = 0; i < amountOfMessages; i++) {
+				ByteString messageData = ByteString.copyFrom(BigInteger.valueOf(i).toByteArray());
+				PubsubMessage message = PubsubMessage.newBuilder().setData(messageData).build();
+				publisher.publish(message).get();
+
+				System.out.println("Published message: " + i);
+				Thread.sleep(100L);
+			}
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		} finally {
+			try {
+				if (publisher != null) {
+					publisher.shutdown();
+				}
+			} catch (Exception e) {
+			}
+		}
+	}
+}