| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.client.impl; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.net.HttpURLConnection; |
| import java.net.URL; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| |
| import io.netty.channel.EventLoopGroup; |
| import io.netty.handler.codec.http.HttpRequest; |
| import io.netty.handler.codec.http.HttpResponse; |
| import io.netty.handler.ssl.SslContext; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.pulsar.PulsarVersion; |
| import org.apache.pulsar.client.api.Authentication; |
| import org.apache.pulsar.client.api.AuthenticationDataProvider; |
| import org.apache.pulsar.client.api.PulsarClientException; |
| import org.apache.pulsar.client.api.PulsarClientException.NotFoundException; |
| import org.apache.pulsar.common.util.ObjectMapperFactory; |
| import org.apache.pulsar.common.util.SecurityUtility; |
| import org.asynchttpclient.AsyncHttpClient; |
| import org.asynchttpclient.AsyncHttpClientConfig; |
| import org.asynchttpclient.BoundRequestBuilder; |
| import org.asynchttpclient.DefaultAsyncHttpClient; |
| import org.asynchttpclient.DefaultAsyncHttpClientConfig; |
| import org.asynchttpclient.Request; |
| import org.asynchttpclient.channel.DefaultKeepAliveStrategy; |
| |
| |
| @Slf4j |
| public class HttpClient implements Closeable { |
| |
| protected final static int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10; |
| protected final static int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30; |
| |
| protected final AsyncHttpClient httpClient; |
| protected final ServiceNameResolver serviceNameResolver; |
| protected final Authentication authentication; |
| |
| protected HttpClient(String serviceUrl, Authentication authentication, |
| EventLoopGroup eventLoopGroup, boolean tlsAllowInsecureConnection, String tlsTrustCertsFilePath) |
| throws PulsarClientException { |
| this(serviceUrl, authentication, eventLoopGroup, tlsAllowInsecureConnection, |
| tlsTrustCertsFilePath, DEFAULT_CONNECT_TIMEOUT_IN_SECONDS, DEFAULT_READ_TIMEOUT_IN_SECONDS); |
| } |
| |
| protected HttpClient(String serviceUrl, Authentication authentication, |
| EventLoopGroup eventLoopGroup, boolean tlsAllowInsecureConnection, String tlsTrustCertsFilePath, |
| int connectTimeoutInSeconds, int readTimeoutInSeconds) throws PulsarClientException { |
| this.authentication = authentication; |
| this.serviceNameResolver = new PulsarServiceNameResolver(); |
| this.serviceNameResolver.updateServiceUrl(serviceUrl); |
| |
| DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder(); |
| confBuilder.setFollowRedirect(true); |
| confBuilder.setConnectTimeout(connectTimeoutInSeconds * 1000); |
| confBuilder.setReadTimeout(readTimeoutInSeconds * 1000); |
| confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion())); |
| confBuilder.setKeepAliveStrategy(new DefaultKeepAliveStrategy() { |
| @Override |
| public boolean keepAlive(Request ahcRequest, HttpRequest request, HttpResponse response) { |
| // Close connection upon a server error or per HTTP spec |
| return (response.status().code() / 100 != 5) && super.keepAlive(ahcRequest, request, response); |
| } |
| }); |
| |
| if ("https".equals(serviceNameResolver.getServiceUri().getServiceName())) { |
| try { |
| SslContext sslCtx = null; |
| |
| // Set client key and certificate if available |
| AuthenticationDataProvider authData = authentication.getAuthData(); |
| if (authData.hasDataForTls()) { |
| sslCtx = SecurityUtility.createNettySslContextForClient(tlsAllowInsecureConnection, tlsTrustCertsFilePath, |
| authData.getTlsCertificates(), authData.getTlsPrivateKey()); |
| } else { |
| sslCtx = SecurityUtility.createNettySslContextForClient(tlsAllowInsecureConnection, tlsTrustCertsFilePath); |
| } |
| |
| confBuilder.setSslContext(sslCtx); |
| confBuilder.setUseInsecureTrustManager(tlsAllowInsecureConnection); |
| } catch (Exception e) { |
| throw new PulsarClientException.InvalidConfigurationException(e); |
| } |
| } |
| confBuilder.setEventLoopGroup(eventLoopGroup); |
| AsyncHttpClientConfig config = confBuilder.build(); |
| httpClient = new DefaultAsyncHttpClient(config); |
| |
| log.debug("Using HTTP url: {}", serviceUrl); |
| } |
| |
| String getServiceUrl() { |
| return this.serviceNameResolver.getServiceUrl(); |
| } |
| |
| void setServiceUrl(String serviceUrl) throws PulsarClientException { |
| this.serviceNameResolver.updateServiceUrl(serviceUrl); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| httpClient.close(); |
| } |
| |
| public <T> CompletableFuture<T> get(String path, Class<T> clazz) { |
| final CompletableFuture<T> future = new CompletableFuture<>(); |
| try { |
| String requestUrl = new URL(serviceNameResolver.resolveHostUri().toURL(), path).toString(); |
| String remoteHostName = serviceNameResolver.resolveHostUri().getHost(); |
| AuthenticationDataProvider authData = authentication.getAuthData(remoteHostName); |
| |
| CompletableFuture<Map<String, String>> authFuture = new CompletableFuture<>(); |
| |
| // bring a authenticationStage for sasl auth. |
| if (authData.hasDataForHttp()) { |
| authentication.authenticationStage(requestUrl, authData, null, authFuture); |
| } else { |
| authFuture.complete(null); |
| } |
| |
| // auth complete, do real request |
| authFuture.whenComplete((respHeaders, ex) -> { |
| if (ex != null) { |
| log.warn("[{}] Failed to perform http request at authentication stage: {}", |
| requestUrl, ex.getMessage()); |
| future.completeExceptionally(new PulsarClientException(ex)); |
| return; |
| } |
| |
| // auth complete, use a new builder |
| BoundRequestBuilder builder = httpClient.prepareGet(requestUrl) |
| .setHeader("Accept", "application/json"); |
| |
| if (authData.hasDataForHttp()) { |
| Set<Entry<String, String>> headers; |
| try { |
| headers = authentication.newRequestHeader(requestUrl, authData, respHeaders); |
| } catch (Exception e) { |
| log.warn("[{}] Error during HTTP get headers: {}", requestUrl, e.getMessage()); |
| future.completeExceptionally(new PulsarClientException(e)); |
| return; |
| } |
| if (headers != null) { |
| headers.forEach(entry -> builder.addHeader(entry.getKey(), entry.getValue())); |
| } |
| } |
| |
| builder.execute().toCompletableFuture().whenComplete((response2, t) -> { |
| if (t != null) { |
| log.warn("[{}] Failed to perform http request: {}", requestUrl, t.getMessage()); |
| future.completeExceptionally(new PulsarClientException(t)); |
| return; |
| } |
| |
| // request not success |
| if (response2.getStatusCode() != HttpURLConnection.HTTP_OK) { |
| log.warn("[{}] HTTP get request failed: {}", requestUrl, response2.getStatusText()); |
| Exception e; |
| if (response2.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { |
| e = new NotFoundException("Not found: " + response2.getStatusText()); |
| } else { |
| e = new PulsarClientException("HTTP get request failed: " + response2.getStatusText()); |
| } |
| future.completeExceptionally(e); |
| return; |
| } |
| |
| try { |
| T data = ObjectMapperFactory.getThreadLocal().readValue(response2.getResponseBodyAsBytes(), clazz); |
| future.complete(data); |
| } catch (Exception e) { |
| log.warn("[{}] Error during HTTP get request: {}", requestUrl, e.getMessage()); |
| future.completeExceptionally(new PulsarClientException(e)); |
| } |
| }); |
| }); |
| } catch (Exception e) { |
| log.warn("[{}]PulsarClientImpl: {}", path, e.getMessage()); |
| if (e instanceof PulsarClientException) { |
| future.completeExceptionally(e); |
| } else { |
| future.completeExceptionally(new PulsarClientException(e)); |
| } |
| } |
| |
| return future; |
| } |
| } |