blob: 2061e7f8f25d2ed2d6a0889d63256481c474e839 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.step.Mutating;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.event.Event;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.event.EventCallback;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.event.MutationListener;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Property;
import org.apache.tinkerpop.gremlin.structure.Transaction;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedProperty;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory;
import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceProperty;
import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertexProperty;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
/**
* A strategy that raises events when {@link Mutating} steps are encountered and successfully executed.
* <p/>
* Note that this implementation requires a {@link Graph} on the {@link Traversal} instance. If that is not present
* an {@code IllegalStateException} will be thrown.
*
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
public final class EventStrategy extends AbstractTraversalStrategy<TraversalStrategy.DecorationStrategy> implements TraversalStrategy.DecorationStrategy {
private final EventQueue eventQueue;
private final Detachment detachment;
private EventStrategy(final Builder builder) {
this.eventQueue = builder.eventQueue;
this.eventQueue.setListeners(builder.listeners);
this.detachment = builder.detachment;
}
public Detachment getDetachment() {
return this.detachment;
}
/**
* Applies the appropriate detach operation to elements that will be raised in mutation events.
*/
public <R> R detach(final R attached) {
return (R) detachment.detach(attached);
}
@Override
public void apply(final Traversal.Admin<?, ?> traversal) {
final EventStrategyCallback callback = new EventStrategyCallback(eventQueue);
TraversalHelper.getStepsOfAssignableClass(Mutating.class, traversal).forEach(s -> s.getMutatingCallbackRegistry().addCallback(callback));
}
public static Builder build() {
return new Builder();
}
public class EventStrategyCallback implements EventCallback<Event>, Serializable {
private final EventQueue eventQueue;
public EventStrategyCallback(final EventQueue eventQueue) {
this.eventQueue = eventQueue;
}
@Override
public void accept(final Event event) {
eventQueue.addEvent(event);
}
}
public final static class Builder {
private final List<MutationListener> listeners = new ArrayList<>();
private EventQueue eventQueue = new DefaultEventQueue();
private Detachment detachment = Detachment.DETACHED_WITH_PROPERTIES;
Builder() {}
public Builder addListener(final MutationListener listener) {
this.listeners.add(listener);
return this;
}
public Builder eventQueue(final EventQueue eventQueue) {
this.eventQueue = eventQueue;
return this;
}
/**
* Configures the method of detachment for element provided in mutation callback events. The default is
* {@link Detachment#DETACHED_WITH_PROPERTIES}.
*/
public Builder detach(final Detachment detachment) {
this.detachment = detachment;
return this;
}
public EventStrategy create() {
return new EventStrategy(this);
}
}
/**
* A common interface for detachment.
*/
public interface Detacher {
public Object detach(final Object object);
}
/**
* Options for detaching elements from the graph during eventing.
*/
public enum Detachment implements Detacher {
/**
* Does not detach the element from the graph. It should be noted that if this option is used with
* transactional graphs new transactions may be opened if these elements are accessed after a {@code commit()}
* is called.
*/
NONE {
@Override
public Object detach(final Object object) {
return object;
}
},
/**
* Uses {@link DetachedFactory} to detach and includes properties of elements that have them.
*/
DETACHED_WITH_PROPERTIES {
@Override
public Object detach(final Object object) {
return DetachedFactory.detach(object, true);
}
},
/**
* Uses {@link DetachedFactory} to detach and does not include properties of elements that have them.
*/
DETACHED_NO_PROPERTIES {
@Override
public Object detach(final Object object) {
return DetachedFactory.detach(object, false);
}
},
/**
* Uses {@link ReferenceFactory} to detach which only includes id and label of elements.
*/
REFERENCE {
@Override
public Object detach(final Object object) {
return ReferenceFactory.detach(object);
}
}
}
/**
* Gathers messages from callbacks and fires them to listeners. When the event is sent to the listener is
* up to the implementation of this interface.
*/
public interface EventQueue {
/**
* Provide listeners to the queue that were given to the {@link EventStrategy} on construction.
*/
public void setListeners(final List<MutationListener> listeners);
/**
* Add an event to the event queue.
*/
public void addEvent(final Event evt);
}
/**
* Immediately notifies all listeners as events arrive.
*/
public static class DefaultEventQueue implements EventQueue {
private List<MutationListener> listeners = Collections.emptyList();
@Override
public void setListeners(final List<MutationListener> listeners) {
this.listeners = listeners;
}
@Override
public void addEvent(final Event evt) {
evt.fireEvent(listeners.iterator());
}
}
/**
* Stores events in a queue that builds up until the transaction is committed which then fires them in the order
* they were received.
*/
public static class TransactionalEventQueue implements EventQueue {
private final ThreadLocal<Deque<Event>> eventQueue = new ThreadLocal<Deque<Event>>() {
protected Deque<Event> initialValue() {
return new ArrayDeque<>();
}
};
private List<MutationListener> listeners = Collections.emptyList();
public TransactionalEventQueue(final Graph graph) {
if (!graph.features().graph().supportsTransactions())
throw new IllegalStateException(String.format("%s requires the graph to support transactions", EventStrategy.class.getName()));
// since this is a transactional graph events are enqueued so the events should be fired/reset only after
// transaction is committed/rolled back as tied to a graph transaction
graph.tx().addTransactionListener(status -> {
if (status == Transaction.Status.COMMIT)
fireEventQueue();
else if (status == Transaction.Status.ROLLBACK)
resetEventQueue();
else
throw new RuntimeException(String.format("The %s is not aware of this status: %s", EventQueue.class.getName(), status));
});
}
public void addEvent(final Event evt) {
eventQueue.get().add(evt);
}
@Override
public void setListeners(final List<MutationListener> listeners) {
this.listeners = listeners;
}
private void resetEventQueue() {
eventQueue.set(new ArrayDeque<>());
}
private void fireEventQueue() {
final Deque<Event> deque = eventQueue.get();
for (Event event = deque.pollFirst(); event != null; event = deque.pollFirst()) {
event.fireEvent(listeners.iterator());
}
}
}
}