blob: 7e047a0632d6f7b2dc9cc91bfd1a30f2a22e53df [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.job.schema;
import java.util.Collection;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.SchemaStatus;
import org.apache.hugegraph.util.LockUtil;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.hugegraph.schema.EdgeLabel;
import org.apache.hugegraph.schema.IndexLabel;
import org.apache.hugegraph.schema.SchemaElement;
import org.apache.hugegraph.schema.SchemaLabel;
import org.apache.hugegraph.schema.VertexLabel;
import org.apache.hugegraph.structure.HugeElement;
import com.google.common.collect.ImmutableSet;
public class IndexLabelRebuildJob extends SchemaJob {
@Override
public String type() {
return REBUILD_INDEX;
}
@Override
public Object execute() {
SchemaElement schema = this.schemaElement();
// If the schema does not exist, ignore it
if (schema != null) {
this.rebuildIndex(schema);
}
return null;
}
private void rebuildIndex(SchemaElement schema) {
switch (schema.type()) {
case INDEX_LABEL:
IndexLabel indexLabel = (IndexLabel) schema;
SchemaLabel label;
if (indexLabel.baseType() == HugeType.VERTEX_LABEL) {
label = this.graph().vertexLabel(indexLabel.baseValue());
} else {
assert indexLabel.baseType() == HugeType.EDGE_LABEL;
label = this.graph().edgeLabel(indexLabel.baseValue());
}
assert label != null;
this.rebuildIndex(label, ImmutableSet.of(indexLabel.id()));
break;
case VERTEX_LABEL:
case EDGE_LABEL:
label = (SchemaLabel) schema;
this.rebuildIndex(label, label.indexLabels());
break;
default:
assert schema.type() == HugeType.PROPERTY_KEY;
throw new AssertionError(String.format(
"The %s can't rebuild index", schema.type()));
}
}
private void rebuildIndex(SchemaLabel label, Collection<Id> indexLabelIds) {
SchemaTransaction schemaTx = this.params().schemaTransaction();
GraphTransaction graphTx = this.params().graphTransaction();
Consumer<?> indexUpdater = (elem) -> {
for (Id id : indexLabelIds) {
graphTx.updateIndex(id, (HugeElement) elem, false);
}
};
LockUtil.Locks locks = new LockUtil.Locks(schemaTx.graphName());
try {
locks.lockWrites(LockUtil.INDEX_LABEL_REBUILD, indexLabelIds);
Set<IndexLabel> ils = indexLabelIds.stream()
.map(this.graph()::indexLabel)
.collect(Collectors.toSet());
for (IndexLabel il : ils) {
if (il.status() == SchemaStatus.CREATING) {
continue;
}
schemaTx.updateSchemaStatus(il, SchemaStatus.REBUILDING);
}
this.removeIndex(indexLabelIds);
/*
* Note: Here must commit index transaction firstly.
* Because remove index convert to (id like <?>:personByCity):
* `delete from index table where label = ?`,
* But append index will convert to (id like Beijing:personByCity):
* `update index element_ids += xxx where field_value = ?
* and index_label_name = ?`,
* They have different id lead to it can't compare and optimize
*/
graphTx.commit();
try {
if (label.type() == HugeType.VERTEX_LABEL) {
@SuppressWarnings("unchecked")
Consumer<Vertex> consumer = (Consumer<Vertex>) indexUpdater;
graphTx.traverseVerticesByLabel((VertexLabel) label,
consumer, false);
} else {
assert label.type() == HugeType.EDGE_LABEL;
@SuppressWarnings("unchecked")
Consumer<Edge> consumer = (Consumer<Edge>) indexUpdater;
graphTx.traverseEdgesByLabel((EdgeLabel) label,
consumer, false);
}
graphTx.commit();
} catch (Throwable e) {
for (IndexLabel il : ils) {
schemaTx.updateSchemaStatus(il, SchemaStatus.INVALID);
}
throw e;
}
for (IndexLabel il : ils) {
schemaTx.updateSchemaStatus(il, SchemaStatus.CREATED);
}
} finally {
locks.unlock();
}
}
private void removeIndex(Collection<Id> indexLabelIds) {
SchemaTransaction schemaTx = this.params().schemaTransaction();
GraphTransaction graphTx = this.params().graphTransaction();
for (Id id : indexLabelIds) {
IndexLabel il = schemaTx.getIndexLabel(id);
if (il == null || il.status() == SchemaStatus.CREATING) {
/*
* TODO: How to deal with non-existent index name:
* continue or throw exception?
*/
continue;
}
LockUtil.Locks locks = new LockUtil.Locks(schemaTx.graphName());
try {
locks.lockWrites(LockUtil.INDEX_LABEL_DELETE, indexLabelIds);
graphTx.removeIndex(il);
} catch (Throwable e) {
schemaTx.updateSchemaStatus(il, SchemaStatus.INVALID);
throw e;
} finally {
locks.unlock();
}
}
}
private SchemaElement schemaElement() {
HugeType type = this.schemaType();
Id id = this.schemaId();
switch (type) {
case VERTEX_LABEL:
return this.graph().vertexLabel(id);
case EDGE_LABEL:
return this.graph().edgeLabel(id);
case INDEX_LABEL:
return this.graph().indexLabel(id);
default:
throw new AssertionError(String.format(
"Invalid HugeType '%s' for rebuild", type));
}
}
}