| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.msq.indexing; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.inject.Injector; |
| import com.google.inject.Key; |
| import org.apache.druid.client.coordinator.CoordinatorClient; |
| import org.apache.druid.guice.annotations.Self; |
| import org.apache.druid.indexer.report.TaskReport; |
| import org.apache.druid.indexing.common.TaskToolbox; |
| import org.apache.druid.indexing.common.actions.TaskActionClient; |
| import org.apache.druid.java.util.common.io.Closer; |
| import org.apache.druid.java.util.emitter.service.ServiceEmitter; |
| import org.apache.druid.msq.exec.Controller; |
| import org.apache.druid.msq.exec.ControllerContext; |
| import org.apache.druid.msq.exec.WorkerClient; |
| import org.apache.druid.msq.exec.WorkerManagerClient; |
| import org.apache.druid.msq.indexing.client.ControllerChatHandler; |
| import org.apache.druid.msq.indexing.client.IndexerWorkerClient; |
| import org.apache.druid.msq.indexing.client.IndexerWorkerManagerClient; |
| import org.apache.druid.rpc.ServiceClientFactory; |
| import org.apache.druid.rpc.indexing.OverlordClient; |
| import org.apache.druid.segment.realtime.firehose.ChatHandler; |
| import org.apache.druid.server.DruidNode; |
| |
| /** |
| * Implementation for {@link ControllerContext} required to run multi-stage queries as indexing tasks. |
| */ |
| public class IndexerControllerContext implements ControllerContext |
| { |
| private final TaskToolbox toolbox; |
| private final Injector injector; |
| private final ServiceClientFactory clientFactory; |
| private final OverlordClient overlordClient; |
| private final WorkerManagerClient workerManager; |
| |
| public IndexerControllerContext( |
| final TaskToolbox toolbox, |
| final Injector injector, |
| final ServiceClientFactory clientFactory, |
| final OverlordClient overlordClient |
| ) |
| { |
| this.toolbox = toolbox; |
| this.injector = injector; |
| this.clientFactory = clientFactory; |
| this.overlordClient = overlordClient; |
| this.workerManager = new IndexerWorkerManagerClient(overlordClient); |
| } |
| |
| @Override |
| public ServiceEmitter emitter() |
| { |
| return toolbox.getEmitter(); |
| } |
| |
| @Override |
| public ObjectMapper jsonMapper() |
| { |
| return toolbox.getJsonMapper(); |
| } |
| |
| @Override |
| public Injector injector() |
| { |
| return injector; |
| } |
| |
| @Override |
| public DruidNode selfNode() |
| { |
| return injector.getInstance(Key.get(DruidNode.class, Self.class)); |
| } |
| |
| @Override |
| public CoordinatorClient coordinatorClient() |
| { |
| return toolbox.getCoordinatorClient(); |
| } |
| |
| @Override |
| public TaskActionClient taskActionClient() |
| { |
| return toolbox.getTaskActionClient(); |
| } |
| |
| @Override |
| public WorkerClient taskClientFor(Controller controller) |
| { |
| // Ignore controller parameter. |
| return new IndexerWorkerClient(clientFactory, overlordClient, jsonMapper()); |
| } |
| |
| @Override |
| public void registerController(Controller controller, final Closer closer) |
| { |
| ChatHandler chatHandler = new ControllerChatHandler(toolbox, controller); |
| toolbox.getChatHandlerProvider().register(controller.id(), chatHandler, false); |
| closer.register(() -> toolbox.getChatHandlerProvider().unregister(controller.id())); |
| } |
| |
| @Override |
| public WorkerManagerClient workerManager() |
| { |
| return workerManager; |
| } |
| |
| @Override |
| public void writeReports(String controllerTaskId, TaskReport.ReportMap reports) |
| { |
| toolbox.getTaskReportFileWriter().write(controllerTaskId, reports); |
| } |
| } |