blob: 38d230ffc0f200eeb2967a75e2ba07c5d58dc74d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.patches;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.pc.WorkItemBuilder;
import org.apache.atlas.pc.WorkItemConsumer;
import org.apache.atlas.pc.WorkItemManager;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasElement;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
public class ReIndexPatch extends AtlasPatchHandler {
private static final Logger LOG = LoggerFactory.getLogger(ReIndexPatch.class);
private static final String PATCH_ID = "JAVA_PATCH_0000_006";
private static final String PATCH_DESCRIPTION = "Performs reindex on all the indexes.";
private final PatchContext context;
public ReIndexPatch(PatchContext context) {
super(context.getPatchRegistry(), PATCH_ID, PATCH_DESCRIPTION);
this.context = context;
}
@Override
public void apply() throws AtlasBaseException {
if (AtlasConfiguration.REBUILD_INDEX.getBoolean() == false) {
LOG.info("ReIndexPatch: Skipped, since not enabled!");
return;
}
try {
LOG.info("ReIndexPatch: Starting...");
ReindexPatchProcessor reindexPatchProcessor = new ReindexPatchProcessor(context);
reindexPatchProcessor.repairVertices();
reindexPatchProcessor.repairEdges();
} catch (Exception exception) {
LOG.error("Error while reindexing.", exception);
} finally {
LOG.info("ReIndexPatch: Done!");
}
setStatus(UNKNOWN);
LOG.info("ReIndexPatch.apply(): patchId={}, status={}", getPatchId(), getStatus());
}
public static class ReindexPatchProcessor {
private static String[] vertexIndexNames = new String[]{ Constants.VERTEX_INDEX, Constants.FULLTEXT_INDEX };
private static String[] edgeIndexNames = new String[]{ Constants.EDGE_INDEX };
private static String WORKER_PREFIX = "reindex";
private PatchContext context;
public ReindexPatchProcessor(PatchContext context) {
this.context = context;
}
public void repairVertices() {
repairElements(ReindexPatchProcessor::vertices, vertexIndexNames);
}
public void repairEdges() {
repairElements(ReindexPatchProcessor::edges, edgeIndexNames);
}
private void repairElements(BiConsumer<WorkItemManager, AtlasGraph> action, String[] indexNames) {
WorkItemManager manager = new WorkItemManager(new ReindexConsumerBuilder(context.getGraph(), indexNames),
WORKER_PREFIX, ConcurrentPatchProcessor.BATCH_SIZE, ConcurrentPatchProcessor.NUM_WORKERS, false);
try {
LOG.info("repairElements.execute(): {}: Starting...", indexNames);
action.accept(manager, context.getGraph());
manager.drain();
} finally {
try {
manager.shutdown();
} catch (InterruptedException e) {
LOG.error("repairEdges.execute(): interrupted during WorkItemManager shutdown.", e);
}
LOG.info("repairElements.execute(): {}: Done!", indexNames);
}
}
private static void edges(WorkItemManager manager, AtlasGraph graph) {
Iterable<AtlasEdge> iterable = graph.getEdges();
for (Iterator<AtlasEdge> iter = iterable.iterator(); iter.hasNext(); ) {
manager.checkProduce(iter.next());
}
}
private static void vertices(WorkItemManager manager, AtlasGraph graph) {
Iterable<AtlasVertex> iterable = graph.getVertices();
for (Iterator<AtlasVertex> iter = iterable.iterator(); iter.hasNext(); ) {
AtlasVertex vertex = iter.next();
manager.checkProduce(vertex);
}
}
}
private static class ReindexConsumerBuilder implements WorkItemBuilder<ReindexConsumer, AtlasElement> {
private AtlasGraph graph;
private String[] indexNames;
public ReindexConsumerBuilder(AtlasGraph graph, String[] indexNames) {
this.graph = graph;
this.indexNames = indexNames;
}
@Override
public ReindexConsumer build(BlockingQueue queue) {
return new ReindexConsumer(queue, this.graph, this.indexNames);
}
}
private static class ReindexConsumer extends WorkItemConsumer<AtlasElement> {
private final List<AtlasElement> list = new ArrayList();
private final String[] indexNames;
private final AtlasGraph graph;
private final AtomicLong counter;
public ReindexConsumer(BlockingQueue queue, AtlasGraph graph, String[] indexNames) {
super(queue);
this.graph = graph;
this.indexNames = indexNames;
this.counter = new AtomicLong(0);
}
@Override
protected void doCommit() {
if (list.size() >= ConcurrentPatchProcessor.BATCH_SIZE) {
attemptCommit();
}
}
@Override
protected void commitDirty() {
attemptCommit();
LOG.info("Total: Commit: {}", counter.get());
super.commitDirty();
}
private void attemptCommit() {
for (String indexName : indexNames) {
try {
this.graph.getManagementSystem().reindex(indexName, list);
}
catch (IllegalStateException e) {
LOG.error("IllegalStateException: Exception", e);
return;
}
catch (Exception exception) {
LOG.error("Exception: {}", indexName, exception);
}
}
list.clear();
LOG.info("Processed: {}", counter.get());
}
@Override
protected void processItem(AtlasElement item) {
counter.incrementAndGet();
list.add(item);
commit();
}
}
}