blob: 5cb99918173c7ddf51387b0d132296f2273ebbbf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.http.util;
import okhttp3.Authenticator;
import okhttp3.Cache;
import okhttp3.Credentials;
import okhttp3.FormBody;
import okhttp3.HttpUrl;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
import okhttp3.OkHttpClient.Builder;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.Route;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.store.http.HttpApiConfig;
import org.apache.drill.exec.store.http.HttpApiConfig.HttpMethod;
import org.apache.drill.exec.store.http.HttpStoragePluginConfig;
import org.apache.drill.exec.store.http.HttpSubScan;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
/**
* Performs the actual HTTP requests for the HTTP Storage Plugin. The core
* method is the getInputStream() method which accepts a url and opens an
* InputStream with that URL's contents.
*/
public class SimpleHttp {
private static final Logger logger = LoggerFactory.getLogger(SimpleHttp.class);
private final OkHttpClient client;
private final HttpSubScan scanDefn;
private final File tempDir;
private final HttpProxyConfig proxyConfig;
private final CustomErrorContext errorContext;
private final HttpUrl url;
public SimpleHttp(HttpSubScan scanDefn, HttpUrl url, File tempDir,
HttpProxyConfig proxyConfig, CustomErrorContext errorContext) {
this.scanDefn = scanDefn;
this.url = url;
this.tempDir = tempDir;
this.proxyConfig = proxyConfig;
this.errorContext = errorContext;
this.client = setupHttpClient();
}
/**
* Configures the OkHTTP3 server object with configuration info from the user.
*
* @return OkHttpClient configured server
*/
private OkHttpClient setupHttpClient() {
Builder builder = new OkHttpClient.Builder();
// Set up the HTTP Cache. Future possibilities include making the cache size and retention configurable but
// right now it is on or off. The writer will write to the Drill temp directory if it is accessible and
// output a warning if not.
HttpStoragePluginConfig config = scanDefn.tableSpec().config();
if (config.cacheResults()) {
setupCache(builder);
}
// If the API uses basic authentication add the authentication code.
HttpApiConfig apiConfig = scanDefn.tableSpec().connectionConfig();
if (apiConfig.authType().toLowerCase().equals("basic")) {
logger.debug("Adding Interceptor");
builder.addInterceptor(new BasicAuthInterceptor(apiConfig.userName(), apiConfig.password()));
}
// Set timeouts
int timeout = Math.max(1, config.timeout());
builder.connectTimeout(timeout, TimeUnit.SECONDS);
builder.writeTimeout(timeout, TimeUnit.SECONDS);
builder.readTimeout(timeout, TimeUnit.SECONDS);
// Set the proxy configuration
Proxy.Type proxyType;
switch (proxyConfig.type) {
case SOCKS:
proxyType = Proxy.Type.SOCKS;
break;
case HTTP:
proxyType = Proxy.Type.HTTP;
break;
default:
proxyType = Proxy.Type.DIRECT;
}
if (proxyType != Proxy.Type.DIRECT) {
builder.proxy(new Proxy(proxyType,
new InetSocketAddress(proxyConfig.host, proxyConfig.port)));
if (proxyConfig.username != null) {
builder.proxyAuthenticator(new Authenticator() {
@Override public Request authenticate(Route route, Response response) {
String credential = Credentials.basic(proxyConfig.username, proxyConfig.password);
return response.request().newBuilder()
.header("Proxy-Authorization", credential)
.build();
}
});
}
}
return builder.build();
}
public String url() { return url.toString(); }
public InputStream getInputStream() {
Request.Builder requestBuilder = new Request.Builder()
.url(url);
// The configuration does not allow for any other request types other than POST and GET.
HttpApiConfig apiConfig = scanDefn.tableSpec().connectionConfig();
if (apiConfig.getMethodType() == HttpMethod.POST) {
// Handle POST requests
FormBody.Builder formBodyBuilder = buildPostBody(apiConfig.postBody());
requestBuilder.post(formBodyBuilder.build());
}
// Log the URL and method to aid in debugging user issues.
logger.info("Connection: {}, Method {}, URL: {}",
scanDefn.tableSpec().connection(),
apiConfig.getMethodType().name(), url());
// Add headers to request
if (apiConfig.headers() != null) {
for (Map.Entry<String, String> entry : apiConfig.headers().entrySet()) {
requestBuilder.addHeader(entry.getKey(), entry.getValue());
}
}
// Build the request object
Request request = requestBuilder.build();
try {
// Execute the request
Response response = client
.newCall(request)
.execute();
// If the request is unsuccessful, throw a UserException
if (!response.isSuccessful()) {
throw UserException
.dataReadError()
.message("HTTP request failed")
.addContext("Response code", response.code())
.addContext("Response message", response.message())
.addContext(errorContext)
.build(logger);
}
logger.debug("HTTP Request for {} successful.", url());
logger.debug("Response Headers: {} ", response.headers().toString());
// Return the InputStream of the response
return Objects.requireNonNull(response.body()).byteStream();
} catch (IOException e) {
throw UserException
.dataReadError(e)
.message("Failed to read the HTTP response body")
.addContext("Error message", e.getMessage())
.addContext(errorContext)
.build(logger);
}
}
/**
* Configures response caching using a provided temp directory.
*
* @param builder
* Builder the Builder object to which the caching is to be
* configured
*/
private void setupCache(Builder builder) {
int cacheSize = 10 * 1024 * 1024; // TODO Add cache size in MB to config
File cacheDirectory = new File(tempDir, "http-cache");
if (!cacheDirectory.exists()) {
if (!cacheDirectory.mkdirs()) {
throw UserException
.dataWriteError()
.message("Could not create the HTTP cache directory")
.addContext("Path", cacheDirectory.getAbsolutePath())
.addContext("Please check the temp directory or disable HTTP caching.")
.addContext(errorContext)
.build(logger);
}
}
try {
Cache cache = new Cache(cacheDirectory, cacheSize);
logger.debug("Caching HTTP Query Results at: {}", cacheDirectory);
builder.cache(cache);
} catch (Exception e) {
throw UserException.dataWriteError(e)
.message("Could not create the HTTP cache")
.addContext("Path", cacheDirectory.getAbsolutePath())
.addContext("Please check the temp directory or disable HTTP caching.")
.addContext(errorContext)
.build(logger);
}
}
/**
* Accepts text from a post body in the format:<br>
* {@code key1=value1}<br>
* {@code key2=value2}
* <p>
* and creates the appropriate headers.
*
* @return FormBody.Builder The populated formbody builder
*/
private FormBody.Builder buildPostBody(String postBody) {
final Pattern postBodyPattern = Pattern.compile("^.+=.+$");
FormBody.Builder formBodyBuilder = new FormBody.Builder();
String[] lines = postBody.split("\\r?\\n");
for(String line : lines) {
// If the string is in the format key=value split it,
// Otherwise ignore
if (postBodyPattern.matcher(line).find()) {
//Split into key/value
String[] parts = line.split("=");
formBodyBuilder.add(parts[0], parts[1]);
}
}
return formBodyBuilder;
}
/**
* Intercepts requests and adds authentication headers to the request
*/
public static class BasicAuthInterceptor implements Interceptor {
private final String credentials;
public BasicAuthInterceptor(String user, String password) {
credentials = Credentials.basic(user, password);
}
@NotNull
@Override
public Response intercept(Chain chain) throws IOException {
// Get the existing request
Request request = chain.request();
// Replace with new request containing the authorization headers and previous headers
Request authenticatedRequest = request.newBuilder().header("Authorization", credentials).build();
return chain.proceed(authenticatedRequest);
}
}
}