Merge commit 'refs/pull/617/head' of github.com:apache/usergrid
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueRepairer.java b/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueRepairer.java
new file mode 100644
index 0000000..04915be
--- /dev/null
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueRepairer.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.tools;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.management.OrganizationInfo;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.field.StringField;
+import org.apache.usergrid.utils.ConversionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.google.common.base.Optional;
+import com.google.common.collect.BiMap;
+
+import rx.Observable;
+import rx.observables.ConnectableObservable;
+import rx.schedulers.Schedulers;
+
+public class UniqueValueRepairer extends ExportingToolBase {
+
+ static final Logger logger = LoggerFactory.getLogger(UniqueValueRepairer.class);
+
+ JsonFactory jsonFactory = new JsonFactory();
+ public static final String LAST_ID = "lastId";
+
+ public static final String FIND_MISSING_UNIQUE_VALUES = "findMissingUniqueValues";
+ public static final String FIX_MISSING_VALUES = "fixUniqueValues";
+
+ private boolean findMissingUniqueValues = false;
+ private boolean fixMissingValue = false;
+
+ private AllEntityIdsObservable allEntityIdsObs;
+ private SimpleEdge lastEdge = null;
+
+ private ExecutorService entityFetcher = Executors.newFixedThreadPool(10);
+ private ExecutorService uniqueValueChecker = Executors.newFixedThreadPool(50);
+
+ private Session session;
+ private UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+ private MvccEntitySerializationStrategy mvccEntitySerializationStrategy;
+
+ @Override
+ @SuppressWarnings("static-access")
+ public Options createOptions() {
+
+ Options options = super.createOptions();
+
+ Option findMissingUniqueValues = OptionBuilder
+ .withDescription("Find entities with missing unique value entry -findMissingUniqueValues")
+ .create(FIND_MISSING_UNIQUE_VALUES);
+ Option fixMissingUniqueValueEntries = OptionBuilder
+ .withDescription("Fix entities with missing unique value entry -fixUniqueValues")
+ .create(FIX_MISSING_VALUES);
+
+ options.addOption(findMissingUniqueValues);
+ options.addOption(fixMissingUniqueValueEntries);
+
+ return options;
+ }
+
+ @Override
+ public void runTool(CommandLine line) throws Exception {
+
+ startSpring();
+ setVerbose(line);
+
+ this.allEntityIdsObs = injector.getInstance(AllEntityIdsObservable.class);
+ applyInputParams(line);
+
+ mvccEntitySerializationStrategy = injector.getInstance(MvccEntitySerializationStrategy.class);
+ uniqueValueSerializationStrategy = injector.getInstance(UniqueValueSerializationStrategy.class);
+ session = injector.getInstance(Session.class);
+
+ startEntityScan();
+
+ logger.info("Finished checking entities. Waiting for threads to complete execution.");
+
+ while (true) {
+ try {
+ // Spinning to prevent program execution from ending.
+ // Need to replace with some kind of countdown latch or task tracker
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ logger.error("Exception while waiting for unique check to complete.", e);
+ }
+ }
+ }
+
+ private void startEntityScan() throws Exception, UnsupportedEncodingException {
+
+ for (Entry<UUID, String> organizationName : getOrgs().entrySet()) {
+
+ // Let's skip the test entities.
+ if (organizationName.equals(properties.getProperty("usergrid.test-account.organization"))) {
+ continue;
+ }
+ fetchApplicationsForOrgs(organizationName.getKey(), organizationName.getValue());
+ }
+ }
+
+ private Map<UUID, String> getOrgs() throws Exception {
+ // Loop through the organizations
+ Map<UUID, String> organizationNames = null;
+
+ if (orgId == null && (orgName == null || orgName.trim().equals(""))) {
+ organizationNames = managementService.getOrganizations();
+ } else {
+ OrganizationInfo info = null;
+
+ if (orgId != null) {
+ info = managementService.getOrganizationByUuid(orgId);
+ } else {
+ info = managementService.getOrganizationByName(orgName);
+ }
+
+ if (info == null) {
+ logger.error("Organization info is null!");
+ System.exit(1);
+ }
+
+ organizationNames = new HashMap<UUID, String>();
+ organizationNames.put(info.getUuid(), info.getName());
+ }
+
+ return organizationNames;
+ }
+
+ private void fetchApplicationsForOrgs(UUID orgId, String orgName) throws Exception {
+
+ logger.info("Fetch applications for {} : {} ", orgId, orgName);
+
+ // Loop through the applications per organization
+ BiMap<UUID, String> applications = managementService.getApplicationsForOrganization(orgId);
+
+ if (applicationId == null && (applicationName == null || applicationName.trim().equals(""))) {
+ // export all apps as appId or name is not provided
+
+ Observable.from(applications.entrySet()).subscribe(appEntry -> {
+ UUID appId = appEntry.getKey();
+ String appName = appEntry.getValue().split("/")[1];
+ try {
+ fetchApplications(appId, appName);
+ } catch (Exception e) {
+ logger.error("There was an exception fetching application {} : {}", appName, appId, e);
+ }
+ });
+
+ } else {
+
+ UUID appId = applicationId;
+ String appName = applicationName;
+
+ if (applicationId != null) {
+ appName = applications.get(appId);
+ } else {
+ appId = applications.inverse().get(orgName + '/' + appName);
+ }
+
+ try {
+ fetchApplications(appId, appName);
+ } catch (Exception e) {
+ logger.error("There was an exception fetching application {} : {}", appName, appId, e);
+ }
+
+ }
+ }
+
+ private void fetchApplications(UUID appId, String appName) throws Exception {
+
+ logger.info("Fetching application for {} : {} ", appName, appId);
+
+ EntityManager em = emf.getEntityManager(appId);
+
+ Set<String> collections = em.getApplicationCollections();
+
+ if (collNames == null || collNames.length <= 0) {
+ logger.info("Please pass collection name ( -collectionName testCollection ) ");
+ } else {
+ Observable.from(collNames).subscribe(collectionName -> {
+ if (collections.contains(collectionName)) {
+ fetchCollections(appId, collectionName, em);
+ }
+ });
+ }
+
+ }
+
+ private void fetchCollections(UUID appId, String collectionName, EntityManager em) {
+ extractEntitiesForCollection(appId, collectionName);
+ }
+
+ private void extractEntitiesForCollection(UUID applicationId, String collectionName) {
+
+ AtomicInteger batch = new AtomicInteger(1);
+
+ final EntityManager rootEm = emf.getEntityManager(applicationId);
+
+ ExecutorService edgeScopeFetcher = Executors.newFixedThreadPool(1);
+ allEntityIdsObs
+ .getEdgesToEntities(Observable.just(CpNamingUtils.getApplicationScope(applicationId)),
+ Optional.fromNullable(
+ CpNamingUtils.getEdgeTypeFromCollectionName(collectionName.toLowerCase())),
+ (lastEdge == null ? Optional.absent() : Optional.fromNullable(lastEdge)))
+ .buffer(1000).finallyDo(() -> {
+ edgeScopeFetcher.shutdown();
+ logger.info("Finished fetching entity ids for {}. Shutting down entity edge scope fetcher ",
+ collectionName);
+ while (!edgeScopeFetcher.isTerminated()) {
+ try {
+ edgeScopeFetcher.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ }
+ }
+ logger.info("Entity edge scope fetcher terminated after shutdown for {}", collectionName);
+ }).subscribe(edges -> {
+
+ logger.info("For collection {}", collectionName);
+ Integer batchId = batch.getAndIncrement();
+ logger.info("Started fetching details for collection {} batch {} ", collectionName, batchId);
+ Observable.just(edges).subscribeOn(Schedulers.from(edgeScopeFetcher)).subscribe(edgeScopes -> {
+
+ List<UUID> entityIds = new ArrayList<UUID>(1000);
+
+ for (EdgeScope edgeScope : edgeScopes) {
+ Id entityId = edgeScope.getEdge().getTargetNode();
+ if (entityId != null) {
+ entityIds.add(entityId.getUuid());
+ } else {
+ edgeScopes.remove(edgeScope);
+ }
+ }
+ try {
+ String type = edgeScopes.get(0).getEdge().getTargetNode().getType();
+
+ Observable.just(entityIds).subscribeOn(Schedulers.from(entityFetcher)) // change to
+ .subscribe(entIds -> {
+
+ logger.info("Fetched {} entity id's of type {} for batch ID {}", entIds.size(),
+ type, batchId);
+ Results entities = rootEm.getEntities(entIds, type);
+ logger.info("Fetched {} entities of type {} for batch ID {}", entities.size(),
+ type, batchId);
+ try {
+
+ ConnectableObservable<Entity> entityObs = Observable
+ .from(entities.getEntities()).publish();
+ entityObs.subscribeOn(Schedulers.from(uniqueValueChecker));
+ entityObs.subscribe(t -> {
+ logger.info("Fetched entity with UUID : {}", t.getUuid());
+ if (findMissingUniqueValues) {
+ String fieldValue = null;
+ //We can search entity with UUID or name/email based on the entity type.
+ //This mapping between unique value field(name/email etc) and UUID,
+ //is stored in unique value table. This can either be name / email or any other type.
+ //This value is being passed as field type.
+ //The code below takes the parameter and retrieves the value of the field using the getter method.
+ if (fieldType == null || fieldType.equals("")
+ || fieldType.equals("name")) {
+ fieldType = "name";
+ fieldValue = t.getName();
+ } else {
+ try {
+ Method method = t.getClass()
+ .getMethod("get"
+ + fieldType.substring(0, 1).toUpperCase()
+ + fieldType.substring(1));
+ fieldValue = (String) method.invoke(t);
+ } catch (Exception e1) {
+ logger.error(
+ "Exception while trying to fetch field value of type {} for entity {} batch {}",
+ fieldType, t.getUuid(), batchId, e1);
+ }
+ }
+ try {
+ if (fieldValue != null) {
+
+ Entity e = rootEm.getUniqueEntityFromAlias(t.getType(),
+ fieldValue, false);
+
+ if (e == null) {
+ logger.info(
+ "No entity found for field type {} and field value {} but exists for UUID {}",
+ fieldType, fieldValue, t.getUuid());
+ if (fixMissingValue) {
+ logger.info(
+ "Trying to repair unique value mapping for {} ",
+ t.getUuid());
+ UniqueValueSet uniqueValueSet = uniqueValueSerializationStrategy
+ .load(new ApplicationScopeImpl(new SimpleId(
+ applicationId, "application")),
+ ConsistencyLevel
+ .valueOf(System.getProperty(
+ "usergrid.read.cl",
+ "LOCAL_QUORUM")),
+ t.getType(),
+ Collections.singletonList(
+ new StringField(fieldType,
+ fieldValue)),
+ false);
+
+ ApplicationScope applicationScope = new ApplicationScopeImpl(
+ new SimpleId(applicationId, "application"));
+ com.google.common.base.Optional<MvccEntity> entity = mvccEntitySerializationStrategy
+ .load(applicationScope, new SimpleId(
+ t.getUuid(), t.getType()));
+
+ if (!entity.isPresent()
+ || !entity.get().getEntity().isPresent()) {
+ throw new RuntimeException(
+ "Unable to update unique value index because supplied UUID "
+ + t.getUuid()
+ + " does not exist");
+ }
+ logger.info("Delete unique value: {}",
+ uniqueValueSet.getValue(fieldType));
+ try {
+ session.execute(uniqueValueSerializationStrategy
+ .deleteCQL(applicationScope,
+ uniqueValueSet
+ .getValue(fieldType)));
+ } catch (Exception ex) {
+ logger.error(
+ "Exception while trying to delete the Unique value for {}. Will proceed with creating new entry",
+ t.getUuid(), ex);
+ }
+
+ UniqueValue newUniqueValue = new UniqueValueImpl(
+ new StringField(fieldType, fieldValue),
+ entity.get().getId(),
+ entity.get().getVersion());
+ logger.info("Writing new unique value: {}",
+ newUniqueValue);
+ session.execute(uniqueValueSerializationStrategy
+ .writeCQL(applicationScope, newUniqueValue,
+ -1));
+ }
+
+ } else {
+ logger.info(
+ "Found entity {} for field type {} and field value {}",
+ e.getUuid(), fieldType, fieldValue);
+ }
+ } else {
+ logger.info("No value found for field {} for entity {}",
+ fieldType, t.getUuid());
+ }
+ } catch (Exception e) {
+ logger.error(
+ "Error while checking unique values for batch id : {} for entity {}",
+ batchId, t.getUuid(), e);
+ }
+ }
+ });
+ entityObs.connect();
+
+ } catch (Exception e) {
+ logger.error(
+ "Error while checking unique values for batch id : {} for collection {}",
+ batchId, collectionName, e);
+ }
+ });
+
+ } catch (Exception e) {
+ logger.error("Exception while traversing entities " + edgeScopes.get(0).getEdge(), e);
+ System.exit(0);
+ }
+ });
+ logger.info("Finished entity walk for collection {} for batch {}", collectionName, batchId);
+ });
+ logger.info("Exiting extractEntitiesForCollection() method.");
+ }
+
+ protected void applyInputParams(CommandLine line) {
+
+ if (line.hasOption(ORG_ID)) {
+ orgId = ConversionUtils.uuid(line.getOptionValue(ORG_ID));
+ } else if (line.hasOption(ORG_NAME)) {
+ orgName = line.getOptionValue(ORG_NAME);
+ }
+
+ if (line.hasOption(APP_ID)) {
+ applicationId = ConversionUtils.uuid(line.getOptionValue(APP_ID));
+ } else if (line.hasOption(APP_NAME)) {
+ applicationName = line.getOptionValue(APP_NAME);
+ }
+ if (line.hasOption(COLL_NAMES)) {
+ collNames = line.getOptionValue(COLL_NAMES).split(",");
+ }
+ if (line.hasOption(COLLECTION_NAME)) {
+ collNames = new String[] { line.getOptionValue(COLLECTION_NAME) };
+ }
+ findMissingUniqueValues = line.hasOption(FIND_MISSING_UNIQUE_VALUES);
+ fixMissingValue = line.hasOption(FIX_MISSING_VALUES);
+
+ }
+}