blob: b6e9f61a75b372005c29372196d007b7c51f1346 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.runtime.rest.resources;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.ServletContext;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@Path("/connectors")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class ConnectorsResource {
private static final Logger log = LoggerFactory.getLogger(ConnectorsResource.class);
// TODO: This should not be so long. However, due to potentially long rebalances that may have to wait a full
// session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but
// we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases,
// but currently a worker simply leaving the group can take this long as well.
private static final long REQUEST_TIMEOUT_MS = 90 * 1000;
private final Herder herder;
@javax.ws.rs.core.Context
private ServletContext context;
public ConnectorsResource(Herder herder) {
this.herder = herder;
}
@GET
@Path("/")
public Collection<String> listConnectors() throws Throwable {
FutureCallback<Collection<String>> cb = new FutureCallback<>();
herder.connectors(cb);
return completeOrForwardRequest(cb, "/connectors", "GET", null, new TypeReference<Collection<String>>() {
});
}
@POST
@Path("/")
public Response createConnector(final CreateConnectorRequest createRequest) throws Throwable {
String name = createRequest.name();
Map<String, String> configs = createRequest.config();
if (!configs.containsKey(ConnectorConfig.NAME_CONFIG))
configs.put(ConnectorConfig.NAME_CONFIG, name);
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
herder.putConnectorConfig(name, configs, false, cb);
Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, "/connectors", "POST", createRequest,
new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator());
return Response.created(URI.create("/connectors/" + name)).entity(info.result()).build();
}
@GET
@Path("/{connector}")
public ConnectorInfo getConnector(final @PathParam("connector") String connector) throws Throwable {
FutureCallback<ConnectorInfo> cb = new FutureCallback<>();
herder.connectorInfo(connector, cb);
return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null);
}
@GET
@Path("/{connector}/config")
public Map<String, String> getConnectorConfig(final @PathParam("connector") String connector) throws Throwable {
FutureCallback<Map<String, String>> cb = new FutureCallback<>();
herder.connectorConfig(connector, cb);
return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null);
}
@GET
@Path("/{connector}/status")
public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") String connector) throws Throwable {
return herder.connectorStatus(connector);
}
@PUT
@Path("/{connector}/config")
public Response putConnectorConfig(final @PathParam("connector") String connector,
final Map<String, String> connectorConfig) throws Throwable {
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
herder.putConnectorConfig(connector, connectorConfig, true, cb);
Herder.Created<ConnectorInfo> createdInfo = completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
"PUT", connectorConfig, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator());
Response.ResponseBuilder response;
if (createdInfo.created())
response = Response.created(URI.create("/connectors/" + connector));
else
response = Response.ok();
return response.entity(createdInfo.result()).build();
}
@GET
@Path("/{connector}/tasks")
public List<TaskInfo> getTaskConfigs(final @PathParam("connector") String connector) throws Throwable {
FutureCallback<List<TaskInfo>> cb = new FutureCallback<>();
herder.taskConfigs(connector, cb);
return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", null, new TypeReference<List<TaskInfo>>() {
});
}
@POST
@Path("/{connector}/tasks")
public void putTaskConfigs(final @PathParam("connector") String connector,
final List<Map<String, String>> taskConfigs) throws Throwable {
FutureCallback<Void> cb = new FutureCallback<>();
herder.putTaskConfigs(connector, taskConfigs, cb);
completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", taskConfigs);
}
@GET
@Path("/{connector}/tasks/{task}/status")
public ConnectorStateInfo.TaskState getTaskStatus(@PathParam("connector") String connector,
@PathParam("task") Integer task) throws Throwable {
return herder.taskStatus(new ConnectorTaskId(connector, task));
}
@DELETE
@Path("/{connector}")
public void destroyConnector(final @PathParam("connector") String connector) throws Throwable {
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
herder.putConnectorConfig(connector, null, true, cb);
completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", null);
}
// Wait for a FutureCallback to complete. If it succeeds, return the parsed response. If it fails, try to forward the
// request to the leader.
private <T, U> T completeOrForwardRequest(
FutureCallback<T> cb, String path, String method, Object body, TypeReference<U> resultType,
Translator<T, U> translator) throws Throwable {
try {
return cb.get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
if (e.getCause() instanceof NotLeaderException) {
NotLeaderException notLeaderError = (NotLeaderException) e.getCause();
String forwardUrl = RestServer.urlJoin(notLeaderError.leaderUrl(), path);
log.debug("Forwarding request to leader: {} {} {}", forwardUrl, method, body);
return translator.translate(RestServer.httpRequest(forwardUrl, method, body, resultType));
}
throw e.getCause();
} catch (TimeoutException e) {
// This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server
// error is the best option
throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request timed out");
} catch (InterruptedException e) {
throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request interrupted");
}
}
private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, Object body, TypeReference<T> resultType) throws Throwable {
return completeOrForwardRequest(cb, path, method, body, resultType, new IdentityTranslator<T>());
}
private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, Object body) throws Throwable {
return completeOrForwardRequest(cb, path, method, body, null, new IdentityTranslator<T>());
}
private interface Translator<T, U> {
T translate(RestServer.HttpResponse<U> response);
}
private class IdentityTranslator<T> implements Translator<T, T> {
@Override
public T translate(RestServer.HttpResponse<T> response) {
return response.body();
}
}
private class CreatedConnectorInfoTranslator implements Translator<Herder.Created<ConnectorInfo>, ConnectorInfo> {
@Override
public Herder.Created<ConnectorInfo> translate(RestServer.HttpResponse<ConnectorInfo> response) {
boolean created = response.status() == 201;
return new Herder.Created<>(created, response.body());
}
}
}