blob: 9e47132ad1baa41271621fb1bbb2331e65604c16 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.indexing.pcj.fluo.app.observers;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.log4j.Logger;
import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater;
import org.apache.rya.indexing.pcj.fluo.app.ConstructQueryResultUpdater;
import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater;
import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.PeriodicQueryUpdater;
import org.apache.rya.indexing.pcj.fluo.app.ProjectionResultUpdater;
import org.apache.rya.indexing.pcj.fluo.app.QueryResultUpdater;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataCache;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.MetadataCacheSupplier;
import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
/**
* Notified when the results of a node have been updated to include a new Binding
* Set. This observer updates its parent if the new Binding Set effects the parent's
* results.
*/
@DefaultAnnotation(NonNull.class)
public abstract class BindingSetUpdater extends AbstractObserver {
private static final Logger log = Logger.getLogger(BindingSetUpdater.class);
// DAO
protected final FluoQueryMetadataCache queryDao = MetadataCacheSupplier.getOrCreateCache();
// Updaters
private final JoinResultUpdater joinUpdater = new JoinResultUpdater();
private final FilterResultUpdater filterUpdater = new FilterResultUpdater();
private final QueryResultUpdater queryUpdater = new QueryResultUpdater();
private final AggregationResultUpdater aggregationUpdater = new AggregationResultUpdater();
private final ConstructQueryResultUpdater constructUpdater = new ConstructQueryResultUpdater();
private final ProjectionResultUpdater projectionUpdater = new ProjectionResultUpdater();
private final PeriodicQueryUpdater periodicQueryUpdater = new PeriodicQueryUpdater();
@Override
public abstract ObservedColumn getObservedColumn();
/**
* Create an {@link Observation} that defines the work that needs to be done.
*
* @param tx - The Fluo transaction being used for the observer notification. (not null)
* @param row - The row that triggered the notification. (not null)
* @return An {@link Observation} that defines the work that needs to be done.
* @throws Exception A problem caused this method to fail.
*/
public abstract Observation parseObservation(TransactionBase tx, Bytes row) throws Exception;
@Override
public final void process(final TransactionBase tx, final Bytes row, final Column col) {
checkNotNull(tx);
checkNotNull(row);
checkNotNull(col);
final Observation observation;
try {
observation = parseObservation(tx, row);
} catch (final Exception e) {
log.error("Unable to parse an Observation from a Row and Column pair, so this notification will be skipped. " +
"Row: " + row + " Column: " + col, e);
return;
}
final String observedNodeId = observation.getObservedNodeId();
final VisibilityBindingSet observedBindingSet = observation.getObservedBindingSet();
final String parentNodeId = observation.getParentId();
// Figure out which node needs to handle the new metadata.
final NodeType parentNodeType = NodeType.fromNodeId(parentNodeId).get();
switch(parentNodeType) {
case QUERY:
final QueryMetadata parentQuery = queryDao.readQueryMetadata(tx, parentNodeId);
try {
queryUpdater.updateQueryResults(tx, observedBindingSet, parentQuery);
} catch (final Exception e) {
throw new RuntimeException("Could not process a Query node.", e);
}
break;
case PROJECTION:
final ProjectionMetadata projectionQuery = queryDao.readProjectionMetadata(tx, parentNodeId);
try {
projectionUpdater.updateProjectionResults(tx, observedBindingSet, projectionQuery);
} catch (final Exception e) {
throw new RuntimeException("Could not process a Query node.", e);
}
break;
case CONSTRUCT:
final ConstructQueryMetadata constructQuery = queryDao.readConstructQueryMetadata(tx, parentNodeId);
try{
constructUpdater.updateConstructQueryResults(tx, observedBindingSet, constructQuery);
} catch (final Exception e) {
throw new RuntimeException("Could not process a Query node.", e);
}
break;
case FILTER:
final FilterMetadata parentFilter = queryDao.readFilterMetadata(tx, parentNodeId);
try {
filterUpdater.updateFilterResults(tx, observedBindingSet, parentFilter);
} catch (final Exception e) {
throw new RuntimeException("Could not process a Filter node.", e);
}
break;
case JOIN:
final JoinMetadata parentJoin = queryDao.readJoinMetadata(tx, parentNodeId);
try {
joinUpdater.updateJoinResults(tx, observedNodeId, observedBindingSet, parentJoin);
} catch (final Exception e) {
throw new RuntimeException("Could not process a Join node.", e);
}
break;
case PERIODIC_QUERY:
final PeriodicQueryMetadata parentPeriodicQuery = queryDao.readPeriodicQueryMetadata(tx, parentNodeId);
try{
periodicQueryUpdater.updatePeriodicBinResults(tx, observedBindingSet, parentPeriodicQuery);
} catch(Exception e) {
throw new RuntimeException("Could not process PeriodicBin node.", e);
}
break;
case AGGREGATION:
final AggregationMetadata parentAggregation = queryDao.readAggregationMetadata(tx, parentNodeId);
try {
aggregationUpdater.updateAggregateResults(tx, observedBindingSet, parentAggregation);
} catch (final Exception e) {
throw new RuntimeException("Could not process an Aggregation node.", e);
}
break;
default:
throw new IllegalArgumentException("The parent node's NodeType must be of type Aggregation, Projection, ConstructQuery, Filter, Join, PeriodicBin or Query, but was " + parentNodeType);
}
}
/**
* Defines who just emitted a new Binding Set result, the Binding Set itself,
* and which node must now handle it.
*/
public static final class Observation {
private final String observedNodeId;
private final VisibilityBindingSet observedBindingSet;
private final String parentNodeId;
/**
* Creates an instance of {@link Observation}.
*
* @param observedNodeId - The Node ID that just emitted a new Binding Set. (not null)
* @param observedBindingSet - A Binding Set that was just emitted. (not null)
* @param parentNodeId - The Node ID of the node that must handle the new Binding Set input. (not null)
*/
public Observation(
final String observedNodeId,
final VisibilityBindingSet observedBindingSet,
final String parentNodeId) {
this.observedNodeId = checkNotNull(observedNodeId);
this.observedBindingSet = checkNotNull(observedBindingSet);
this.parentNodeId = checkNotNull(parentNodeId);
}
/**
* @return The Node ID that just emitted a new Binding Set.
*/
public String getObservedNodeId() {
return observedNodeId;
}
/**
* @return A Binding Set that was just emitted.
*/
public VisibilityBindingSet getObservedBindingSet() {
return observedBindingSet;
}
/**
* @return The Node ID of the node that must handle the new Binding Set input.
*/
public String getParentId() {
return parentNodeId;
}
}
}