UNOMI-505 Study replication of existing profileIDs into new alias index (#440)
diff --git a/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java b/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
index 9dc873b..410fbd1 100644
--- a/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
+++ b/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
@@ -19,6 +19,7 @@
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
@@ -28,6 +29,7 @@
import org.apache.karaf.shell.api.console.Session;
import org.apache.unomi.shell.migration.Migration;
import org.apache.unomi.shell.migration.utils.ConsoleUtils;
+import org.json.JSONArray;
import org.json.JSONObject;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Version;
@@ -35,8 +37,10 @@
import java.io.IOException;
import java.io.InputStream;
+import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
+import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -65,7 +69,8 @@
public String getDescription() {
return "Updates mapping for an index \"event\" with prefix \"context\" by default. Adds the \"sourceId\" field and copies value "
+ "from the \"scope\" field to it."
- + "Creates the scope entries in the index \"scope\" from the existing sopes of the events";
+ + "Creates the scope entries in the index \"scope\" from the existing scopes of the events. "
+ + "Creates the \"profileAlias\" documents based on \"profile\".";
}
@Override
@@ -86,6 +91,7 @@
}
createScopeMapping(indexPrefix);
createScopes(getSetOfScopes(indexes), indexPrefix);
+ createProfileAliasDocumentsFromProfile();
}
private void updateMapping(final String indexName) throws IOException {
@@ -246,6 +252,125 @@
return scopes;
}
+ private void createProfileAliasDocumentsFromProfile() throws IOException {
+ System.out.println("Migration \"Create profileAlias from profile\" started");
+ Instant migrationTime = Instant.now();
+ int size = 1000;
+ doProcessProfiles(migrationTime, size);
+ System.out.println("Migration \"Create profileAlias from profile\" completed.");
+ }
+
+ private void doProcessProfiles(Instant migrationTime, int offset) throws IOException {
+ CloseableHttpResponse response = null;
+ try {
+ response = httpClient.execute(createSearchRequest(offset));
+
+ while (true) {
+ JSONObject responseAsJson = getResponseAsJSON(response);
+ String scrollId = responseAsJson.getString("_scroll_id");
+ JSONArray hits = getProfileHits(responseAsJson);
+
+ if (hits.length() == 0) {
+ if (scrollId != null) {
+ CloseableHttpResponse deleteScrollResponse = httpClient.execute(createDeleteScrollRequest(scrollId));
+ if (deleteScrollResponse != null) {
+ deleteScrollResponse.close();
+ }
+ }
+ break;
+ }
+
+ StringBuilder bulkCreateRequest = new StringBuilder();
+ for (Object o : hits) {
+ JSONObject hit = (JSONObject) o;
+ if (hit.has("_source")) {
+ JSONObject profile = hit.getJSONObject("_source");
+ if (profile.has("itemId")) {
+ String itemId = profile.getString("itemId");
+ String bulkSaveProfileAliases = resourceAsString("requestBody/bulkSaveProfileAliases.ndjson");
+ bulkCreateRequest.append(bulkSaveProfileAliases.
+ replace("$itemId", itemId).
+ replace("$migrationTime", migrationTime.toString()));
+ }
+ }
+ }
+
+ CloseableHttpResponse bulkResponse = httpClient.execute(createProfileAliasRequest(bulkCreateRequest.toString()));
+ if (bulkResponse != null) {
+ bulkResponse.close();
+ }
+
+ response = httpClient.execute(createSearchRequestWithScrollId(scrollId));
+ }
+ } finally {
+ if (response != null) {
+ response.close();
+ }
+ }
+ }
+
+ private JSONObject getResponseAsJSON(CloseableHttpResponse response) throws IOException {
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ return new JSONObject(EntityUtils.toString(response.getEntity()));
+ }
+ return new JSONObject();
+ }
+
+ private JSONArray getProfileHits(JSONObject responseAsJson) {
+ if (responseAsJson.has("hits")) {
+ JSONObject hitsObject = responseAsJson.getJSONObject("hits");
+ if (hitsObject.has("hits")) {
+ return hitsObject.getJSONArray("hits");
+ }
+ }
+ return new JSONArray();
+ }
+
+ private HttpPost createSearchRequestWithScrollId(final String scrollId) throws IOException {
+ final String requestBody = "{\n" +
+ " \"scroll_id\": \"" + scrollId + "\",\n" +
+ " \"scroll\": \"1h\"\n" +
+ "}";
+
+ final HttpPost request = new HttpPost(esAddress + "/_search/scroll");
+
+ request.addHeader("Accept", "application/json");
+ request.addHeader("Content-Type", "application/json");
+ request.setEntity(new StringEntity(requestBody));
+
+ return request;
+ }
+
+ private HttpGet createSearchRequest(int size) {
+ return new HttpGet(esAddress + "/context-profile/_search?&scroll=1h&_source_includes=itemId&size=" + size);
+ }
+
+ private HttpEntityEnclosingRequestBase createDeleteScrollRequest(final String scrollId) throws IOException {
+ final HttpEntityEnclosingRequestBase deleteRequest = new HttpEntityEnclosingRequestBase() {
+ @Override
+ public String getMethod() {
+ return "DELETE";
+ }
+ };
+
+ deleteRequest.setURI(URI.create(esAddress + "/_search/scroll"));
+ deleteRequest.setEntity(new StringEntity("{ \"scroll_id\": \"" + scrollId + "\" }"));
+ deleteRequest.addHeader("Accept", "application/json");
+ deleteRequest.addHeader("Content-Type", "application/json");
+
+ return deleteRequest;
+ }
+
+ private HttpPost createProfileAliasRequest(String bulkRequestAsString) throws IOException {
+ final HttpPost bulkRequest = new HttpPost(esAddress + "/context-profilealias/_bulk");
+
+ bulkRequest.addHeader("Accept", "application/json");
+ bulkRequest.addHeader("Content-Type", "application/json");
+ bulkRequest.setEntity(new StringEntity(bulkRequestAsString));
+
+ return bulkRequest;
+ }
+
protected String resourceAsString(final String resource) {
final URL url = bundleContext.getBundle().getResource(resource);
try (InputStream stream = url.openStream()) {
diff --git a/tools/shell-commands/src/main/resources/requestBody/bulkSaveProfileAliases.ndjson b/tools/shell-commands/src/main/resources/requestBody/bulkSaveProfileAliases.ndjson
new file mode 100644
index 0000000..ad886ca
--- /dev/null
+++ b/tools/shell-commands/src/main/resources/requestBody/bulkSaveProfileAliases.ndjson
@@ -0,0 +1,2 @@
+{ "create" : { "_id": "$itemId" }}
+{ "itemId": "$itemId", "itemType": "profileAlias", "profileID": "$itemId", "scope": null, "clientID": "defaultClientId", "creationTime": "$migrationTime", "modifiedTime": "$migrationTime"}