UNOMI-505 Study replication of existing profileIDs into new alias index (#440)

diff --git a/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java b/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
index 9dc873b..410fbd1 100644
--- a/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
+++ b/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
@@ -19,6 +19,7 @@
 import org.apache.commons.io.IOUtils;
 import org.apache.http.HttpStatus;
 import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpPut;
@@ -28,6 +29,7 @@
 import org.apache.karaf.shell.api.console.Session;
 import org.apache.unomi.shell.migration.Migration;
 import org.apache.unomi.shell.migration.utils.ConsoleUtils;
+import org.json.JSONArray;
 import org.json.JSONObject;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Version;
@@ -35,8 +37,10 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.URI;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
+import java.time.Instant;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
@@ -65,7 +69,8 @@
     public String getDescription() {
         return "Updates mapping for an index \"event\" with prefix \"context\" by default. Adds the \"sourceId\" field and copies value "
                 + "from the \"scope\" field to it."
-                + "Creates the scope entries in the index \"scope\" from the existing sopes of the events";
+                + "Creates the scope entries in the index \"scope\" from the existing scopes of the events. "
+                + "Creates the \"profileAlias\" documents based on \"profile\".";
     }
 
     @Override
@@ -86,6 +91,7 @@
         }
         createScopeMapping(indexPrefix);
         createScopes(getSetOfScopes(indexes), indexPrefix);
+        createProfileAliasDocumentsFromProfile();
     }
 
     private void updateMapping(final String indexName) throws IOException {
@@ -246,6 +252,125 @@
         return scopes;
     }
 
+    private void createProfileAliasDocumentsFromProfile() throws IOException {
+        System.out.println("Migration \"Create profileAlias from profile\" started");
+        Instant migrationTime = Instant.now();
+        int size = 1000;
+        doProcessProfiles(migrationTime, size);
+        System.out.println("Migration \"Create profileAlias from profile\" completed.");
+    }
+
+    private void doProcessProfiles(Instant migrationTime, int offset) throws IOException {
+        CloseableHttpResponse response = null;
+        try {
+            response = httpClient.execute(createSearchRequest(offset));
+
+            while (true) {
+                JSONObject responseAsJson = getResponseAsJSON(response);
+                String scrollId = responseAsJson.getString("_scroll_id");
+                JSONArray hits = getProfileHits(responseAsJson);
+
+                if (hits.length() == 0) {
+                    if (scrollId != null) {
+                        CloseableHttpResponse deleteScrollResponse = httpClient.execute(createDeleteScrollRequest(scrollId));
+                        if (deleteScrollResponse != null) {
+                            deleteScrollResponse.close();
+                        }
+                    }
+                    break;
+                }
+
+                StringBuilder bulkCreateRequest = new StringBuilder();
+                for (Object o : hits) {
+                    JSONObject hit = (JSONObject) o;
+                    if (hit.has("_source")) {
+                        JSONObject profile = hit.getJSONObject("_source");
+                        if (profile.has("itemId")) {
+                            String itemId = profile.getString("itemId");
+                            String bulkSaveProfileAliases = resourceAsString("requestBody/bulkSaveProfileAliases.ndjson");
+                            bulkCreateRequest.append(bulkSaveProfileAliases.
+                                    replace("$itemId", itemId).
+                                    replace("$migrationTime", migrationTime.toString()));
+                        }
+                    }
+                }
+
+                CloseableHttpResponse bulkResponse = httpClient.execute(createProfileAliasRequest(bulkCreateRequest.toString()));
+                if (bulkResponse != null) {
+                    bulkResponse.close();
+                }
+
+                response = httpClient.execute(createSearchRequestWithScrollId(scrollId));
+            }
+        } finally {
+            if (response != null) {
+                response.close();
+            }
+        }
+    }
+
+    private JSONObject getResponseAsJSON(CloseableHttpResponse response) throws IOException {
+        if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+            return new JSONObject(EntityUtils.toString(response.getEntity()));
+        }
+        return new JSONObject();
+    }
+
+    private JSONArray getProfileHits(JSONObject responseAsJson) {
+        if (responseAsJson.has("hits")) {
+            JSONObject hitsObject = responseAsJson.getJSONObject("hits");
+            if (hitsObject.has("hits")) {
+                return hitsObject.getJSONArray("hits");
+            }
+        }
+        return new JSONArray();
+    }
+
+    private HttpPost createSearchRequestWithScrollId(final String scrollId) throws IOException {
+        final String requestBody = "{\n" +
+                "  \"scroll_id\": \"" + scrollId + "\",\n" +
+                "  \"scroll\": \"1h\"\n" +
+                "}";
+
+        final HttpPost request = new HttpPost(esAddress + "/_search/scroll");
+
+        request.addHeader("Accept", "application/json");
+        request.addHeader("Content-Type", "application/json");
+        request.setEntity(new StringEntity(requestBody));
+
+        return request;
+    }
+
+    private HttpGet createSearchRequest(int size) {
+        return new HttpGet(esAddress + "/context-profile/_search?&scroll=1h&_source_includes=itemId&size=" + size);
+    }
+
+    private HttpEntityEnclosingRequestBase createDeleteScrollRequest(final String scrollId) throws IOException {
+        final HttpEntityEnclosingRequestBase deleteRequest = new HttpEntityEnclosingRequestBase() {
+            @Override
+            public String getMethod() {
+                return "DELETE";
+            }
+        };
+
+        deleteRequest.setURI(URI.create(esAddress + "/_search/scroll"));
+        deleteRequest.setEntity(new StringEntity("{ \"scroll_id\": \"" + scrollId + "\" }"));
+        deleteRequest.addHeader("Accept", "application/json");
+        deleteRequest.addHeader("Content-Type", "application/json");
+
+        return deleteRequest;
+    }
+
+    private HttpPost createProfileAliasRequest(String bulkRequestAsString) throws IOException {
+        final HttpPost bulkRequest = new HttpPost(esAddress + "/context-profilealias/_bulk");
+
+        bulkRequest.addHeader("Accept", "application/json");
+        bulkRequest.addHeader("Content-Type", "application/json");
+        bulkRequest.setEntity(new StringEntity(bulkRequestAsString));
+
+        return bulkRequest;
+    }
+
     protected String resourceAsString(final String resource) {
         final URL url = bundleContext.getBundle().getResource(resource);
         try (InputStream stream = url.openStream()) {
diff --git a/tools/shell-commands/src/main/resources/requestBody/bulkSaveProfileAliases.ndjson b/tools/shell-commands/src/main/resources/requestBody/bulkSaveProfileAliases.ndjson
new file mode 100644
index 0000000..ad886ca
--- /dev/null
+++ b/tools/shell-commands/src/main/resources/requestBody/bulkSaveProfileAliases.ndjson
@@ -0,0 +1,2 @@
+{ "create" : { "_id": "$itemId" }}
+{ "itemId":  "$itemId", "itemType":  "profileAlias", "profileID":  "$itemId", "scope":  null, "clientID": "defaultClientId", "creationTime":  "$migrationTime", "modifiedTime":  "$migrationTime"}