| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.apex.malhar.sql; |
| |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.Date; |
| import java.util.List; |
| import java.util.Properties; |
| import java.util.TimeZone; |
| |
| import javax.validation.ConstraintViolationException; |
| |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import org.apache.apex.malhar.contrib.formatter.CsvFormatter; |
| import org.apache.apex.malhar.contrib.parser.CsvParser; |
| import org.apache.apex.malhar.kafka.EmbeddedKafka; |
| import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator; |
| import org.apache.apex.malhar.kafka.KafkaSinglePortOutputOperator; |
| import org.apache.apex.malhar.sql.table.KafkaEndpoint; |
| import org.apache.apex.malhar.sql.table.StreamEndpoint; |
| |
| import org.apache.hadoop.conf.Configuration; |
| |
| import org.apache.kafka.clients.consumer.ConsumerConfig; |
| import org.apache.kafka.clients.producer.ProducerConfig; |
| |
| import com.google.common.collect.ImmutableMap; |
| |
| import com.datatorrent.api.DAG; |
| import com.datatorrent.api.LocalMode; |
| import com.datatorrent.api.StreamingApplication; |
| |
| public class StreamEndpointTest |
| { |
| private String testTopicData0 = "dataTopic0"; |
| private String testTopicResult = "resultTopic"; |
| |
| private EmbeddedKafka kafka; |
| |
| private TimeZone defaultTZ; |
| |
| @Before |
| public void setup() throws IOException |
| { |
| defaultTZ = TimeZone.getDefault(); |
| TimeZone.setDefault(TimeZone.getTimeZone("GMT")); |
| |
| kafka = new EmbeddedKafka(); |
| kafka.start(); |
| kafka.createTopic(testTopicData0); |
| kafka.createTopic(testTopicResult); |
| } |
| |
| @After |
| public void tearDown() throws IOException |
| { |
| kafka.stop(); |
| |
| TimeZone.setDefault(defaultTZ); |
| } |
| |
| @Test |
| public void testApplicationWithPortEndpoint() |
| { |
| try { |
| LocalMode lma = LocalMode.newInstance(); |
| Configuration conf = new Configuration(false); |
| lma.prepareDAG(new KafkaPortApplication(kafka.getBroker(), testTopicData0, testTopicResult), conf); |
| LocalMode.Controller lc = lma.getController(); |
| lc.runAsync(); |
| |
| kafka.publish(testTopicData0, Arrays.asList("15/02/2016 10:15:00 +0000,1,paint1,11", |
| "15/02/2016 10:16:00 +0000,2,paint2,12", |
| "15/02/2016 10:17:00 +0000,3,paint3,13", "15/02/2016 10:18:00 +0000,4,paint4,14", |
| "15/02/2016 10:19:00 +0000,5,paint5,15", "15/02/2016 10:10:00 +0000,6,abcde6,16")); |
| |
| // TODO: Workaround to add \r\n char to test results because of bug in CsvFormatter which adds new line char. |
| String[] expectedLines = new String[] {"15/02/2016 10:18:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT4\r\n", |
| "15/02/2016 10:19:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT5\r\n"}; |
| |
| List<String> consume = kafka.consume(testTopicResult, 30000); |
| |
| Assert.assertTrue(Arrays.deepEquals(consume.toArray(new String[consume.size()]), expectedLines)); |
| |
| lc.shutdown(); |
| } catch (ConstraintViolationException e) { |
| Assert.fail("constraint violations: " + e.getConstraintViolations()); |
| } catch (Exception e) { |
| Assert.fail("Exception: " + e); |
| } |
| } |
| |
| public static class KafkaPortApplication implements StreamingApplication |
| { |
| private String broker; |
| private String sourceTopic; |
| private String destTopic; |
| |
| public KafkaPortApplication(String broker, String sourceTopic, String destTopic) |
| { |
| this.broker = broker; |
| this.sourceTopic = sourceTopic; |
| this.destTopic = destTopic; |
| } |
| |
| @Override |
| public void populateDAG(DAG dag, Configuration conf) |
| { |
| String schemaIn = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + |
| "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," + |
| "{\"name\":\"id\",\"type\":\"Integer\"}," + |
| "{\"name\":\"Product\",\"type\":\"String\"}," + |
| "{\"name\":\"units\",\"type\":\"Integer\"}]}"; |
| String schemaOut = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" + |
| "{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," + |
| "{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy HH:mm:ss Z\"}}," + |
| "{\"name\":\"Product\",\"type\":\"String\"}]}"; |
| |
| KafkaSinglePortInputOperator kafkaInput = dag.addOperator("KafkaInput", KafkaSinglePortInputOperator.class); |
| kafkaInput.setTopics(sourceTopic); |
| kafkaInput.setInitialOffset("EARLIEST"); |
| Properties props = new Properties(); |
| props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); |
| props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaEndpoint.KEY_DESERIALIZER); |
| props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaEndpoint.VALUE_DESERIALIZER); |
| kafkaInput.setConsumerProps(props); |
| kafkaInput.setClusters(broker); |
| |
| CsvParser csvParser = dag.addOperator("CSVParser", CsvParser.class); |
| csvParser.setSchema(schemaIn); |
| |
| dag.addStream("KafkaToCSV", kafkaInput.outputPort, csvParser.in); |
| |
| CsvFormatter formatter = dag.addOperator("CSVFormatter", CsvFormatter.class); |
| formatter.setSchema(schemaOut); |
| |
| KafkaSinglePortOutputOperator kafkaOutput = dag.addOperator("KafkaOutput", KafkaSinglePortOutputOperator.class); |
| kafkaOutput.setTopic(destTopic); |
| |
| props = new Properties(); |
| props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaEndpoint.VALUE_SERIALIZER); |
| props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaEndpoint.KEY_SERIALIZER); |
| props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); |
| kafkaOutput.setProperties(props); |
| |
| dag.addStream("CSVToKafka", formatter.out, kafkaOutput.inputPort); |
| |
| SQLExecEnvironment.getEnvironment() |
| .registerTable("ORDERS", new StreamEndpoint(csvParser.out, InputPOJO.class)) |
| .registerTable("SALES", new StreamEndpoint(formatter.in, |
| ImmutableMap.<String, Class>of("RowTime1", Date.class, "RowTime2", Date.class, "Product", String.class))) |
| .registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str") |
| .executeSQL(dag, "INSERT INTO SALES " + "SELECT STREAM ROWTIME, " + "FLOOR(ROWTIME TO DAY), " + |
| "APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) " + "FROM ORDERS WHERE ID > 3 " + "AND " + |
| "PRODUCT LIKE 'paint%'"); |
| } |
| } |
| } |