blob: 04915be19d4a4ea3f5a17209a4cdf0b67e145b4f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.usergrid.tools;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.management.OrganizationInfo;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.Results;
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.model.field.StringField;
import org.apache.usergrid.utils.ConversionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Session;
import com.fasterxml.jackson.core.JsonFactory;
import com.google.common.base.Optional;
import com.google.common.collect.BiMap;
import rx.Observable;
import rx.observables.ConnectableObservable;
import rx.schedulers.Schedulers;
public class UniqueValueRepairer extends ExportingToolBase {
static final Logger logger = LoggerFactory.getLogger(UniqueValueRepairer.class);
JsonFactory jsonFactory = new JsonFactory();
public static final String LAST_ID = "lastId";
public static final String FIND_MISSING_UNIQUE_VALUES = "findMissingUniqueValues";
public static final String FIX_MISSING_VALUES = "fixUniqueValues";
private boolean findMissingUniqueValues = false;
private boolean fixMissingValue = false;
private AllEntityIdsObservable allEntityIdsObs;
private SimpleEdge lastEdge = null;
private ExecutorService entityFetcher = Executors.newFixedThreadPool(10);
private ExecutorService uniqueValueChecker = Executors.newFixedThreadPool(50);
private Session session;
private UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
private MvccEntitySerializationStrategy mvccEntitySerializationStrategy;
@Override
@SuppressWarnings("static-access")
public Options createOptions() {
Options options = super.createOptions();
Option findMissingUniqueValues = OptionBuilder
.withDescription("Find entities with missing unique value entry -findMissingUniqueValues")
.create(FIND_MISSING_UNIQUE_VALUES);
Option fixMissingUniqueValueEntries = OptionBuilder
.withDescription("Fix entities with missing unique value entry -fixUniqueValues")
.create(FIX_MISSING_VALUES);
options.addOption(findMissingUniqueValues);
options.addOption(fixMissingUniqueValueEntries);
return options;
}
@Override
public void runTool(CommandLine line) throws Exception {
startSpring();
setVerbose(line);
this.allEntityIdsObs = injector.getInstance(AllEntityIdsObservable.class);
applyInputParams(line);
mvccEntitySerializationStrategy = injector.getInstance(MvccEntitySerializationStrategy.class);
uniqueValueSerializationStrategy = injector.getInstance(UniqueValueSerializationStrategy.class);
session = injector.getInstance(Session.class);
startEntityScan();
logger.info("Finished checking entities. Waiting for threads to complete execution.");
while (true) {
try {
// Spinning to prevent program execution from ending.
// Need to replace with some kind of countdown latch or task tracker
Thread.sleep(10000);
} catch (InterruptedException e) {
logger.error("Exception while waiting for unique check to complete.", e);
}
}
}
private void startEntityScan() throws Exception, UnsupportedEncodingException {
for (Entry<UUID, String> organizationName : getOrgs().entrySet()) {
// Let's skip the test entities.
if (organizationName.equals(properties.getProperty("usergrid.test-account.organization"))) {
continue;
}
fetchApplicationsForOrgs(organizationName.getKey(), organizationName.getValue());
}
}
private Map<UUID, String> getOrgs() throws Exception {
// Loop through the organizations
Map<UUID, String> organizationNames = null;
if (orgId == null && (orgName == null || orgName.trim().equals(""))) {
organizationNames = managementService.getOrganizations();
} else {
OrganizationInfo info = null;
if (orgId != null) {
info = managementService.getOrganizationByUuid(orgId);
} else {
info = managementService.getOrganizationByName(orgName);
}
if (info == null) {
logger.error("Organization info is null!");
System.exit(1);
}
organizationNames = new HashMap<UUID, String>();
organizationNames.put(info.getUuid(), info.getName());
}
return organizationNames;
}
private void fetchApplicationsForOrgs(UUID orgId, String orgName) throws Exception {
logger.info("Fetch applications for {} : {} ", orgId, orgName);
// Loop through the applications per organization
BiMap<UUID, String> applications = managementService.getApplicationsForOrganization(orgId);
if (applicationId == null && (applicationName == null || applicationName.trim().equals(""))) {
// export all apps as appId or name is not provided
Observable.from(applications.entrySet()).subscribe(appEntry -> {
UUID appId = appEntry.getKey();
String appName = appEntry.getValue().split("/")[1];
try {
fetchApplications(appId, appName);
} catch (Exception e) {
logger.error("There was an exception fetching application {} : {}", appName, appId, e);
}
});
} else {
UUID appId = applicationId;
String appName = applicationName;
if (applicationId != null) {
appName = applications.get(appId);
} else {
appId = applications.inverse().get(orgName + '/' + appName);
}
try {
fetchApplications(appId, appName);
} catch (Exception e) {
logger.error("There was an exception fetching application {} : {}", appName, appId, e);
}
}
}
private void fetchApplications(UUID appId, String appName) throws Exception {
logger.info("Fetching application for {} : {} ", appName, appId);
EntityManager em = emf.getEntityManager(appId);
Set<String> collections = em.getApplicationCollections();
if (collNames == null || collNames.length <= 0) {
logger.info("Please pass collection name ( -collectionName testCollection ) ");
} else {
Observable.from(collNames).subscribe(collectionName -> {
if (collections.contains(collectionName)) {
fetchCollections(appId, collectionName, em);
}
});
}
}
private void fetchCollections(UUID appId, String collectionName, EntityManager em) {
extractEntitiesForCollection(appId, collectionName);
}
private void extractEntitiesForCollection(UUID applicationId, String collectionName) {
AtomicInteger batch = new AtomicInteger(1);
final EntityManager rootEm = emf.getEntityManager(applicationId);
ExecutorService edgeScopeFetcher = Executors.newFixedThreadPool(1);
allEntityIdsObs
.getEdgesToEntities(Observable.just(CpNamingUtils.getApplicationScope(applicationId)),
Optional.fromNullable(
CpNamingUtils.getEdgeTypeFromCollectionName(collectionName.toLowerCase())),
(lastEdge == null ? Optional.absent() : Optional.fromNullable(lastEdge)))
.buffer(1000).finallyDo(() -> {
edgeScopeFetcher.shutdown();
logger.info("Finished fetching entity ids for {}. Shutting down entity edge scope fetcher ",
collectionName);
while (!edgeScopeFetcher.isTerminated()) {
try {
edgeScopeFetcher.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
}
logger.info("Entity edge scope fetcher terminated after shutdown for {}", collectionName);
}).subscribe(edges -> {
logger.info("For collection {}", collectionName);
Integer batchId = batch.getAndIncrement();
logger.info("Started fetching details for collection {} batch {} ", collectionName, batchId);
Observable.just(edges).subscribeOn(Schedulers.from(edgeScopeFetcher)).subscribe(edgeScopes -> {
List<UUID> entityIds = new ArrayList<UUID>(1000);
for (EdgeScope edgeScope : edgeScopes) {
Id entityId = edgeScope.getEdge().getTargetNode();
if (entityId != null) {
entityIds.add(entityId.getUuid());
} else {
edgeScopes.remove(edgeScope);
}
}
try {
String type = edgeScopes.get(0).getEdge().getTargetNode().getType();
Observable.just(entityIds).subscribeOn(Schedulers.from(entityFetcher)) // change to
.subscribe(entIds -> {
logger.info("Fetched {} entity id's of type {} for batch ID {}", entIds.size(),
type, batchId);
Results entities = rootEm.getEntities(entIds, type);
logger.info("Fetched {} entities of type {} for batch ID {}", entities.size(),
type, batchId);
try {
ConnectableObservable<Entity> entityObs = Observable
.from(entities.getEntities()).publish();
entityObs.subscribeOn(Schedulers.from(uniqueValueChecker));
entityObs.subscribe(t -> {
logger.info("Fetched entity with UUID : {}", t.getUuid());
if (findMissingUniqueValues) {
String fieldValue = null;
//We can search entity with UUID or name/email based on the entity type.
//This mapping between unique value field(name/email etc) and UUID,
//is stored in unique value table. This can either be name / email or any other type.
//This value is being passed as field type.
//The code below takes the parameter and retrieves the value of the field using the getter method.
if (fieldType == null || fieldType.equals("")
|| fieldType.equals("name")) {
fieldType = "name";
fieldValue = t.getName();
} else {
try {
Method method = t.getClass()
.getMethod("get"
+ fieldType.substring(0, 1).toUpperCase()
+ fieldType.substring(1));
fieldValue = (String) method.invoke(t);
} catch (Exception e1) {
logger.error(
"Exception while trying to fetch field value of type {} for entity {} batch {}",
fieldType, t.getUuid(), batchId, e1);
}
}
try {
if (fieldValue != null) {
Entity e = rootEm.getUniqueEntityFromAlias(t.getType(),
fieldValue, false);
if (e == null) {
logger.info(
"No entity found for field type {} and field value {} but exists for UUID {}",
fieldType, fieldValue, t.getUuid());
if (fixMissingValue) {
logger.info(
"Trying to repair unique value mapping for {} ",
t.getUuid());
UniqueValueSet uniqueValueSet = uniqueValueSerializationStrategy
.load(new ApplicationScopeImpl(new SimpleId(
applicationId, "application")),
ConsistencyLevel
.valueOf(System.getProperty(
"usergrid.read.cl",
"LOCAL_QUORUM")),
t.getType(),
Collections.singletonList(
new StringField(fieldType,
fieldValue)),
false);
ApplicationScope applicationScope = new ApplicationScopeImpl(
new SimpleId(applicationId, "application"));
com.google.common.base.Optional<MvccEntity> entity = mvccEntitySerializationStrategy
.load(applicationScope, new SimpleId(
t.getUuid(), t.getType()));
if (!entity.isPresent()
|| !entity.get().getEntity().isPresent()) {
throw new RuntimeException(
"Unable to update unique value index because supplied UUID "
+ t.getUuid()
+ " does not exist");
}
logger.info("Delete unique value: {}",
uniqueValueSet.getValue(fieldType));
try {
session.execute(uniqueValueSerializationStrategy
.deleteCQL(applicationScope,
uniqueValueSet
.getValue(fieldType)));
} catch (Exception ex) {
logger.error(
"Exception while trying to delete the Unique value for {}. Will proceed with creating new entry",
t.getUuid(), ex);
}
UniqueValue newUniqueValue = new UniqueValueImpl(
new StringField(fieldType, fieldValue),
entity.get().getId(),
entity.get().getVersion());
logger.info("Writing new unique value: {}",
newUniqueValue);
session.execute(uniqueValueSerializationStrategy
.writeCQL(applicationScope, newUniqueValue,
-1));
}
} else {
logger.info(
"Found entity {} for field type {} and field value {}",
e.getUuid(), fieldType, fieldValue);
}
} else {
logger.info("No value found for field {} for entity {}",
fieldType, t.getUuid());
}
} catch (Exception e) {
logger.error(
"Error while checking unique values for batch id : {} for entity {}",
batchId, t.getUuid(), e);
}
}
});
entityObs.connect();
} catch (Exception e) {
logger.error(
"Error while checking unique values for batch id : {} for collection {}",
batchId, collectionName, e);
}
});
} catch (Exception e) {
logger.error("Exception while traversing entities " + edgeScopes.get(0).getEdge(), e);
System.exit(0);
}
});
logger.info("Finished entity walk for collection {} for batch {}", collectionName, batchId);
});
logger.info("Exiting extractEntitiesForCollection() method.");
}
protected void applyInputParams(CommandLine line) {
if (line.hasOption(ORG_ID)) {
orgId = ConversionUtils.uuid(line.getOptionValue(ORG_ID));
} else if (line.hasOption(ORG_NAME)) {
orgName = line.getOptionValue(ORG_NAME);
}
if (line.hasOption(APP_ID)) {
applicationId = ConversionUtils.uuid(line.getOptionValue(APP_ID));
} else if (line.hasOption(APP_NAME)) {
applicationName = line.getOptionValue(APP_NAME);
}
if (line.hasOption(COLL_NAMES)) {
collNames = line.getOptionValue(COLL_NAMES).split(",");
}
if (line.hasOption(COLLECTION_NAME)) {
collNames = new String[] { line.getOptionValue(COLLECTION_NAME) };
}
findMissingUniqueValues = line.hasOption(FIND_MISSING_UNIQUE_VALUES);
fixMissingValue = line.hasOption(FIX_MISSING_VALUES);
}
}