/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.db.filter;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.base.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.context.*;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.partitions.FilteredPartition;
import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;

import static org.apache.cassandra.cql3.statements.RequestValidations.checkBindValueSet;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;

/**
 * A filter on which rows a given query should include or exclude.
 * <p>
 * This corresponds to the restrictions on rows that are not handled by the query
 * {@link ClusteringIndexFilter}. Some of the expressions of this filter may
 * be handled by a 2ndary index, and the rest is simply filtered out from the
 * result set (the later can only happen if the query was using ALLOW FILTERING).
 */
public abstract class RowFilter implements Iterable<RowFilter.Expression>
{
    private static final Logger logger = LoggerFactory.getLogger(RowFilter.class);

    public static final Serializer serializer = new Serializer();
    public static final RowFilter NONE = new CQLFilter(Collections.emptyList());

    protected final List<Expression> expressions;

    protected RowFilter(List<Expression> expressions)
    {
        this.expressions = expressions;
    }

    public static RowFilter create()
    {
        return new CQLFilter(new ArrayList<>());
    }

    public static RowFilter create(int capacity)
    {
        return new CQLFilter(new ArrayList<>(capacity));
    }

    public static RowFilter forThrift(int capacity)
    {
        return new ThriftFilter(new ArrayList<>(capacity));
    }

    public SimpleExpression add(ColumnDefinition def, Operator op, ByteBuffer value)
    {
        SimpleExpression expression = new SimpleExpression(def, op, value);
        add(expression);
        return expression;
    }

    public void addMapEquality(ColumnDefinition def, ByteBuffer key, Operator op, ByteBuffer value)
    {
        add(new MapEqualityExpression(def, key, op, value));
    }

    public void addThriftExpression(CFMetaData metadata, ByteBuffer name, Operator op, ByteBuffer value)
    {
        assert (this instanceof ThriftFilter);
        add(new ThriftExpression(metadata, name, op, value));
    }

    public void addCustomIndexExpression(CFMetaData cfm, IndexMetadata targetIndex, ByteBuffer value)
    {
        add(new CustomExpression(cfm, targetIndex, value));
    }

    private void add(Expression expression)
    {
        expression.validate();
        expressions.add(expression);
    }

    public void addUserExpression(UserExpression e)
    {
        expressions.add(e);
    }

    public List<Expression> getExpressions()
    {
        return expressions;
    }

    /**
     * Checks if some of the expressions apply to clustering or regular columns.
     * @return {@code true} if some of the expressions apply to clustering or regular columns, {@code false} otherwise.
     */
    public boolean hasExpressionOnClusteringOrRegularColumns()
    {
        for (Expression expression : expressions)
        {
            ColumnDefinition column = expression.column();
            if (column.isClusteringColumn() || column.isRegular())
                return true;
        }
        return false;
    }

    protected abstract Transformation<BaseRowIterator<?>> filter(CFMetaData metadata, int nowInSec);

