| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to you under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.calcite.adapter.elasticsearch; |
| |
| import org.apache.calcite.schema.Schema; |
| import org.apache.calcite.schema.SchemaFactory; |
| import org.apache.calcite.schema.SchemaPlus; |
| import org.apache.calcite.util.UnsafeX509ExtendedTrustManager; |
| |
| import org.apache.http.HttpHost; |
| import org.apache.http.auth.AuthScope; |
| import org.apache.http.auth.UsernamePasswordCredentials; |
| import org.apache.http.client.CredentialsProvider; |
| import org.apache.http.impl.client.BasicCredentialsProvider; |
| |
| import com.fasterxml.jackson.core.JsonParser; |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.base.Strings; |
| import com.google.common.cache.Cache; |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.cache.RemovalListener; |
| import com.google.common.cache.RemovalNotification; |
| import com.google.common.collect.ImmutableList; |
| |
| import org.elasticsearch.client.RestClient; |
| import org.elasticsearch.client.RestClientBuilder; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.security.KeyManagementException; |
| import java.security.NoSuchAlgorithmException; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| import java.util.stream.Collectors; |
| import javax.net.ssl.SSLContext; |
| import javax.net.ssl.TrustManager; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| |
| /** |
| * Factory that creates an {@link ElasticsearchSchema}. |
| * |
| * <p>Allows a custom schema to be included in a model.json file. |
| */ |
| @SuppressWarnings("UnusedDeclaration") |
| public class ElasticsearchSchemaFactory implements SchemaFactory { |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchSchemaFactory.class); |
| |
| private static final int REST_CLIENT_CACHE_SIZE = 100; |
| |
| // RestClient objects allocate system resources and are thread safe. Here, we cache |
| // them using a key derived from the parameters that define a RestClient. The primary |
| // reason to do this is to limit the resource leak that results from Calcite's |
| // current inability to close clients that it creates. Amongst the OS resources |
| // leaked are file descriptors which are limited to 1024 per process by default on |
| // Linux at the time of writing. |
| private static final Cache<List, RestClient> REST_CLIENTS = CacheBuilder.newBuilder() |
| .maximumSize(REST_CLIENT_CACHE_SIZE) |
| .removalListener(new RemovalListener<List, RestClient>() { |
| @Override public void onRemoval(RemovalNotification<List, RestClient> notice) { |
| LOGGER.warn( |
| "Will close an ES REST client to keep the number of open clients under {}. " |
| + "Any schema objects that might still have been relying on this client are now " |
| + "broken! Do not try to access more than {} distinct ES REST APIs through this " |
| + "adapter.", |
| REST_CLIENT_CACHE_SIZE, |
| REST_CLIENT_CACHE_SIZE); |
| |
| try { |
| // Free resources allocated by this RestClient |
| notice.getValue().close(); |
| } catch (IOException ex) { |
| LOGGER.warn("Could not close RestClient {}", notice.getValue(), ex); |
| } |
| } |
| }) |
| .build(); |
| |
| public ElasticsearchSchemaFactory() { |
| } |
| |
| /** |
| * Create an ElasticSearch {@link Schema}. |
| * The operand property accepts the following key/value pairs: |
| * |
| * <ul> |
| * <li><b>username</b>: The username for the ES cluster</li> |
| * <li><b>password</b>: The password for the ES cluster</li> |
| * <li><b>hosts</b>: A {@link List} of hosts for the ES cluster. Either the hosts or |
| * coordinates must be populated.</li> |
| * <li><b>coordinates</b>: A {@link List} of coordinates for the ES cluster. Either the hosts |
| * list or |
| * the coordinates list must be populated.</li> |
| * <li><b>disableSSLVerification</b>: A boolean parameter to disable SSL verification. Defaults |
| * to false. This should always be set to false for production systems.</li> |
| * </ul> |
| * |
| * @param parentSchema Parent schema |
| * @param name Name of this schema |
| * @param operand The "operand" JSON property |
| * @return Returns a {@link Schema} for the ES cluster. |
| */ |
| @Override public Schema create(SchemaPlus parentSchema, String name, |
| Map<String, Object> operand) { |
| |
| final Map map = (Map) operand; |
| |
| final ObjectMapper mapper = new ObjectMapper(); |
| mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true); |
| |
| try { |
| |
| List<HttpHost> hosts; |
| |
| if (map.containsKey("hosts")) { |
| final List<String> configHosts = |
| mapper.readValue((String) map.get("hosts"), |
| new TypeReference<List<String>>() { }); |
| |
| hosts = |
| configHosts.stream() |
| .map(host -> HttpHost.create(host)) |
| .collect(Collectors.toList()); |
| } else if (map.containsKey("coordinates")) { |
| final Map<String, Integer> coordinates = |
| mapper.readValue((String) map.get("coordinates"), |
| new TypeReference<Map<String, Integer>>() { }); |
| |
| hosts = |
| coordinates.entrySet() |
| .stream() |
| .map(entry -> new HttpHost(entry.getKey(), entry.getValue())) |
| .collect(Collectors.toList()); |
| |
| LOGGER.warn("Prefer using hosts, coordinates is deprecated."); |
| } else { |
| throw new IllegalArgumentException |
| ("Both 'coordinates' and 'hosts' is missing in configuration. Provide one of them."); |
| } |
| final String pathPrefix = (String) map.get("pathPrefix"); |
| |
| // Enable or Disable SSL Verification |
| boolean disableSSLVerification; |
| if (map.containsKey("disableSSLVerification")) { |
| String temp = (String) map.get("disableSSLVerification"); |
| disableSSLVerification = Boolean.getBoolean(temp.toLowerCase(Locale.ROOT)); |
| } else { |
| disableSSLVerification = false; |
| } |
| |
| // create client |
| String username = (String) map.get("username"); |
| String password = (String) map.get("password"); |
| final RestClient client = |
| connect(hosts, pathPrefix, username, password, disableSSLVerification); |
| final String index = (String) map.get("index"); |
| |
| return new ElasticsearchSchema(client, new ObjectMapper(), index); |
| } catch (IOException e) { |
| throw new RuntimeException("Cannot parse values from json", e); |
| } |
| } |
| |
| /** |
| * Builds Elastic rest client from user configuration. |
| * |
| * @param hosts list of ES HTTP Hosts to connect to |
| * @param username the username of ES |
| * @param password the password of ES |
| * @return new or cached low-level rest http client for ES |
| */ |
| @SuppressWarnings({"java:S4830", "java:S5527"}) |
| private static RestClient connect(List<HttpHost> hosts, String pathPrefix, |
| String username, String password, |
| boolean disableSSLVerification) { |
| |
| Objects.requireNonNull(hosts, "hosts or coordinates"); |
| checkArgument(!hosts.isEmpty(), "no ES hosts specified"); |
| // Two lists are considered equal when all of their corresponding elements are equal |
| // making a list of RestClient params a suitable cache key. |
| List cacheKey = ImmutableList.of(hosts, pathPrefix, username, password); |
| |
| try { |
| return REST_CLIENTS.get(cacheKey, new Callable<RestClient>() { |
| @Override public RestClient call() throws NoSuchAlgorithmException, KeyManagementException { |
| RestClientBuilder builder = RestClient.builder(hosts.toArray(new HttpHost[hosts.size()])); |
| |
| if (!Strings.isNullOrEmpty(username) && !Strings.isNullOrEmpty(password)) { |
| CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); |
| credentialsProvider.setCredentials(AuthScope.ANY, |
| new UsernamePasswordCredentials(username, password)); |
| builder.setHttpClientConfigCallback(httpClientBuilder -> |
| httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); |
| } |
| |
| if (disableSSLVerification) { |
| SSLContext sslContext = SSLContext.getInstance("TLS"); |
| sslContext.init(null, new TrustManager[] {UnsafeX509ExtendedTrustManager.getInstance()}, |
| null); |
| |
| builder.setHttpClientConfigCallback(httpClientBuilder -> |
| httpClientBuilder.setSSLContext(sslContext) |
| .setSSLHostnameVerifier((host, session) -> true)); |
| } |
| |
| if (pathPrefix != null && !pathPrefix.isEmpty()) { |
| builder.setPathPrefix(pathPrefix); |
| } |
| return builder.build(); |
| } |
| }); |
| } catch (ExecutionException ex) { |
| throw new RuntimeException("Cannot return a cached RestClient", ex); |
| } |
| } |
| } |