Merge commit 'refs/pull/617/head' of github.com:apache/usergrid
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueRepairer.java b/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueRepairer.java
new file mode 100644
index 0000000..04915be
--- /dev/null
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueRepairer.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.tools;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.management.OrganizationInfo;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.field.StringField;
+import org.apache.usergrid.utils.ConversionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.google.common.base.Optional;
+import com.google.common.collect.BiMap;
+
+import rx.Observable;
+import rx.observables.ConnectableObservable;
+import rx.schedulers.Schedulers;
+
+public class UniqueValueRepairer extends ExportingToolBase {
+
+	static final Logger logger = LoggerFactory.getLogger(UniqueValueRepairer.class);
+
+	JsonFactory jsonFactory = new JsonFactory();
+	public static final String LAST_ID = "lastId";
+
+	public static final String FIND_MISSING_UNIQUE_VALUES = "findMissingUniqueValues";
+	public static final String FIX_MISSING_VALUES = "fixUniqueValues";
+
+	private boolean findMissingUniqueValues = false;
+	private boolean fixMissingValue = false;
+
+	private AllEntityIdsObservable allEntityIdsObs;
+	private SimpleEdge lastEdge = null;
+
+	private ExecutorService entityFetcher = Executors.newFixedThreadPool(10);
+	private ExecutorService uniqueValueChecker = Executors.newFixedThreadPool(50);
+
+	private Session session;
+	private UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+	private MvccEntitySerializationStrategy mvccEntitySerializationStrategy;
+
+	@Override
+	@SuppressWarnings("static-access")
+	public Options createOptions() {
+
+		Options options = super.createOptions();
+
+		Option findMissingUniqueValues = OptionBuilder
+				.withDescription("Find entities with missing unique value entry  -findMissingUniqueValues")
+				.create(FIND_MISSING_UNIQUE_VALUES);
+		Option fixMissingUniqueValueEntries = OptionBuilder
+				.withDescription("Fix entities with missing unique value entry  -fixUniqueValues")
+				.create(FIX_MISSING_VALUES);
+
+		options.addOption(findMissingUniqueValues);
+		options.addOption(fixMissingUniqueValueEntries);
+
+		return options;
+	}
+
+	@Override
+	public void runTool(CommandLine line) throws Exception {
+
+		startSpring();
+		setVerbose(line);
+
+		this.allEntityIdsObs = injector.getInstance(AllEntityIdsObservable.class);
+		applyInputParams(line);
+
+		mvccEntitySerializationStrategy = injector.getInstance(MvccEntitySerializationStrategy.class);
+		uniqueValueSerializationStrategy = injector.getInstance(UniqueValueSerializationStrategy.class);
+		session = injector.getInstance(Session.class);
+
+		startEntityScan();
+
+		logger.info("Finished checking entities. Waiting for threads to complete execution.");
+
+		while (true) {
+			try {
+				// Spinning to prevent program execution from ending.
+				// Need to replace with some kind of countdown latch or task tracker
+				Thread.sleep(10000);
+			} catch (InterruptedException e) {
+				logger.error("Exception while waiting for unique check to complete.", e);
+			}
+		}
+	}
+
+	private void startEntityScan() throws Exception, UnsupportedEncodingException {
+
+		for (Entry<UUID, String> organizationName : getOrgs().entrySet()) {
+
+			// Let's skip the test entities.
+			if (organizationName.equals(properties.getProperty("usergrid.test-account.organization"))) {
+				continue;
+			}
+			fetchApplicationsForOrgs(organizationName.getKey(), organizationName.getValue());
+		}
+	}
+
+	private Map<UUID, String> getOrgs() throws Exception {
+		// Loop through the organizations
+		Map<UUID, String> organizationNames = null;
+
+		if (orgId == null && (orgName == null || orgName.trim().equals(""))) {
+			organizationNames = managementService.getOrganizations();
+		} else {
+			OrganizationInfo info = null;
+
+			if (orgId != null) {
+				info = managementService.getOrganizationByUuid(orgId);
+			} else {
+				info = managementService.getOrganizationByName(orgName);
+			}
+
+			if (info == null) {
+				logger.error("Organization info is null!");
+				System.exit(1);
+			}
+
+			organizationNames = new HashMap<UUID, String>();
+			organizationNames.put(info.getUuid(), info.getName());
+		}
+
+		return organizationNames;
+	}
+
+	private void fetchApplicationsForOrgs(UUID orgId, String orgName) throws Exception {
+
+		logger.info("Fetch applications for {} : {} ", orgId, orgName);
+
+		// Loop through the applications per organization
+		BiMap<UUID, String> applications = managementService.getApplicationsForOrganization(orgId);
+
+		if (applicationId == null && (applicationName == null || applicationName.trim().equals(""))) {
+			// export all apps as appId or name is not provided
+
+			Observable.from(applications.entrySet()).subscribe(appEntry -> {
+				UUID appId = appEntry.getKey();
+				String appName = appEntry.getValue().split("/")[1];
+				try {
+					fetchApplications(appId, appName);
+				} catch (Exception e) {
+					logger.error("There was an exception fetching application {} : {}", appName, appId, e);
+				}
+			});
+
+		} else {
+
+			UUID appId = applicationId;
+			String appName = applicationName;
+
+			if (applicationId != null) {
+				appName = applications.get(appId);
+			} else {
+				appId = applications.inverse().get(orgName + '/' + appName);
+			}
+
+			try {
+				fetchApplications(appId, appName);
+			} catch (Exception e) {
+				logger.error("There was an exception fetching application {} : {}", appName, appId, e);
+			}
+
+		}
+	}
+
+	private void fetchApplications(UUID appId, String appName) throws Exception {
+
+		logger.info("Fetching application for {} : {} ", appName, appId);
+
+		EntityManager em = emf.getEntityManager(appId);
+
+		Set<String> collections = em.getApplicationCollections();
+
+		if (collNames == null || collNames.length <= 0) {
+			logger.info("Please pass collection name ( -collectionName testCollection ) ");
+		} else {
+			Observable.from(collNames).subscribe(collectionName -> {
+				if (collections.contains(collectionName)) {
+					fetchCollections(appId, collectionName, em);
+				}
+			});
+		}
+
+	}
+
+	private void fetchCollections(UUID appId, String collectionName, EntityManager em) {
+		extractEntitiesForCollection(appId, collectionName);
+	}
+
+	private void extractEntitiesForCollection(UUID applicationId, String collectionName) {
+
+		AtomicInteger batch = new AtomicInteger(1);
+
+		final EntityManager rootEm = emf.getEntityManager(applicationId);
+
+		ExecutorService edgeScopeFetcher = Executors.newFixedThreadPool(1);
+		allEntityIdsObs
+				.getEdgesToEntities(Observable.just(CpNamingUtils.getApplicationScope(applicationId)),
+						Optional.fromNullable(
+								CpNamingUtils.getEdgeTypeFromCollectionName(collectionName.toLowerCase())),
+						(lastEdge == null ? Optional.absent() : Optional.fromNullable(lastEdge)))
+				.buffer(1000).finallyDo(() -> {
+					edgeScopeFetcher.shutdown();
+					logger.info("Finished fetching entity ids for {}. Shutting down entity edge scope fetcher ",
+							collectionName);
+					while (!edgeScopeFetcher.isTerminated()) {
+						try {
+							edgeScopeFetcher.awaitTermination(10, TimeUnit.SECONDS);
+						} catch (InterruptedException e) {
+						}
+					}
+					logger.info("Entity edge scope fetcher terminated after shutdown for {}", collectionName);
+				}).subscribe(edges -> {
+
+					logger.info("For collection {}", collectionName);
+					Integer batchId = batch.getAndIncrement();
+					logger.info("Started fetching details for collection {} batch {} ", collectionName, batchId);
+					Observable.just(edges).subscribeOn(Schedulers.from(edgeScopeFetcher)).subscribe(edgeScopes -> {
+
+						List<UUID> entityIds = new ArrayList<UUID>(1000);
+
+						for (EdgeScope edgeScope : edgeScopes) {
+							Id entityId = edgeScope.getEdge().getTargetNode();
+							if (entityId != null) {
+								entityIds.add(entityId.getUuid());
+							} else {
+								edgeScopes.remove(edgeScope);
+							}
+						}
+						try {
+							String type = edgeScopes.get(0).getEdge().getTargetNode().getType();
+
+							Observable.just(entityIds).subscribeOn(Schedulers.from(entityFetcher)) // change to
+									.subscribe(entIds -> {
+
+										logger.info("Fetched {} entity id's of type {} for batch ID {}", entIds.size(),
+												type, batchId);
+										Results entities = rootEm.getEntities(entIds, type);
+										logger.info("Fetched {} entities of type {} for batch ID {}", entities.size(),
+												type, batchId);
+										try {
+
+											ConnectableObservable<Entity> entityObs = Observable
+													.from(entities.getEntities()).publish();
+											entityObs.subscribeOn(Schedulers.from(uniqueValueChecker));
+											entityObs.subscribe(t -> {
+												logger.info("Fetched entity with UUID : {}", t.getUuid());
+												if (findMissingUniqueValues) {
+													String fieldValue = null;
+													//We can search entity with UUID or name/email based on the entity type. 
+													//This mapping between unique value field(name/email etc) and UUID,
+													//is stored in unique value table. This can either be name / email or any other type.
+													//This value is being passed as field type. 
+										            //The code below takes the parameter and retrieves the value of the field using the getter method. 
+													if (fieldType == null || fieldType.equals("")
+															|| fieldType.equals("name")) {
+														fieldType = "name";
+														fieldValue = t.getName();
+													} else {
+														try {
+															Method method = t.getClass()
+																	.getMethod("get"
+																			+ fieldType.substring(0, 1).toUpperCase()
+																			+ fieldType.substring(1));
+															fieldValue = (String) method.invoke(t);
+														} catch (Exception e1) {
+															logger.error(
+																	"Exception while trying to fetch field value of type {} for entity {} batch {}",
+																	fieldType, t.getUuid(), batchId, e1);
+														}
+													}
+													try {
+														if (fieldValue != null) {
+
+															Entity e = rootEm.getUniqueEntityFromAlias(t.getType(),
+																	fieldValue, false);
+
+															if (e == null) {
+																logger.info(
+																		"No entity found for field type {} and field value {} but exists for UUID {}",
+																		fieldType, fieldValue, t.getUuid());
+																if (fixMissingValue) {
+																	logger.info(
+																			"Trying to repair unique value mapping for {} ",
+																			t.getUuid());
+																	UniqueValueSet uniqueValueSet = uniqueValueSerializationStrategy
+																			.load(new ApplicationScopeImpl(new SimpleId(
+																					applicationId, "application")),
+																					ConsistencyLevel
+																							.valueOf(System.getProperty(
+																									"usergrid.read.cl",
+																									"LOCAL_QUORUM")),
+																					t.getType(),
+																					Collections.singletonList(
+																							new StringField(fieldType,
+																									fieldValue)),
+																					false);
+
+																	ApplicationScope applicationScope = new ApplicationScopeImpl(
+																			new SimpleId(applicationId, "application"));
+																	com.google.common.base.Optional<MvccEntity> entity = mvccEntitySerializationStrategy
+																			.load(applicationScope, new SimpleId(
+																					t.getUuid(), t.getType()));
+
+																	if (!entity.isPresent()
+																			|| !entity.get().getEntity().isPresent()) {
+																		throw new RuntimeException(
+																				"Unable to update unique value index because supplied UUID "
+																						+ t.getUuid()
+																						+ " does not exist");
+																	}
+																	logger.info("Delete unique value: {}",
+																			uniqueValueSet.getValue(fieldType));
+																	try {
+																		session.execute(uniqueValueSerializationStrategy
+																				.deleteCQL(applicationScope,
+																						uniqueValueSet
+																								.getValue(fieldType)));
+																	} catch (Exception ex) {
+																		logger.error(
+																				"Exception while trying to delete the Unique value for {}. Will proceed with creating new entry",
+																				t.getUuid(), ex);
+																	}
+
+																	UniqueValue newUniqueValue = new UniqueValueImpl(
+																			new StringField(fieldType, fieldValue),
+																			entity.get().getId(),
+																			entity.get().getVersion());
+																	logger.info("Writing new unique value: {}",
+																			newUniqueValue);
+																	session.execute(uniqueValueSerializationStrategy
+																			.writeCQL(applicationScope, newUniqueValue,
+																					-1));
+																}
+
+															} else {
+																logger.info(
+																		"Found entity {} for field type {} and field value {}",
+																		e.getUuid(), fieldType, fieldValue);
+															}
+														} else {
+															logger.info("No value found for field {} for entity {}",
+																	fieldType, t.getUuid());
+														}
+													} catch (Exception e) {
+														logger.error(
+																"Error while checking unique values for batch id : {} for entity {}",
+																batchId, t.getUuid(), e);
+													}
+												}
+											});
+											entityObs.connect();
+
+										} catch (Exception e) {
+											logger.error(
+													"Error while checking unique values for batch id : {} for collection {}",
+													batchId, collectionName, e);
+										}
+									});
+
+						} catch (Exception e) {
+							logger.error("Exception while traversing entities " + edgeScopes.get(0).getEdge(), e);
+							System.exit(0);
+						}
+					});
+					logger.info("Finished entity walk for collection {} for batch {}", collectionName, batchId);
+				});
+		logger.info("Exiting extractEntitiesForCollection() method.");
+	}
+
+	protected void applyInputParams(CommandLine line) {
+
+		if (line.hasOption(ORG_ID)) {
+			orgId = ConversionUtils.uuid(line.getOptionValue(ORG_ID));
+		} else if (line.hasOption(ORG_NAME)) {
+			orgName = line.getOptionValue(ORG_NAME);
+		}
+
+		if (line.hasOption(APP_ID)) {
+			applicationId = ConversionUtils.uuid(line.getOptionValue(APP_ID));
+		} else if (line.hasOption(APP_NAME)) {
+			applicationName = line.getOptionValue(APP_NAME);
+		}
+		if (line.hasOption(COLL_NAMES)) {
+			collNames = line.getOptionValue(COLL_NAMES).split(",");
+		}
+		if (line.hasOption(COLLECTION_NAME)) {
+			collNames = new String[] { line.getOptionValue(COLLECTION_NAME) };
+		}
+		findMissingUniqueValues = line.hasOption(FIND_MISSING_UNIQUE_VALUES);
+		fixMissingValue = line.hasOption(FIX_MISSING_VALUES);
+
+	}
+}