Add tool for auditing versions of entities in Elasticsearch.
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/EntityVersionAudit.java b/stack/tools/src/main/java/org/apache/usergrid/tools/EntityVersionAudit.java
new file mode 100644
index 0000000..c9f4860
--- /dev/null
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/EntityVersionAudit.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.tools;
+
+
+import com.google.common.base.Optional;
+import com.netflix.astyanax.MutationBatch;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
+import org.apache.usergrid.corepersistence.service.CollectionService;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.SimpleEntityRef;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.index.IndexLocationStrategy;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.index.impl.EsProvider;
+import org.apache.usergrid.persistence.index.utils.UUIDUtils;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.schema.CollectionInfo;
+import org.apache.usergrid.utils.InflectionUtils;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+
+import java.io.*;
+import java.net.URLEncoder;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.commons.lang.StringUtils.isBlank;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createSearchEdgeFromSource;
+import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createIndexDocId;
+
+
+public class EntityVersionAudit extends ToolBase {
+
+ /*
+
+ Writes to files in the current directory:
+ entity_es_urls.txt (contains the relative URLs for the elasticsearch API to GET a document for an entity and version
+ entity_version_agg.txt ( contains the number of versions per entity on a line)
+ */
+
+ private static final Logger logger = LoggerFactory.getLogger( EntityVersionAudit.class );
+
+ private static final String APPLICATION_ARG = "app";
+
+ private static final String ENTITY_TYPE_ARG = "entityType";
+
+ private static final String USE_LATEST_VERSION_ARG = "useLatestVersion";
+
+ private static final String ENTITY_UUID = "entityUUID";
+
+
+ private EntityManager em;
+
+ @Override
+ @SuppressWarnings( "static-access" )
+ public Options createOptions() {
+
+
+ Options options = super.createOptions();
+
+
+ Option appOption = OptionBuilder.withArgName( APPLICATION_ARG ).hasArg().isRequired( true )
+ .withDescription( "application id" ).create( APPLICATION_ARG );
+
+
+ options.addOption( appOption );
+
+ Option collectionOption =
+ OptionBuilder.withArgName(ENTITY_TYPE_ARG).hasArg().isRequired( true ).withDescription( "singular collection name" )
+ .create(ENTITY_TYPE_ARG);
+
+ options.addOption( collectionOption );
+
+
+ Option useLatestVersion =
+ OptionBuilder.withArgName(USE_LATEST_VERSION_ARG).hasArg().isRequired( false ).withDescription( "use latest version" )
+ .create(USE_LATEST_VERSION_ARG);
+
+ options.addOption( useLatestVersion );
+
+ Option entityUUID =
+ OptionBuilder.withArgName(ENTITY_UUID).hasArg().isRequired( false ).withDescription( "specific entity uuid" )
+ .create(ENTITY_UUID);
+
+ options.addOption( entityUUID );
+
+
+
+ return options;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.usergrid.tools.ToolBase#runTool(org.apache.commons.cli.CommandLine)
+ */
+ @Override
+ public void runTool( CommandLine line ) throws Exception {
+
+ logger.info("Starting Tool: EntityVersionAudit");
+ logger.info("Using Cassandra consistency level: {}", System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM"));
+
+ startSpring();
+
+ String applicationOption = line.getOptionValue(APPLICATION_ARG);
+ String entityTypeOption = line.getOptionValue(ENTITY_TYPE_ARG);
+
+ if (isBlank(applicationOption)) {
+ throw new RuntimeException("Application ID not provided.");
+ }
+ final UUID app = UUID.fromString(line.getOptionValue(APPLICATION_ARG));
+
+ if (isBlank(entityTypeOption)) {
+ throw new RuntimeException("Entity type (singular collection name) not provided.");
+ }
+ String entityType = entityTypeOption;
+
+
+ boolean useLatestVersion =
+ line.getOptionValue(USE_LATEST_VERSION_ARG) != null && line.getOptionValue(USE_LATEST_VERSION_ARG).equalsIgnoreCase("true");
+ logger.info("useLatestVersion {}", useLatestVersion);
+
+ final String entityUUID = line.getOptionValue(ENTITY_UUID);
+ logger.info("entityUUID {}", entityUUID);
+
+
+ em = emf.getEntityManager( app );
+
+ String collectionName = InflectionUtils.pluralize(entityType);
+ String simpleEdgeType = CpNamingUtils.getEdgeTypeFromCollectionName(collectionName);
+ logger.info("simpleEdgeType: {}", simpleEdgeType);
+
+ ApplicationScope applicationScope = new ApplicationScopeImpl(new SimpleId(app, "application"));
+ Id applicationScopeId = applicationScope.getApplication();
+ logger.info("applicationScope.getApplication(): {}", applicationScopeId);
+
+ GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class );
+ GraphManager gm = gmf.createEdgeManager(applicationScope);
+
+ EntityCollectionManagerFactory emf = injector.getInstance( EntityCollectionManagerFactory.class );
+ EntityCollectionManager ecm = emf.createCollectionManager(applicationScope);
+
+ final SimpleSearchByEdgeType search =
+ new SimpleSearchByEdgeType( applicationScopeId, simpleEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+ Optional.absent(), false );
+
+ final IndexLocationStrategyFactory ilsf = injector.getInstance(IndexLocationStrategyFactory.class);
+ final Writer versionAuditWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("entity_version_audit.txt"), "utf-8"));
+ final Writer versionAggWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("entity_version_agg.txt"), "utf-8"));
+
+ versionAuditWriter.write("collection,entityUUID,cassandraTimestamp,elasticsearchTimestamp,indexDelayMillis,existsInElasticsearch\n");
+ versionAuditWriter.flush();
+
+ final EsProvider esProvider = injector.getInstance(EsProvider.class);
+
+ gm.loadEdgesFromSource(search).map(markedEdge -> {
+
+ UUID uuid = markedEdge.getTargetNode().getUuid();
+
+ if (entityUUID == null || uuid.equals(UUID.fromString(entityUUID))){
+ logger.info("matched uuid: {}", uuid);
+ try {
+ EntityRef entityRef = new SimpleEntityRef(entityType, uuid);
+ org.apache.usergrid.persistence.Entity retrieved = em.get(entityRef);
+
+ if ( retrieved != null ){
+
+ final AtomicInteger versionCount = new AtomicInteger();
+ Observable<MvccLogEntry> versionObs = ecm.getVersionsFromMaxToMin( retrieved.asId(), org.apache.usergrid.utils.UUIDUtils.newTimeUUID() );
+ if (useLatestVersion) {
+ versionObs = versionObs.take(1);
+ }
+ versionObs.forEach( mvccLogEntry -> {
+
+ IndexLocationStrategy strategy = ilsf.getIndexLocationStrategy(applicationScope);
+ final String readAlias = strategy.getAlias().getReadAlias();
+
+ final SearchEdge searchEdge = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(),
+ CpNamingUtils.getEdgeTypeFromCollectionName( InflectionUtils.pluralize( retrieved.asId().getType() ) ), retrieved.asId(),
+ Long.MAX_VALUE ) );
+
+ final String esDocId = createIndexDocId( applicationScope, retrieved.asId(), mvccLogEntry.getVersion(), searchEdge);
+ GetResponse response = esProvider.getClient().prepareGet(readAlias, "entity", esDocId)
+ .execute()
+ .actionGet();
+ boolean exists = response.isExists();
+
+ long indexTimestamp = response.getField("_timestamp") == null ? 0 : (long)response.getField("_timestamp").getValue();
+ long uuidTimestamp = UUIDUtils.getTimestampInMillis(retrieved.getUuid());
+
+ long diff = 0;
+ if (indexTimestamp > 0) {
+ diff = uuidTimestamp = indexTimestamp;
+ }
+
+ try {
+
+ String csvLine =
+ collectionName + "," +
+ uuid + "," +
+ uuidTimestamp + "," +
+ indexTimestamp + "," +
+ diff + "," +
+ exists;
+
+ //final String url = "/"+readAlias+"/entity/"+URLEncoder.encode(esDocId, "UTF-8");
+ versionAuditWriter.write(csvLine+"\n");
+ versionAuditWriter.flush();
+ versionCount.incrementAndGet();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+
+ versionAggWriter.write(versionCount.toString()+","+retrieved.asId().getUuid()+"\n");
+ versionAggWriter.flush();
+
+ }else{
+ logger.info("entity: {} NOT FOUND", uuid);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ return markedEdge;
+ }).toBlocking().lastOrDefault(null);
+
+ versionAuditWriter.close();
+ versionAggWriter.close();
+
+ }
+}