[FLINK-8983] End-to-end test: Confluent schema registry

This closes #6083.
diff --git a/flink-confluent-schema-registry-e2e-tests/pom.xml b/flink-confluent-schema-registry-e2e-tests/pom.xml
new file mode 100644
index 0000000..576fca6
--- /dev/null
+++ b/flink-confluent-schema-registry-e2e-tests/pom.xml
@@ -0,0 +1,155 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.6-SNAPSHOT</version>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-confluent-schema-registry</artifactId>
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<confluent.version>4.1.0</confluent.version>
+	</properties>
+
+	<repositories>
+		<repository>
+			<id>confluent</id>
+			<url>http://packages.confluent.io/maven/</url>
+		</repository>
+	</repositories>
+
+	<dependencies>
+		<!-- Apache Flink dependencies -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<!-- This dependency is required to actually execute jobs. It is currently pulled in by
+                flink-streaming-java, but we explicitly depend on it to safeguard against future changes. -->
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10 -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro-confluent-registry</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>3.0.0</version>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<finalName>TestAvroConsumerConfluent</finalName>
+							<artifactSet>
+								<excludes>
+									<exclude>com.google.code.findbugs:jsr305</exclude>
+								</excludes>
+							</artifactSet>
+							<filters>
+								<filter>
+									<artifact>*:*</artifact>
+									<excludes>
+										<exclude>META-INF/*.SF</exclude>
+										<exclude>META-INF/*.DSA</exclude>
+										<exclude>META-INF/*.RSA</exclude>
+									</excludes>
+								</filter>
+							</filters>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.schema.registry.test.TestAvroConsumerConfluent</mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.avro</groupId>
+				<artifactId>avro-maven-plugin</artifactId>
+				<version>${avro.version}</version>
+				<executions>
+					<execution>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>schema</goal>
+						</goals>
+						<configuration>
+							<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
+							<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
+							<fieldVisibility>PRIVATE</fieldVisibility>
+							<includes>
+								<include>**/*.avsc</include>
+							</includes>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-enforcer-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>dependency-convergence</id>
+						<goals>
+							<goal>enforce</goal>
+						</goals>
+						<configuration>
+							<skip>true</skip>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>
diff --git a/flink-confluent-schema-registry-e2e-tests/src/main/avro/user.avsc b/flink-confluent-schema-registry-e2e-tests/src/main/avro/user.avsc
new file mode 100644
index 0000000..aca9b83
--- /dev/null
+++ b/flink-confluent-schema-registry-e2e-tests/src/main/avro/user.avsc
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ {"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string", "default": ""},
+     {"name": "favoriteNumber",  "type": "string", "default": ""},
+     {"name": "favoriteColor", "type": "string", "default": ""},
+     {"name": "eventType","type": {"name": "EventType","type": "enum", "symbols": ["meeting"] }}
+ ]
+}
diff --git a/flink-confluent-schema-registry-e2e-tests/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java b/flink-confluent-schema-registry-e2e-tests/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
new file mode 100644
index 0000000..9149832
--- /dev/null
+++ b/flink-confluent-schema-registry-e2e-tests/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.schema.registry.test;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
+
+import example.avro.User;
+
+import java.util.Properties;
+
+/**
+ * A simple example that shows how to read from and write to Kafka with Confluent Schema Registry.
+ * This will read AVRO messages from the input topic, parse them into a POJO type via checking the Schema by calling Schema registry.
+ * Then this example publish the POJO type to kafka by converting the POJO to AVRO and verifying the schema.
+ * --input-topic test-input --output-topic test-output --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --schema-registry-url http://localhost:8081 --group.id myconsumer
+ */
+public class TestAvroConsumerConfluent {
+
+	public static void main(String[] args) throws Exception {
+		Properties config = new Properties();
+		// parse input arguments
+		final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+		if (parameterTool.getNumberOfParameters() < 6) {
+			System.out.println("Missing parameters!\n" +
+				"Usage: Kafka --input-topic <topic> --output-topic <topic> " +
+				"--bootstrap.servers <kafka brokers> " +
+				"--zookeeper.connect <zk quorum> " +
+				"--schema-registry-url <confluent schema registry> --group.id <some id>");
+			return;
+		}
+		config.setProperty("bootstrap.servers", parameterTool.getRequired("bootstrap.servers"));
+		config.setProperty("group.id", parameterTool.getRequired("group.id"));
+		config.setProperty("zookeeper.connect", parameterTool.getRequired("zookeeper.connect"));
+		String schemaRegistryUrl = parameterTool.getRequired("schema-registry-url");
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableSysoutLogging();
+
+		DataStreamSource<User> input = env
+			.addSource(
+				new FlinkKafkaConsumer010(
+					parameterTool.getRequired("input-topic"),
+					ConfluentRegistryAvroDeserializationSchema.forSpecific(User.class, schemaRegistryUrl),
+					config).setStartFromEarliest());
+
+		SingleOutputStreamOperator<String> mapToString = input
+			.map(new MapFunction<User, String>() {
+				@Override
+				public String map(User value) throws Exception {
+					return value.toString();
+				}
+			});
+
+		FlinkKafkaProducer010<String> stringFlinkKafkaProducer010 = new FlinkKafkaProducer010(
+			parameterTool.getRequired("output-topic"),
+			new SimpleStringSchema(),
+			config);
+
+		mapToString.addSink(stringFlinkKafkaProducer010);
+		env.execute("Kafka 0.10 Confluent Schema Registry AVRO Example");
+	}
+}