blob: 973ad8b92679d91e5af8abdcb58a58a5fe58b9db [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.triggers;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.cassandra.config.TriggerDefinition;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
public class TriggerExecutor
{
public static final TriggerExecutor instance = new TriggerExecutor();
private final Map<String, ITrigger> cachedTriggers = Maps.newConcurrentMap();
private final ClassLoader parent = Thread.currentThread().getContextClassLoader();
private volatile ClassLoader customClassLoader;
private TriggerExecutor()
{
reloadClasses();
}
/**
* Reload the triggers which is already loaded, Invoking this will update
* the class loader so new jars can be loaded.
*/
public void reloadClasses()
{
File triggerDirectory = FBUtilities.cassandraTriggerDir();
if (triggerDirectory == null)
return;
customClassLoader = new CustomClassLoader(parent, triggerDirectory);
cachedTriggers.clear();
}
/**
* Augment a partition update by executing triggers to generate an intermediate
* set of mutations, then merging the ColumnFamily from each mutation with those
* supplied. This is called from @{link org.apache.cassandra.service.StorageProxy#cas}
* which is scoped for a single partition. For that reason, any mutations generated
* by triggers are checked to ensure that they are for the same table and partition
* key as the primary update; if not, InvalidRequestException is thrown. If no
* additional mutations are generated, the original updates are returned unmodified.
*
* @param key partition key for the update
* @param updates partition update to be applied, contains the merge of the original
* update and any generated mutations
* @return the final update to be applied, the original update merged with any
* additional mutations generated by configured triggers
* @throws InvalidRequestException if any mutation generated by a trigger does not
* apply to the exact same partition as the initial update
*/
public ColumnFamily execute(ByteBuffer key, ColumnFamily updates) throws InvalidRequestException
{
List<Mutation> intermediate = executeInternal(key, updates);
if (intermediate == null || intermediate.isEmpty())
return updates;
validateForSinglePartition(updates.metadata().getKeyValidator(), updates.id(), key, intermediate);
for (Mutation mutation : intermediate)
{
for (ColumnFamily cf : mutation.getColumnFamilies())
{
updates.addAll(cf);
}
}
return updates;
}
/**
* Takes a collection of mutations and possibly augments it by adding extra mutations
* generated by configured triggers. If no additional mutations are created
* this returns null, signalling to the caller that only the initial set of
* mutations should be applied. If additional mutations <i>are</i> generated,
* the total set (i.e. the original plus the additional mutations) are applied
* together in a logged batch. Should this not be possible because the initial
* mutations contain counter updates, InvalidRequestException is thrown.
*
* @param mutations initial collection of mutations
* @return augmented mutations. Either the union of the initial and additional
* mutations or null if no additional mutations were generated
* @throws InvalidRequestException if additional mutations were generated, but
* the initial mutations contains counter updates
*/
public Collection<Mutation> execute(Collection<? extends IMutation> mutations) throws InvalidRequestException
{
boolean hasCounters = false;
List<Mutation> augmentedMutations = null;
for (IMutation mutation : mutations)
{
if (mutation instanceof CounterMutation)
hasCounters = true;
for (ColumnFamily cf : mutation.getColumnFamilies())
{
List<Mutation> augmentations = executeInternal(mutation.key(), cf);
if (augmentations == null || augmentations.isEmpty())
continue;
validate(augmentations);
if (augmentedMutations == null)
augmentedMutations = new LinkedList<>();
augmentedMutations.addAll(augmentations);
}
}
if (augmentedMutations == null)
return null;
if (hasCounters)
throw new InvalidRequestException("Counter mutations and trigger mutations cannot be applied together atomically.");
@SuppressWarnings("unchecked")
Collection<Mutation> originalMutations = (Collection<Mutation>) mutations;
return mergeMutations(Iterables.concat(originalMutations, augmentedMutations));
}
private Collection<Mutation> mergeMutations(Iterable<Mutation> mutations)
{
Map<Pair<String, ByteBuffer>, Mutation> groupedMutations = new HashMap<>();
for (Mutation mutation : mutations)
{
Pair<String, ByteBuffer> key = Pair.create(mutation.getKeyspaceName(), mutation.key());
Mutation current = groupedMutations.get(key);
if (current == null)
{
// copy in case the mutation's modifications map is backed by an immutable Collections#singletonMap().
groupedMutations.put(key, mutation.copy());
}
else
{
current.addAll(mutation);
}
}
return groupedMutations.values();
}
private void validateForSinglePartition(AbstractType<?> keyValidator,
UUID cfId,
ByteBuffer key,
Collection<Mutation> tmutations)
throws InvalidRequestException
{
for (Mutation mutation : tmutations)
{
if (keyValidator.compare(mutation.key(), key) != 0)
throw new InvalidRequestException("Partition key of additional mutation does not match primary update key");
for (ColumnFamily cf : mutation.getColumnFamilies())
{
if (! cf.id().equals(cfId))
throw new InvalidRequestException("table of additional mutation does not match primary update table");
}
}
validate(tmutations);
}
private void validate(Collection<Mutation> tmutations) throws InvalidRequestException
{
for (Mutation mutation : tmutations)
{
QueryProcessor.validateKey(mutation.key());
for (ColumnFamily tcf : mutation.getColumnFamilies())
for (Cell cell : tcf)
cell.validateFields(tcf.metadata());
}
}
/**
* Switch class loader before using the triggers for the column family, if
* not loaded them with the custom class loader.
*/
private List<Mutation> executeInternal(ByteBuffer key, ColumnFamily columnFamily)
{
Map<String, TriggerDefinition> triggers = columnFamily.metadata().getTriggers();
if (triggers.isEmpty())
return null;
List<Mutation> tmutations = Lists.newLinkedList();
Thread.currentThread().setContextClassLoader(customClassLoader);
try
{
for (TriggerDefinition td : triggers.values())
{
ITrigger trigger = cachedTriggers.get(td.classOption);
if (trigger == null)
{
trigger = loadTriggerInstance(td.classOption);
cachedTriggers.put(td.classOption, trigger);
}
Collection<Mutation> temp = trigger.augment(key, columnFamily);
if (temp != null)
tmutations.addAll(temp);
}
return tmutations;
}
catch (Exception ex)
{
throw new RuntimeException(String.format("Exception while creating trigger on table with ID: %s", columnFamily.id()), ex);
}
finally
{
Thread.currentThread().setContextClassLoader(parent);
}
}
public synchronized ITrigger loadTriggerInstance(String triggerName) throws Exception
{
// double check.
if (cachedTriggers.get(triggerName) != null)
return cachedTriggers.get(triggerName);
return (ITrigger) customClassLoader.loadClass(triggerName).getConstructor().newInstance();
}
}