blob: bfe5edffb955cc6ec060efdd3b67416db0ae0ec9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.usergrid.tools;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
import com.google.common.base.Optional;
import com.netflix.astyanax.MutationBatch;
import org.apache.usergrid.corepersistence.service.CollectionService;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.*;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.graph.*;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
import org.apache.usergrid.persistence.index.utils.UUIDUtils;
import org.apache.usergrid.persistence.model.entity.*;
import org.apache.usergrid.persistence.schema.CollectionInfo;
import org.apache.usergrid.utils.InflectionUtils;
import static org.apache.commons.lang.StringUtils.isBlank;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
public class CollectionIterator extends ToolBase {
private static final Logger logger = LoggerFactory.getLogger( CollectionIterator.class );
private static final String APPLICATION_ARG = "app";
private static final String ENTITY_TYPE_ARG = "entityType";
private static final String REMOVE_DUPLICATE_CONNECTIONS_ARG = "removeDuplicateConnections";
private static final String REMOVE_ORPHAN_CONNECTIONS_ARG = "removeOrphanConnections";
private static final String LATEST_TIMESTAMP_ARG = "latestTimestamp";
private static final String EARLIEST_TIMESTAMP_ARG = "earliestTimestamp";
private static final String SECONDS_IN_PAST_ARG = "secondsInPast";
private static final Long DEFAULT_SECONDS_IN_PAST = 60L * 60L; // hour
private EntityManager em;
@Override
@SuppressWarnings( "static-access" )
public Options createOptions() {
Options options = super.createOptions();
Option appOption = OptionBuilder.withArgName( APPLICATION_ARG ).hasArg().isRequired( true )
.withDescription( "application id" ).create( APPLICATION_ARG );
options.addOption( appOption );
Option collectionOption =
OptionBuilder.withArgName(ENTITY_TYPE_ARG).hasArg().isRequired( true ).withDescription( "singular collection name" )
.create(ENTITY_TYPE_ARG);
options.addOption( collectionOption );
Option removeOrphanConnectionsOption =
OptionBuilder.withArgName(REMOVE_ORPHAN_CONNECTIONS_ARG).hasArg().isRequired( false ).withDescription( "remove orphaned connections" )
.create(REMOVE_ORPHAN_CONNECTIONS_ARG);
options.addOption( removeOrphanConnectionsOption );
Option removeDuplicateConnectionsOption =
OptionBuilder.withArgName(REMOVE_DUPLICATE_CONNECTIONS_ARG).hasArg().isRequired( false ).withDescription( "remove duplicate connections" )
.create(REMOVE_DUPLICATE_CONNECTIONS_ARG);
options.addOption( removeDuplicateConnectionsOption );
Option earliestTimestampOption =
OptionBuilder.withArgName(EARLIEST_TIMESTAMP_ARG).hasArg().isRequired( false ).withDescription( "earliest timestamp to delete" )
.create(EARLIEST_TIMESTAMP_ARG);
options.addOption( earliestTimestampOption );
Option latestTimestampOption =
OptionBuilder.withArgName(LATEST_TIMESTAMP_ARG).hasArg().isRequired( false ).withDescription( "latest timestamp to delete" )
.create(LATEST_TIMESTAMP_ARG);
options.addOption( latestTimestampOption );
Option secondsInPastOption =
OptionBuilder.withArgName(SECONDS_IN_PAST_ARG).hasArg().isRequired( false ).withDescription( "how many seconds old orphan must be to be deleted" )
.create(SECONDS_IN_PAST_ARG);
options.addOption( secondsInPastOption );
return options;
}
/*
* (non-Javadoc)
*
* @see
* org.apache.usergrid.tools.ToolBase#runTool(org.apache.commons.cli.CommandLine)
*/
@Override
public void runTool( CommandLine line ) throws Exception {
startSpring();
String applicationOption = line.getOptionValue(APPLICATION_ARG);
String entityTypeOption = line.getOptionValue(ENTITY_TYPE_ARG);
String removeOrphanConnectionsOption = line.getOptionValue(REMOVE_ORPHAN_CONNECTIONS_ARG);
String removeDuplicateConnectionsOption = line.getOptionValue(REMOVE_DUPLICATE_CONNECTIONS_ARG);
String earliestTimestampOption = line.getOptionValue(EARLIEST_TIMESTAMP_ARG);
String latestTimestampOption = line.getOptionValue(LATEST_TIMESTAMP_ARG);
String secondsInPastOption = line.getOptionValue(SECONDS_IN_PAST_ARG);
if (isBlank(applicationOption)) {
throw new RuntimeException("Application ID not provided.");
}
final UUID app = UUID.fromString(line.getOptionValue(APPLICATION_ARG));
if (isBlank(entityTypeOption)) {
throw new RuntimeException("Entity type (singular collection name) not provided.");
}
String entityType = entityTypeOption;
final boolean removeOrphans = !isBlank(removeOrphanConnectionsOption) && removeOrphanConnectionsOption.toLowerCase().equals("yes");
final boolean removeDuplicates = !isBlank(removeDuplicateConnectionsOption) && removeDuplicateConnectionsOption.toLowerCase().equals("yes");
if (!isBlank(secondsInPastOption) && !isBlank(latestTimestampOption)) {
throw new RuntimeException("Can't specify both latest timestamp and seconds in past options.");
}
long earliest = 0L;
if (!isBlank(earliestTimestampOption)) {
try {
earliest = Long.parseLong(earliestTimestampOption);
} catch (Exception e) {
throw new RuntimeException("Cannot convert earliest timestamp to long: " + earliestTimestampOption);
}
}
final long earliestTimestamp = earliest;
long currentTimestamp = System.currentTimeMillis();
// default to DEFAULT_SECONDS_IN_PAST
long latest = currentTimestamp - (DEFAULT_SECONDS_IN_PAST * 1000L);
if (!isBlank(latestTimestampOption)) {
try {
latest = Long.parseLong(latestTimestampOption);
} catch (Exception e) {
throw new RuntimeException("Cannot convert latest timestamp to long: " + latestTimestampOption);
}
} else if (!isBlank(secondsInPastOption)) {
try {
long secondsInPast = Long.parseLong(secondsInPastOption);
latest = currentTimestamp - (secondsInPast * 1000L);
} catch (Exception e) {
throw new RuntimeException("Cannot convert seconds in past to long: " + secondsInPastOption);
}
}
final long latestTimestamp = latest;
logger.info("Starting Tool: CollectionIterator");
logger.info("Orphans {} be deleted", removeOrphans ? "WILL" : "will not");
logger.info("Timestamp range {} to {}", Long.toString(earliestTimestamp), Long.toString(latestTimestamp));
logger.info("Using Cassandra consistency level: {}", System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM"));
em = emf.getEntityManager( app );
EntityRef headEntity = new SimpleEntityRef("application", app);
CollectionService collectionService = injector.getInstance(CollectionService.class);
String collectionName = InflectionUtils.pluralize(entityType);
String simpleEdgeType = CpNamingUtils.getEdgeTypeFromCollectionName(collectionName);
logger.info("simpleEdgeType: {}", simpleEdgeType);
ApplicationScope applicationScope = new ApplicationScopeImpl(new SimpleId(app, "application"));
Id applicationScopeId = applicationScope.getApplication();
logger.info("applicationScope.getApplication(): {}", applicationScopeId);
EdgeSerialization edgeSerialization = injector.getInstance(EdgeSerialization.class);
Query query = new Query();
query.setCollection(collectionName);
query.setLimit(1000);
com.google.common.base.Optional<String> queryString = com.google.common.base.Optional.absent();
CollectionInfo collection = getDefaultSchema().getCollection(headEntity.getType(), collectionName);
GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class );
GraphManager gm = gmf.createEdgeManager(applicationScope);
final SimpleSearchByEdgeType search =
new SimpleSearchByEdgeType( applicationScopeId, simpleEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
Optional.absent(), false );
Set<UUID> uuidSet = new HashSet<>();
gm.loadEdgesFromSource(search).map(markedEdge -> {
UUID uuid = markedEdge.getTargetNode().getUuid();
long edgeTimestamp = markedEdge.getTimestamp();
String edgeType = markedEdge.getType();
boolean duplicate = uuidSet.contains(uuid);
if (!duplicate) {
uuidSet.add(uuid);
}
try {
EntityRef entityRef = new SimpleEntityRef(entityType, uuid);
org.apache.usergrid.persistence.Entity retrieved = em.get(entityRef);
long timestamp = 0;
DateFormat df = new SimpleDateFormat();
df.setTimeZone(TimeZone.getTimeZone("GMT"));
String dateString = "NOT TIME-BASED";
if (UUIDUtils.isTimeBased(uuid)){
timestamp = UUIDUtils.getTimestampInMillis(uuid);
Date uuidDate = new Date(timestamp);
dateString = df.format(uuidDate) + " GMT";
}
Date uuidEdgeDate = new Date(UUIDUtils.getUnixTimestampInMillisFromUUIDTimestamp(edgeTimestamp));
String edgeDateString = df.format(uuidEdgeDate) + " GMT";
if ( retrieved != null ){
if (duplicate) {
if (removeDuplicates) {
logger.info("DUPLICATE ENTITY (REMOVING): uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{}",
uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString);
try {
MutationBatch batch = edgeSerialization.deleteEdge(applicationScope, markedEdge, UUIDUtils.newTimeUUID());
logger.info("BATCH: {}", batch);
batch.execute();
} catch (Exception e) {
logger.error("{} - exception while trying to remove orphaned connection, {}", uuid, e.getMessage());
}
} else {
logger.info("DUPLICATE ENTITY (WON'T REMOVE): uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{}",
uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString);
}
} else {
logger.info("ENTITY: uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{}",
uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString);
}
}else{
if (removeOrphans && timestamp >= earliestTimestamp && timestamp <= latestTimestamp) {
logger.info("NOT FOUND (REMOVING): uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{} - isDeleted:{} isSourceNodeDeleted:{} isTargetNodeDeleted:{}",
uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString, markedEdge.isDeleted(), markedEdge.isSourceNodeDelete(), markedEdge.isTargetNodeDeleted());
try {
MutationBatch batch = edgeSerialization.deleteEdge(applicationScope, markedEdge, UUIDUtils.newTimeUUID());
logger.info("BATCH: {}", batch);
batch.execute();
} catch (Exception e) {
logger.error("{} - exception while trying to remove orphaned connection, {}", uuid, e.getMessage());
}
} else if (removeOrphans) {
logger.info("NOT FOUND (TIMESTAMP OUT OF RANGE): uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{} - isDeleted:{} isSourceNodeDeleted:{} isTargetNodeDeleted:{}",
uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString, markedEdge.isDeleted(), markedEdge.isSourceNodeDelete(), markedEdge.isTargetNodeDeleted());
} else {
logger.info("NOT FOUND: uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{} - isDeleted:{} isSourceNodeDeleted:{} isTargetNodeDeleted:{}",
uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString, markedEdge.isDeleted(), markedEdge.isSourceNodeDelete(), markedEdge.isTargetNodeDeleted());
}
}
} catch (Exception e) {
logger.error("{} - exception while trying to load entity data, {} ", uuid, e.getMessage());
}
return markedEdge;
}).toBlocking().lastOrDefault(null);
}
}