    /**
     * Filters the provided iterator so that only the row satisfying the expression of this filter
     * are included in the resulting iterator.
     *
     * @param iter the iterator to filter
     * @param nowInSec the time of query in seconds.
     * @return the filtered iterator.
     */
    public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec)
    {
        return expressions.isEmpty() ? iter : Transformation.apply(iter, filter(iter.metadata(), nowInSec));
    }

    /**
     * Filters the provided iterator so that only the row satisfying the expression of this filter
     * are included in the resulting iterator.
     *
     * @param iter the iterator to filter
     * @param nowInSec the time of query in seconds.
     * @return the filtered iterator.
     */
    public PartitionIterator filter(PartitionIterator iter, CFMetaData metadata, int nowInSec)
    {
        return expressions.isEmpty() ? iter : Transformation.apply(iter, filter(metadata, nowInSec));
    }

    /**
     * Whether the provided row in the provided partition satisfies this filter.
     *
     * @param metadata the table metadata.
     * @param partitionKey the partition key for partition to test.
     * @param row the row to test.
     * @param nowInSec the current time in seconds (to know what is live and what isn't).
     * @return {@code true} if {@code row} in partition {@code partitionKey} satisfies this row filter.
     */
    public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey partitionKey, Row row, int nowInSec)
    {
        // We purge all tombstones as the expressions isSatisfiedBy methods expects it
        Row purged = row.purge(DeletionPurger.PURGE_ALL, nowInSec, metadata.enforceStrictLiveness());
        if (purged == null)
            return expressions.isEmpty();

        for (Expression e : expressions)
        {
            if (!e.isSatisfiedBy(metadata, partitionKey, purged))
                return false;
        }
        return true;
    }

    /**
     * Returns true if all of the expressions within this filter that apply to the partition key are satisfied by
     * the given key, false otherwise.
     */
    public boolean partitionKeyRestrictionsAreSatisfiedBy(DecoratedKey key, AbstractType<?> keyValidator)
    {
        for (Expression e : expressions)
        {
            if (!e.column.isPartitionKey())
                continue;

            ByteBuffer value = keyValidator instanceof CompositeType
                             ? ((CompositeType) keyValidator).split(key.getKey())[e.column.position()]
                             : key.getKey();
            if (!e.operator().isSatisfiedBy(e.column.type, value, e.value))
                return false;
        }
        return true;
    }

    /**
     * Returns true if all of the expressions within this filter that apply to the clustering key are satisfied by
     * the given Clustering, false otherwise.
     */
    public boolean clusteringKeyRestrictionsAreSatisfiedBy(Clustering clustering)
    {
        for (Expression e : expressions)
        {
            if (!e.column.isClusteringColumn())
                continue;

            if (!e.operator().isSatisfiedBy(e.column.type, clustering.get(e.column.position()), e.value))
            {
                return false;
            }
        }
        return true;
    }

    /**
     * Returns this filter but without the provided expression. This method
     * *assumes* that the filter contains the provided expression.
     */
    public RowFilter without(Expression expression)
    {
        assert expressions.contains(expression);
        if (expressions.size() == 1)
            return RowFilter.NONE;

        List<Expression> newExpressions = new ArrayList<>(expressions.size() - 1);
        for (Expression e : expressions)
            if (!e.equals(expression))
                newExpressions.add(e);

        return withNewExpressions(newExpressions);
    }

    public RowFilter withoutExpressions()
    {
        return withNewExpressions(Collections.emptyList());
    }

    protected abstract RowFilter withNewExpressions(List<Expression> expressions);

    public boolean isEmpty()
    {
        return expressions.isEmpty();
    }

    public Iterator<Expression> iterator()
    {
        return expressions.iterator();
    }

    private static Clustering makeCompactClustering(CFMetaData metadata, ByteBuffer name)
    {
        assert metadata.isCompactTable();
        if (metadata.isCompound())
        {
            List<ByteBuffer> values = CompositeType.splitName(name);
            return Clustering.make(values.toArray(new ByteBuffer[metadata.comparator.size()]));
        }
        else
        {
            return Clustering.make(name);
        }
    }

    @Override
    public String toString()
    {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < expressions.size(); i++)
        {
            if (i > 0)
                sb.append(" AND ");
            sb.append(expressions.get(i));
        }
        return sb.toString();
    }

    private static class CQLFilter extends RowFilter
    {
        private CQLFilter(List<Expression> expressions)
        {
            super(expressions);
        }

        protected Transformation<BaseRowIterator<?>> filter(CFMetaData metadata, int nowInSec)
        {
            List<Expression> partitionLevelExpressions = new ArrayList<>();
            List<Expression> rowLevelExpressions = new ArrayList<>();
            for (Expression e: expressions)
            {
                if (e.column.isStatic() || e.column.isPartitionKey())
                    partitionLevelExpressions.add(e);
                else
                    rowLevelExpressions.add(e);
            }

            long numberOfRegularColumnExpressions = rowLevelExpressions.size();
            final boolean filterNonStaticColumns = numberOfRegularColumnExpressions > 0;

            return new Transformation<BaseRowIterator<?>>()
            {
                DecoratedKey pk;
                protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> partition)
                {
                    pk = partition.partitionKey();

                    // Short-circuit all partitions that won't match based on static and partition keys
                    for (Expression e : partitionLevelExpressions)
                        if (!e.isSatisfiedBy(metadata, partition.partitionKey(), partition.staticRow()))
                        {
                            partition.close();
                            return null;
                        }

                    BaseRowIterator<?> iterator = partition instanceof UnfilteredRowIterator
                                                  ? Transformation.apply((UnfilteredRowIterator) partition, this)
                                                  : Transformation.apply((RowIterator) partition, this);

                    if (filterNonStaticColumns && !iterator.hasNext())
                    {
                        iterator.close();
                        return null;
                    }

                    return iterator;
                }

                public Row applyToRow(Row row)
                {
                    Row purged = row.purge(DeletionPurger.PURGE_ALL, nowInSec, metadata.enforceStrictLiveness());
                    if (purged == null)
                        return null;

                    for (Expression e : rowLevelExpressions)
                        if (!e.isSatisfiedBy(metadata, pk, purged))
                            return null;

                    return row;
                }
            };
        }

        protected RowFilter withNewExpressions(List<Expression> expressions)
        {
            return new CQLFilter(expressions);
        }
    }

    private static class ThriftFilter extends RowFilter
    {
        private ThriftFilter(List<Expression> expressions)
        {
            super(expressions);
        }

        protected Transformation<BaseRowIterator<?>> filter(CFMetaData metadata, int nowInSec)
        {
            // Thrift does not filter rows, it filters entire partition if any of the expression is not
            // satisfied, which forces us to materialize the result (in theory we could materialize only
            // what we need which might or might not be everything, but we keep it simple since in practice
            // it's not worth that it has ever been).
            return new Transformation<BaseRowIterator<?>>()
            {
                protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> partition)
                {
                    return partition instanceof UnfilteredRowIterator ? applyTo((UnfilteredRowIterator) partition)
                                                                      : applyTo((RowIterator) partition);
                }

                private UnfilteredRowIterator applyTo(UnfilteredRowIterator partition)
                {
                    ImmutableBTreePartition result = ImmutableBTreePartition.create(partition);
                    partition.close();
                    return accepts(result) ? result.unfilteredIterator() : null;
                }

                private RowIterator applyTo(RowIterator partition)
                {
                    FilteredPartition result = FilteredPartition.create(partition);
                    return accepts(result) ? result.rowIterator() : null;
                }

                private boolean accepts(ImmutableBTreePartition result)
                {
                    // The partition needs to have a row for every expression, and the expression needs to be valid.
                    for (Expression expr : expressions)
                    {
                        assert expr instanceof ThriftExpression;
                        Row row = result.getRow(makeCompactClustering(metadata, expr.column().name.bytes));
                        if (row == null || !expr.isSatisfiedBy(metadata, result.partitionKey(), row))
                            return false;
                    }
                    // If we get there, it means all expressions where satisfied, so return the original result
                    return true;
                }
            };
        }

        protected RowFilter withNewExpressions(List<Expression> expressions)
        {
            return new ThriftFilter(expressions);
        }
    }

    public static abstract class Expression
    {
        private static final Serializer serializer = new Serializer();

        // Note: the order of this enum matter, it's used for serialization
        protected enum Kind { SIMPLE, MAP_EQUALITY, THRIFT_DYN_EXPR, CUSTOM, USER }

        protected abstract Kind kind();
        protected final ColumnDefinition column;
        protected final Operator operator;
        protected final ByteBuffer value;

        protected Expression(ColumnDefinition column, Operator operator, ByteBuffer value)
        {
            this.column = column;
            this.operator = operator;
            this.value = value;
        }

        public boolean isCustom()
        {
            return kind() == Kind.CUSTOM;
        }

        public boolean isUserDefined()
        {
            return kind() == Kind.USER;
        }

        public ColumnDefinition column()
        {
            return column;
        }

        public Operator operator()
        {
            return operator;
        }

        /**
         * Checks if the operator of this <code>IndexExpression</code> is a <code>CONTAINS</code> operator.
         *
         * @return <code>true</code> if the operator of this <code>IndexExpression</code> is a <code>CONTAINS</code>
         * operator, <code>false</code> otherwise.
         */
        public boolean isContains()
        {
            return Operator.CONTAINS == operator;
        }

        /**
         * Checks if the operator of this <code>IndexExpression</code> is a <code>CONTAINS_KEY</code> operator.
         *
         * @return <code>true</code> if the operator of this <code>IndexExpression</code> is a <code>CONTAINS_KEY</code>
         * operator, <code>false</code> otherwise.
         */
        public boolean isContainsKey()
        {
            return Operator.CONTAINS_KEY == operator;
        }

        /**
         * If this expression is used to query an index, the value to use as
         * partition key for that index query.
         */
        public ByteBuffer getIndexValue()
        {
            return value;
        }

        public void validate()
        {
            checkNotNull(value, "Unsupported null value for column %s", column.name);
            checkBindValueSet(value, "Unsupported unset value for column %s", column.name);
        }

        @Deprecated
        public void validateForIndexing()
        {
            checkFalse(value.remaining() > FBUtilities.MAX_UNSIGNED_SHORT,
                       "Index expression values may not be larger than 64K");
        }

        /**
         * Returns whether the provided row satisfied this expression or not.
         *
         * @param partitionKey the partition key for row to check.
         * @param row the row to check. It should *not* contain deleted cells
         * (i.e. it should come from a RowIterator).
         * @return whether the row is satisfied by this expression.
         */
        public abstract boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey partitionKey, Row row);

        protected ByteBuffer getValue(CFMetaData metadata, DecoratedKey partitionKey, Row row)
        {
            switch (column.kind)
            {
                case PARTITION_KEY:
                    return metadata.getKeyValidator() instanceof CompositeType
                         ? CompositeType.extractComponent(partitionKey.getKey(), column.position())
                         : partitionKey.getKey();
                case CLUSTERING:
                    return row.clustering().get(column.position());
                default:
                    Cell cell = row.getCell(column);
                    return cell == null ? null : cell.value();
            }
        }

        @Override
        public boolean equals(Object o)
        {
            if (this == o)
                return true;

            if (!(o instanceof Expression))
                return false;

            Expression that = (Expression)o;

            return Objects.equal(this.kind(), that.kind())
                && Objects.equal(this.column.name, that.column.name)
                && Objects.equal(this.operator, that.operator)
                && Objects.equal(this.value, that.value);
        }

        @Override
        public int hashCode()
        {
            return Objects.hashCode(column.name, operator, value);
        }

        private static class Serializer
        {
            public void serialize(Expression expression, DataOutputPlus out, int version) throws IOException
            {
                if (version >= MessagingService.VERSION_30)
                    out.writeByte(expression.kind().ordinal());

                // Custom expressions include neither a column or operator, but all
                // other expressions do. Also, custom expressions are 3.0+ only, so
                // the column & operator will always be the first things written for
                // any pre-3.0 version
                if (expression.kind() == Kind.CUSTOM)
                {
                    assert version >= MessagingService.VERSION_30;
                    IndexMetadata.serializer.serialize(((CustomExpression)expression).targetIndex, out, version);
                    ByteBufferUtil.writeWithShortLength(expression.value, out);
                    return;
                }

                if (expression.kind() == Kind.USER)
                {
                    assert version >= MessagingService.VERSION_30;
                    UserExpression.serialize((UserExpression)expression, out, version);
                    return;
                }

                ByteBufferUtil.writeWithShortLength(expression.column.name.bytes, out);
                expression.operator.writeTo(out);

                switch (expression.kind())
                {
                    case SIMPLE:
                        ByteBufferUtil.writeWithShortLength(((SimpleExpression)expression).value, out);
                        break;
                    case MAP_EQUALITY:
                        MapEqualityExpression mexpr = (MapEqualityExpression)expression;
                        if (version < MessagingService.VERSION_30)
                        {
                            ByteBufferUtil.writeWithShortLength(mexpr.getIndexValue(), out);
                        }
                        else
                        {
                            ByteBufferUtil.writeWithShortLength(mexpr.key, out);
                            ByteBufferUtil.writeWithShortLength(mexpr.value, out);
                        }
                        break;
                    case THRIFT_DYN_EXPR:
                        ByteBufferUtil.writeWithShortLength(((ThriftExpression)expression).value, out);
                        break;
                }
            }

            public Expression deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException
            {
                Kind kind = null;
                ByteBuffer name;
                Operator operator;
                ColumnDefinition column;

                if (version >= MessagingService.VERSION_30)
                {
                    kind = Kind.values()[in.readByte()];
                    // custom expressions (3.0+ only) do not contain a column or operator, only a value
                    if (kind == Kind.CUSTOM)
                    {
                        return new CustomExpression(metadata,
                                                    IndexMetadata.serializer.deserialize(in, version, metadata),
                                                    ByteBufferUtil.readWithShortLength(in));
                    }

                    if (kind == Kind.USER)
                    {
                        return UserExpression.deserialize(in, version, metadata);
                    }
                }

                name = ByteBufferUtil.readWithShortLength(in);
                operator = Operator.readFrom(in);
                column = metadata.getColumnDefinition(name);
                if (!metadata.isCompactTable() && column == null)
                    throw new RuntimeException("Unknown (or dropped) column " + UTF8Type.instance.getString(name) + " during deserialization");

                if (version < MessagingService.VERSION_30)
                {
                    if (column == null)
                        kind = Kind.THRIFT_DYN_EXPR;
                    else if (column.type instanceof MapType && operator == Operator.EQ)
                        kind = Kind.MAP_EQUALITY;
                    else
                        kind = Kind.SIMPLE;
                }

                assert kind != null;
                switch (kind)
                {
                    case SIMPLE:
                        return new SimpleExpression(column, operator, ByteBufferUtil.readWithShortLength(in));
                    case MAP_EQUALITY:
                        ByteBuffer key, value;
                        if (version < MessagingService.VERSION_30)
                        {
                            ByteBuffer composite = ByteBufferUtil.readWithShortLength(in);
                            key = CompositeType.extractComponent(composite, 0);
                            value = CompositeType.extractComponent(composite, 0);
                        }
                        else
                        {
                            key = ByteBufferUtil.readWithShortLength(in);
                            value = ByteBufferUtil.readWithShortLength(in);
                        }
                        return new MapEqualityExpression(column, key, operator, value);
                    case THRIFT_DYN_EXPR:
                        return new ThriftExpression(metadata, name, operator, ByteBufferUtil.readWithShortLength(in));
                }
                throw new AssertionError();
            }


            public long serializedSize(Expression expression, int version)
            {
                // version 3.0+ includes a byte for Kind
                long size = version >= MessagingService.VERSION_30 ? 1 : 0;

                // Custom expressions include neither a column or operator, but all
                // other expressions do. Also, custom expressions are 3.0+ only, so
                // the column & operator will always be the first things written for
                // any pre-3.0 version
                if (expression.kind() != Kind.CUSTOM && expression.kind() != Kind.USER)
                    size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes)
                            + expression.operator.serializedSize();

                switch (expression.kind())
                {
                    case SIMPLE:
                        size += ByteBufferUtil.serializedSizeWithShortLength(((SimpleExpression)expression).value);
                        break;
                    case MAP_EQUALITY:
                        MapEqualityExpression mexpr = (MapEqualityExpression)expression;
                        if (version < MessagingService.VERSION_30)
                            size += ByteBufferUtil.serializedSizeWithShortLength(mexpr.getIndexValue());
                        else
                            size += ByteBufferUtil.serializedSizeWithShortLength(mexpr.key)
                                  + ByteBufferUtil.serializedSizeWithShortLength(mexpr.value);
                        break;
                    case THRIFT_DYN_EXPR:
                        size += ByteBufferUtil.serializedSizeWithShortLength(((ThriftExpression)expression).value);
                        break;
                    case CUSTOM:
                        if (version >= MessagingService.VERSION_30)
                            size += IndexMetadata.serializer.serializedSize(((CustomExpression)expression).targetIndex, version)
                                   + ByteBufferUtil.serializedSizeWithShortLength(expression.value);
                        break;
                    case USER:
                        if (version >= MessagingService.VERSION_30)
                            size += UserExpression.serializedSize((UserExpression)expression, version);
                }
                return size;
            }
        }
    }

    /**
     * An expression of the form 'column' 'op' 'value'.
     */
    public static class SimpleExpression extends Expression
    {
        SimpleExpression(ColumnDefinition column, Operator operator, ByteBuffer value)
        {
            super(column, operator, value);
        }

        public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey partitionKey, Row row)
        {
            // We support null conditions for LWT (in ColumnCondition) but not for RowFilter.
            // TODO: we should try to merge both code someday.
            assert value != null;

            switch (operator)
            {
                case EQ:
                case LT:
                case LTE:
                case GTE:
                case GT:
                    {
                        assert !column.isComplex() : "Only CONTAINS and CONTAINS_KEY are supported for 'complex' types";

                        // In order to support operators on Counter types, their value has to be extracted from internal
                        // representation. See CASSANDRA-11629
                        if (column.type.isCounter())
                        {
                            ByteBuffer foundValue = getValue(metadata, partitionKey, row);
                            if (foundValue == null)
                                return false;

                            ByteBuffer counterValue = LongType.instance.decompose(CounterContext.instance().total(foundValue));
                            return operator.isSatisfiedBy(LongType.instance, counterValue, value);
                        }
                        else
                        {
                            // Note that CQL expression are always of the form 'x < 4', i.e. the tested value is on the left.
                            ByteBuffer foundValue = getValue(metadata, partitionKey, row);
                            return foundValue != null && operator.isSatisfiedBy(column.type, foundValue, value);
                        }
                    }
                case NEQ:
                case LIKE_PREFIX:
                case LIKE_SUFFIX:
                case LIKE_CONTAINS:
                case LIKE_MATCHES:
                    {
                        assert !column.isComplex() : "Only CONTAINS and CONTAINS_KEY are supported for 'complex' types";
                        ByteBuffer foundValue = getValue(metadata, partitionKey, row);
                        // Note that CQL expression are always of the form 'x < 4', i.e. the tested value is on the left.
                        return foundValue != null && operator.isSatisfiedBy(column.type, foundValue, value);
                    }
                case CONTAINS:
                    assert column.type.isCollection();
                    CollectionType<?> type = (CollectionType<?>)column.type;
                    if (column.isComplex())
                    {
                        ComplexColumnData complexData = row.getComplexColumnData(column);
                        if (complexData != null)
                        {
                            for (Cell cell : complexData)
                            {
                                if (type.kind == CollectionType.Kind.SET)
                                {
                                    if (type.nameComparator().compare(cell.path().get(0), value) == 0)
                                        return true;
                                }
                                else
                                {
                                    if (type.valueComparator().compare(cell.value(), value) == 0)
                                        return true;
                                }
                            }
                        }
                        return false;
                    }
                    else
                    {
                        ByteBuffer foundValue = getValue(metadata, partitionKey, row);
                        if (foundValue == null)
                            return false;

                        switch (type.kind)
                        {
                            case LIST:
                                ListType<?> listType = (ListType<?>)type;
                                return listType.compose(foundValue).contains(listType.getElementsType().compose(value));
                            case SET:
                                SetType<?> setType = (SetType<?>)type;
                                return setType.compose(foundValue).contains(setType.getElementsType().compose(value));
                            case MAP:
                                MapType<?,?> mapType = (MapType<?, ?>)type;
                                return mapType.compose(foundValue).containsValue(mapType.getValuesType().compose(value));
                        }
                        throw new AssertionError();
                    }
                case CONTAINS_KEY:
                    assert column.type.isCollection() && column.type instanceof MapType;
                    MapType<?, ?> mapType = (MapType<?, ?>)column.type;
                    if (column.isComplex())
                    {
                         return row.getCell(column, CellPath.create(value)) != null;
                    }
                    else
                    {
                        ByteBuffer foundValue = getValue(metadata, partitionKey, row);
                        return foundValue != null && mapType.getSerializer().getSerializedValue(foundValue, value, mapType.getKeysType()) != null;
                    }

                case IN:
                    // It wouldn't be terribly hard to support this (though doing so would imply supporting
                    // IN for 2ndary index) but currently we don't.
                    throw new AssertionError();
            }
            throw new AssertionError();
        }

        @Override
        public String toString()
        {
            AbstractType<?> type = column.type;
            switch (operator)
            {
                case CONTAINS:
                    assert type instanceof CollectionType;
                    CollectionType<?> ct = (CollectionType<?>)type;
                    type = ct.kind == CollectionType.Kind.SET ? ct.nameComparator() : ct.valueComparator();
                    break;
                case CONTAINS_KEY:
                    assert type instanceof MapType;
                    type = ((MapType<?, ?>)type).nameComparator();
                    break;
                case IN:
                    type = ListType.getInstance(type, false);
                    break;
                default:
                    break;
            }
            return String.format("%s %s %s", column.name, operator, type.getString(value));
        }

        @Override
        protected Kind kind()
        {
            return Kind.SIMPLE;
        }
    }

    /**
     * An expression of the form 'column' ['key'] = 'value' (which is only
     * supported when 'column' is a map).
     */
    private static class MapEqualityExpression extends Expression
    {
        private final ByteBuffer key;

        public MapEqualityExpression(ColumnDefinition column, ByteBuffer key, Operator operator, ByteBuffer value)
        {
            super(column, operator, value);
            assert column.type instanceof MapType && operator == Operator.EQ;
            this.key = key;
        }

        @Override
        public void validate() throws InvalidRequestException
        {
            checkNotNull(key, "Unsupported null map key for column %s", column.name);
            checkBindValueSet(key, "Unsupported unset map key for column %s", column.name);
            checkNotNull(value, "Unsupported null map value for column %s", column.name);
            checkBindValueSet(value, "Unsupported unset map value for column %s", column.name);
        }

        @Override
        public ByteBuffer getIndexValue()
        {
            return CompositeType.build(key, value);
        }

        public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey partitionKey, Row row)
        {
            assert key != null;
            // We support null conditions for LWT (in ColumnCondition) but not for RowFilter.
            // TODO: we should try to merge both code someday.
            assert value != null;

            if (row.isStatic() != column.isStatic())
                return true;

            MapType<?, ?> mt = (MapType<?, ?>)column.type;
            if (column.isComplex())
            {
                Cell cell = row.getCell(column, CellPath.create(key));
                return cell != null && mt.valueComparator().compare(cell.value(), value) == 0;
            }
            else
            {
                ByteBuffer serializedMap = getValue(metadata, partitionKey, row);
                if (serializedMap == null)
                    return false;

                ByteBuffer foundValue = mt.getSerializer().getSerializedValue(serializedMap, key, mt.getKeysType());
                return foundValue != null && mt.valueComparator().compare(foundValue, value) == 0;
            }
        }

        @Override
        public String toString()
        {
            MapType<?, ?> mt = (MapType<?, ?>)column.type;
            return String.format("%s[%s] = %s", column.name, mt.nameComparator().getString(key), mt.valueComparator().getString(value));
        }

        @Override
        public boolean equals(Object o)
        {
            if (this == o)
                return true;

            if (!(o instanceof MapEqualityExpression))
                return false;

            MapEqualityExpression that = (MapEqualityExpression)o;

            return Objects.equal(this.column.name, that.column.name)
                && Objects.equal(this.operator, that.operator)
                && Objects.equal(this.key, that.key)
                && Objects.equal(this.value, that.value);
        }

        @Override
        public int hashCode()
        {
            return Objects.hashCode(column.name, operator, key, value);
        }

        @Override
        protected Kind kind()
        {
            return Kind.MAP_EQUALITY;
        }
    }

    /**
     * An expression of the form 'name' = 'value', but where 'name' is actually the
     * clustering value for a compact table. This is only for thrift.
     */
    private static class ThriftExpression extends Expression
    {
        public ThriftExpression(CFMetaData metadata, ByteBuffer name, Operator operator, ByteBuffer value)
        {
            super(makeDefinition(metadata, name), operator, value);
            assert metadata.isCompactTable();
        }

        private static ColumnDefinition makeDefinition(CFMetaData metadata, ByteBuffer name)
        {
            ColumnDefinition def = metadata.getColumnDefinition(name);
            if (def != null)
                return def;

            // In thrift, we actually allow expression on non-defined columns for the sake of filtering. To accomodate
            // this we create a "fake" definition. This is messy but it works so is probably good enough.
            return ColumnDefinition.regularDef(metadata, name, metadata.compactValueColumn().type);
        }

        public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey partitionKey, Row row)
        {
            assert value != null;

            // On thrift queries, even if the column expression is a "static" one, we'll have convert it as a "dynamic"
            // one in ThriftResultsMerger, so we always expect it to be a dynamic one. Further, we expect this is only
            // called when the row clustering does match the column (see ThriftFilter above).
            assert row.clustering().equals(makeCompactClustering(metadata, column.name.bytes));
            Cell cell = row.getCell(metadata.compactValueColumn());
            return cell != null && operator.isSatisfiedBy(column.type, cell.value(), value);
        }

        @Override
        public String toString()
        {
            return String.format("%s %s %s", column.name, operator, column.type.getString(value));
        }

        @Override
        protected Kind kind()
        {
            return Kind.THRIFT_DYN_EXPR;
        }
    }

    /**
     * A custom index expression for use with 2i implementations which support custom syntax and which are not
     * necessarily linked to a single column in the base table.
     */
    public static final class CustomExpression extends Expression
    {
        private final IndexMetadata targetIndex;
        private final CFMetaData cfm;

        public CustomExpression(CFMetaData cfm, IndexMetadata targetIndex, ByteBuffer value)
        {
            // The operator is not relevant, but Expression requires it so for now we just hardcode EQ
            super(makeDefinition(cfm, targetIndex), Operator.EQ, value);
            this.targetIndex = targetIndex;
            this.cfm = cfm;
        }

        private static ColumnDefinition makeDefinition(CFMetaData cfm, IndexMetadata index)
        {
            // Similarly to how we handle non-defined columns in thift, we create a fake column definition to
            // represent the target index. This is definitely something that can be improved though.
            return ColumnDefinition.regularDef(cfm, ByteBuffer.wrap(index.name.getBytes()), BytesType.instance);
        }

        public IndexMetadata getTargetIndex()
        {
            return targetIndex;
        }

        public ByteBuffer getValue()
        {
            return value;
        }

        public String toString()
        {
            return String.format("expr(%s, %s)",
                                 targetIndex.name,
                                 Keyspace.openAndGetStore(cfm)
                                         .indexManager
                                         .getIndex(targetIndex)
                                         .customExpressionValueType());
        }

        protected Kind kind()
        {
            return Kind.CUSTOM;
        }

        // Filtering by custom expressions isn't supported yet, so just accept any row
        public boolean isSatisfiedBy(CFMetaData metadata, DecoratedKey partitionKey, Row row)
        {
            return true;
        }
    }

    /**
     * A user defined filtering expression. These may be added to RowFilter programmatically by a
     * QueryHandler implementation. No concrete implementations are provided and adding custom impls
     * to the classpath is a task for operators (needless to say, this is something of a power
     * user feature). Care must also be taken to register implementations, via the static register
     * method during system startup. An implementation and its corresponding Deserializer must be
     * registered before sending or receiving any messages containing expressions of that type.
     * Use of custom filtering expressions in a mixed version cluster should be handled with caution
     * as the order in which types are registered is significant: if continuity of use during upgrades
     * is important, new types should registered last and obsoleted types should still be registered (
     * or dummy implementations registered in their place) to preserve consistent identifiers across
     * the cluster).
     *
     * During serialization, the identifier for the Deserializer implementation is prepended to the
     * implementation specific payload. To deserialize, the identifier is read first to obtain the
     * Deserializer, which then provides the concrete expression instance.
     */
    public static abstract class UserExpression extends Expression
    {
        private static final DeserializerRegistry deserializers = new DeserializerRegistry();
        private static final class DeserializerRegistry
        {
            private final AtomicInteger counter = new AtomicInteger(0);
            private final ConcurrentMap<Integer, Deserializer> deserializers = new ConcurrentHashMap<>();
            private final ConcurrentMap<Class<? extends UserExpression>, Integer> registeredClasses = new ConcurrentHashMap<>();

            public void registerUserExpressionClass(Class<? extends UserExpression> expressionClass,
                                                    UserExpression.Deserializer deserializer)
            {
                int id = registeredClasses.computeIfAbsent(expressionClass, (cls) -> counter.getAndIncrement());
                deserializers.put(id, deserializer);

                logger.debug("Registered user defined expression type {} and serializer {} with identifier {}",
                             expressionClass.getName(), deserializer.getClass().getName(), id);
            }

            public Integer getId(UserExpression expression)
            {
                return registeredClasses.get(expression.getClass());
            }

            public Deserializer getDeserializer(int id)
            {
                return deserializers.get(id);
            }
        }

        protected static abstract class Deserializer
        {
            protected abstract UserExpression deserialize(DataInputPlus in,
                                                          int version,
                                                          CFMetaData metadata) throws IOException;
        }

        public static void register(Class<? extends UserExpression> expressionClass, Deserializer deserializer)
        {
            deserializers.registerUserExpressionClass(expressionClass, deserializer);
        }

        private static UserExpression deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException
        {
            int id = in.readInt();
            Deserializer deserializer = deserializers.getDeserializer(id);
            assert deserializer != null : "No user defined expression type registered with id " + id;
            return deserializer.deserialize(in, version, metadata);
        }

        private static void serialize(UserExpression expression, DataOutputPlus out, int version) throws IOException
        {
            Integer id = deserializers.getId(expression);
            assert id != null : "User defined expression type " + expression.getClass().getName() + " is not registered";
            out.writeInt(id);
            expression.serialize(out, version);
        }

        private static long serializedSize(UserExpression expression, int version)
        {   // 4 bytes for the expression type id
            return 4 + expression.serializedSize(version);
        }

        protected UserExpression(ColumnDefinition column, Operator operator, ByteBuffer value)
        {
            super(column, operator, value);
        }

        protected Kind kind()
        {
            return Kind.USER;
        }

        protected abstract void serialize(DataOutputPlus out, int version) throws IOException;
        protected abstract long serializedSize(int version);
    }

    public static class Serializer
    {
        public void serialize(RowFilter filter, DataOutputPlus out, int version) throws IOException
        {
            out.writeBoolean(filter instanceof ThriftFilter);
            out.writeUnsignedVInt(filter.expressions.size());
            for (Expression expr : filter.expressions)
                Expression.serializer.serialize(expr, out, version);

        }

        public RowFilter deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException
        {
            boolean forThrift = in.readBoolean();
            int size = (int)in.readUnsignedVInt();
            List<Expression> expressions = new ArrayList<>(size);
            for (int i = 0; i < size; i++)
                expressions.add(Expression.serializer.deserialize(in, version, metadata));

            return forThrift
                 ? new ThriftFilter(expressions)
                 : new CQLFilter(expressions);
        }

        public long serializedSize(RowFilter filter, int version)
        {
            long size = 1 // forThrift
                      + TypeSizes.sizeofUnsignedVInt(filter.expressions.size());
            for (Expression expr : filter.expressions)
                size += Expression.serializer.serializedSize(expr, version);
            return size;
        }
    }
}
