| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.extensions.sql.jdbc; |
| |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| import static org.apache.beam.sdk.extensions.sql.jdbc.BeamSqlLineTestingUtils.buildArgs; |
| import static org.apache.beam.sdk.extensions.sql.jdbc.BeamSqlLineTestingUtils.toLines; |
| import static org.hamcrest.CoreMatchers.everyItem; |
| import static org.junit.Assert.assertThat; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.Serializable; |
| import java.text.ParseException; |
| import java.text.SimpleDateFormat; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.TimeZone; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; |
| import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; |
| import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub; |
| import org.apache.beam.sdk.testing.TestPipeline; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; |
| import org.codehaus.jackson.map.ObjectMapper; |
| import org.codehaus.jackson.node.ObjectNode; |
| import org.hamcrest.collection.IsIn; |
| import org.joda.time.Duration; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Rule; |
| import org.junit.Test; |
| |
| /** BeamSqlLine integration tests. */ |
| public class BeamSqlLineIT implements Serializable { |
| |
| @Rule public transient TestPubsub eventsTopic = TestPubsub.create(); |
| |
| private static String project; |
| private static String createPubsubTableStatement; |
| private static String setProject; |
| private static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
| |
| private ExecutorService pool; |
| |
| @BeforeClass |
| public static void setUpClass() { |
| project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); |
| |
| setProject = String.format("SET project = '%s';", project); |
| |
| createPubsubTableStatement = |
| "CREATE EXTERNAL TABLE taxi_rides (\n" |
| + " event_timestamp TIMESTAMP,\n" |
| + " attributes MAP<VARCHAR, VARCHAR>,\n" |
| + " payload ROW<\n" |
| + " ride_id VARCHAR,\n" |
| + " point_idx INT,\n" |
| + " latitude DOUBLE,\n" |
| + " longitude DOUBLE,\n" |
| + " meter_reading DOUBLE,\n" |
| + " meter_increment DOUBLE,\n" |
| + " ride_status VARCHAR,\n" |
| + " passenger_count TINYINT>)\n" |
| + " TYPE pubsub \n" |
| + " LOCATION '%s'\n" |
| + " TBLPROPERTIES '{\"timestampAttributeKey\": \"ts\"}';"; |
| |
| dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); |
| } |
| |
| @Before |
| public void setUp() { |
| pool = Executors.newFixedThreadPool(1); |
| } |
| |
| @After |
| public void tearDown() { |
| pool.shutdown(); |
| } |
| |
| @Test |
| public void testSelectFromPubsub() throws Exception { |
| String[] args = |
| buildArgs( |
| String.format(createPubsubTableStatement, eventsTopic.topicPath()), |
| setProject, |
| "SELECT event_timestamp, taxi_rides.payload.ride_status, taxi_rides.payload.latitude, " |
| + "taxi_rides.payload.longitude from taxi_rides LIMIT 3;"); |
| |
| Future<List<List<String>>> expectedResult = runQueryInBackground(args); |
| eventsTopic.checkIfAnySubscriptionExists(project, Duration.standardMinutes(1)); |
| |
| List<PubsubMessage> messages = |
| ImmutableList.of( |
| message( |
| convertTimestampToMillis("2018-07-01 21:25:20"), |
| taxiRideJSON("id1", 1, 40.702, -74.001, 1000, 10, "enroute", 2)), |
| message( |
| convertTimestampToMillis("2018-07-01 21:26:06"), |
| taxiRideJSON("id2", 2, 40.703, -74.002, 1000, 10, "enroute", 4)), |
| message( |
| convertTimestampToMillis("2018-07-02 13:26:06"), |
| taxiRideJSON("id3", 3, 30.0, -72.32324, 2000, 20, "enroute", 7))); |
| |
| eventsTopic.publish(messages); |
| |
| assertThat( |
| Arrays.asList( |
| Arrays.asList("2018-07-01 21:25:20", "enroute", "40.702", "-74.001"), |
| Arrays.asList("2018-07-01 21:26:06", "enroute", "40.703", "-74.002"), |
| Arrays.asList("2018-07-02 13:26:06", "enroute", "30.0", "-72.32324")), |
| everyItem(IsIn.isOneOf(expectedResult.get(30, TimeUnit.SECONDS).toArray()))); |
| } |
| |
| @Test |
| public void testFilterForSouthManhattan() throws Exception { |
| String[] args = |
| buildArgs( |
| String.format(createPubsubTableStatement, eventsTopic.topicPath()), |
| setProject, |
| "SELECT event_timestamp, taxi_rides.payload.ride_status, \n" |
| + "taxi_rides.payload.latitude, taxi_rides.payload.longitude from taxi_rides\n" |
| + " WHERE taxi_rides.payload.longitude > -74.747\n" |
| + " AND taxi_rides.payload.longitude < -73.969\n" |
| + " AND taxi_rides.payload.latitude > 40.699\n" |
| + " AND taxi_rides.payload.latitude < 40.720 LIMIT 2;"); |
| |
| Future<List<List<String>>> expectedResult = runQueryInBackground(args); |
| eventsTopic.checkIfAnySubscriptionExists(project, Duration.standardMinutes(1)); |
| |
| List<PubsubMessage> messages = |
| ImmutableList.of( |
| message( |
| convertTimestampToMillis("2018-07-01 21:25:20"), |
| taxiRideJSON("id1", 1, 40.701, -74.001, 1000, 10, "enroute", 2)), |
| message( |
| convertTimestampToMillis("2018-07-01 21:26:06"), |
| taxiRideJSON("id2", 2, 40.702, -74.002, 1000, 10, "enroute", 4)), |
| message( |
| convertTimestampToMillis("2018-07-02 13:26:06"), |
| taxiRideJSON("id3", 3, 30, -72.32324, 2000, 20, "enroute", 7)), |
| message( |
| convertTimestampToMillis("2018-07-02 14:28:22"), |
| taxiRideJSON("id4", 4, 34, -73.32324, 2000, 20, "enroute", 8))); |
| |
| eventsTopic.publish(messages); |
| |
| assertThat( |
| Arrays.asList( |
| Arrays.asList("2018-07-01 21:25:20", "enroute", "40.701", "-74.001"), |
| Arrays.asList("2018-07-01 21:26:06", "enroute", "40.702", "-74.002")), |
| everyItem(IsIn.isOneOf(expectedResult.get(30, TimeUnit.SECONDS).toArray()))); |
| } |
| |
| private String taxiRideJSON( |
| String rideId, |
| int pointIdex, |
| double latitude, |
| double longitude, |
| int meterReading, |
| int meterIncrement, |
| String rideStatus, |
| int passengerCount) { |
| ObjectMapper mapper = new ObjectMapper(); |
| ObjectNode objectNode = mapper.createObjectNode(); |
| objectNode.put("ride_id", rideId); |
| objectNode.put("point_idx", pointIdex); |
| objectNode.put("latitude", latitude); |
| objectNode.put("longitude", longitude); |
| objectNode.put("meter_reading", meterReading); |
| objectNode.put("meter_increment", meterIncrement); |
| objectNode.put("ride_status", rideStatus); |
| objectNode.put("passenger_count", passengerCount); |
| return objectNode.toString(); |
| } |
| |
| private Future<List<List<String>>> runQueryInBackground(String[] args) { |
| return pool.submit( |
| (Callable) |
| () -> { |
| ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); |
| BeamSqlLine.runSqlLine(args, null, outputStream, null); |
| return toLines(outputStream); |
| }); |
| } |
| |
| private long convertTimestampToMillis(String timestamp) throws ParseException { |
| return dateFormat.parse(timestamp).getTime(); |
| } |
| |
| private PubsubMessage message(long timestampInMillis, String jsonPayload) { |
| return new PubsubMessage( |
| jsonPayload.getBytes(UTF_8), ImmutableMap.of("ts", String.valueOf(timestampInMillis))); |
| } |
| } |