| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.msq.sql.resources; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.SettableFuture; |
| import org.apache.calcite.sql.type.SqlTypeName; |
| import org.apache.druid.client.indexing.TaskPayloadResponse; |
| import org.apache.druid.client.indexing.TaskStatusResponse; |
| import org.apache.druid.error.ErrorResponse; |
| import org.apache.druid.indexer.TaskLocation; |
| import org.apache.druid.indexer.TaskState; |
| import org.apache.druid.indexer.TaskStatusPlus; |
| import org.apache.druid.indexer.report.TaskReport; |
| import org.apache.druid.indexing.common.task.IndexTask; |
| import org.apache.druid.java.util.common.DateTimes; |
| import org.apache.druid.java.util.common.Intervals; |
| import org.apache.druid.java.util.common.StringUtils; |
| import org.apache.druid.java.util.common.granularity.Granularities; |
| import org.apache.druid.java.util.common.guava.Sequences; |
| import org.apache.druid.java.util.common.guava.Yielders; |
| import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; |
| import org.apache.druid.msq.counters.ChannelCounters; |
| import org.apache.druid.msq.counters.CounterSnapshots; |
| import org.apache.druid.msq.counters.CounterSnapshotsTree; |
| import org.apache.druid.msq.indexing.MSQControllerTask; |
| import org.apache.druid.msq.indexing.MSQSpec; |
| import org.apache.druid.msq.indexing.MSQTuningConfig; |
| import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; |
| import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination; |
| import org.apache.druid.msq.indexing.report.MSQResultsReport; |
| import org.apache.druid.msq.indexing.report.MSQStagesReport; |
| import org.apache.druid.msq.indexing.report.MSQStatusReport; |
| import org.apache.druid.msq.indexing.report.MSQTaskReport; |
| import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; |
| import org.apache.druid.msq.indexing.report.MSQTaskReportTest; |
| import org.apache.druid.msq.sql.MSQTaskQueryMaker; |
| import org.apache.druid.msq.sql.SqlStatementState; |
| import org.apache.druid.msq.sql.entity.ColumnNameAndTypes; |
| import org.apache.druid.msq.sql.entity.PageInformation; |
| import org.apache.druid.msq.sql.entity.ResultSetInformation; |
| import org.apache.druid.msq.sql.entity.SqlStatementResult; |
| import org.apache.druid.msq.test.MSQTestBase; |
| import org.apache.druid.query.Druids; |
| import org.apache.druid.query.Query; |
| import org.apache.druid.query.scan.ScanQuery; |
| import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; |
| import org.apache.druid.rpc.HttpResponseException; |
| import org.apache.druid.rpc.indexing.OverlordClient; |
| import org.apache.druid.segment.TestHelper; |
| import org.apache.druid.segment.column.ColumnType; |
| import org.apache.druid.segment.column.RowSignature; |
| import org.apache.druid.segment.column.ValueType; |
| import org.apache.druid.server.mocks.MockHttpServletRequest; |
| import org.apache.druid.server.security.Access; |
| import org.apache.druid.server.security.Action; |
| import org.apache.druid.server.security.AuthConfig; |
| import org.apache.druid.server.security.AuthenticationResult; |
| import org.apache.druid.server.security.Authorizer; |
| import org.apache.druid.server.security.AuthorizerMapper; |
| import org.apache.druid.server.security.ResourceType; |
| import org.apache.druid.sql.calcite.planner.ColumnMappings; |
| import org.apache.druid.sql.calcite.util.CalciteTests; |
| import org.apache.druid.sql.http.ResultFormat; |
| import org.apache.druid.sql.http.SqlResourceTest; |
| import org.apache.druid.storage.local.LocalFileStorageConnector; |
| import org.jboss.netty.handler.codec.http.DefaultHttpResponse; |
| import org.jboss.netty.handler.codec.http.HttpResponseStatus; |
| import org.jboss.netty.handler.codec.http.HttpVersion; |
| import org.joda.time.DateTime; |
| import org.junit.Assert; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Test; |
| import org.mockito.ArgumentMatchers; |
| import org.mockito.Mock; |
| import org.mockito.Mockito; |
| |
| import javax.ws.rs.core.Response; |
| import java.io.IOException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayDeque; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.function.Supplier; |
| |
| public class SqlStatementResourceTest extends MSQTestBase |
| { |
| public static final DateTime CREATED_TIME = DateTimes.of("2023-05-31T12:00Z"); |
| private static final ObjectMapper JSON_MAPPER = TestHelper.makeJsonMapper(); |
| |
| private static final String ACCEPTED_SELECT_MSQ_QUERY = "QUERY_ID_1"; |
| private static final String RUNNING_SELECT_MSQ_QUERY = "QUERY_ID_2"; |
| private static final String FINISHED_SELECT_MSQ_QUERY = "QUERY_ID_3"; |
| private static final String ERRORED_SELECT_MSQ_QUERY = "QUERY_ID_4"; |
| |
| private static final String RUNNING_NON_MSQ_TASK = "QUERY_ID_5"; |
| private static final String FAILED_NON_MSQ_TASK = "QUERY_ID_6"; |
| private static final String FINISHED_NON_MSQ_TASK = "QUERY_ID_7"; |
| |
| private static final String ACCEPTED_INSERT_MSQ_TASK = "QUERY_ID_8"; |
| private static final String RUNNING_INSERT_MSQ_QUERY = "QUERY_ID_9"; |
| private static final String FINISHED_INSERT_MSQ_QUERY = "QUERY_ID_10"; |
| private static final String ERRORED_INSERT_MSQ_QUERY = "QUERY_ID_11"; |
| |
| |
| private static final Query<?> QUERY = new Druids.ScanQueryBuilder().resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) |
| .legacy(false) |
| .intervals(new MultipleIntervalSegmentSpec( |
| Collections.singletonList(Intervals.of( |
| "2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z")))) |
| .dataSource("target") |
| .context(ImmutableMap.of( |
| MSQTaskQueryMaker.USER_KEY, |
| AuthConfig.ALLOW_ALL_NAME |
| )) |
| .build(); |
| |
| |
| private static final MSQControllerTask MSQ_CONTROLLER_SELECT_PAYLOAD = new MSQControllerTask( |
| ACCEPTED_SELECT_MSQ_QUERY, |
| MSQSpec.builder() |
| .query(QUERY) |
| .columnMappings( |
| ColumnMappings.identity( |
| RowSignature.builder() |
| .add( |
| "_time", |
| ColumnType.LONG |
| ) |
| .add( |
| "alias", |
| ColumnType.STRING |
| ) |
| .add( |
| "market", |
| ColumnType.STRING |
| ) |
| .build())) |
| .destination(TaskReportMSQDestination.instance()) |
| .tuningConfig( |
| MSQTuningConfig.defaultConfig()) |
| .build(), |
| "select _time,alias,market from test", |
| new HashMap<>(), |
| null, |
| ImmutableList.of( |
| SqlTypeName.TIMESTAMP, |
| SqlTypeName.VARCHAR, |
| SqlTypeName.VARCHAR |
| ), |
| ImmutableList.of( |
| ColumnType.LONG, |
| ColumnType.STRING, |
| ColumnType.STRING |
| ), |
| null |
| ); |
| |
| private static final MSQControllerTask MSQ_CONTROLLER_INSERT_PAYLOAD = new MSQControllerTask( |
| ACCEPTED_SELECT_MSQ_QUERY, |
| MSQSpec.builder() |
| .query(QUERY) |
| .columnMappings( |
| ColumnMappings.identity( |
| RowSignature.builder() |
| .add( |
| "_time", |
| ColumnType.LONG |
| ) |
| .add( |
| "alias", |
| ColumnType.STRING |
| ) |
| .add( |
| "market", |
| ColumnType.STRING |
| ) |
| .build())) |
| .destination(new DataSourceMSQDestination( |
| "test", |
| Granularities.DAY, |
| null, |
| null |
| )) |
| .tuningConfig( |
| MSQTuningConfig.defaultConfig()) |
| .build(), |
| "insert into test select _time,alias,market from test", |
| new HashMap<>(), |
| null, |
| ImmutableList.of( |
| SqlTypeName.TIMESTAMP, |
| SqlTypeName.VARCHAR, |
| SqlTypeName.VARCHAR |
| ), |
| ImmutableList.of( |
| ColumnType.LONG, |
| ColumnType.STRING, |
| ColumnType.STRING |
| ), |
| null |
| ); |
| |
| private static final List<Object[]> RESULT_ROWS = ImmutableList.of( |
| new Object[]{123, "foo", "bar"}, |
| new Object[]{234, "foo1", "bar1"} |
| ); |
| |
| private final Supplier<MSQTaskReport> selectTaskReport = () -> new MSQTaskReport( |
| FINISHED_SELECT_MSQ_QUERY, |
| new MSQTaskReportPayload( |
| new MSQStatusReport( |
| TaskState.SUCCESS, |
| null, |
| new ArrayDeque<>(), |
| null, |
| 0, |
| new HashMap<>(), |
| 1, |
| 2, |
| null, |
| null |
| ), |
| MSQStagesReport.create( |
| MSQTaskReportTest.QUERY_DEFINITION, |
| ImmutableMap.of(), |
| ImmutableMap.of(), |
| ImmutableMap.of(0, 1), |
| ImmutableMap.of(0, 1) |
| |
| ), |
| CounterSnapshotsTree.fromMap(ImmutableMap.of( |
| 0, |
| ImmutableMap.of( |
| 0, |
| new CounterSnapshots(ImmutableMap.of( |
| "output", |
| new ChannelCounters.Snapshot( |
| new long[]{1L, 2L}, |
| new long[]{3L, 5L}, |
| new long[]{}, |
| new long[]{}, |
| new long[]{} |
| ) |
| ) |
| ) |
| ) |
| )), |
| new MSQResultsReport( |
| ImmutableList.of( |
| new MSQResultsReport.ColumnAndType( |
| "_time", |
| ColumnType.LONG |
| ), |
| new MSQResultsReport.ColumnAndType( |
| "alias", |
| ColumnType.STRING |
| ), |
| new MSQResultsReport.ColumnAndType( |
| "market", |
| ColumnType.STRING |
| ) |
| ), |
| ImmutableList.of( |
| SqlTypeName.TIMESTAMP, |
| SqlTypeName.VARCHAR, |
| SqlTypeName.VARCHAR |
| ), |
| Yielders.each( |
| Sequences.simple( |
| RESULT_ROWS)), |
| null |
| ) |
| ) |
| ); |
| |
| private static final MSQTaskReport MSQ_INSERT_TASK_REPORT = new MSQTaskReport( |
| FINISHED_INSERT_MSQ_QUERY, |
| new MSQTaskReportPayload( |
| new MSQStatusReport( |
| TaskState.SUCCESS, |
| null, |
| new ArrayDeque<>(), |
| null, |
| 0, |
| new HashMap<>(), |
| 1, |
| 2, |
| null, |
| null |
| ), |
| MSQStagesReport.create( |
| MSQTaskReportTest.QUERY_DEFINITION, |
| ImmutableMap.of(), |
| ImmutableMap.of(), |
| ImmutableMap.of(), |
| ImmutableMap.of() |
| ), |
| new CounterSnapshotsTree(), |
| null |
| ) |
| ); |
| private static final DateTime QUEUE_INSERTION_TIME = DateTimes.of("2023-05-31T12:01Z"); |
| public static final ImmutableList<ColumnNameAndTypes> COL_NAME_AND_TYPES = ImmutableList.of( |
| new ColumnNameAndTypes( |
| "_time", |
| SqlTypeName.TIMESTAMP.getName(), |
| ValueType.LONG.name() |
| ), |
| new ColumnNameAndTypes( |
| "alias", |
| SqlTypeName.VARCHAR.getName(), |
| ValueType.STRING.name() |
| ), |
| new ColumnNameAndTypes( |
| "market", |
| SqlTypeName.VARCHAR.getName(), |
| ValueType.STRING.name() |
| ) |
| ); |
| private static final String FAILURE_MSG = "failure msg"; |
| private static SqlStatementResource resource; |
| |
| private static final String SUPERUSER = "superuser"; |
| private static final String STATE_R_USER = "stateR"; |
| private static final String STATE_W_USER = "stateW"; |
| private static final String STATE_RW_USER = "stateRW"; |
| |
| private AuthorizerMapper authorizerMapper = new AuthorizerMapper(null) |
| { |
| @Override |
| public Authorizer getAuthorizer(String name) |
| { |
| return (authenticationResult, resource, action) -> { |
| if (SUPERUSER.equals(authenticationResult.getIdentity())) { |
| return Access.OK; |
| } |
| |
| switch (resource.getType()) { |
| case ResourceType.DATASOURCE: |
| case ResourceType.VIEW: |
| case ResourceType.QUERY_CONTEXT: |
| case ResourceType.EXTERNAL: |
| return Access.OK; |
| case ResourceType.STATE: |
| String identity = authenticationResult.getIdentity(); |
| if (action == Action.READ) { |
| if (STATE_R_USER.equals(identity) || STATE_RW_USER.equals(identity)) { |
| return Access.OK; |
| } |
| } else if (action == Action.WRITE) { |
| if (STATE_W_USER.equals(identity) || STATE_RW_USER.equals(identity)) { |
| return Access.OK; |
| } |
| } |
| return Access.DENIED; |
| |
| default: |
| return Access.DENIED; |
| } |
| }; |
| } |
| }; |
| |
| @Mock |
| private OverlordClient overlordClient; |
| |
| private void setupMocks(OverlordClient indexingServiceClient) |
| { |
| Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(ACCEPTED_SELECT_MSQ_QUERY))) |
| .thenReturn(Futures.immediateFuture(new TaskStatusResponse(ACCEPTED_SELECT_MSQ_QUERY, new TaskStatusPlus( |
| ACCEPTED_SELECT_MSQ_QUERY, |
| null, |
| MSQControllerTask.TYPE, |
| CREATED_TIME, |
| QUEUE_INSERTION_TIME, |
| null, |
| null, |
| null, |
| TaskLocation.unknown(), |
| null, |
| null |
| )))); |
| |
| Mockito.when(indexingServiceClient.taskPayload(ArgumentMatchers.eq(ACCEPTED_SELECT_MSQ_QUERY))) |
| .thenReturn(Futures.immediateFuture(new TaskPayloadResponse( |
| ACCEPTED_SELECT_MSQ_QUERY, |
| MSQ_CONTROLLER_SELECT_PAYLOAD |
| ))); |
| |
| Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(RUNNING_SELECT_MSQ_QUERY))) |
| .thenReturn(Futures.immediateFuture(new TaskStatusResponse(RUNNING_SELECT_MSQ_QUERY, new TaskStatusPlus( |
| RUNNING_SELECT_MSQ_QUERY, |
| null, |
| MSQControllerTask.TYPE, |
| CREATED_TIME, |
| QUEUE_INSERTION_TIME, |
| TaskState.RUNNING, |
| null, |
| null, |
| TaskLocation.create("test", 0, 0), |
| null, |
| null |
| )))); |
| |
| Mockito.when(indexingServiceClient.taskPayload(RUNNING_SELECT_MSQ_QUERY)) |
| .thenReturn(Futures.immediateFuture(new TaskPayloadResponse( |
| RUNNING_SELECT_MSQ_QUERY, |
| MSQ_CONTROLLER_SELECT_PAYLOAD |
| ))); |
| |
| |
| Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(FINISHED_SELECT_MSQ_QUERY))) |
| .thenReturn(Futures.immediateFuture(new TaskStatusResponse(FINISHED_SELECT_MSQ_QUERY, new TaskStatusPlus( |
| FINISHED_SELECT_MSQ_QUERY, |
| null, |
| MSQControllerTask.TYPE, |
| CREATED_TIME, |
| QUEUE_INSERTION_TIME, |
| TaskState.SUCCESS, |
| null, |
| 100L, |
| TaskLocation.create("test", 0, 0), |
| null, |
| null |
| )))); |
| |
| Mockito.when(indexingServiceClient.taskPayload(FINISHED_SELECT_MSQ_QUERY)) |
| .thenReturn(Futures.immediateFuture(new TaskPayloadResponse( |
| FINISHED_SELECT_MSQ_QUERY, |
| MSQ_CONTROLLER_SELECT_PAYLOAD |
| ))); |
| |
| |
| Mockito.when(indexingServiceClient.taskReportAsMap(FINISHED_SELECT_MSQ_QUERY)) |
| .thenAnswer(inv -> Futures.immediateFuture(TaskReport.buildTaskReports(selectTaskReport.get()))); |
| |
| Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(ERRORED_SELECT_MSQ_QUERY))) |
| .thenReturn(Futures.immediateFuture(new TaskStatusResponse(ERRORED_SELECT_MSQ_QUERY, new TaskStatusPlus( |
| ERRORED_SELECT_MSQ_QUERY, |
| null, |
| MSQControllerTask.TYPE, |
| CREATED_TIME, |
| QUEUE_INSERTION_TIME, |
| TaskState.FAILED, |
| null, |
| -1L, |
| TaskLocation.unknown(), |
| null, |
| FAILURE_MSG |
| )))); |
| |
| Mockito.when(indexingServiceClient.taskPayload(ArgumentMatchers.eq(ERRORED_SELECT_MSQ_QUERY))) |
| .thenReturn(Futures.immediateFuture(new TaskPayloadResponse( |
| FINISHED_INSERT_MSQ_QUERY, |
| MSQ_CONTROLLER_SELECT_PAYLOAD |
| ))); |
| |
| Mockito.when(indexingServiceClient.taskReportAsMap(ArgumentMatchers.eq(ERRORED_SELECT_MSQ_QUERY))) |
| .thenReturn(Futures.immediateFuture(null)); |
| |
| Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(RUNNING_NON_MSQ_TASK))) |
| .thenReturn(Futures.immediateFuture(new TaskStatusResponse(RUNNING_NON_MSQ_TASK, new TaskStatusPlus( |
| RUNNING_NON_MSQ_TASK, |
| null, |
| null, |
| CREATED_TIME, |
| QUEUE_INSERTION_TIME, |
| TaskState.RUNNING, |
| null, |
| -1L, |
| TaskLocation.unknown(), |
| null, |
| null |
| )))); |
| |
| |
| Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(FAILED_NON_MSQ_TASK))) |
| .thenReturn(Futures.immediateFuture(new TaskStatusResponse(FAILED_NON_MSQ_TASK, new TaskStatusPlus( |
| FAILED_NON_MSQ_TASK, |
| null, |
| null, |
| CREATED_TIME, |
| QUEUE_INSERTION_TIME, |
| TaskState.FAILED, |
| null, |
| -1L, |
| TaskLocation.unknown(), |
| null, |
| FAILURE_MSG |
| )))); |
| |
| |
| Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(FINISHED_NON_MSQ_TASK))) |
| .thenReturn(Futures.immediateFuture(new TaskStatusResponse(FINISHED_NON_MSQ_TASK, new TaskStatusPlus( |
| FINISHED_NON_MSQ_TASK, |
| null, |
| IndexTask.TYPE, |
| CREATED_TIME, |
| QUEUE_INSERTION_TIME, |
| TaskState.SUCCESS, |
| null, |
| -1L, |
| TaskLocation.unknown(), |
| null, |
| null |
| )))); |
| |
| |
| Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(ACCEPTED_INSERT_MSQ_TASK))) |
| .thenReturn(Futures.immediateFuture(new TaskStatusResponse(ACCEPTED_SELECT_MSQ_QUERY, new TaskStatusPlus( |
| ACCEPTED_SELECT_MSQ_QUERY, |
| null, |
| MSQControllerTask.TYPE, |
| CREATED_TIME, |
| QUEUE_INSERTION_TIME, |
| null, |
| null, |
| null, |
| TaskLocation.unknown(), |
| null, |
| null |
| )))); |
| |
| Mockito.when(indexingServiceClient.taskPayload(ArgumentMatchers.eq(ACCEPTED_INSERT_MSQ_TASK))) |
| .thenReturn(Futures.immediateFuture(new TaskPayloadResponse( |
| ACCEPTED_INSERT_MSQ_TASK, |
| MSQ_CONTROLLER_INSERT_PAYLOAD |
| ))); |
| |
| Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(RUNNING_INSERT_MSQ_QUERY))) |
| .thenReturn(Futures.immediateFuture(new TaskStatusResponse(RUNNING_INSERT_MSQ_QUERY, new TaskStatusPlus( |
| RUNNING_INSERT_MSQ_QUERY, |
| null, |
| MSQControllerTask.TYPE, |
| CREATED_TIME, |
| QUEUE_INSERTION_TIME, |
| TaskState.RUNNING, |
| null, |
| null, |
| TaskLocation.create("test", 0, 0), |
| null, |
| null |
| )))); |
| |
| Mockito.when(indexingServiceClient.taskPayload(RUNNING_INSERT_MSQ_QUERY)) |
| .thenReturn(Futures.immediateFuture(new TaskPayloadResponse( |
| RUNNING_INSERT_MSQ_QUERY, |
| MSQ_CONTROLLER_INSERT_PAYLOAD |
| ))); |
| |
| |
| Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(FINISHED_INSERT_MSQ_QUERY))) |
| .thenReturn(Futures.immediateFuture(new TaskStatusResponse(FINISHED_INSERT_MSQ_QUERY, new TaskStatusPlus( |
| FINISHED_INSERT_MSQ_QUERY, |
| null, |
| MSQControllerTask.TYPE, |
| CREATED_TIME, |
| QUEUE_INSERTION_TIME, |
| TaskState.SUCCESS, |
| null, |
| 100L, |
| TaskLocation.create("test", 0, 0), |
| null, |
| null |
| )))); |
| |
| Mockito.when(indexingServiceClient.taskReportAsMap(ArgumentMatchers.eq(FINISHED_INSERT_MSQ_QUERY))) |
| .thenReturn(Futures.immediateFuture(TaskReport.buildTaskReports(MSQ_INSERT_TASK_REPORT))); |
| |
| Mockito.when(indexingServiceClient.taskPayload(FINISHED_INSERT_MSQ_QUERY)) |
| .thenReturn(Futures.immediateFuture(new TaskPayloadResponse( |
| FINISHED_INSERT_MSQ_QUERY, |
| MSQ_CONTROLLER_INSERT_PAYLOAD |
| ))); |
| |
| Mockito.when(indexingServiceClient.taskStatus(ArgumentMatchers.eq(ERRORED_INSERT_MSQ_QUERY))) |
| .thenReturn(Futures.immediateFuture(new TaskStatusResponse(ERRORED_INSERT_MSQ_QUERY, new TaskStatusPlus( |
| ERRORED_INSERT_MSQ_QUERY, |
| null, |
| MSQControllerTask.TYPE, |
| CREATED_TIME, |
| QUEUE_INSERTION_TIME, |
| TaskState.FAILED, |
| null, |
| -1L, |
| TaskLocation.unknown(), |
| null, |
| FAILURE_MSG |
| )))); |
| |
| Mockito.when(indexingServiceClient.taskPayload(ArgumentMatchers.eq(ERRORED_INSERT_MSQ_QUERY))) |
| .thenReturn(Futures.immediateFuture(new TaskPayloadResponse( |
| ERRORED_INSERT_MSQ_QUERY, |
| MSQ_CONTROLLER_INSERT_PAYLOAD |
| ))); |
| |
| Mockito.when(indexingServiceClient.taskReportAsMap(ArgumentMatchers.eq(ERRORED_INSERT_MSQ_QUERY))) |
| .thenReturn(Futures.immediateFuture(null)); |
| |
| } |
| |
| public static void assertNotFound(Response response, String queryId) |
| { |
| assertExceptionMessage(response, StringUtils.format("Query [%s] was not found. The query details are no longer present or might not be of the type [%s]. Verify that the id is correct.", queryId, MSQControllerTask.TYPE), Response.Status.NOT_FOUND); |
| } |
| |
| public static void assertExceptionMessage( |
| Response response, |
| String exceptionMessage, |
| Response.Status expectectedStatus |
| ) |
| { |
| Assert.assertEquals(expectectedStatus.getStatusCode(), response.getStatus()); |
| Assert.assertEquals(exceptionMessage, getQueryExceptionFromResponse(response)); |
| } |
| |
| public static List getResultRowsFromResponse(Response resultsResponse) throws IOException |
| { |
| byte[] bytes = SqlResourceTest.responseToByteArray(resultsResponse); |
| if (bytes == null) { |
| return null; |
| } |
| return JSON_MAPPER.readValue(bytes, List.class); |
| } |
| |
| private static String getQueryExceptionFromResponse(Response response) |
| { |
| if (response.getEntity() instanceof SqlStatementResult) { |
| return ((SqlStatementResult) response.getEntity()).getErrorResponse().getUnderlyingException().getMessage(); |
| } else { |
| return ((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage(); |
| } |
| } |
| |
| public static MockHttpServletRequest makeOkRequest() |
| { |
| return makeExpectedReq(CalciteTests.REGULAR_USER_AUTH_RESULT); |
| } |
| |
| private static MockHttpServletRequest makeExpectedReq(AuthenticationResult authenticationResult) |
| { |
| MockHttpServletRequest req = new MockHttpServletRequest(); |
| req.attributes.put(AuthConfig.DRUID_AUTHENTICATION_RESULT, authenticationResult); |
| req.remoteAddr = "1.2.3.4"; |
| return req; |
| } |
| |
| private static AuthenticationResult makeAuthResultForUser(String user) |
| { |
| return new AuthenticationResult( |
| user, |
| AuthConfig.ALLOW_ALL_NAME, |
| null, |
| null |
| ); |
| } |
| |
| @BeforeEach |
| public void init() throws Exception |
| { |
| overlordClient = Mockito.mock(OverlordClient.class); |
| setupMocks(overlordClient); |
| resource = new SqlStatementResource( |
| sqlStatementFactory, |
| objectMapper, |
| overlordClient, |
| new LocalFileStorageConnector(newTempFolder("local")), |
| authorizerMapper |
| ); |
| } |
| |
| @Test |
| public void testMSQSelectAcceptedQuery() |
| { |
| Response response = resource.doGetStatus(ACCEPTED_SELECT_MSQ_QUERY, makeOkRequest()); |
| Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); |
| Assert.assertEquals( |
| new SqlStatementResult( |
| ACCEPTED_SELECT_MSQ_QUERY, |
| SqlStatementState.ACCEPTED, |
| CREATED_TIME, |
| COL_NAME_AND_TYPES, |
| null, |
| null, |
| null |
| ), |
| response.getEntity() |
| ); |
| |
| assertExceptionMessage( |
| resource.doGetResults(ACCEPTED_SELECT_MSQ_QUERY, 0L, null, makeOkRequest()), |
| StringUtils.format( |
| "Query[%s] is currently in [%s] state. Please wait for it to complete.", |
| ACCEPTED_SELECT_MSQ_QUERY, |
| SqlStatementState.ACCEPTED |
| ), |
| Response.Status.BAD_REQUEST |
| ); |
| Assert.assertEquals( |
| Response.Status.ACCEPTED.getStatusCode(), |
| resource.deleteQuery(ACCEPTED_SELECT_MSQ_QUERY, makeOkRequest()).getStatus() |
| ); |
| } |
| |
| @Test |
| public void testMSQSelectRunningQuery() |
| { |
| |
| Response response = resource.doGetStatus(RUNNING_SELECT_MSQ_QUERY, makeOkRequest()); |
| Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); |
| Assert.assertEquals( |
| new SqlStatementResult( |
| RUNNING_SELECT_MSQ_QUERY, |
| SqlStatementState.RUNNING, |
| CREATED_TIME, |
| COL_NAME_AND_TYPES, |
| null, |
| null, |
| null |
| ), |
| response.getEntity() |
| ); |
| |
| assertExceptionMessage( |
| resource.doGetResults(RUNNING_SELECT_MSQ_QUERY, 0L, null, makeOkRequest()), |
| StringUtils.format( |
| "Query[%s] is currently in [%s] state. Please wait for it to complete.", |
| RUNNING_SELECT_MSQ_QUERY, |
| SqlStatementState.RUNNING |
| ), |
| Response.Status.BAD_REQUEST |
| ); |
| Assert.assertEquals( |
| Response.Status.ACCEPTED.getStatusCode(), |
| resource.deleteQuery(RUNNING_SELECT_MSQ_QUERY, makeOkRequest()).getStatus() |
| ); |
| } |
| |
| @Test |
| public void testFinishedSelectMSQQuery() throws Exception |
| { |
| Response response = resource.doGetStatus(FINISHED_SELECT_MSQ_QUERY, makeOkRequest()); |
| Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); |
| Assert.assertEquals(objectMapper.writeValueAsString(new SqlStatementResult( |
| FINISHED_SELECT_MSQ_QUERY, |
| SqlStatementState.SUCCESS, |
| CREATED_TIME, |
| COL_NAME_AND_TYPES, |
| 100L, |
| new ResultSetInformation( |
| 3L, |
| 8L, |
| null, |
| MSQControllerTask.DUMMY_DATASOURCE_FOR_SELECT, |
| RESULT_ROWS, |
| ImmutableList.of(new PageInformation(0, 3L, 8L)) |
| ), |
| null |
| )), objectMapper.writeValueAsString(response.getEntity())); |
| |
| Response resultsResponse = resource.doGetResults(FINISHED_SELECT_MSQ_QUERY, 0L, ResultFormat.OBJECTLINES.name(), makeOkRequest()); |
| Assert.assertEquals(Response.Status.OK.getStatusCode(), resultsResponse.getStatus()); |
| |
| String expectedResult = "{\"_time\":123,\"alias\":\"foo\",\"market\":\"bar\"}\n" |
| + "{\"_time\":234,\"alias\":\"foo1\",\"market\":\"bar1\"}\n\n"; |
| |
| assertExpectedResults(expectedResult, resultsResponse); |
| |
| Assert.assertEquals( |
| Response.Status.OK.getStatusCode(), |
| resource.deleteQuery(FINISHED_SELECT_MSQ_QUERY, makeOkRequest()).getStatus() |
| ); |
| |
| assertExpectedResults( |
| expectedResult, |
| resource.doGetResults( |
| FINISHED_SELECT_MSQ_QUERY, |
| 0L, |
| ResultFormat.OBJECTLINES.name(), |
| makeOkRequest() |
| ) |
| ); |
| |
| assertExpectedResults( |
| expectedResult, |
| resource.doGetResults( |
| FINISHED_SELECT_MSQ_QUERY, |
| null, |
| ResultFormat.OBJECTLINES.name(), |
| makeOkRequest() |
| ) |
| ); |
| |
| Assert.assertEquals( |
| Response.Status.BAD_REQUEST.getStatusCode(), |
| resource.doGetResults(FINISHED_SELECT_MSQ_QUERY, -1L, null, makeOkRequest()).getStatus() |
| ); |
| } |
| |
| private void assertExpectedResults(String expectedResult, Response resultsResponse) throws IOException |
| { |
| byte[] bytes = SqlResourceTest.responseToByteArray(resultsResponse); |
| Assert.assertEquals(expectedResult, new String(bytes, StandardCharsets.UTF_8)); |
| } |
| |
| @Test |
| public void testFailedMSQQuery() |
| { |
| for (String queryID : ImmutableList.of(ERRORED_SELECT_MSQ_QUERY, ERRORED_INSERT_MSQ_QUERY)) { |
| assertExceptionMessage(resource.doGetStatus(queryID, makeOkRequest()), FAILURE_MSG, Response.Status.OK); |
| assertExceptionMessage( |
| resource.doGetResults(queryID, 0L, null, makeOkRequest()), |
| StringUtils.format( |
| "Query[%s] failed. Check the status api for more details.", |
| queryID |
| ), |
| Response.Status.BAD_REQUEST |
| ); |
| |
| Assert.assertEquals( |
| Response.Status.OK.getStatusCode(), |
| resource.deleteQuery(queryID, makeOkRequest()).getStatus() |
| ); |
| } |
| } |
| |
| @Test |
| public void testFinishedInsertMSQQuery() |
| { |
| Response response = resource.doGetStatus(FINISHED_INSERT_MSQ_QUERY, makeOkRequest()); |
| Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); |
| Assert.assertEquals(new SqlStatementResult( |
| FINISHED_INSERT_MSQ_QUERY, |
| SqlStatementState.SUCCESS, |
| CREATED_TIME, |
| null, |
| 100L, |
| new ResultSetInformation(null, null, null, "test", null, null), |
| null |
| ), response.getEntity()); |
| |
| Assert.assertEquals( |
| Response.Status.OK.getStatusCode(), |
| resource.doGetResults(FINISHED_INSERT_MSQ_QUERY, 0L, null, makeOkRequest()).getStatus() |
| ); |
| Assert.assertEquals( |
| Response.Status.OK.getStatusCode(), |
| resource.doGetResults(FINISHED_INSERT_MSQ_QUERY, null, null, makeOkRequest()).getStatus() |
| ); |
| |
| Assert.assertEquals( |
| Response.Status.BAD_REQUEST.getStatusCode(), |
| resource.doGetResults(FINISHED_INSERT_MSQ_QUERY, -1L, null, makeOkRequest()).getStatus() |
| ); |
| } |
| |
| @Test |
| public void testNonMSQTasks() |
| { |
| for (String queryID : ImmutableList.of(RUNNING_NON_MSQ_TASK, FAILED_NON_MSQ_TASK, FINISHED_NON_MSQ_TASK)) { |
| assertNotFound(resource.doGetStatus(queryID, makeOkRequest()), queryID); |
| assertNotFound(resource.doGetResults(queryID, 0L, null, makeOkRequest()), queryID); |
| assertNotFound(resource.deleteQuery(queryID, makeOkRequest()), queryID); |
| } |
| } |
| |
| @Test |
| public void testMSQInsertAcceptedQuery() |
| { |
| Response response = resource.doGetStatus(ACCEPTED_INSERT_MSQ_TASK, makeOkRequest()); |
| Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); |
| Assert.assertEquals( |
| new SqlStatementResult( |
| ACCEPTED_INSERT_MSQ_TASK, |
| SqlStatementState.ACCEPTED, |
| CREATED_TIME, |
| null, |
| null, |
| null, |
| null |
| ), |
| response.getEntity() |
| ); |
| |
| assertExceptionMessage( |
| resource.doGetResults(ACCEPTED_INSERT_MSQ_TASK, 0L, null, makeOkRequest()), |
| StringUtils.format( |
| "Query[%s] is currently in [%s] state. Please wait for it to complete.", |
| ACCEPTED_INSERT_MSQ_TASK, |
| SqlStatementState.ACCEPTED |
| ), |
| Response.Status.BAD_REQUEST |
| ); |
| Assert.assertEquals( |
| Response.Status.ACCEPTED.getStatusCode(), |
| resource.deleteQuery(ACCEPTED_INSERT_MSQ_TASK, makeOkRequest()).getStatus() |
| ); |
| } |
| |
| @Test |
| public void testMSQInsertRunningQuery() |
| { |
| Response response = resource.doGetStatus(RUNNING_INSERT_MSQ_QUERY, makeOkRequest()); |
| Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); |
| Assert.assertEquals( |
| new SqlStatementResult( |
| RUNNING_INSERT_MSQ_QUERY, |
| SqlStatementState.RUNNING, |
| CREATED_TIME, |
| null, |
| null, |
| null, |
| null |
| ), |
| response.getEntity() |
| ); |
| |
| assertExceptionMessage( |
| resource.doGetResults(RUNNING_INSERT_MSQ_QUERY, 0L, null, makeOkRequest()), |
| StringUtils.format( |
| "Query[%s] is currently in [%s] state. Please wait for it to complete.", |
| RUNNING_INSERT_MSQ_QUERY, |
| SqlStatementState.RUNNING |
| ), |
| Response.Status.BAD_REQUEST |
| ); |
| Assert.assertEquals( |
| Response.Status.ACCEPTED.getStatusCode(), |
| resource.deleteQuery(RUNNING_INSERT_MSQ_QUERY, makeOkRequest()).getStatus() |
| ); |
| } |
| |
| @Test |
| public void testAPIBehaviourWithSuperUsers() |
| { |
| Assert.assertEquals( |
| Response.Status.OK.getStatusCode(), |
| resource.doGetStatus( |
| RUNNING_SELECT_MSQ_QUERY, |
| makeExpectedReq(makeAuthResultForUser(SUPERUSER)) |
| ).getStatus() |
| ); |
| Assert.assertEquals( |
| Response.Status.BAD_REQUEST.getStatusCode(), |
| resource.doGetResults( |
| RUNNING_SELECT_MSQ_QUERY, |
| 1L, |
| null, |
| makeExpectedReq(makeAuthResultForUser(SUPERUSER)) |
| ).getStatus() |
| ); |
| Assert.assertEquals( |
| Response.Status.ACCEPTED.getStatusCode(), |
| resource.deleteQuery( |
| RUNNING_SELECT_MSQ_QUERY, |
| makeExpectedReq(makeAuthResultForUser(SUPERUSER)) |
| ).getStatus() |
| ); |
| } |
| |
| @Test |
| public void testAPIBehaviourWithDifferentUserAndNoStatePermission() |
| { |
| AuthenticationResult differentUserAuthResult = makeAuthResultForUser("differentUser"); |
| Assert.assertEquals( |
| Response.Status.FORBIDDEN.getStatusCode(), |
| resource.doGetStatus( |
| RUNNING_SELECT_MSQ_QUERY, |
| makeExpectedReq(differentUserAuthResult) |
| ).getStatus() |
| ); |
| Assert.assertEquals( |
| Response.Status.FORBIDDEN.getStatusCode(), |
| resource.doGetResults( |
| RUNNING_SELECT_MSQ_QUERY, |
| 1L, |
| null, |
| makeExpectedReq(differentUserAuthResult) |
| ).getStatus() |
| ); |
| Assert.assertEquals( |
| Response.Status.FORBIDDEN.getStatusCode(), |
| resource.deleteQuery( |
| RUNNING_SELECT_MSQ_QUERY, |
| makeExpectedReq(differentUserAuthResult) |
| ).getStatus() |
| ); |
| } |
| |
| @Test |
| public void testAPIBehaviourWithDifferentUserAndStateRPermission() |
| { |
| AuthenticationResult differentUserAuthResult = makeAuthResultForUser(STATE_R_USER); |
| Assert.assertEquals( |
| Response.Status.OK.getStatusCode(), |
| resource.doGetStatus( |
| RUNNING_SELECT_MSQ_QUERY, |
| makeExpectedReq(differentUserAuthResult) |
| ).getStatus() |
| ); |
| Assert.assertEquals( |
| Response.Status.BAD_REQUEST.getStatusCode(), |
| resource.doGetResults( |
| RUNNING_SELECT_MSQ_QUERY, |
| 1L, |
| null, |
| makeExpectedReq(differentUserAuthResult) |
| ).getStatus() |
| ); |
| Assert.assertEquals( |
| Response.Status.FORBIDDEN.getStatusCode(), |
| resource.deleteQuery( |
| RUNNING_SELECT_MSQ_QUERY, |
| makeExpectedReq(differentUserAuthResult) |
| ).getStatus() |
| ); |
| } |
| |
| @Test |
| public void testAPIBehaviourWithDifferentUserAndStateWPermission() |
| { |
| AuthenticationResult differentUserAuthResult = makeAuthResultForUser(STATE_W_USER); |
| Assert.assertEquals( |
| Response.Status.FORBIDDEN.getStatusCode(), |
| resource.doGetStatus( |
| RUNNING_SELECT_MSQ_QUERY, |
| makeExpectedReq(differentUserAuthResult) |
| ).getStatus() |
| ); |
| Assert.assertEquals( |
| Response.Status.FORBIDDEN.getStatusCode(), |
| resource.doGetResults( |
| RUNNING_SELECT_MSQ_QUERY, |
| 1L, |
| null, |
| makeExpectedReq(differentUserAuthResult) |
| ).getStatus() |
| ); |
| Assert.assertEquals( |
| Response.Status.ACCEPTED.getStatusCode(), |
| resource.deleteQuery( |
| RUNNING_SELECT_MSQ_QUERY, |
| makeExpectedReq(differentUserAuthResult) |
| ).getStatus() |
| ); |
| } |
| |
| @Test |
| public void testAPIBehaviourWithDifferentUserAndStateRWPermission() |
| { |
| AuthenticationResult differentUserAuthResult = makeAuthResultForUser(STATE_RW_USER); |
| Assert.assertEquals( |
| Response.Status.OK.getStatusCode(), |
| resource.doGetStatus( |
| RUNNING_SELECT_MSQ_QUERY, |
| makeExpectedReq(differentUserAuthResult) |
| ).getStatus() |
| ); |
| Assert.assertEquals( |
| Response.Status.BAD_REQUEST.getStatusCode(), |
| resource.doGetResults( |
| RUNNING_SELECT_MSQ_QUERY, |
| 1L, |
| null, |
| makeExpectedReq(differentUserAuthResult) |
| ).getStatus() |
| ); |
| Assert.assertEquals( |
| Response.Status.ACCEPTED.getStatusCode(), |
| resource.deleteQuery( |
| RUNNING_SELECT_MSQ_QUERY, |
| makeExpectedReq(differentUserAuthResult) |
| ).getStatus() |
| ); |
| } |
| |
| @Test |
| public void testTaskIdNotFound() |
| { |
| String taskIdNotFound = "notFound"; |
| final DefaultHttpResponse incorrectResponse = |
| new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND); |
| SettableFuture<TaskStatusResponse> settableFuture = SettableFuture.create(); |
| settableFuture.setException(new HttpResponseException(new StringFullResponseHolder( |
| incorrectResponse, |
| StandardCharsets.UTF_8 |
| ))); |
| Mockito.when(overlordClient.taskStatus(taskIdNotFound)).thenReturn(settableFuture); |
| |
| Assert.assertEquals( |
| Response.Status.NOT_FOUND.getStatusCode(), |
| resource.doGetStatus(taskIdNotFound, makeOkRequest()).getStatus() |
| ); |
| Assert.assertEquals( |
| Response.Status.NOT_FOUND.getStatusCode(), |
| resource.doGetResults(taskIdNotFound, null, null, makeOkRequest()).getStatus() |
| ); |
| Assert.assertEquals( |
| Response.Status.NOT_FOUND.getStatusCode(), |
| resource.deleteQuery(taskIdNotFound, makeOkRequest()).getStatus() |
| ); |
| } |
| |
| @Test |
| public void testIsEnabled() |
| { |
| Assert.assertEquals(Response.Status.OK.getStatusCode(), resource.isEnabled(makeOkRequest()).getStatus()); |
| } |
| } |