| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.lens.server.query; |
| |
| import static javax.ws.rs.core.MediaType.APPLICATION_XML_TYPE; |
| import static javax.ws.rs.core.Response.Status.*; |
| |
| import static org.apache.lens.server.LensServerTestUtil.DB_WITH_JARS; |
| import static org.apache.lens.server.LensServerTestUtil.DB_WITH_JARS_2; |
| |
| import static org.apache.lens.server.api.LensServerAPITestUtil.getLensConf; |
| |
| import static org.apache.lens.server.api.user.MockDriverQueryHook.*; |
| |
| import static org.apache.lens.server.common.RestAPITestUtil.*; |
| |
| import static org.testng.Assert.*; |
| |
| import java.io.*; |
| import java.net.URLEncoder; |
| import java.sql.Connection; |
| import java.sql.DriverManager; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.util.*; |
| import java.util.function.Predicate; |
| import java.util.stream.Collectors; |
| |
| import javax.ws.rs.NotFoundException; |
| import javax.ws.rs.client.Entity; |
| import javax.ws.rs.client.WebTarget; |
| import javax.ws.rs.core.Application; |
| import javax.ws.rs.core.GenericType; |
| import javax.ws.rs.core.MediaType; |
| import javax.ws.rs.core.Response; |
| |
| import org.apache.lens.api.APIResult; |
| import org.apache.lens.api.LensConf; |
| import org.apache.lens.api.LensSessionHandle; |
| import org.apache.lens.api.Priority; |
| import org.apache.lens.api.jaxb.LensJAXBContextResolver; |
| import org.apache.lens.api.query.*; |
| import org.apache.lens.api.query.QueryStatus.Status; |
| import org.apache.lens.api.result.LensAPIResult; |
| import org.apache.lens.api.result.LensErrorTO; |
| import org.apache.lens.api.result.QueryCostTO; |
| import org.apache.lens.cube.error.LensCubeErrorCode; |
| import org.apache.lens.driver.hive.HiveDriver; |
| import org.apache.lens.lib.query.FilePersistentFormatter; |
| import org.apache.lens.lib.query.FileSerdeFormatter; |
| import org.apache.lens.server.LensJerseyTest; |
| import org.apache.lens.server.LensServerTestUtil; |
| import org.apache.lens.server.LensServices; |
| import org.apache.lens.server.api.LensConfConstants; |
| import org.apache.lens.server.api.driver.*; |
| import org.apache.lens.server.api.error.LensDriverErrorCode; |
| import org.apache.lens.server.api.error.LensException; |
| import org.apache.lens.server.api.metrics.LensMetricsRegistry; |
| import org.apache.lens.server.api.metrics.MetricsService; |
| import org.apache.lens.server.api.query.AbstractQueryContext; |
| import org.apache.lens.server.api.query.QueryContext; |
| import org.apache.lens.server.api.query.QueryExecutionService; |
| import org.apache.lens.server.api.session.SessionService; |
| import org.apache.lens.server.common.ErrorResponseExpectedData; |
| import org.apache.lens.server.common.RestAPITestUtil; |
| import org.apache.lens.server.common.TestDataUtils; |
| import org.apache.lens.server.common.TestResourceFile; |
| import org.apache.lens.server.error.GenericExceptionMapper; |
| import org.apache.lens.server.session.HiveSessionService; |
| import org.apache.lens.server.session.LensSessionImpl; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hive.conf.HiveConf; |
| import org.apache.hadoop.io.IOUtils; |
| |
| import org.glassfish.jersey.media.multipart.FormDataBodyPart; |
| import org.glassfish.jersey.media.multipart.FormDataContentDisposition; |
| import org.glassfish.jersey.media.multipart.FormDataMultiPart; |
| import org.glassfish.jersey.test.TestProperties; |
| import org.junit.Assert; |
| import org.testng.annotations.*; |
| |
| import com.codahale.metrics.MetricRegistry; |
| import com.google.common.base.Optional; |
| import lombok.extern.slf4j.Slf4j; |
| |
| /** |
| * The Class TestQueryService. |
| */ |
| @Slf4j |
| @Test(groups = "unit-test") |
| public class TestQueryService extends LensJerseyTest { |
| |
| /** The query service. */ |
| QueryExecutionServiceImpl queryService; |
| |
| /** The metrics svc. */ |
| MetricsService metricsSvc; |
| |
| /** The lens session id. */ |
| LensSessionHandle lensSessionId; |
| |
| public static class QueryServiceTestApp extends QueryApp { |
| |
| @Override |
| public Set<Class<?>> getClasses() { |
| final Set<Class<?>> classes = super.getClasses(); |
| classes.add(GenericExceptionMapper.class); |
| classes.add(LensJAXBContextResolver.class); |
| classes.add(TestQueryNotifictaionResource.class); |
| return classes; |
| } |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.glassfish.jersey.test.JerseyTest#setUp() |
| */ |
| @BeforeTest |
| public void setUp() throws Exception { |
| super.setUp(); |
| } |
| |
| @BeforeClass |
| public void create() throws Exception { |
| queryService = LensServices.get().getService(QueryExecutionService.NAME); |
| metricsSvc = LensServices.get().getService(MetricsService.NAME); |
| Map<String, String> sessionconf = new HashMap<>(); |
| sessionconf.put("test.session.key", "svalue"); |
| // @localhost should be removed automatically |
| lensSessionId = queryService.openSession("foo@localhost", "bar", sessionconf); |
| |
| //Create Hive table and load data |
| createTable(TEST_TABLE); |
| loadData(TEST_TABLE, TestResourceFile.TEST_DATA2_FILE.getValue()); |
| |
| //Create HSQLDB table and load data |
| createHSQLTableAndLoadData(); |
| } |
| |
| private void createHSQLTableAndLoadData() throws SQLException { |
| Connection conn = DriverManager.getConnection("jdbc:hsqldb:mem:jdbcTestDB;MODE=MYSQL", "sa", ""); |
| String createTableCmd = "create table " + TEST_JDBC_TABLE + " (ID integer, IDSTR varchar(10))"; |
| String loadTableCmd = "Insert into " + TEST_JDBC_TABLE + " values " |
| + "(1, 'one'), (NULL, 'two'), (3, NULL), (NULL, NULL), (5, '')"; |
| Statement statement = conn.createStatement(); |
| int result = statement.executeUpdate(createTableCmd); |
| System.out.print(result); |
| conn.commit(); |
| result = statement.executeUpdate(loadTableCmd); |
| System.out.print(result); |
| statement.close(); |
| conn.commit(); |
| conn.close(); |
| } |
| |
| /* |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.glassfish.jersey.test.JerseyTest#tearDown() |
| */ |
| @AfterTest |
| public void tearDown() throws Exception { |
| super.tearDown(); |
| } |
| |
| @AfterClass |
| public void drop() throws Exception { |
| dropTable(TEST_TABLE); |
| queryService.closeSession(lensSessionId); |
| for (LensDriver driver : queryService.getDrivers()) { |
| if (driver instanceof HiveDriver) { |
| assertFalse(((HiveDriver) driver).hasLensSession(lensSessionId)); |
| } |
| } |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.glassfish.jersey.test.JerseyTest#configure() |
| */ |
| @Override |
| protected Application configure() { |
| enable(TestProperties.LOG_TRAFFIC); |
| enable(TestProperties.DUMP_ENTITY); |
| return new QueryServiceTestApp(); |
| } |
| |
| /** The test table. */ |
| public static final String TEST_TABLE = "TEST_TABLE"; |
| |
| public static final String TEST_JDBC_TABLE = "TEST_JDBC_TABLE"; |
| |
| /** |
| * Creates the table. |
| * |
| * @param tblName the tbl name |
| * @throws InterruptedException the interrupted exception |
| */ |
| private void createTable(String tblName) throws InterruptedException { |
| LensServerTestUtil.createTable(tblName, target(), lensSessionId, defaultMT); |
| } |
| |
| /** |
| * Load data. |
| * |
| * @param tblName the tbl name |
| * @param testDataFile the test data file |
| * @throws InterruptedException the interrupted exception |
| */ |
| private void loadData(String tblName, final String testDataFile) throws InterruptedException { |
| LensServerTestUtil.loadDataFromClasspath(tblName, testDataFile, target(), lensSessionId, defaultMT); |
| } |
| |
| /** |
| * Drop table. |
| * |
| * @param tblName the tbl name |
| * @throws InterruptedException the interrupted exception |
| */ |
| private void dropTable(String tblName) throws InterruptedException { |
| LensServerTestUtil.dropTable(tblName, target(), lensSessionId, defaultMT); |
| } |
| |
| /** |
| * Test get random query. should return 400 |
| */ |
| @Test(dataProvider = "mediaTypeData") |
| public void testGetRandomQuery(MediaType mt) { |
| final WebTarget target = target().path("queryapi/queries"); |
| |
| Response rs = target.path("random").queryParam("sessionid", lensSessionId).request(mt).get(); |
| assertEquals(rs.getStatus(), 400); |
| } |
| |
| @Test |
| public void testLoadingMultipleDrivers() { |
| Collection<LensDriver> drivers = queryService.getDrivers(); |
| assertEquals(drivers.size(), 4); |
| Set<String> driverNames = new HashSet<>(drivers.size()); |
| for (LensDriver driver : drivers) { |
| assertEquals(driver.getConf().get("lens.driver.test.drivername"), driver.getFullyQualifiedName()); |
| driverNames.add(driver.getFullyQualifiedName()); |
| } |
| assertTrue(driverNames.containsAll(Arrays.asList("hive/hive1", "hive/hive2", "jdbc/jdbc1", "mock/fail1"))); |
| } |
| |
| /** |
| * Test rewrite failure in execute operation. |
| * |
| * @throws InterruptedException the interrupted exception |
| */ |
| @Test(dataProvider = "mediaTypeData") |
| public void testRewriteFailureInExecute(MediaType mt) throws InterruptedException { |
| final WebTarget target = target().path("queryapi/queries"); |
| LensConf conf = new LensConf(); |
| final FormDataMultiPart mp = new FormDataMultiPart(); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, mt)); |
| mp.bodyPart( |
| new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID from non_exist_table")); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "execute")); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), conf, mt)); |
| final Response response = target.request(mt).post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE)); |
| assertEquals(response.getStatus(), BAD_REQUEST.getStatusCode()); |
| } |
| |
| /** |
| * Test launch failure in execute operation. |
| * |
| * @throws InterruptedException the interrupted exception |
| */ |
| @Test(dataProvider = "mediaTypeData") |
| public void testLaunchFail(MediaType mt) throws InterruptedException { |
| LensQuery lensQuery = executeAndWaitForQueryToFinish(target(), lensSessionId, "select fail from non_exist", |
| Optional.<LensConf>absent(), Optional.of(Status.FAILED), mt); |
| assertTrue(lensQuery.getSubmissionTime() > 0); |
| assertTrue(lensQuery.getLaunchTime() > 0); |
| assertEquals(lensQuery.getDriverStartTime(), 0); |
| assertEquals(lensQuery.getDriverFinishTime(), 0); |
| assertTrue(lensQuery.getFinishTime() > 0); |
| } |
| |
| /** |
| * Test multiple launches and failure in execute operation. |
| * |
| * @throws InterruptedException the interrupted exception |
| */ |
| @Test(dataProvider = "mediaTypeData") |
| public void testMultipleLaunches(MediaType mt) throws Exception { |
| QueryHandle handle = executeAndGetHandle(target(), Optional.of(lensSessionId), |
| Optional.of("select wait,fail from non_exist"), |
| Optional.<LensConf>absent(), mt); |
| // launch one more. |
| QueryHandle handle2 = executeAndGetHandle(target(), Optional.of(lensSessionId), |
| Optional.of("select wait,fail2 from non_exist"), |
| Optional.<LensConf>absent(), mt); |
| assertNotEquals(handle, handle2); |
| // put a small sleep sothat querysubmitter picks handle2 |
| Thread.sleep(50); |
| assertTrue(queryService.getQueryContext(handle).isLaunching()); |
| assertTrue(queryService.getQueryContext(handle2).isLaunching()); |
| assertTrue(queryService.getLaunchingQueriesCount() > 1); |
| waitForQueryToFinish(target(), lensSessionId, handle, mt); |
| waitForQueryToFinish(target(), lensSessionId, handle2, mt); |
| } |
| |
| @Test |
| public void testPriorityOnMockQuery() throws Exception { |
| String query = "select mock, fail from " + TEST_TABLE; |
| QueryContext ctx = queryService.createContext(query, null, new LensConf(), new Configuration(), 5000L); |
| ctx.setLensSessionIdentifier(lensSessionId.getPublicId().toString()); |
| queryService.acquire(lensSessionId); |
| try { |
| queryService.rewriteAndSelect(ctx); |
| } finally { |
| queryService.release(lensSessionId); |
| } |
| assertNotNull(ctx.getSelectedDriver()); |
| assertEquals(ctx.getPriority(), Priority.NORMAL); |
| } |
| |
| // test with execute async post, get all queries, get query context, |
| // get wrong uuid query |
| |
| /** |
| * Test queries api. |
| * |
| * @throws InterruptedException the interrupted exception |
| */ |
| @Test(dataProvider = "mediaTypeData") |
| public void testQueriesAPI(MediaType mt) throws InterruptedException { |
| // test post execute op |
| final WebTarget target = target().path("queryapi/queries"); |
| |
| long queuedQueries = metricsSvc.getQueuedQueries(); |
| long runningQueries = metricsSvc.getRunningQueries(); |
| long finishedQueries = metricsSvc.getFinishedQueries(); |
| |
| int noOfQueriesBeforeExecution = queryService.allQueries.size(); |
| long before = System.currentTimeMillis(); |
| QueryHandle theHandle = executeAndGetHandle(target(), Optional.of(lensSessionId), Optional.of("select ID from " |
| + TEST_TABLE), Optional.<LensConf>absent(), mt); |
| |
| List<QueryHandle> allQueries = target.queryParam("sessionid", lensSessionId).request(mt) |
| .get(new GenericType<List<QueryHandle>>() {}); |
| assertTrue(allQueries.size() >= 1); |
| assertTrue(allQueries.contains(theHandle)); |
| // time filter |
| allQueries = target.queryParam("sessionid", lensSessionId).queryParam("fromDate", before) |
| .queryParam("toDate", "now").request(mt).get(new GenericType<List<QueryHandle>>() {}); |
| assertEquals(allQueries.size(), 1); |
| assertTrue(allQueries.contains(theHandle)); |
| |
| // status filter |
| allQueries = target.queryParam("sessionid", lensSessionId).queryParam("state", "SUCCESSFUL, CANCELED") |
| .request(mt).get(new GenericType<List<QueryHandle>>() {}); |
| assertTrue(allQueries.size() >= 1); |
| |
| String queryXML = target.path(theHandle.toString()).queryParam("sessionid", lensSessionId) |
| .request(MediaType.APPLICATION_XML).get(String.class); |
| log.debug("query XML:{}", queryXML); |
| |
| Response response = |
| target.path(theHandle.toString() + "001").queryParam("sessionid", lensSessionId).request(mt).get(); |
| assertEquals(response.getStatus(), 404); |
| |
| LensQuery query = target.path(theHandle.toString()).queryParam("sessionid", lensSessionId).request(mt) |
| .get(LensQuery.class); |
| |
| // wait till the query finishes |
| QueryStatus stat = query.getStatus(); |
| while (!stat.finished()) { |
| Thread.sleep(1000); |
| query = target.path(theHandle.toString()).queryParam("sessionid", lensSessionId).request(mt).get(LensQuery.class); |
| stat = query.getStatus(); |
| /* |
| Commented due to same issue as: https://issues.apache.org/jira/browse/LENS-683 |
| switch (stat.getStatus()) { |
| case RUNNING: |
| assertEquals(metricsSvc.getRunningQueries(), runningQueries + 1); |
| break; |
| case QUEUED: |
| assertEquals(metricsSvc.getQueuedQueries(), queuedQueries + 1); |
| break; |
| default: // nothing |
| }*/ |
| } |
| |
| assertTrue(query.getSubmissionTime() > 0); |
| assertTrue(query.getFinishTime() > 0); |
| assertEquals(query.getStatus().getStatus(), Status.SUCCESSFUL); |
| |
| assertEquals(query.getPriority(), Priority.LOW); |
| //Check Query Priority can be read even after query is purged i,e query details are read from DB. |
| boolean isPurged = false; |
| while (!isPurged) { |
| isPurged = true; |
| for (QueryHandle aHandle : queryService.allQueries.keySet()) { |
| if (aHandle.equals(theHandle)) { |
| isPurged = false; //current query is still not purged |
| Thread.sleep(1000); |
| break; |
| } |
| } |
| } |
| assertEquals(query.getPriority(), Priority.LOW); |
| |
| // Update conf for query |
| final FormDataMultiPart confpart = new FormDataMultiPart(); |
| LensConf conf = new LensConf(); |
| conf.addProperty("my.property", "myvalue"); |
| confpart.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| mt)); |
| confpart.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), conf, |
| mt)); |
| APIResult updateConf = target.path(theHandle.toString()).request(mt) |
| .put(Entity.entity(confpart, MediaType.MULTIPART_FORM_DATA_TYPE), APIResult.class); |
| assertEquals(updateConf.getStatus(), APIResult.Status.FAILED); |
| } |
| |
| // Test explain query |
| |
| /** |
| * Test explain query. |
| * |
| * @throws InterruptedException the interrupted exception |
| */ |
| @Test(dataProvider = "mediaTypeData") |
| public void testExplainQuery(MediaType mt) throws InterruptedException { |
| final WebTarget target = target().path("queryapi/queries"); |
| |
| final FormDataMultiPart mp = new FormDataMultiPart(); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| mt)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID from " + TEST_TABLE)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "explain")); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), new LensConf(), |
| mt)); |
| |
| final QueryPlan plan = target.request(mt) |
| .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE), |
| new GenericType<LensAPIResult<QueryPlan>>() {}).getData(); |
| assertEquals(plan.getTablesQueried().size(), 1); |
| assertTrue(plan.getTablesQueried().get(0).endsWith(TEST_TABLE.toLowerCase())); |
| assertNull(plan.getPrepareHandle()); |
| |
| // Test explain and prepare |
| final WebTarget ptarget = target().path("queryapi/preparedqueries"); |
| |
| final FormDataMultiPart mp2 = new FormDataMultiPart(); |
| mp2.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| mt)); |
| mp2.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), |
| "select ID from " + TEST_TABLE)); |
| mp2.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "explain_and_prepare")); |
| mp2.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), new LensConf(), |
| mt)); |
| |
| final QueryPlan plan2 = ptarget.request(mt).post(Entity.entity(mp2, MediaType.MULTIPART_FORM_DATA_TYPE), |
| new GenericType<LensAPIResult<QueryPlan>>() {}).getData(); |
| assertEquals(plan2.getTablesQueried().size(), 1); |
| assertTrue(plan2.getTablesQueried().get(0).endsWith(TEST_TABLE.toLowerCase())); |
| assertNotNull(plan2.getPrepareHandle()); |
| } |
| |
| // Test explain failure |
| |
| /** |
| * Test explain failure. |
| * |
| * @throws InterruptedException the interrupted exception |
| * @throws UnsupportedEncodingException |
| */ |
| @Test(dataProvider = "mediaTypeData") |
| public void testExplainFailure(MediaType mt) throws InterruptedException, UnsupportedEncodingException { |
| final WebTarget target = target().path("queryapi/queries"); |
| |
| final FormDataMultiPart mp = new FormDataMultiPart(); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| mt)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select NO_ID from " |
| + TEST_TABLE)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "explain")); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), new LensConf(), |
| mt)); |
| |
| final Response responseExplain = target.request(mt).post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE)); |
| |
| assertEquals(responseExplain.getStatus(), BAD_REQUEST.getStatusCode()); |
| |
| // Test explain and prepare |
| final WebTarget ptarget = target().path("queryapi/preparedqueries"); |
| |
| final FormDataMultiPart mp2 = new FormDataMultiPart(); |
| mp2.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| mt)); |
| mp2.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select NO_ID from " |
| + TEST_TABLE)); |
| mp2.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "explain_and_prepare")); |
| mp2.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), new LensConf(), |
| mt)); |
| |
| final Response responseExplainAndPrepare = ptarget.request(mt).post( |
| Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE)); |
| |
| assertEquals(responseExplainAndPrepare.getStatus(), BAD_REQUEST.getStatusCode()); |
| } |
| |
| /** |
| * Test semantic error for hive query on non-existent table. |
| * |
| * @throws IOException Signals that an I/O exception has occurred. |
| * @throws InterruptedException the interrupted exception |
| */ |
| @Test(dataProvider = "mediaTypeData") |
| public void testHiveSemanticFailure(MediaType mt) throws InterruptedException, IOException { |
| final WebTarget target = target().path("queryapi/queries"); |
| final FormDataMultiPart mp = new FormDataMultiPart(); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, mt)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), " select ID from NOT_EXISTS")); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "execute")); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), new LensConf(), |
| mt)); |
| |
| Response response = target.request(mt).post(Entity.entity(mp, MediaType |
| .MULTIPART_FORM_DATA_TYPE)); |
| LensAPIResult result = response.readEntity(LensAPIResult.class); |
| List<LensErrorTO> childErrors = result.getLensErrorTO().getChildErrors(); |
| boolean hiveSemanticErrorExists = false; |
| for (LensErrorTO error : childErrors) { |
| if (error.getCode() == LensDriverErrorCode.SEMANTIC_ERROR.getLensErrorInfo().getErrorCode()) { |
| hiveSemanticErrorExists = true; |
| break; |
| } |
| } |
| assertTrue(hiveSemanticErrorExists); |
| } |
| |
| // post to preparedqueries |
| // get all prepared queries |
| // get a prepared query |
| // update a prepared query |
| // post to prepared query multiple times |
| // delete a prepared query |
| |
| /** |
| * Test prepare query. |
| * |
| * @throws InterruptedException the interrupted exception |
| */ |
| @Test(dataProvider = "mediaTypeData") |
| public void testPrepareQuery(MediaType mt) throws InterruptedException { |
| final WebTarget target = target().path("queryapi/preparedqueries"); |
| |
| final FormDataMultiPart mp = new FormDataMultiPart(); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| mt)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID from " + TEST_TABLE)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "prepare")); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("queryName").build(), "testQuery1")); |
| |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), new LensConf(), |
| mt)); |
| |
| final QueryPrepareHandle pHandle = target.request(mt).post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE), |
| new GenericType<LensAPIResult<QueryPrepareHandle>>() {}).getData(); |
| |
| // Get all prepared queries |
| List<QueryPrepareHandle> allQueries = target.queryParam("sessionid", lensSessionId) |
| .queryParam("queryName", "testQuery1").request(mt).get(new GenericType<List<QueryPrepareHandle>>() { |
| }); |
| assertTrue(allQueries.size() >= 1); |
| assertTrue(allQueries.contains(pHandle)); |
| |
| LensPreparedQuery ctx = target.path(pHandle.toString()).queryParam("sessionid", lensSessionId).request(mt) |
| .get(LensPreparedQuery.class); |
| assertTrue(ctx.getUserQuery().equalsIgnoreCase("select ID from " + TEST_TABLE)); |
| assertTrue(ctx.getDriverQuery().equalsIgnoreCase("select ID from " + TEST_TABLE)); |
| //both drivers hive/hive1 and hive/hive2 are capable of handling the query as they point to the same hive server |
| assertTrue(ctx.getSelectedDriverName().equals("hive/hive1") || ctx.getSelectedDriverName().equals("hive/hive2")); |
| assertNull(ctx.getConf().getProperties().get("my.property")); |
| |
| // Update conf for prepared query |
| final FormDataMultiPart confpart = new FormDataMultiPart(); |
| LensConf conf = new LensConf(); |
| conf.addProperty("my.property", "myvalue"); |
| confpart.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| mt)); |
| confpart.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), conf, |
| mt)); |
| APIResult updateConf = target.path(pHandle.toString()).request(mt) |
| .put(Entity.entity(confpart, MediaType.MULTIPART_FORM_DATA_TYPE), APIResult.class); |
| assertEquals(updateConf.getStatus(), APIResult.Status.SUCCEEDED); |
| |
| ctx = target.path(pHandle.toString()).queryParam("sessionid", lensSessionId).request(mt).get(LensPreparedQuery |
| .class); |
| assertEquals(ctx.getConf().getProperties().get("my.property"), "myvalue"); |
| |
| QueryHandle handle1 = target.path(pHandle.toString()).request(mt) |
| .post(Entity.entity(confpart, MediaType.MULTIPART_FORM_DATA_TYPE), QueryHandle.class); |
| |
| // Override query name |
| confpart.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("queryName").build(), "testQueryName2")); |
| // do post once again |
| QueryHandle handle2 = target.path(pHandle.toString()).request(mt) |
| .post(Entity.entity(confpart, MediaType.MULTIPART_FORM_DATA_TYPE), QueryHandle.class); |
| assertNotEquals(handle1, handle2); |
| |
| LensQuery ctx1 = waitForQueryToFinish(target(), lensSessionId, handle1, Status.SUCCESSFUL, mt); |
| assertEquals(ctx1.getQueryName().toLowerCase(), "testquery1"); |
| |
| LensQuery ctx2 = waitForQueryToFinish(target(), lensSessionId, handle2, Status.SUCCESSFUL, mt); |
| assertEquals(ctx2.getQueryName().toLowerCase(), "testqueryname2"); |
| |
| // destroy prepared |
| APIResult result = target.path(pHandle.toString()).queryParam("sessionid", lensSessionId).request(mt) |
| .delete(APIResult.class); |
| assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); |
| |
| // Post on destroyed query |
| Response response = target.path(pHandle.toString()).request(mt) |
| .post(Entity.entity(confpart, MediaType.MULTIPART_FORM_DATA_TYPE), Response.class); |
| assertEquals(response.getStatus(), 404); |
| } |
| |
| /** |
| * Test explain and prepare query. |
| * |
| * @throws InterruptedException the interrupted exception |
| */ |
| @Test(dataProvider = "mediaTypeData") |
| public void testExplainAndPrepareQuery(MediaType mt) throws InterruptedException { |
| final WebTarget target = target().path("queryapi/preparedqueries"); |
| |
| final FormDataMultiPart mp = new FormDataMultiPart(); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| mt)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID from " + TEST_TABLE)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "explain_and_prepare")); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), new LensConf(), |
| mt)); |
| |
| final QueryPlan plan = target.request(mt) |
| .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE), |
| new GenericType<LensAPIResult<QueryPlan>>() {}).getData(); |
| |
| assertEquals(plan.getTablesQueried().size(), 1); |
| assertTrue(plan.getTablesQueried().get(0).endsWith(TEST_TABLE.toLowerCase())); |
| assertNotNull(plan.getPrepareHandle()); |
| |
| LensPreparedQuery ctx = target.path(plan.getPrepareHandle().toString()).queryParam("sessionid", lensSessionId) |
| .request(mt).get(LensPreparedQuery.class); |
| assertTrue(ctx.getUserQuery().equalsIgnoreCase("select ID from " + TEST_TABLE)); |
| assertTrue(ctx.getDriverQuery().equalsIgnoreCase("select ID from " + TEST_TABLE)); |
| //both drivers hive/hive1 and hive/hive2 are capable of handling the query as they point to the same hive server |
| assertTrue(ctx.getSelectedDriverName().equals("hive/hive1") || ctx.getSelectedDriverName().equals("hive/hive2")); |
| assertNull(ctx.getConf().getProperties().get("my.property")); |
| |
| // Update conf for prepared query |
| final FormDataMultiPart confpart = new FormDataMultiPart(); |
| LensConf conf = new LensConf(); |
| conf.addProperty("my.property", "myvalue"); |
| confpart.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| mt)); |
| confpart.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), conf, |
| mt)); |
| APIResult updateConf = target.path(plan.getPrepareHandle().toString()).request(mt) |
| .put(Entity.entity(confpart, MediaType.MULTIPART_FORM_DATA_TYPE), APIResult.class); |
| assertEquals(updateConf.getStatus(), APIResult.Status.SUCCEEDED); |
| |
| ctx = target.path(plan.getPrepareHandle().toString()).queryParam("sessionid", lensSessionId).request(mt) |
| .get(LensPreparedQuery.class); |
| assertEquals(ctx.getConf().getProperties().get("my.property"), "myvalue"); |
| |
| QueryHandle handle1 = target.path(plan.getPrepareHandle().toString()).request(mt) |
| .post(Entity.entity(confpart, MediaType.MULTIPART_FORM_DATA_TYPE), QueryHandle.class); |
| |
| // do post once again |
| QueryHandle handle2 = target.path(plan.getPrepareHandle().toString()).request(mt) |
| .post(Entity.entity(confpart, MediaType.MULTIPART_FORM_DATA_TYPE), QueryHandle.class); |
| assertNotEquals(handle1, handle2); |
| |
| waitForQueryToFinish(target(), lensSessionId, handle1, Status.SUCCESSFUL, mt); |
| waitForQueryToFinish(target(), lensSessionId, handle2, Status.SUCCESSFUL, mt); |
| |
| // destroy prepared |
| APIResult result = target.path(plan.getPrepareHandle().toString()).queryParam("sessionid", lensSessionId) |
| .request(mt).delete(APIResult.class); |
| assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); |
| |
| // Post on destroyed query |
| Response response = target.path(plan.getPrepareHandle().toString()).request(mt) |
| .post(Entity.entity(confpart, MediaType.MULTIPART_FORM_DATA_TYPE), Response.class); |
| assertEquals(response.getStatus(), 404); |
| |
| } |
| |
| // test with execute async post, get query, get results |
| // test cancel query |
| |
| /** |
| * Test execute async. |
| * |
| * @throws InterruptedException the interrupted exception |
| * @throws IOException Signals that an I/O exception has occurred. |
| */ |
| @Test(dataProvider = "mediaTypeData") |
| public void testExecuteAsync(MediaType mt) throws InterruptedException, IOException, LensException { |
| // test post execute op |
| final WebTarget target = target().path("queryapi/queries"); |
| |
| long queuedQueries = metricsSvc.getQueuedQueries(); |
| long runningQueries = metricsSvc.getRunningQueries(); |
| |
| final FormDataMultiPart mp = new FormDataMultiPart(); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| mt)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID, IDSTR from " |
| + TEST_TABLE)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "execute")); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), new LensConf(), |
| mt)); |
| final QueryHandle handle = target.request(mt).post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE), |
| new GenericType<LensAPIResult<QueryHandle>>() {}).getData(); |
| |
| assertNotNull(handle); |
| QueryContext ctx = queryService.getUpdatedQueryContext(lensSessionId, handle); |
| assertEquals(ctx.getSelectedDriverConf().get(KEY_POST_SELECT), VALUE_POST_SELECT); |
| |
| // Get query |
| LensQuery lensQuery = target.path(handle.toString()).queryParam("sessionid", lensSessionId).request(mt) |
| .get(LensQuery.class); |
| assertTrue(lensQuery.getStatus().getStatus().equals(Status.QUEUED) |
| || lensQuery.getStatus().getStatus().equals(Status.LAUNCHED) |
| || lensQuery.getStatus().getStatus().equals(Status.RUNNING) |
| || lensQuery.getStatus().getStatus().equals(Status.SUCCESSFUL), lensQuery.getStatus().toString()); |
| |
| // wait till the query finishes |
| QueryStatus stat = lensQuery.getStatus(); |
| while (!stat.finished()) { |
| lensQuery = target.path(handle.toString()).queryParam("sessionid", lensSessionId).request(mt).get(LensQuery |
| .class); |
| stat = lensQuery.getStatus(); |
| /* Commented and jira ticket raised for correction: https://issues.apache.org/jira/browse/LENS-683 |
| switch (stat.getStatus()) { |
| case RUNNING: |
| assertEquals(metricsSvc.getRunningQueries(), runningQueries + 1, |
| "Asserting queries for " + ctx.getQueryHandle()); |
| break; |
| case QUEUED: |
| assertEquals(metricsSvc.getQueuedQueries(), queuedQueries + 1); |
| break; |
| default: // nothing |
| }*/ |
| Thread.sleep(1000); |
| } |
| assertEquals(ctx.getSelectedDriverConf().get(KEY_PRE_LAUNCH), VALUE_PRE_LAUNCH); |
| assertEquals(ctx.getSelectedDriverConf().get(PRE_REWRITE), PRE_REWRITE); |
| assertEquals(ctx.getSelectedDriverConf().get(POST_REWRITE), POST_REWRITE); |
| assertEquals(ctx.getSelectedDriverConf().get(PRE_ESTIMATE), PRE_ESTIMATE); |
| assertEquals(ctx.getSelectedDriverConf().get(POST_ESTIMATE), POST_ESTIMATE); |
| assertTrue(lensQuery.getSubmissionTime() > 0); |
| assertTrue(lensQuery.getLaunchTime() > 0); |
| assertTrue(lensQuery.getDriverStartTime() > 0); |
| assertTrue(lensQuery.getDriverFinishTime() > 0); |
| assertTrue(lensQuery.getFinishTime() > 0); |
| ctx = queryService.getUpdatedQueryContext(lensSessionId, lensQuery.getQueryHandle()); |
| |
| assertNotNull(ctx.getPhase1RewrittenQuery()); |
| assertEquals(ctx.getPhase1RewrittenQuery(), ctx.getUserQuery()); //Since there is no rewriter in this test |
| assertEquals(lensQuery.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL); |
| |
| validatePersistedResult(handle, target(), lensSessionId, new String[][]{{"ID", "INT"}, {"IDSTR", "STRING"}}, true, |
| false, mt); |
| |
| // test cancel query |
| final QueryHandle handle2 = target.request(mt).post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE), |
| new GenericType<LensAPIResult<QueryHandle>>() {}).getData(); |
| |
| assertNotNull(handle2); |
| APIResult result = target.path(handle2.toString()).queryParam("sessionid", lensSessionId).request(mt) |
| .delete(APIResult.class); |
| // cancel would fail query is already successful |
| LensQuery ctx2 = target.path(handle2.toString()).queryParam("sessionid", lensSessionId).request(mt) |
| .get(LensQuery.class); |
| if (result.getStatus().equals(APIResult.Status.FAILED)) { |
| assertEquals(ctx2.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL, |
| "cancel failed without query having been succeeded"); |
| } else if (result.getStatus().equals(APIResult.Status.SUCCEEDED)) { |
| assertEquals(ctx2.getStatus().getStatus(), QueryStatus.Status.CANCELED, |
| "cancel succeeded but query wasn't cancelled"); |
| } else { |
| fail("unexpected cancel status: " + result.getStatus()); |
| } |
| |
| // 1. Test http download end point and result path should be correct (when both driver and server persist) |
| // 2. Test Fetch result should fail before query is marked successful |
| log.info("Starting httpendpoint test"); |
| final FormDataMultiPart mp3 = new FormDataMultiPart(); |
| mp3.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| mt)); |
| mp3.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID, IDSTR from " |
| + TEST_TABLE)); |
| mp3.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "execute")); |
| LensConf conf = new LensConf(); |
| conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_SET, "true"); |
| conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, "true"); |
| conf.addProperty(LensConfConstants.QUERY_OUTPUT_FORMATTER, DeferredPersistentResultFormatter.class.getName()); |
| conf.addProperty("deferPersistenceByMillis", 5000); // defer persistence for 5 secs |
| |
| mp3.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), conf, |
| mt)); |
| final QueryHandle handle3 = target.request(mt).post(Entity.entity(mp3, MediaType.MULTIPART_FORM_DATA_TYPE), |
| new GenericType<LensAPIResult<QueryHandle>>() {}).getData(); |
| |
| QueryContext ctx3 = queryService.getQueryContext(handle3); |
| assertFalse(ctx3.finished()); //Formatting is deferred so query will take time to finish |
| try { |
| queryService.fetchResultSet(lensSessionId, handle3, 0, 100); |
| fail("client should not be allowed to fetch result before query finishes successfully"); |
| } catch (NotFoundException e) { |
| // Expected. Ignore |
| } |
| waitForQueryToFinish(target(), lensSessionId, handle3, Status.SUCCESSFUL, mt); |
| LensResultSet rs = queryService.getResultset(handle3); |
| //check persisted result path |
| String expectedPath = |
| ctx3.getSelectedDriverConf().get(LensConfConstants.RESULT_SET_PARENT_DIR) + "/" + handle3.getHandleIdString() |
| + ctx3.getConf().get(LensConfConstants.QUERY_OUTPUT_FILE_EXTN); |
| assertTrue(((PersistentResultSet) rs).getOutputPath().endsWith(expectedPath) |
| , "Result Path " + ((PersistentResultSet) rs).getOutputPath() |
| + " does not contain expected path: " + expectedPath); |
| |
| validateHttpEndPoint(target(), null, handle3, null); |
| } |
| |
| /** |
| * Validate persisted result. |
| * |
| * @param handle the handle |
| * @param parent the parent |
| * @param lensSessionId the lens session id |
| * @param isDir the is dir |
| * @param isCSVFormat the result format is csv. |
| * @throws IOException Signals that an I/O exception has occurred. |
| */ |
| static void validatePersistedResult(QueryHandle handle, WebTarget parent, LensSessionHandle lensSessionId, |
| String[][] schema, boolean isDir, boolean isCSVFormat, MediaType mt) throws IOException { |
| final WebTarget target = parent.path("queryapi/queries"); |
| // fetch results |
| validateResultSetMetadata(handle, "", schema, parent, lensSessionId, mt); |
| |
| String presultset = target.path(handle.toString()).path("resultset").queryParam("sessionid", lensSessionId) |
| .request(mt).get(String.class); |
| System.out.println("PERSISTED RESULT:" + presultset); |
| |
| PersistentQueryResult resultset = target.path(handle.toString()).path("resultset") |
| .queryParam("sessionid", lensSessionId).request().get(PersistentQueryResult.class); |
| validatePersistentResult(resultset, handle, isDir, isCSVFormat); |
| |
| if (isDir) { |
| validNotFoundForHttpResult(parent, lensSessionId, handle); |
| } |
| } |
| |
| /** |
| * Read result set. |
| * |
| * @param resultset the resultset |
| * @param handle the handle |
| * @param isDir the is dir |
| * @return the list |
| * @throws IOException Signals that an I/O exception has occurred. |
| */ |
| public static List<String> readResultSet(PersistentQueryResult resultset, QueryHandle handle, boolean isDir) |
| throws IOException { |
| assertTrue(resultset.getPersistedURI().contains(handle.toString())); |
| Path actualPath = new Path(resultset.getPersistedURI()); |
| FileSystem fs = actualPath.getFileSystem(new Configuration()); |
| List<String> actualRows = new ArrayList<>(); |
| if (fs.getFileStatus(actualPath).isDir()) { |
| assertTrue(isDir); |
| for (FileStatus fstat : fs.listStatus(actualPath)) { |
| if (!fstat.isDirectory()) { |
| addRowsFromFile(actualRows, fs, fstat.getPath()); |
| } |
| } |
| } else { |
| assertFalse(isDir); |
| addRowsFromFile(actualRows, fs, actualPath); |
| } |
| return actualRows; |
| } |
| |
| /** |
| * Returns the size of result set file when result path is a file, null otherwise |
| * |
| * @param resultset |
| * @param handle |
| * @param isDir |
| * @return |
| * @throws IOException |
| */ |
| public static Long readResultFileSize(PersistentQueryResult resultset, QueryHandle handle, boolean isDir) |
| throws IOException { |
| assertTrue(resultset.getPersistedURI().contains(handle.toString())); |
| Path actualPath = new Path(resultset.getPersistedURI()); |
| FileSystem fs = actualPath.getFileSystem(new Configuration()); |
| FileStatus fileStatus = fs.getFileStatus(actualPath); |
| if (fileStatus.isDir()) { |
| assertTrue(isDir); |
| return null; |
| } else { |
| assertFalse(isDir); |
| return fileStatus.getLen(); |
| } |
| } |
| |
| |
| /** |
| * Adds the rows from file. |
| * |
| * @param actualRows the actual rows |
| * @param fs the fs |
| * @param path the path |
| * @throws IOException Signals that an I/O exception has occurred. |
| */ |
| static void addRowsFromFile(List<String> actualRows, FileSystem fs, Path path) throws IOException { |
| FSDataInputStream in = fs.open(path); |
| BufferedReader br = null; |
| try { |
| br = new BufferedReader(new InputStreamReader(in)); |
| String line; |
| |
| while ((line = br.readLine()) != null) { |
| actualRows.add(line); |
| } |
| } finally { |
| if (br != null) { |
| br.close(); |
| } |
| if (in != null) { |
| in.close(); |
| } |
| } |
| } |
| |
| /** |
| * Validate persistent result. |
| * |
| * @param resultset the resultset |
| * @param handle the handle |
| * @param isDir the is dir |
| * @param isCSVFormat the result format is csv. |
| * @throws IOException Signals that an I/O exception has occurred. |
| */ |
| static void validatePersistentResult(PersistentQueryResult resultset, QueryHandle handle, boolean isDir, |
| boolean isCSVFormat)throws IOException { |
| List<String> actualRows = readResultSet(resultset, handle, isDir); |
| validatePersistentResult(actualRows, isCSVFormat); |
| if (!isDir) { |
| assertEquals(resultset.getNumRows().intValue(), actualRows.size()); |
| } |
| Long fileSize = readResultFileSize(resultset, handle, isDir); |
| assertEquals(resultset.getFileSize(), fileSize); |
| } |
| |
| static void validatePersistentResult(List<String> actualRows, boolean isCSVFormat) { |
| String[] expected1 = null; |
| String[] expected2 = null; |
| if (isCSVFormat) { |
| //This case will be hit when the result is persisted by the server (CSV result) |
| expected1 = new String[]{ |
| "\"1\",\"one\"", |
| "\"NULL\",\"two\"", |
| "\"3\",\"NULL\"", |
| "\"NULL\",\"NULL\"", |
| "\"5\",\"\"", |
| }; |
| } else { |
| //This is case of hive driver persistence |
| expected1 = new String[] { |
| "1one", |
| "\\Ntwo123item1item2", |
| "3\\Nitem1item2", |
| "\\N\\N", |
| "5nothing", |
| }; |
| expected2 = new String[] { |
| "1one[][]", |
| "\\Ntwo[1,2,3][\"item1\",\"item2\"]", |
| "3\\N[][\"item1\",\"item2\"]", |
| "\\N\\N[][]", |
| "5[][\"nothing\"]", |
| }; |
| } |
| |
| for (int i = 0; i < actualRows.size(); i++) { |
| assertEquals(expected1[i].indexOf(actualRows.get(i)) == 0 |
| || (expected2 != null && expected2[i].indexOf(actualRows.get(i)) == 0), true); |
| } |
| } |
| |
| /** |
| * Validate http end point. |
| * |
| * @param parent the parent |
| * @param lensSessionId the lens session id |
| * @param handle the handle |
| * @param redirectUrl the redirect url |
| * @throws IOException Signals that an I/O exception has occurred. |
| */ |
| static void validateHttpEndPoint(WebTarget parent, LensSessionHandle lensSessionId, QueryHandle handle, |
| String redirectUrl) throws IOException { |
| log.info("@@@ validateHttpEndPoint sessionid " + lensSessionId); |
| Response response = parent.path("queryapi/queries/" + handle.toString() + "/httpresultset") |
| .queryParam("sessionid", lensSessionId).request().get(); |
| |
| assertTrue(response.getHeaderString("content-disposition").contains(handle.toString())); |
| |
| if (redirectUrl == null) { |
| InputStream in = (InputStream) response.getEntity(); |
| ByteArrayOutputStream bos = new ByteArrayOutputStream(); |
| IOUtils.copyBytes(in, bos, new Configuration()); |
| bos.close(); |
| in.close(); |
| |
| String result = new String(bos.toByteArray()); |
| List<String> actualRows = Arrays.asList(result.split("\n")); |
| validatePersistentResult(actualRows, false); |
| } else { |
| assertEquals(SEE_OTHER.getStatusCode(), response.getStatus()); |
| assertTrue(response.getHeaderString("Location").contains(redirectUrl)); |
| } |
| } |
| |
| /** |
| * Valid not found for http result. |
| * |
| * @param parent the parent |
| * @param lensSessionId the lens session id |
| * @param handle the handle |
| */ |
| static void validNotFoundForHttpResult(WebTarget parent, LensSessionHandle lensSessionId, QueryHandle handle) { |
| try { |
| Response response = parent.path("queryapi/queries/" + handle.toString() + "/httpresultset") |
| .queryParam("sessionid", lensSessionId).request().get(); |
| if (NOT_FOUND.getStatusCode() != response.getStatus()) { |
| fail("Expected not found excepiton, but got:" + response.getStatus()); |
| } |
| assertEquals(response.getStatus(), NOT_FOUND.getStatusCode()); |
| } catch (NotFoundException e) { |
| // expected |
| log.error("Resource not found.", e); |
| } |
| |
| } |
| |
| // test with execute async post, get query, get results |
| // test cancel query |
| |
| /** |
| * Test execute async in memory result. |
| * |
| * @throws InterruptedException the interrupted exception |
| * @throws IOException Signals that an I/O exception has occurred. |
| */ |
| @Test(dataProvider = "mediaTypeData") |
| public void testExecuteAsyncInMemoryResult(MediaType mt) throws InterruptedException, IOException { |
| // test post execute op |
| final WebTarget target = target().path("queryapi/queries"); |
| |
| final FormDataMultiPart mp = new FormDataMultiPart(); |
| LensConf conf = new LensConf(); |
| conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, "false"); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| mt)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID, IDSTR from " |
| + TEST_TABLE)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "execute")); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), conf, |
| mt)); |
| final QueryHandle handle = target.request(mt).post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE), |
| new GenericType<LensAPIResult<QueryHandle>>() {}).getData(); |
| |
| assertNotNull(handle); |
| |
| // Get query |
| waitForQueryToFinish(target(), lensSessionId, handle, Status.SUCCESSFUL, mt); |
| |
| // fetch results |
| validateResultSetMetadata(handle, "", |
| new String[][]{{"ID", "INT"}, {"IDSTR", "STRING"}}, |
| target(), lensSessionId, mt); |
| |
| validateInmemoryResult(target, handle, mt); |
| |
| validNotFoundForHttpResult(target(), lensSessionId, handle); |
| waitForPurge(0, queryService.finishedQueries); |
| APIResult result=target.path(handle.toString()).path("resultset") |
| .queryParam("sessionid", lensSessionId).request().delete(APIResult.class); |
| assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); |
| } |
| |
| @Test |
| public void testTTLForInMemoryResult() throws InterruptedException, IOException, LensException { |
| long inMemoryresultsetTTLMillisBackup = queryService.getInMemoryResultsetTTLMillis(); |
| queryService.setInMemoryResultsetTTLMillis(5000); // 5 secs |
| try { |
| // test post execute op |
| final WebTarget target = target().path("queryapi/queries"); |
| |
| final FormDataMultiPart mp = new FormDataMultiPart(); |
| LensConf conf = new LensConf(); |
| conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, "false"); |
| conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_SET, "false"); |
| conf.addProperty(LensConfConstants.QUERY_MAIL_NOTIFY, "false"); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| defaultMT)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID, IDSTR from " |
| + TEST_TABLE)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "execute")); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), conf, |
| defaultMT)); |
| |
| final QueryHandle handle = |
| target |
| .request() |
| .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE), |
| new GenericType<LensAPIResult<QueryHandle>>() { |
| }).getData(); |
| assertNotNull(handle); |
| |
| waitForQueryToFinish(target(), lensSessionId, handle, Status.SUCCESSFUL, defaultMT); |
| |
| // Check TTL |
| QueryContext ctx = queryService.getUpdatedQueryContext(lensSessionId, handle); |
| long softExpiryTime = ctx.getDriverStatus().getDriverFinishTime() |
| + queryService.getInMemoryResultsetTTLMillis() - 1000; //Keeping buffer of 1 secs |
| int checkCount = 0; |
| while (System.currentTimeMillis() < softExpiryTime) { |
| assertEquals(queryService.getFinishedQueriesCount(), 1); |
| assertEquals(queryService.finishedQueries.peek().canBePurged(), false); |
| assertEquals(((InMemoryResultSet) queryService.getResultset(handle)).canBePurged(), false); |
| checkCount++; |
| Thread.sleep(1000); // sleep for 1 secs and then check again |
| } |
| assertTrue(checkCount >= 2, "CheckCount = " + checkCount); // TTl check at least twice |
| |
| Thread.sleep(3000); // should be past TTL after this sleep . purge thread runs every 1 secs for Tests |
| assertEquals(queryService.getFinishedQueriesCount(), 0); |
| } finally { |
| queryService.setInMemoryResultsetTTLMillis(inMemoryresultsetTTLMillisBackup); |
| } |
| } |
| |
| /** |
| * Test execute async temp table. |
| * |
| * @throws InterruptedException the interrupted exception |
| * @throws IOException Signals that an I/O exception has occurred. |
| */ |
| @Test(dataProvider = "mediaTypeData") |
| public void testExecuteAsyncTempTable(MediaType mt) throws InterruptedException, IOException { |
| // test post execute op |
| final WebTarget target = target().path("queryapi/queries"); |
| |
| final FormDataMultiPart drop = new FormDataMultiPart(); |
| LensConf conf = new LensConf(); |
| conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, "false"); |
| drop.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| mt)); |
| drop.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), |
| "drop table if exists temp_output")); |
| drop.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "execute")); |
| drop.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), conf, |
| mt)); |
| final QueryHandle dropHandle = target.request(mt).post(Entity.entity(drop, MediaType.MULTIPART_FORM_DATA_TYPE), |
| new GenericType<LensAPIResult<QueryHandle>>() {}).getData(); |
| |
| assertNotNull(dropHandle); |
| |
| // Get query |
| waitForQueryToFinish(target(), lensSessionId, dropHandle, Status.SUCCESSFUL, mt); |
| |
| final FormDataMultiPart mp = new FormDataMultiPart(); |
| conf = new LensConf(); |
| conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, "false"); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| mt)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), |
| "create table temp_output as select ID, IDSTR from " + TEST_TABLE)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "execute")); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), conf, |
| mt)); |
| final QueryHandle handle = target.request(mt).post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE), |
| new GenericType<LensAPIResult<QueryHandle>>() {}).getData(); |
| |
| assertNotNull(handle); |
| |
| // Get query |
| waitForQueryToFinish(target(), lensSessionId, handle, Status.SUCCESSFUL, mt); |
| |
| String select = "SELECT * FROM temp_output"; |
| final FormDataMultiPart fetch = new FormDataMultiPart(); |
| fetch.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| mt)); |
| fetch.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), select)); |
| fetch.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "execute")); |
| fetch.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), conf, |
| mt)); |
| final QueryHandle handle2 = target.request(mt).post(Entity.entity(fetch, MediaType.MULTIPART_FORM_DATA_TYPE), |
| new GenericType<LensAPIResult<QueryHandle>>() {}).getData(); |
| |
| assertNotNull(handle2); |
| |
| // Get query |
| waitForQueryToFinish(target(), lensSessionId, handle2, Status.SUCCESSFUL, mt); |
| |
| // fetch results |
| validateResultSetMetadata(handle2, "temp_output.", new String[][]{{"ID", "INT"}, {"IDSTR", "STRING"}}, |
| target(), lensSessionId, mt); |
| |
| validateInmemoryResult(target, handle2, mt); |
| } |
| |
| /** |
| * Validate result set metadata. |
| * |
| * @param handle the handle |
| * @param parent the parent |
| * @param lensSessionId the lens session id |
| */ |
| static void validateResultSetMetadata(QueryHandle handle, WebTarget parent, LensSessionHandle lensSessionId, |
| MediaType mt) { |
| validateResultSetMetadata(handle, "", |
| new String[][]{{"ID", "INT"}, {"IDSTR", "STRING"}, {"IDARR", "ARRAY"}, {"IDSTRARR", "ARRAY"}}, |
| parent, lensSessionId, mt); |
| } |
| |
| /** |
| * Validate result set metadata. |
| * |
| * @param handle the handle |
| * @param outputTablePfx the output table pfx |
| * @param parent the parent |
| * @param lensSessionId the lens session id |
| */ |
| static void validateResultSetMetadata(QueryHandle handle, String outputTablePfx, String[][] columns, WebTarget parent, |
| LensSessionHandle lensSessionId, MediaType mt) { |
| final WebTarget target = parent.path("queryapi/queries"); |
| |
| QueryResultSetMetadata metadata = target.path(handle.toString()).path("resultsetmetadata") |
| .queryParam("sessionid", lensSessionId).request(mt).get(QueryResultSetMetadata.class); |
| assertEquals(metadata.getColumns().size(), columns.length); |
| for (int i = 0; i < columns.length; i++) { |
| assertTrue( |
| metadata.getColumns().get(i).getName().toLowerCase().equals(outputTablePfx + columns[i][0].toLowerCase()) |
| || metadata.getColumns().get(i).getName().toLowerCase().equals(columns[i][0].toLowerCase()) |
| ); |
| assertEquals(columns[i][1].toLowerCase(), metadata.getColumns().get(i).getType().name().toLowerCase()); |
| } |
| } |
| private void validateInmemoryResult(WebTarget target, QueryHandle handle, MediaType mt) throws IOException { |
| if (mt.equals(MediaType.APPLICATION_JSON_TYPE)) { |
| String resultSet = target.path(handle.toString()).path("resultset") |
| .queryParam("sessionid", lensSessionId).request(mt).get(String.class); |
| // this is being done because json unmarshalling does not work to construct java Objects back |
| assertEquals(resultSet.replaceAll("\\W", ""), expectedJsonResult().replaceAll("\\W", "")); |
| } else { |
| InMemoryQueryResult resultSet = target.path(handle.toString()).path("resultset") |
| .queryParam("sessionid", lensSessionId).request(mt).get(InMemoryQueryResult.class); |
| validateInmemoryResult(resultSet); |
| } |
| } |
| private String expectedJsonResult() { |
| StringBuilder expectedJson = new StringBuilder(); |
| expectedJson.append("{\"inMemoryQueryResult\" : {\"rows\" : [ ") |
| .append("{\"values\" : [ {\n\"type\" : \"int\",\n\"value\" : 1}, {\"type\" : \"string\",\"value\" : \"one\"} ]},") |
| .append("{\"values\" : [ null, {\"type\" : \"string\",\"value\" : \"two\"} ]},") |
| .append("{\"values\" : [ {\"type\" : \"int\",\"value\" : 3}, null ]},") |
| .append("{\"values\" : [ null, null ]},") |
| .append("{\"values\" : [ {\"type\" : \"int\",\"value\" : 5}, {\"type\" : \"string\",\"value\" : \"\"} ]} ]}}"); |
| return expectedJson.toString(); |
| } |
| |
| /** |
| * Validate inmemory result. |
| * |
| * @param resultset the resultset |
| */ |
| private void validateInmemoryResult(InMemoryQueryResult resultset) { |
| assertEquals(resultset.getRows().size(), 5); |
| assertEquals(resultset.getRows().get(0).getValues().get(0), 1); |
| assertEquals((String) resultset.getRows().get(0).getValues().get(1), "one"); |
| |
| assertNull(resultset.getRows().get(1).getValues().get(0)); |
| assertEquals((String) resultset.getRows().get(1).getValues().get(1), "two"); |
| |
| assertEquals(resultset.getRows().get(2).getValues().get(0), 3); |
| assertNull(resultset.getRows().get(2).getValues().get(1)); |
| |
| assertNull(resultset.getRows().get(3).getValues().get(0)); |
| assertNull(resultset.getRows().get(3).getValues().get(1)); |
| assertEquals(resultset.getRows().get(4).getValues().get(0), 5); |
| assertEquals(resultset.getRows().get(4).getValues().get(1), ""); |
| } |
| |
| // test execute with timeout, fetch results |
| // cancel the query with execute_with_timeout |
| |
| /** |
| * Test execute with timeout query. |
| * |
| * @throws IOException Signals that an I/O exception has occurred. |
| * @throws InterruptedException the interrupted exception |
| */ |
| @Test(dataProvider = "mediaTypeData") |
| public void testExecuteWithTimeoutQuery(MediaType mt) throws IOException, InterruptedException { |
| final WebTarget target = target().path("queryapi/queries"); |
| //1. Validate Persistent result |
| final FormDataMultiPart mp = new FormDataMultiPart(); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| mt)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID, IDSTR from " |
| + TEST_TABLE)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "execute_with_timeout")); |
| // set a timeout value enough for tests |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("timeoutmillis").build(), "300000")); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), new LensConf(), |
| mt)); |
| |
| QueryHandleWithResultSet result = target.request(mt).post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE), |
| new GenericType<LensAPIResult<QueryHandleWithResultSet>>() {}).getData(); |
| assertNotNull(result.getQueryHandle()); |
| assertNotNull(result.getResult()); |
| validatePersistentResult((PersistentQueryResult) result.getResult(), result.getQueryHandle(), true, false); |
| |
| //2. Validate InMemory result |
| final FormDataMultiPart mp2 = new FormDataMultiPart(); |
| LensConf conf = new LensConf(); |
| conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, "false"); |
| mp2.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| mt)); |
| mp2.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID, IDSTR from " |
| + TEST_TABLE)); |
| mp2.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "execute_with_timeout")); |
| // set a timeout value enough for tests |
| mp2.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("timeoutmillis").build(), "300000")); |
| mp2.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), conf, |
| mt)); |
| |
| validateInmemoryResultForTimeoutQuery(target, mp2, mt); |
| } |
| |
| @Test |
| public void testAutoCancelOnTimeOut() throws Exception { |
| queryService.pauseQuerySubmitter(true); |
| //First query will not be queued. @see QueryExecutionServiceImpl.QuerySubmitter.run |
| queryService.executeAsync(lensSessionId, "select 1 from " + TEST_TABLE, new LensConf(), "dummyQuery"); |
| |
| //Second query after pause will be queued |
| QueryHandleWithResultSet result = queryService.execute(lensSessionId, "select ID, IDSTR from "+ TEST_TABLE, 100, |
| new LensConf(), "testQuery"); |
| assertNotNull(result.getQueryHandle()); |
| assertTrue(result.getStatus().queued()); |
| int checkCtr = 0; |
| boolean cancelled = false; |
| while (!cancelled && checkCtr++ < 100) { //Max 10 secs wait |
| Thread.sleep(100); //wait for query to get auto cancelled |
| cancelled = queryService.getUpdatedQueryContext(lensSessionId, result.getQueryHandle()).getStatus().cancelled(); |
| } |
| assertTrue(cancelled); //auto cancelled beyond timeout |
| queryService.pauseQuerySubmitter(false); |
| } |
| |
| private void validateInmemoryResultForTimeoutQuery(WebTarget target, FormDataMultiPart mp, MediaType mt) { |
| if (mt.equals(MediaType.APPLICATION_JSON_TYPE)) { |
| String result = target.request(mt).post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE), String.class); |
| assertTrue(result.contains("\"type\" : \"queryHandleWithResultSet\"")); |
| assertTrue(result.contains("\"status\" : \"SUCCESSFUL\"")); |
| assertTrue(result.contains("\"isResultSetAvailable\" : true")); |
| assertTrue(result.replaceAll("\\W", "").contains(expectedJsonResult().replaceAll("\\W", ""))); |
| } else { |
| QueryHandleWithResultSet result = target.request(mt).post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE), |
| new GenericType<LensAPIResult<QueryHandleWithResultSet>>() { |
| }).getData(); |
| assertNotNull(result.getQueryHandle()); |
| assertNotNull(result.getResult()); |
| validateInmemoryResult((InMemoryQueryResult) result.getResult()); |
| } |
| } |
| /** |
| * Data provider for test case |
| * {@link #testExecuteWithTimeoutAndPreFetchAndServerPersistence(long, int, boolean, long, String, int)} |
| */ |
| @DataProvider |
| public Object[][] executeWithTimeoutAndPreFetechAndServerPersistenceDP() { |
| String query5RowsHive = "select ID, IDSTR from " + TEST_TABLE; |
| String query0RowsHive = "select ID, IDSTR from " + TEST_TABLE + " where ID=99"; |
| |
| String query5RowsJdbc = "select ID, IDSTR from " + TEST_JDBC_TABLE; |
| String query0RowsJdbc = "select ID, IDSTR from " + TEST_JDBC_TABLE + " where ID=99"; |
| |
| //Columns: timeOutMillis, preFetchRows, isStreamingResultAvailable, deferPersistenceByMillis,query,rows in result |
| return new Object[][] { |
| {30000, 5, true, 0, query5RowsHive, 5}, //All 5 rows are requested to be pre-fetched |
| {30000, 10, true, 6000, query5RowsHive, 5}, //10 rows are requested to be pre-fetched. |
| {30000, 2, false, 4000, query5RowsHive, 5}, //2 rows are requested to be pre-fetched. Will not stream |
| {10, 5, false, 0, query5RowsHive, 5}, //5 rows requested. Timeout is less (10ms). Will not stream |
| {30000, 5, true, 0, query0RowsHive, 0}, //Result has no rows |
| {30000, 5, true, 0, query5RowsJdbc, 5}, //All 5 rows are requested to be pre-fetched |
| {30000, 10, true, 6000, query5RowsJdbc, 5}, //10 rows are requested to be pre-fetched. |
| {30000, 2, false, 4000, query5RowsJdbc, 5}, //2 rows are requested to be pre-fetched. Will not stream |
| {10, 5, false, 0, query5RowsJdbc, 5}, //5 rows requested. Timeout is less (10ms). Will not stream |
| {30000, 5, true, 0, query0RowsJdbc, 0}, //Result has no rows |
| }; |
| } |
| |
| @Test |
| public void testExecuteAsyncJDBCQuery() throws InterruptedException { |
| String query = "select ID, IDSTR from " + TEST_JDBC_TABLE; |
| QueryHandle handle = RestAPITestUtil.executeAndGetHandle(target(), Optional.of(lensSessionId), Optional.of(query), |
| Optional.of(getLensConf(LensConfConstants.QUERY_PERSISTENT_RESULT_SET, false)), APPLICATION_XML_TYPE); |
| // fetch results so that it can be purged |
| InMemoryQueryResult queryResult = RestAPITestUtil.getLensQueryResult(target(), lensSessionId, handle, |
| InMemoryQueryResult.class, APPLICATION_XML_TYPE); |
| assertEquals(queryResult.getRows().size(), 5); |
| } |
| |
| /** |
| * @param timeOutMillis : wait time for execute with timeout api |
| * @param preFetchRows : number of rows to pre-fetch in case of InMemoryResultSet |
| * @param isStreamingResultAvailable : whether the execute call is expected to return InMemoryQueryResult |
| * @param deferPersistenceByMillis : The time in millis by which Result formatter will be deferred by. |
| * @throws IOException |
| * @throws InterruptedException |
| */ |
| @Test(dataProvider = "executeWithTimeoutAndPreFetechAndServerPersistenceDP") |
| public void testExecuteWithTimeoutAndPreFetchAndServerPersistence(long timeOutMillis, int preFetchRows, |
| boolean isStreamingResultAvailable, long deferPersistenceByMillis, String query, int rowsInResult) |
| throws Exception { |
| final WebTarget target = target().path("queryapi/queries"); |
| |
| final FormDataMultiPart mp = new FormDataMultiPart(); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| APPLICATION_XML_TYPE)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), query)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "execute_with_timeout")); |
| // Set a timeout value enough for tests |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("timeoutmillis").build(), timeOutMillis + "")); |
| LensConf conf = new LensConf(); |
| conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_SET, "true"); |
| conf.addProperty(LensConfConstants.CANCEL_QUERY_ON_TIMEOUT, "false"); |
| conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, "false"); |
| conf.addProperty(LensConfConstants.PREFETCH_INMEMORY_RESULTSET, "true"); |
| conf.addProperty(LensConfConstants.PREFETCH_INMEMORY_RESULTSET_ROWS, preFetchRows); |
| conf.addProperty(LensConfConstants.QUERY_OUTPUT_FORMATTER, DeferredInMemoryResultFormatter.class.getName()); |
| conf.addProperty("deferPersistenceByMillis", deferPersistenceByMillis); // property used for test only |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), conf, |
| APPLICATION_XML_TYPE)); |
| QueryHandleWithResultSet result = target.request(APPLICATION_XML_TYPE) |
| .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE), |
| new GenericType<LensAPIResult<QueryHandleWithResultSet>>() {}).getData(); |
| QueryHandle handle = result.getQueryHandle(); |
| assertNotNull(handle); |
| assertNotEquals(result.getStatus().getStatus(), QueryStatus.Status.FAILED); |
| |
| if (isStreamingResultAvailable) { |
| // TEST streamed result |
| assertTrue(result.getStatus().getStatus() == QueryStatus.Status.EXECUTED |
| || result.getStatus().getStatus() == QueryStatus.Status.SUCCESSFUL, |
| "Check if timeoutmillis need to be increased based on query status " + result.getStatus()); |
| assertEquals(result.getResultMetadata().getColumns().size(), 2); |
| assertNotNull(result.getResult()); |
| if (rowsInResult > 0) { |
| validateInmemoryResult((InMemoryQueryResult) result.getResult()); |
| } else { |
| assertEquals(((InMemoryQueryResult) result.getResult()).getRows().size(), 0); |
| } |
| } else { |
| // IF timeout is sufficient for query to finish , we should receive PersistentQueryResult |
| // Else we will get null result |
| assertTrue(result.getResult()==null || result.getResult() instanceof PersistentQueryResult); |
| } |
| |
| waitForQueryToFinish(target(), lensSessionId, handle, Status.SUCCESSFUL, APPLICATION_XML_TYPE); |
| |
| // Test Persistent Result |
| validatePersistedResult(handle, target(), lensSessionId, new String[][] { { "ID", "INT" }, { "IDSTR", "STRING" } }, |
| false, true, APPLICATION_XML_TYPE); |
| } |
| |
| public static class DeferredInMemoryResultFormatter extends FileSerdeFormatter { |
| /** |
| * Defer init so that this output formatter takes significant time. |
| */ |
| @Override |
| public void init(QueryContext ctx, LensResultSetMetadata metadata) throws IOException { |
| super.init(ctx, metadata); |
| deferFormattingIfApplicable(ctx); |
| } |
| } |
| |
| public static class DeferredPersistentResultFormatter extends FilePersistentFormatter { |
| /** |
| * Defer init so that this output formatter takes significant time. |
| */ |
| @Override |
| public void init(QueryContext ctx, LensResultSetMetadata metadata) throws IOException { |
| super.init(ctx, metadata); |
| deferFormattingIfApplicable(ctx); |
| } |
| } |
| |
| private static void deferFormattingIfApplicable(QueryContext ctx) { |
| long deferPersistenceByMillis = ctx.getConf().getLong("deferPersistenceByMillis", 0); |
| if (deferPersistenceByMillis > 0) { |
| try { |
| log.info("Deferring result formatting by {} millis", deferPersistenceByMillis); |
| Thread.sleep(deferPersistenceByMillis); |
| } catch (InterruptedException e) { |
| // Ignore |
| } |
| } |
| } |
| |
| /** |
| * Test execute with timeout query. |
| * |
| * @throws IOException Signals that an I/O exception has occurred. |
| * @throws InterruptedException the interrupted exception |
| */ |
| @Test(dataProvider = "mediaTypeData") |
| public void testExecuteWithTimeoutFailingQuery(MediaType mt) throws IOException, InterruptedException { |
| final WebTarget target = target().path("queryapi/queries"); |
| |
| final FormDataMultiPart mp = new FormDataMultiPart(); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| mt)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID from nonexist")); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "execute_with_timeout")); |
| // set a timeout value enough for tests |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("timeoutmillis").build(), "300000")); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), new LensConf(), |
| mt)); |
| |
| Response response = target.request(mt).post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE)); |
| assertEquals(response.getStatus(), BAD_REQUEST.getStatusCode()); |
| } |
| |
| /** |
| * Test default config. |
| * |
| * @throws LensException the lens exception |
| */ |
| @Test |
| public void testDefaultConfig() throws LensException { |
| LensConf queryConf = new LensConf(); |
| queryConf.addProperty("test.query.conf", "qvalue"); |
| Configuration conf = queryService.getLensConf(lensSessionId, queryConf); |
| |
| // session specific conf |
| assertEquals(conf.get("test.session.key"), "svalue"); |
| // query specific conf |
| assertEquals(conf.get("test.query.conf"), "qvalue"); |
| // lenssession default should be loaded |
| assertNotNull(conf.get("lens.query.enable.persistent.resultset")); |
| // lens site should be loaded |
| assertEquals(conf.get("test.lens.site.key"), "gsvalue"); |
| // hive default variables should not be set |
| assertNull(conf.get("hive.exec.local.scratchdir")); |
| // hive site variables should not be set |
| assertNull(conf.get("hive.metastore.warehouse.dir")); |
| // core default should not be loaded |
| assertNull(conf.get("fs.default.name")); |
| // server configuration should not set |
| assertNull(conf.get("lens.server.persist.location")); |
| |
| // Test server config. Hive configs overriden should be set |
| assertFalse(queryService.getHiveConf().getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)); |
| |
| final String query = "select ID from " + TEST_TABLE; |
| QueryContext ctx = new QueryContext(query, null, queryConf, conf, queryService.getDrivers()); |
| Map<LensDriver, String> driverQueries = new HashMap<>(); |
| for (LensDriver driver : queryService.getDrivers()) { |
| driverQueries.put(driver, query); |
| } |
| ctx.setDriverQueries(driverQueries); |
| |
| // This still holds since current database is default |
| assertEquals(queryService.getSession(lensSessionId).getCurrentDatabase(), "default"); |
| assertEquals(queryService.getSession(lensSessionId).getHiveConf().getClassLoader(), ctx.getConf() |
| .getClassLoader()); |
| assertEquals(queryService.getSession(lensSessionId).getHiveConf().getClassLoader(), |
| ctx.getDriverContext().getDriverConf(queryService.getDrivers().iterator().next()).getClassLoader()); |
| assertTrue(ctx.isDriverQueryExplicitlySet()); |
| for (LensDriver driver : queryService.getDrivers()) { |
| Configuration dconf = ctx.getDriverConf(driver); |
| assertEquals(dconf.get("test.session.key"), "svalue"); |
| // query specific conf |
| assertEquals(dconf.get("test.query.conf"), "qvalue"); |
| // lenssession default should be loaded |
| assertNotNull(dconf.get("lens.query.enable.persistent.resultset")); |
| // lens site should be loaded |
| assertEquals(dconf.get("test.lens.site.key"), "gsvalue"); |
| // hive default variables should not be set |
| assertNull(conf.get("hive.exec.local.scratchdir")); |
| // driver site should be loaded |
| assertEquals(dconf.get("lens.driver.test.key"), "set"); |
| // core default should not be loaded |
| assertNull(dconf.get("fs.default.name")); |
| // server configuration should not set |
| assertNull(dconf.get("lens.server.persist.location")); |
| } |
| |
| checkDefaultConfigConsistency(); |
| } |
| |
| public void checkDefaultConfigConsistency() { |
| Configuration conf = LensSessionImpl.createDefaultConf(); |
| assertNotNull(conf.get("lens.query.enable.persistent.resultset")); |
| boolean isDriverPersistent = conf.getBoolean("lens.query.enable.persistent.resultset", false); |
| conf.setBoolean("lens.query.enable.persistent.resultset", isDriverPersistent ? false : true); |
| conf.set("new_random_property", "new_random_property"); |
| |
| // Get the default conf again and verify its not modified by previous operations |
| conf = LensSessionImpl.createDefaultConf(); |
| boolean isDriverPersistentNow = conf.getBoolean("lens.query.enable.persistent.resultset", false); |
| assertEquals(isDriverPersistentNow, isDriverPersistent); |
| assertNull(conf.get("new_random_property")); |
| } |
| |
| /** |
| * Test estimate native query. |
| * |
| * @throws InterruptedException the interrupted exception |
| */ |
| @Test(dataProvider = "mediaTypeData") |
| public void testEstimateNativeQuery(MediaType mt) throws InterruptedException { |
| final WebTarget target = target().path("queryapi/queries"); |
| |
| // estimate native query |
| final FormDataMultiPart mp = new FormDataMultiPart(); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| mt)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID from " + TEST_TABLE)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "estimate")); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), new LensConf(), |
| mt)); |
| |
| final QueryCostTO result = target.request(mt) |
| .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE), |
| new GenericType<LensAPIResult<QueryCostTO>>() {}).getData(); |
| assertNotNull(result); |
| assertEquals(result.getEstimatedExecTimeMillis(), null); |
| assertEquals(result.getEstimatedResourceUsage(), Double.MAX_VALUE); |
| } |
| |
| |
| /** |
| * Check if DB static jars get passed to Hive driver |
| * @throws Exception |
| */ |
| @Test(dataProvider = "mediaTypeData") |
| public void testHiveDriverGetsDBJars(MediaType mt) throws Exception { |
| // Set DB to a db with static jars |
| HiveSessionService sessionService = LensServices.get().getService(SessionService.NAME); |
| |
| // Open session with a DB which has static jars |
| LensSessionHandle sessionHandle = |
| sessionService.openSession("foo@localhost", "bar", DB_WITH_JARS, new HashMap<String, String>()); |
| |
| // Add a jar in the session |
| File testJarFile = new File("target/testjars/test2.jar"); |
| sessionService.addResource(sessionHandle, "jar", "file://" + testJarFile.getAbsolutePath()); |
| |
| log.info("@@@ Opened session " + sessionHandle.getPublicId() + " with database " + DB_WITH_JARS); |
| LensSessionImpl session = sessionService.getSession(sessionHandle); |
| |
| // Jars should be pending until query is run |
| assertEquals(session.getPendingSessionResourcesForDatabase(DB_WITH_JARS).size(), 1); |
| assertEquals(session.getPendingSessionResourcesForDatabase(DB_WITH_JARS_2).size(), 1); |
| |
| final String tableInDBWithJars = "testHiveDriverGetsDBJars"; |
| try { |
| // First execute query on the session with db should load jars from DB |
| LensServerTestUtil.createTable(tableInDBWithJars, target(), sessionHandle, "(ID INT, IDSTR STRING) " |
| + "ROW FORMAT SERDE \"DatabaseJarSerde\"", mt); |
| |
| boolean addedToHiveDriver = false; |
| |
| for (LensDriver driver : queryService.getDrivers()) { |
| if (driver instanceof HiveDriver) { |
| addedToHiveDriver = |
| ((HiveDriver) driver).areDBResourcesAddedForSession(sessionHandle.getPublicId().toString(), DB_WITH_JARS); |
| if (addedToHiveDriver) { |
| break; //There are two Hive drivers now both pointing to same hive server. So break after first success |
| } |
| } |
| } |
| assertTrue(addedToHiveDriver); |
| |
| // Switch database |
| log.info("@@@# database switch test"); |
| session.setCurrentDatabase(DB_WITH_JARS_2); |
| LensServerTestUtil.createTable(tableInDBWithJars + "_2", target(), sessionHandle, "(ID INT, IDSTR STRING) " |
| + "ROW FORMAT SERDE \"DatabaseJarSerde\"", mt); |
| |
| // All db jars should have been added |
| assertTrue(session.getDBResources(DB_WITH_JARS_2).isEmpty()); |
| assertTrue(session.getDBResources(DB_WITH_JARS).isEmpty()); |
| |
| // All session resources must have been added to both DBs |
| assertFalse(session.getLensSessionPersistInfo().getResources().isEmpty()); |
| for (LensSessionImpl.ResourceEntry resource : session.getLensSessionPersistInfo().getResources()) { |
| assertTrue(resource.isAddedToDatabase(DB_WITH_JARS_2)); |
| assertTrue(resource.isAddedToDatabase(DB_WITH_JARS)); |
| } |
| |
| assertTrue(session.getPendingSessionResourcesForDatabase(DB_WITH_JARS).isEmpty()); |
| assertTrue(session.getPendingSessionResourcesForDatabase(DB_WITH_JARS_2).isEmpty()); |
| |
| } finally { |
| log.info("@@@ TEST_OVER"); |
| try { |
| LensServerTestUtil.dropTable(tableInDBWithJars, target(), sessionHandle, mt); |
| LensServerTestUtil.dropTable(tableInDBWithJars + "_2", target(), sessionHandle, mt); |
| } catch (Throwable th) { |
| log.error("Exception while dropping table.", th); |
| } |
| sessionService.closeSession(sessionHandle); |
| } |
| } |
| |
| @Test(dataProvider = "mediaTypeData") |
| public void testRewriteFailure(MediaType mt) { |
| final WebTarget target = target().path("queryapi/queries"); |
| |
| // estimate cube query which fails semantic analysis |
| final FormDataMultiPart mp = new FormDataMultiPart(); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| mt)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), |
| "sdfelect ID from cube_nonexist")); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "estimate")); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), new LensConf(), |
| mt)); |
| |
| final Response response = target.request(mt) |
| .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE)); |
| |
| |
| LensErrorTO expectedLensErrorTO = LensErrorTO.composedOf( |
| LensCubeErrorCode.SYNTAX_ERROR.getLensErrorInfo().getErrorCode(), |
| "Syntax Error: line 1:0 cannot recognize input near 'sdfelect' 'ID' 'from'", |
| TestDataUtils.MOCK_STACK_TRACE); |
| ErrorResponseExpectedData expectedData = new ErrorResponseExpectedData(BAD_REQUEST, expectedLensErrorTO); |
| |
| expectedData.verify(response); |
| } |
| |
| @Test |
| public void testDriverEstimateSkippingForRewritefailure() throws LensException { |
| Configuration conf = queryService.getLensConf(lensSessionId, new LensConf()); |
| QueryContext ctx = new QueryContext("cube select ID from nonexist", "user", new LensConf(), conf, |
| queryService.getDrivers()); |
| for (LensDriver driver : queryService.getDrivers()) { |
| ctx.setDriverRewriteError(driver, new LensException()); |
| } |
| |
| // All estimates should be skipped. |
| Map<LensDriver, AbstractQueryContext.DriverEstimateRunnable> estimateRunnables = ctx.getDriverEstimateRunnables(); |
| for (LensDriver driver : estimateRunnables.keySet()) { |
| estimateRunnables.get(driver).run(); |
| assertFalse(estimateRunnables.get(driver).isSucceeded(), driver + " estimate should have been skipped"); |
| } |
| |
| for (LensDriver driver : queryService.getDrivers()) { |
| assertNull(ctx.getDriverQueryCost(driver)); |
| } |
| } |
| |
| @Test(dataProvider = "mediaTypeData") |
| public void testNonSelectQueriesWithPersistResult(MediaType mt) throws InterruptedException { |
| LensConf conf = new LensConf(); |
| conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, "true"); |
| String tblName = "testNonSelectQueriesWithPersistResult"; |
| LensServerTestUtil.dropTableWithConf(tblName, target(), lensSessionId, conf, mt); |
| conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_SET, "true"); |
| LensServerTestUtil.dropTableWithConf(tblName, target(), lensSessionId, conf, mt); |
| conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, "false"); |
| LensServerTestUtil.dropTableWithConf(tblName, target(), lensSessionId, conf, mt); |
| conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_SET, "false"); |
| LensServerTestUtil.dropTableWithConf(tblName, target(), lensSessionId, conf, mt); |
| } |
| |
| @Test(dataProvider = "mediaTypeData") |
| public void testEstimateGauges(MediaType mt) { |
| final WebTarget target = target().path("queryapi/queries"); |
| |
| LensConf conf = new LensConf(); |
| String gaugeKey = "TestQueryService-testEstimateGauges" + mt.getSubtype(); |
| conf.addProperty(LensConfConstants.QUERY_METRIC_UNIQUE_ID_CONF_KEY, gaugeKey); |
| // estimate native query |
| final FormDataMultiPart mp = new FormDataMultiPart(); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| mt)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID from " + TEST_TABLE)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "estimate")); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), conf, |
| mt)); |
| |
| final QueryCostTO queryCostTO = target.request(mt) |
| .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE), |
| new GenericType<LensAPIResult<QueryCostTO>>() { |
| }).getData(); |
| assertNotNull(queryCostTO); |
| |
| MetricRegistry reg = LensMetricsRegistry.getStaticRegistry(); |
| |
| assertTrue(reg.getGauges().keySet().containsAll(Arrays.asList( |
| "lens.MethodMetricGauge." + gaugeKey + "-DRIVER_SELECTION", |
| "lens.MethodMetricGauge." + gaugeKey + "-hive/hive1-CUBE_REWRITE", |
| "lens.MethodMetricGauge." + gaugeKey + "-hive/hive1-DRIVER_ESTIMATE", |
| "lens.MethodMetricGauge." + gaugeKey + "-hive/hive1-RewriteUtil-rewriteQuery", |
| "lens.MethodMetricGauge." + gaugeKey + "-hive/hive2-CUBE_REWRITE", |
| "lens.MethodMetricGauge." + gaugeKey + "-hive/hive2-DRIVER_ESTIMATE", |
| "lens.MethodMetricGauge." + gaugeKey + "-hive/hive2-RewriteUtil-rewriteQuery", |
| "lens.MethodMetricGauge." + gaugeKey + "-jdbc/jdbc1-CUBE_REWRITE", |
| "lens.MethodMetricGauge." + gaugeKey + "-jdbc/jdbc1-DRIVER_ESTIMATE", |
| "lens.MethodMetricGauge." + gaugeKey + "-jdbc/jdbc1-RewriteUtil-rewriteQuery", |
| "lens.MethodMetricGauge." + gaugeKey + "-PARALLEL_ESTIMATE")), |
| reg.getGauges().keySet().toString()); |
| } |
| |
| @Test(dataProvider = "mediaTypeData") |
| public void testQueryRejection(MediaType mt) throws InterruptedException, IOException { |
| final WebTarget target = target().path("queryapi/queries"); |
| |
| final FormDataMultiPart mp = new FormDataMultiPart(); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| mt)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "blah select ID from " |
| + TEST_TABLE)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "execute")); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), new LensConf(), |
| mt)); |
| |
| Response response = target.request(mt).post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE)); |
| assertEquals(response.getStatus(), 400); |
| } |
| |
| /** |
| * Test query purger |
| * |
| * @throws InterruptedException the interrupted exception |
| * @throws IOException Signals that an I/O exception has occurred. |
| */ |
| @Test(dataProvider = "mediaTypeData") |
| public void testQueryPurger(MediaType mt) throws InterruptedException, IOException { |
| waitForPurge(); |
| LensConf conf = getLensConf(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, "false"); |
| // test post execute op |
| LensQuery ctx1 = executeAndWaitForQueryToFinish(target(), lensSessionId, |
| "select ID, IDSTR from " + TEST_TABLE, |
| Optional.of(conf), Optional.of(Status.SUCCESSFUL), mt); |
| LensQuery ctx2 = executeAndWaitForQueryToFinish(target(), lensSessionId, |
| "select ID, IDSTR from " + TEST_TABLE, |
| Optional.of(conf), Optional.of(Status.SUCCESSFUL), mt); |
| LensQuery ctx3 = executeAndWaitForQueryToFinish(target(), lensSessionId, |
| "select ID, IDSTR from " + TEST_TABLE, |
| Optional.of(conf), Optional.of(Status.SUCCESSFUL), mt); |
| waitForPurge(3, queryService.finishedQueries); |
| assertEquals(queryService.finishedQueries.size(), 3); |
| getLensQueryResultAsString(target(), lensSessionId, ctx3.getQueryHandle(), mt); |
| waitForPurge(2, queryService.finishedQueries); |
| assertTrue(queryService.finishedQueries.size() == 2); |
| getLensQueryResultAsString(target(), lensSessionId, ctx2.getQueryHandle(), mt); |
| waitForPurge(1, queryService.finishedQueries); |
| assertTrue(queryService.finishedQueries.size() == 1); |
| getLensQueryResultAsString(target(), lensSessionId, ctx1.getQueryHandle(), mt); |
| } |
| |
| /** |
| * Test session close when a query is active on the session |
| * |
| * @throws Exception |
| */ |
| @Test(dataProvider = "mediaTypeData") |
| public void testSessionClose(MediaType mt) throws Exception { |
| // Query with group by, will run long enough to close the session before finish |
| String query = "select ID, IDSTR, count(*) from " + TEST_TABLE + " group by ID, IDSTR"; |
| SessionService sessionService = LensServices.get().getService(HiveSessionService.NAME); |
| Map<String, String> sessionconf = new HashMap<>(); |
| LensSessionHandle sessionHandle = sessionService.openSession("foo", "bar", "default", sessionconf); |
| LensConf conf = getLensConf(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, "true"); |
| QueryHandle qHandle = |
| executeAndGetHandle(target(), Optional.of(sessionHandle), Optional.of(query), Optional.of(conf), mt); |
| sessionService.closeSession(sessionHandle); |
| sessionHandle = sessionService.openSession("foo", "bar", "default", sessionconf); |
| waitForQueryToFinish(target(), sessionHandle, qHandle, Status.SUCCESSFUL, mt); |
| } |
| |
| @AfterMethod |
| private void waitForPurge() throws InterruptedException { |
| waitForPurge(0, queryService.finishedQueries); |
| } |
| |
| |
| @Test(dataProvider = "mediaTypeData") |
| public void testFinishedNotification(MediaType mt) throws Exception { |
| try { |
| String query = "select ID, IDSTR, count(*) from " + TEST_TABLE + " group by ID, IDSTR"; |
| String endpoint = getBaseUri() + "/queryapi/notifictaion/finished"; |
| String encodedHttpEndPoint1 = endpoint + "?access_token=" + URLEncoder.encode("ABC123", "UTF-8"); |
| System.out.println("encodedHttpEndPoint1 :" + encodedHttpEndPoint1); |
| String encodedHttpEndPoint2 = endpoint + "?access_token=" + URLEncoder.encode("ABC123", "UTF-8") + "&data=" |
| + URLEncoder.encode("x<>yz,\"abc", "UTF-8"); |
| System.out.println("encodedHttpEndPoint2 :" + encodedHttpEndPoint2); |
| LensConf conf = new LensConf(); |
| conf.addProperty(LensConfConstants.QUERY_HTTP_NOTIFICATION_TYPE_FINISHED, "true"); |
| conf.addProperty(LensConfConstants.QUERY_HTTP_NOTIFICATION_MEDIATYPE, mt); |
| conf.addProperty(LensConfConstants.QUERY_HTTP_NOTIFICATION_URLS, encodedHttpEndPoint1 + " , " |
| + encodedHttpEndPoint2); |
| |
| //Test for SUCCESSFUL FINISH notification |
| QueryHandle handle1 = queryService.executeAsync(lensSessionId, query, conf, |
| "testHttpNotificationQuerySuccessful"); |
| |
| //TEST for CANCELLED FINISH notification |
| conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_SET, "true"); |
| conf.addProperty(LensConfConstants.QUERY_OUTPUT_FORMATTER, DeferredPersistentResultFormatter.class.getName()); |
| conf.addProperty("deferPersistenceByMillis", 5000); // defer persistence for 5 secs |
| QueryHandle handle2 = queryService.executeAsync(lensSessionId, query, conf, "testHttpNotificationQueryCanceled"); |
| queryService.cancelQuery(lensSessionId, handle2); |
| |
| //Test for FAILED FINISH notification |
| conf.addProperty(LensConfConstants.QUERY_OUTPUT_FORMATTER, "wrong.formatter"); |
| QueryHandle handle3 = queryService.executeAsync(lensSessionId, query, conf, "testHttpNotificationQueryFailed"); |
| |
| for (QueryHandle handle : new QueryHandle[]{handle1, handle2, handle3}) { |
| LensQuery lensQuery = queryService.getQuery(lensSessionId, handle); |
| while (!lensQuery.getStatus().finished()) { |
| Thread.sleep(100); |
| lensQuery = queryService.getQuery(lensSessionId, handle); |
| } |
| assertTrue(lensQuery.getQueryName().toUpperCase().contains(lensQuery.getStatus().getStatus().name()), |
| "Query finished with wrong status: " + lensQuery); |
| log.info("query {} finished", lensQuery); |
| } |
| // sleep more to allow notifications to go |
| Thread.sleep(3000); |
| assertEquals(TestQueryNotifictaionResource.getFinishedCount(), 6); |
| assertEquals(TestQueryNotifictaionResource.getSuccessfulCount(), 2); |
| assertEquals(TestQueryNotifictaionResource.getCancelledCount(), 2); |
| assertEquals(TestQueryNotifictaionResource.getFailedCount(), 2); |
| assertEquals(TestQueryNotifictaionResource.getAccessTokenCount(), 6); |
| assertEquals(TestQueryNotifictaionResource.getDataCount(), 3); |
| } finally { |
| TestQueryNotifictaionResource.clearState(); |
| } |
| } |
| |
| @Test |
| public void testGetQueryDetails() throws IOException, InterruptedException, LensException { |
| |
| UUID queryName = UUID.randomUUID(); |
| WebTarget target = target().path("queryapi/queries"); |
| final FormDataMultiPart mp = new FormDataMultiPart(); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, |
| APPLICATION_XML_TYPE)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID, IDSTR from " |
| + TEST_TABLE)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "execute_with_timeout")); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("timeoutmillis").build(), "300000")); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("queryName").build(), queryName.toString())); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), new LensConf(), |
| APPLICATION_XML_TYPE)); |
| |
| QueryHandleWithResultSet result = target.request(APPLICATION_XML_TYPE) |
| .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE), |
| new GenericType<LensAPIResult<QueryHandleWithResultSet>>() {}).getData(); |
| assertNotNull(result.getQueryHandle()); |
| assertNotNull(result.getResult()); |
| |
| target = target().path("queryapi/queries/detail"); |
| List<LensQuery> results = target.queryParam("queryName", queryName) |
| .queryParam("sessionid", lensSessionId) |
| .request(APPLICATION_XML_TYPE) |
| .get(new GenericType<List<LensQuery>>(){}); |
| Assert.assertNotNull(results); |
| Assert.assertEquals(1, results.size()); |
| Assert.assertEquals(queryName.toString(), results.get(0).getQueryName()); |
| Assert.assertEquals(result.getQueryHandle(), results.get(0).getQueryHandle()); |
| } |
| |
| @Test |
| public void testQueryTimeoutOnWaitingQuery() throws Exception { |
| String query = "select mock, fail, autocancel from " + TEST_TABLE; |
| LensConf lconf = new LensConf(); |
| lconf.addProperty(LensConfConstants.QUERY_TIMEOUT_MILLIS, 300); |
| QueryHandle handle = executeAndGetHandle(target(), Optional.of(lensSessionId), Optional.of(query), |
| Optional.of(lconf), defaultMT); |
| // query should have been cancelled, as query timeout is 300 millis |
| waitForQueryToFinish(target(), lensSessionId, handle, Status.CANCELED, defaultMT); |
| LensQuery lensQuery = getLensQuery(target(), lensSessionId, handle, defaultMT); |
| assertTrue((lensQuery.getFinishTime() - lensQuery.getLaunchTime()) >= 300, "Query time is " |
| + (lensQuery.getFinishTime() - lensQuery.getLaunchTime())); |
| assertTrue((lensQuery.getFinishTime() - lensQuery.getLaunchTime()) < 400, "Query time is " |
| + (lensQuery.getFinishTime() - lensQuery.getLaunchTime())); |
| } |
| |
| //@Test(dataProvider = "mediaTypeData") |
| public void testEstimateRejectionException(MediaType mt) throws Exception { |
| class EstimateRunnable implements Runnable { |
| boolean failed = false; |
| boolean completed = false; |
| boolean wrongMessage = true; |
| |
| @Override |
| public void run() { |
| Map<String, String> sessionconf = new HashMap<>(); |
| sessionconf.put("test.session.key", "svalue"); |
| LensSessionHandle sessionhandle = null; |
| try { |
| sessionhandle = queryService.openSession("foo@localhost", "bar", sessionconf); |
| final WebTarget target = target().path("queryapi/queries"); |
| LensConf lensConf = new LensConf(); |
| lensConf.addProperty("mock.driver.sleep", "true"); |
| lensConf.addProperty("mock.driver.sleep.ms", "500"); |
| final FormDataMultiPart mp = new FormDataMultiPart(); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), sessionhandle, mt)); |
| mp.bodyPart( |
| new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID from " + TEST_TABLE)); |
| mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "estimate")); |
| mp.bodyPart( |
| new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), lensConf, mt)); |
| Response response = target.request(mt).post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE)); |
| LensAPIResult result = response.readEntity(LensAPIResult.class); |
| if (response.getStatus() == 500) { |
| failed = true; |
| if (result.getLensErrorTO().getMessage().equals("Server is overloaded at this time")) { |
| wrongMessage = false; |
| } |
| } |
| if (response.getStatus() == 200) { |
| completed = true; |
| } |
| } catch (LensException e) { |
| log.error("Error in the EstimateRunnable thread while creating a session", e); |
| } finally { |
| try { |
| queryService.closeSession(sessionhandle); |
| } catch (LensException e) { |
| log.error("Error in the EstimateRunnable thread while closing the session", e); |
| } |
| } |
| } |
| } |
| List<Thread> threads = new ArrayList<>(); |
| List<EstimateRunnable> estimateRunnables = new ArrayList<>(); |
| |
| int totalTasks = 10; |
| for (int i = 0; i < totalTasks; i++) { |
| EstimateRunnable r = new EstimateRunnable(); |
| estimateRunnables.add(r); |
| Thread t = new Thread(r); |
| threads.add(t); |
| t.start(); |
| } |
| // Wait for them to finish |
| for (Thread t : threads) { |
| t.join(); |
| } |
| List<EstimateRunnable> completedTaks = estimateRunnables.stream().filter(new Predicate<EstimateRunnable>() { |
| @Override |
| public boolean test(EstimateRunnable estimateRunnable) { |
| return estimateRunnable.completed; |
| } |
| }).collect(Collectors.toList()); |
| List<EstimateRunnable> inCompleteTasks = estimateRunnables.stream().filter(new Predicate<EstimateRunnable>() { |
| @Override |
| public boolean test(EstimateRunnable estimateRunnable) { |
| // If estimate was failed, it should only be because of the server getting overloaded. |
| return estimateRunnable.failed && !estimateRunnable.wrongMessage; |
| } |
| }).collect(Collectors.toList()); |
| |
| assertTrue(completedTaks.size() > 0); |
| assertTrue(inCompleteTasks.size() > 0); |
| assertEquals(completedTaks.size() + inCompleteTasks.size(), totalTasks); |
| } |
| } |