| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.unomi.shell.migration.utils; |
| |
| import org.apache.commons.io.IOUtils; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.http.HttpStatus; |
| import org.apache.http.client.methods.CloseableHttpResponse; |
| import org.apache.http.client.methods.HttpGet; |
| import org.apache.http.impl.client.CloseableHttpClient; |
| import org.apache.http.util.EntityUtils; |
| import org.json.JSONObject; |
| import org.osgi.framework.Bundle; |
| import org.osgi.framework.BundleContext; |
| |
| import java.io.BufferedReader; |
| import java.io.DataInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.net.URL; |
| import java.nio.charset.StandardCharsets; |
| import java.util.Collections; |
| import java.util.Enumeration; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| |
| /** |
| * @author dgaillard |
| */ |
| public class MigrationUtils { |
| |
| public static JSONObject queryWithScroll(CloseableHttpClient httpClient, String url) throws IOException { |
| url += "?scroll=1m"; |
| |
| return new JSONObject(HttpUtils.executeGetRequest(httpClient, url, null)); |
| } |
| |
| public static JSONObject continueQueryWithScroll(CloseableHttpClient httpClient, String url, String scrollId) throws IOException { |
| url += "/_search/scroll?scroll=1m&scroll_id=" + scrollId; |
| |
| return new JSONObject(HttpUtils.executeGetRequest(httpClient, url, null)); |
| } |
| |
| public static void bulkUpdate(CloseableHttpClient httpClient, String url, String jsonData) throws IOException { |
| HttpUtils.executePostRequest(httpClient, url, jsonData, null); |
| } |
| |
| public static String resourceAsString(BundleContext bundleContext, final String resource) { |
| final URL url = bundleContext.getBundle().getResource(resource); |
| try (InputStream stream = url.openStream()) { |
| return IOUtils.toString(stream, StandardCharsets.UTF_8); |
| } catch (final Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public static String getFileWithoutComments(BundleContext bundleContext, final String resource) { |
| final URL url = bundleContext.getBundle().getResource(resource); |
| try (InputStream stream = url.openStream()) { |
| DataInputStream in = new DataInputStream(stream); |
| BufferedReader br = new BufferedReader(new InputStreamReader(in)); |
| String line; |
| StringBuilder value = new StringBuilder(); |
| while ((line = br.readLine()) != null) { |
| if (!line.startsWith("/*") && !line.startsWith(" *") && !line.startsWith("*/")) |
| value.append(line); |
| } |
| in.close(); |
| return value.toString(); |
| } catch (final Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public static Set<String> getIndexesPrefixedBy(CloseableHttpClient httpClient, String esAddress, String prefix) throws IOException { |
| try (CloseableHttpResponse response = httpClient.execute(new HttpGet(esAddress + "/_aliases"))) { |
| if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { |
| JSONObject indexesAsJson = new JSONObject(EntityUtils.toString(response.getEntity())); |
| return indexesAsJson.keySet().stream(). |
| filter(alias -> alias.startsWith(prefix)). |
| collect(Collectors.toSet()); |
| } |
| } |
| return Collections.emptySet(); |
| } |
| |
| public static String extractMappingFromBundles(BundleContext bundleContext, String fileName) throws IOException { |
| for (Bundle bundle : bundleContext.getBundles()) { |
| Enumeration<URL> predefinedMappings = bundle.findEntries("META-INF/cxs/mappings", fileName, true); |
| if (predefinedMappings == null) { |
| continue; |
| } |
| while (predefinedMappings.hasMoreElements()) { |
| URL predefinedMappingURL = predefinedMappings.nextElement(); |
| return IOUtils.toString(predefinedMappingURL); |
| } |
| } |
| |
| throw new RuntimeException("no mapping found in bundles for: " + fileName); |
| } |
| |
| public static String buildIndexCreationRequest(CloseableHttpClient httpClient, String esAddress, String baseIndexSettings, |
| String originalIndexForSettingsExtraction, String mapping) throws IOException { |
| |
| String settings = baseIndexSettings; |
| |
| // Extract existing settings on index that still exists |
| if (originalIndexForSettingsExtraction != null) { |
| JSONObject originalIndexSettings = new JSONObject(HttpUtils.executeGetRequest(httpClient, esAddress + "/" + originalIndexForSettingsExtraction + "/_settings", null)); |
| settings = settings |
| .replace("#numberOfShards", originalIndexSettings.getJSONObject(originalIndexForSettingsExtraction).getJSONObject("settings").getJSONObject("index").getString("number_of_shards")) |
| .replace("#numberOfReplicas", originalIndexSettings.getJSONObject(originalIndexForSettingsExtraction).getJSONObject("settings").getJSONObject("index").getString("number_of_replicas")) |
| .replace("#maxDocValueFieldsSearch", originalIndexSettings.getJSONObject(originalIndexForSettingsExtraction).getJSONObject("settings").getJSONObject("index").getString("max_docvalue_fields_search")) |
| .replace("#mappingTotalFieldsLimit", originalIndexSettings.getJSONObject(originalIndexForSettingsExtraction).getJSONObject("settings").getJSONObject("index").getJSONObject("mapping").getJSONObject("total_fields").getString("limit")); |
| } |
| |
| return settings.replace("#mappings", mapping); |
| } |
| |
| public static void reIndex(CloseableHttpClient httpClient, BundleContext bundleContext, String esAddress, String indexName, |
| String newIndexSettings, String painlessScript) throws IOException { |
| String indexNameCloned = indexName + "-cloned"; |
| |
| String reIndexRequest = resourceAsString(bundleContext, "requestBody/2.0.0/base_reindex_request.json") |
| .replace("#source", indexNameCloned).replace("#dest", indexName) |
| .replace("#painless", StringUtils.isNotEmpty(painlessScript) ? getScriptPart(painlessScript) : ""); |
| |
| String setIndexReadOnlyRequest = resourceAsString(bundleContext, "requestBody/2.0.0/base_set_index_readonly_request.json"); |
| |
| // Set original index as readOnly |
| HttpUtils.executePutRequest(httpClient, esAddress + "/" + indexName + "/_settings", setIndexReadOnlyRequest, null); |
| // Clone the original index for backup |
| HttpUtils.executePostRequest(httpClient, esAddress + "/" + indexName + "/_clone/" + indexNameCloned, null, null); |
| // Delete original index |
| HttpUtils.executeDeleteRequest(httpClient, esAddress + "/" + indexName, null); |
| // Recreate the original index with new mappings |
| HttpUtils.executePutRequest(httpClient, esAddress + "/" + indexName, newIndexSettings, null); |
| // Reindex data from clone |
| HttpUtils.executePostRequest(httpClient, esAddress + "/_reindex", reIndexRequest, null); |
| // Remove clone |
| HttpUtils.executeDeleteRequest(httpClient, esAddress + "/" + indexNameCloned, null); |
| } |
| |
| private static String getScriptPart(String painlessScript) { |
| return ", \"script\": {\"source\": \"" + painlessScript + "\", \"lang\": \"painless\"}"; |
| } |
| } |