blob: e02073ed420af9e5b45090c19dd9ce9f9eb7ecd8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
class GroovyIndexRepair {
final long COMMIT_BATCH_SIZE = 200
final int MAX_TRIES_ON_FAILURE = 3
final int NUM_THREADS = Runtime.getRuntime().availableProcessors()
Graph g
def indexSerializer
long totalVertexCount
def setup(String configPath) {
display("Initializing graph..")
g = TitanFactory.open(configPath)
indexSerializer = g.getIndexSerializer() as com.thinkaurelius.titan.graphdb.database.IndexSerializer
totalVertexCount = g.V.count()
display("Graph initialized!")
}
def processIndex(String indexName) {
processBatch(indexName, 0, totalVertexCount, NUM_THREADS)
}
def processIndex(List<String> indexNames, String guid) {
for (String indexName : indexNames) {
restore(indexName, guid)
}
}
def processBatch(String indexName, long startIndex, long endIndex, long threadCount) {
Date startTime = new Date()
long batchSize = (endIndex - startIndex) / threadCount
if ((batchSize * threadCount) < endIndex) {
batchSize++
}
display("Processing index: " + indexName)
display(" Start Index : " + startIndex)
display(" End Index : " + endIndex)
display(" Chunk Count : " + threadCount)
display(" Batch Size : " + batchSize)
def threads = []
for (long batchStartIdx = startIndex; batchStartIdx < endIndex; batchStartIdx += batchSize) {
final String lIdxName = indexName
final long lStartIdx = batchStartIdx;
final long lEndIdx = (lStartIdx + batchSize) > endIndex ? endIndex : (lStartIdx + batchSize);
threads << new Thread({
restore(lIdxName, lStartIdx, lEndIdx)
})
}
threads.each { it.start() }
threads.each { it.join() }
Date endTime = new Date()
display("Restore complete: " + indexName + ". Time taken: " + (endTime.time - startTime.time) + "ms")
}
void restore(String indexName, String guid) {
Date startTime = new Date()
def mgmt = g.getManagementSystem()
def documentsPerStore = new java.util.HashMap()
def tx = mgmt.getWrappedTx()
def mutator = tx.getTxHandle()
def tIdx = mgmt.getGraphIndex(indexName)
def indexType = mgmt.getSchemaVertex(tIdx).asIndexType()
def mixedIndexType = indexType as com.thinkaurelius.titan.graphdb.types.indextype.MixedIndexTypeWrapper
def vertex = g.V().has("__guid", guid).next();
if (vertex == null) {
display(" Could not find: " + guid);
return;
}
for (int attemptCount = 1; attemptCount <= MAX_TRIES_ON_FAILURE; attemptCount++) {
try {
indexSerializer.reindexElement(vertex, mixedIndexType, documentsPerStore);
break;
} catch (all) {
display("Exception: " + all)
display("Pausing before retry..")
Thread.sleep(2000 * attemptCount)
}
}
display(" commit")
mutator.getIndexTransaction(mixedIndexType.getBackingIndexName()).restore(documentsPerStore)
def endTime = new Date()
display(" thread end. Time taken: " + (endTime.time - startTime.time) + "ms")
}
void restore(String indexName, long startIndex, long endIndex) {
Date startTime = new Date()
String batchId = "Batch Id: [" + startIndex + "-" + endIndex + "]"
display(batchId + " thread start")
def mgmt = g.getManagementSystem()
def documentsPerStore = new java.util.HashMap()
def tx = mgmt.getWrappedTx()
def mutator = tx.getTxHandle()
def tIdx = mgmt.getGraphIndex(indexName)
def indexType = mgmt.getSchemaVertex(tIdx).asIndexType()
def mixedIndexType = indexType as com.thinkaurelius.titan.graphdb.types.indextype.MixedIndexTypeWrapper
for (long batchStartIdx = startIndex; batchStartIdx < endIndex; batchStartIdx += COMMIT_BATCH_SIZE) {
long batchEndIdx = (batchStartIdx + COMMIT_BATCH_SIZE) > endIndex ? endIndex : (batchStartIdx + COMMIT_BATCH_SIZE)
String displayStr = batchId + ": " + "{" + batchStartIdx + "-" + batchEndIdx + "}"
for (def vertex : g.V[batchStartIdx..batchEndIdx]) {
for (int attemptCount = 1; attemptCount <= MAX_TRIES_ON_FAILURE; attemptCount++) {
try {
indexSerializer.reindexElement(vertex, mixedIndexType, documentsPerStore);
break;
} catch (all) {
display("Exception: " + all)
display("Pausing before retry..")
Thread.sleep(2000 * attemptCount)
}
}
}
display(displayStr + " commit")
mutator.getIndexTransaction(mixedIndexType.getBackingIndexName()).restore(documentsPerStore)
}
def endTime = new Date()
display(batchId + " thread end. Time taken: " + (endTime.time - startTime.time) + "ms")
}
def close() {
g.shutdown()
display("Graph shutdown!")
}
def display(String s) {
println(getTimestamp() + " " + s)
}
def getTimestamp() {
new Date().format("yyyy-MM-dd HH:mm:ss.SSS")
}
}
def repairAtlasIndex(String configPath) {
r = new GroovyIndexRepair()
r.setup(configPath)
r.processIndex("vertex_index")
r.processIndex("fulltext_index")
r.processIndex("edge_index")
r.close()
}
def repairAtlasEntity(String configPath, String guid) {
r = new GroovyIndexRepair()
r.setup(configPath)
r.processIndex(["vertex_index", "fulltext_index", "edge_index"], guid)
r.close()
}