| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| **/ |
| |
| package org.apache.kafka.connect.runtime.rest.resources; |
| |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import org.apache.kafka.connect.runtime.ConnectorConfig; |
| import org.apache.kafka.connect.runtime.Herder; |
| import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException; |
| import org.apache.kafka.connect.runtime.distributed.RequestTargetException; |
| import org.apache.kafka.connect.runtime.rest.RestServer; |
| import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; |
| import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; |
| import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest; |
| import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; |
| import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; |
| import org.apache.kafka.connect.util.ConnectorTaskId; |
| import org.apache.kafka.connect.util.FutureCallback; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.servlet.ServletContext; |
| import javax.ws.rs.BadRequestException; |
| import javax.ws.rs.Consumes; |
| import javax.ws.rs.DELETE; |
| import javax.ws.rs.GET; |
| import javax.ws.rs.POST; |
| import javax.ws.rs.PUT; |
| import javax.ws.rs.Path; |
| import javax.ws.rs.PathParam; |
| import javax.ws.rs.Produces; |
| import javax.ws.rs.QueryParam; |
| import javax.ws.rs.core.MediaType; |
| import javax.ws.rs.core.Response; |
| import javax.ws.rs.core.UriBuilder; |
| import java.net.URI; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| |
| @Path("/connectors") |
| @Produces(MediaType.APPLICATION_JSON) |
| @Consumes(MediaType.APPLICATION_JSON) |
| public class ConnectorsResource { |
| private static final Logger log = LoggerFactory.getLogger(ConnectorsResource.class); |
| |
| // TODO: This should not be so long. However, due to potentially long rebalances that may have to wait a full |
| // session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but |
| // we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases, |
| // but currently a worker simply leaving the group can take this long as well. |
| private static final long REQUEST_TIMEOUT_MS = 90 * 1000; |
| |
| private final Herder herder; |
| @javax.ws.rs.core.Context |
| private ServletContext context; |
| |
| public ConnectorsResource(Herder herder) { |
| this.herder = herder; |
| } |
| |
| @GET |
| @Path("/") |
| public Collection<String> listConnectors(final @QueryParam("forward") Boolean forward) throws Throwable { |
| FutureCallback<Collection<String>> cb = new FutureCallback<>(); |
| herder.connectors(cb); |
| return completeOrForwardRequest(cb, "/connectors", "GET", null, new TypeReference<Collection<String>>() { |
| }, forward); |
| } |
| |
| @POST |
| @Path("/") |
| public Response createConnector(final @QueryParam("forward") Boolean forward, |
| final CreateConnectorRequest createRequest) throws Throwable { |
| String name = createRequest.name(); |
| Map<String, String> configs = createRequest.config(); |
| if (!configs.containsKey(ConnectorConfig.NAME_CONFIG)) |
| configs.put(ConnectorConfig.NAME_CONFIG, name); |
| |
| FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(); |
| herder.putConnectorConfig(name, configs, false, cb); |
| Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, "/connectors", "POST", createRequest, |
| new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward); |
| return Response.created(URI.create("/connectors/" + name)).entity(info.result()).build(); |
| } |
| |
| @GET |
| @Path("/{connector}") |
| public ConnectorInfo getConnector(final @PathParam("connector") String connector, |
| final @QueryParam("forward") Boolean forward) throws Throwable { |
| FutureCallback<ConnectorInfo> cb = new FutureCallback<>(); |
| herder.connectorInfo(connector, cb); |
| return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null, forward); |
| } |
| |
| @GET |
| @Path("/{connector}/config") |
| public Map<String, String> getConnectorConfig(final @PathParam("connector") String connector, |
| final @QueryParam("forward") Boolean forward) throws Throwable { |
| FutureCallback<Map<String, String>> cb = new FutureCallback<>(); |
| herder.connectorConfig(connector, cb); |
| return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null, forward); |
| } |
| |
| @GET |
| @Path("/{connector}/status") |
| public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") String connector) throws Throwable { |
| return herder.connectorStatus(connector); |
| } |
| |
| @PUT |
| @Path("/{connector}/config") |
| public Response putConnectorConfig(final @PathParam("connector") String connector, |
| final @QueryParam("forward") Boolean forward, |
| final Map<String, String> connectorConfig) throws Throwable { |
| FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(); |
| String includedName = connectorConfig.get(ConnectorConfig.NAME_CONFIG); |
| if (includedName != null) { |
| if (!includedName.equals(connector)) |
| throw new BadRequestException("Connector name configuration (" + includedName + ") doesn't match connector name in the URL (" + connector + ")"); |
| } else { |
| connectorConfig.put(ConnectorConfig.NAME_CONFIG, connector); |
| } |
| |
| herder.putConnectorConfig(connector, connectorConfig, true, cb); |
| Herder.Created<ConnectorInfo> createdInfo = completeOrForwardRequest(cb, "/connectors/" + connector + "/config", |
| "PUT", connectorConfig, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward); |
| Response.ResponseBuilder response; |
| if (createdInfo.created()) |
| response = Response.created(URI.create("/connectors/" + connector)); |
| else |
| response = Response.ok(); |
| return response.entity(createdInfo.result()).build(); |
| } |
| |
| @POST |
| @Path("/{connector}/restart") |
| public void restartConnector(final @PathParam("connector") String connector, |
| final @QueryParam("forward") Boolean forward) throws Throwable { |
| FutureCallback<Void> cb = new FutureCallback<>(); |
| herder.restartConnector(connector, cb); |
| completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", "POST", null, forward); |
| } |
| |
| @PUT |
| @Path("/{connector}/pause") |
| public Response pauseConnector(@PathParam("connector") String connector) { |
| herder.pauseConnector(connector); |
| return Response.accepted().build(); |
| } |
| |
| @PUT |
| @Path("/{connector}/resume") |
| public Response resumeConnector(@PathParam("connector") String connector) { |
| herder.resumeConnector(connector); |
| return Response.accepted().build(); |
| } |
| |
| @GET |
| @Path("/{connector}/tasks") |
| public List<TaskInfo> getTaskConfigs(final @PathParam("connector") String connector, |
| final @QueryParam("forward") Boolean forward) throws Throwable { |
| FutureCallback<List<TaskInfo>> cb = new FutureCallback<>(); |
| herder.taskConfigs(connector, cb); |
| return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", null, new TypeReference<List<TaskInfo>>() { |
| }, forward); |
| } |
| |
| @POST |
| @Path("/{connector}/tasks") |
| public void putTaskConfigs(final @PathParam("connector") String connector, |
| final @QueryParam("forward") Boolean forward, |
| final List<Map<String, String>> taskConfigs) throws Throwable { |
| FutureCallback<Void> cb = new FutureCallback<>(); |
| herder.putTaskConfigs(connector, taskConfigs, cb); |
| completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", taskConfigs, forward); |
| } |
| |
| @GET |
| @Path("/{connector}/tasks/{task}/status") |
| public ConnectorStateInfo.TaskState getTaskStatus(final @PathParam("connector") String connector, |
| final @PathParam("task") Integer task) throws Throwable { |
| return herder.taskStatus(new ConnectorTaskId(connector, task)); |
| } |
| |
| @POST |
| @Path("/{connector}/tasks/{task}/restart") |
| public void restartTask(final @PathParam("connector") String connector, |
| final @PathParam("task") Integer task, |
| final @QueryParam("forward") Boolean forward) throws Throwable { |
| FutureCallback<Void> cb = new FutureCallback<>(); |
| ConnectorTaskId taskId = new ConnectorTaskId(connector, task); |
| herder.restartTask(taskId, cb); |
| completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" + task + "/restart", "POST", null, forward); |
| } |
| |
| @DELETE |
| @Path("/{connector}") |
| public void destroyConnector(final @PathParam("connector") String connector, |
| final @QueryParam("forward") Boolean forward) throws Throwable { |
| FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(); |
| herder.putConnectorConfig(connector, null, true, cb); |
| completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", null, forward); |
| } |
| |
| // Wait for a FutureCallback to complete. If it succeeds, return the parsed response. If it fails, try to forward the |
| // request to the leader. |
| private <T, U> T completeOrForwardRequest(FutureCallback<T> cb, |
| String path, |
| String method, |
| Object body, |
| TypeReference<U> resultType, |
| Translator<T, U> translator, |
| Boolean forward) throws Throwable { |
| try { |
| return cb.get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); |
| } catch (ExecutionException e) { |
| Throwable cause = e.getCause(); |
| |
| if (cause instanceof RequestTargetException) { |
| if (forward == null || forward) { |
| // the only time we allow recursive forwarding is when no forward flag has |
| // been set, which should only be seen by the first worker to handle a user request. |
| // this gives two total hops to resolve the request before giving up. |
| boolean recursiveForward = forward == null; |
| RequestTargetException targetException = (RequestTargetException) cause; |
| String forwardUrl = UriBuilder.fromUri(targetException.forwardUrl()) |
| .path(path) |
| .queryParam("forward", recursiveForward) |
| .build() |
| .toString(); |
| log.debug("Forwarding request {} {} {}", forwardUrl, method, body); |
| return translator.translate(RestServer.httpRequest(forwardUrl, method, body, resultType)); |
| } else { |
| // we should find the right target for the query within two hops, so if |
| // we don't, it probably means that a rebalance has taken place. |
| throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(), |
| "Cannot complete request because of a conflicting operation (e.g. worker rebalance)"); |
| } |
| } else if (cause instanceof RebalanceNeededException) { |
| throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(), |
| "Cannot complete request momentarily due to stale configuration (typically caused by a concurrent config change)"); |
| } |
| |
| throw cause; |
| } catch (TimeoutException e) { |
| // This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server |
| // error is the best option |
| throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request timed out"); |
| } catch (InterruptedException e) { |
| throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request interrupted"); |
| } |
| } |
| |
| private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, Object body, |
| TypeReference<T> resultType, Boolean forward) throws Throwable { |
| return completeOrForwardRequest(cb, path, method, body, resultType, new IdentityTranslator<T>(), forward); |
| } |
| |
| private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, |
| Object body, Boolean forward) throws Throwable { |
| return completeOrForwardRequest(cb, path, method, body, null, new IdentityTranslator<T>(), forward); |
| } |
| |
| private interface Translator<T, U> { |
| T translate(RestServer.HttpResponse<U> response); |
| } |
| |
| private class IdentityTranslator<T> implements Translator<T, T> { |
| @Override |
| public T translate(RestServer.HttpResponse<T> response) { |
| return response.body(); |
| } |
| } |
| |
| private class CreatedConnectorInfoTranslator implements Translator<Herder.Created<ConnectorInfo>, ConnectorInfo> { |
| @Override |
| public Herder.Created<ConnectorInfo> translate(RestServer.HttpResponse<ConnectorInfo> response) { |
| boolean created = response.status() == 201; |
| return new Herder.Created<>(created, response.body()); |
| } |
| } |
| } |