blob: 10126d0d2e94a6f97902e704295c303f72ad5631 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.admin.internal;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.ServerErrorException;
import javax.ws.rs.ServiceUnavailableException;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation.Builder;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.admin.PulsarAdminException.ConnectException;
import org.apache.pulsar.client.admin.PulsarAdminException.GettingAuthenticationDataException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotAllowedException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.client.admin.PulsarAdminException.ServerSideErrorException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Abstract base class for all admin resources.
*/
public abstract class BaseResource {
private static final Logger log = LoggerFactory.getLogger(BaseResource.class);
protected final Authentication auth;
protected final long readTimeoutMs;
protected BaseResource(Authentication auth, long readTimeoutMs) {
this.auth = auth;
this.readTimeoutMs = readTimeoutMs;
}
public Builder request(final WebTarget target) throws PulsarAdminException {
try {
return requestAsync(target).get();
} catch (Exception e) {
throw new GettingAuthenticationDataException(e);
}
}
// do the authentication stage, and once authentication completed return a Builder
public CompletableFuture<Builder> requestAsync(final WebTarget target) {
CompletableFuture<Builder> builderFuture = new CompletableFuture<>();
CompletableFuture<Map<String, String>> authFuture = new CompletableFuture<>();
try {
AuthenticationDataProvider authData = auth.getAuthData(target.getUri().getHost());
if (authData.hasDataForHttp()) {
auth.authenticationStage(target.getUri().toString(), authData, null, authFuture);
} else {
authFuture.complete(null);
}
// auth complete, return a new Builder
authFuture.whenComplete((respHeaders, ex) -> {
if (ex != null) {
log.warn("[{}] Failed to perform http request at auth stage: {}", target.getUri(),
ex.getMessage());
builderFuture.completeExceptionally(new PulsarClientException(ex));
return;
}
try {
Builder builder = target.request(MediaType.APPLICATION_JSON);
if (authData.hasDataForHttp()) {
Set<Entry<String, String>> headers =
auth.newRequestHeader(target.getUri().toString(), authData, respHeaders);
if (headers != null) {
headers.forEach(entry -> builder.header(entry.getKey(), entry.getValue()));
}
}
builderFuture.complete(builder);
} catch (Throwable t) {
builderFuture.completeExceptionally(new GettingAuthenticationDataException(t));
}
});
} catch (Throwable t) {
builderFuture.completeExceptionally(new GettingAuthenticationDataException(t));
}
return builderFuture;
}
public <T> CompletableFuture<Void> asyncPutRequest(final WebTarget target, Entity<T> entity) {
final CompletableFuture<Void> future = new CompletableFuture<>();
try {
request(target).async().put(entity, new InvocationCallback<ErrorData>() {
@Override
public void completed(ErrorData response) {
future.complete(null);
}
@Override
public void failed(Throwable throwable) {
log.warn("[{}] Failed to perform http put request: {}", target.getUri(), throwable.getMessage());
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
} catch (PulsarAdminException cae) {
future.completeExceptionally(cae);
}
return future;
}
public <T> CompletableFuture<Void> asyncPostRequest(final WebTarget target, Entity<T> entity) {
final CompletableFuture<Void> future = new CompletableFuture<>();
try {
request(target).async().post(entity, new InvocationCallback<ErrorData>() {
@Override
public void completed(ErrorData response) {
future.complete(null);
}
@Override
public void failed(Throwable throwable) {
log.warn("[{}] Failed to perform http post request: {}", target.getUri(), throwable.getMessage());
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
} catch (PulsarAdminException cae) {
future.completeExceptionally(cae);
}
return future;
}
public <T> void asyncGetRequest(final WebTarget target, InvocationCallback<T> callback) {
try {
request(target).async().get(callback);
} catch (PulsarAdminException cae) {
callback.failed(cae);
}
}
public CompletableFuture<Void> asyncDeleteRequest(final WebTarget target) {
final CompletableFuture<Void> future = new CompletableFuture<>();
try {
request(target).async().delete(new InvocationCallback<ErrorData>() {
@Override
public void completed(ErrorData response) {
future.complete(null);
}
@Override
public void failed(Throwable throwable) {
log.warn("[{}] Failed to perform http delete request: {}", target.getUri(), throwable.getMessage());
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
} catch (PulsarAdminException cae) {
future.completeExceptionally(cae);
}
return future;
}
public PulsarAdminException getApiException(Throwable e) {
if (e instanceof PulsarAdminException) {
return (PulsarAdminException) e;
} else if (e instanceof ServiceUnavailableException) {
if (e.getCause() instanceof java.net.ConnectException) {
return new ConnectException(e.getCause());
} else {
return new PulsarAdminException((ServerErrorException) e);
}
} else if (e instanceof WebApplicationException) {
// Handle 5xx exceptions
if (e instanceof ServerErrorException) {
ServerErrorException see = (ServerErrorException) e;
return new ServerSideErrorException(see, e.getMessage());
} else if (e instanceof ClientErrorException) {
// Handle 4xx exceptions
ClientErrorException cee = (ClientErrorException) e;
int statusCode = cee.getResponse().getStatus();
switch (statusCode) {
case 401:
case 403:
return new NotAuthorizedException(cee);
case 404:
return new NotFoundException(cee);
case 405:
return new NotAllowedException(cee);
case 409:
return new ConflictException(cee);
case 412:
return new PreconditionFailedException(cee);
default:
return new PulsarAdminException(cee);
}
} else {
return new PulsarAdminException((WebApplicationException) e);
}
} else {
return new PulsarAdminException(e);
}
}
public PulsarAdminException getApiException(Response response) {
if (response.getStatusInfo().equals(Response.Status.OK)) {
return null;
}
try {
if (response.getStatus() >= 500) {
throw new ServerErrorException(response);
} else if (response.getStatus() >= 400) {
throw new ClientErrorException(response);
} else {
throw new WebApplicationException(response);
}
} catch (Exception e) {
return getApiException(e);
}
}
}