now() is being rejected in INSERTs when inside collections

patch by slebresne; reviewed by iamaleksey for CASSANDRA-5795
diff --git a/CHANGES.txt b/CHANGES.txt
index aaeb10b..71a956b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,7 @@
    after TTL expires (CASSANDRA-5762)
  * Sort nodetool help output (CASSANDRA-5776)
  * Fix column expiring during 2 phases compaction (CASSANDRA-5799)
+ * now() is being rejected in INSERTs when inside collections (CASSANDRA-5795)
 
 
 1.2.6
diff --git a/src/java/org/apache/cassandra/cql3/AbstractMarker.java b/src/java/org/apache/cassandra/cql3/AbstractMarker.java
index d8353a1..b4a4143 100644
--- a/src/java/org/apache/cassandra/cql3/AbstractMarker.java
+++ b/src/java/org/apache/cassandra/cql3/AbstractMarker.java
@@ -40,6 +40,11 @@
         boundNames[bindIndex] = receiver;
     }
 
+    public boolean containsBindMarker()
+    {
+        return true;
+    }
+
     /**
      * A parsed, but non prepared, bind marker.
      */
diff --git a/src/java/org/apache/cassandra/cql3/Lists.java b/src/java/org/apache/cassandra/cql3/Lists.java
index b579b8c..ae95dca 100644
--- a/src/java/org/apache/cassandra/cql3/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/Lists.java
@@ -19,6 +19,7 @@
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -60,34 +61,27 @@
             this.elements = elements;
         }
 
-        public Value prepare(ColumnSpecification receiver) throws InvalidRequestException
+        public Term prepare(ColumnSpecification receiver) throws InvalidRequestException
         {
             validateAssignableTo(receiver);
 
             ColumnSpecification valueSpec = Lists.valueSpecOf(receiver);
-            List<ByteBuffer> values = new ArrayList<ByteBuffer>(elements.size());
+            List<Term> values = new ArrayList<Term>(elements.size());
+            boolean allTerminal = true;
             for (Term.Raw rt : elements)
             {
                 Term t = rt.prepare(valueSpec);
 
-                if (t instanceof Term.NonTerminal)
+                if (t.containsBindMarker())
                     throw new InvalidRequestException(String.format("Invalid list literal for %s: bind variables are not supported inside collection literals", receiver));
 
-                // We don't allow prepared marker in collections, nor nested collections (for the later, prepare will throw an exception)
-                assert t instanceof Constants.Value;
-                ByteBuffer bytes = ((Constants.Value)t).bytes;
-                if (bytes == null)
-                    throw new InvalidRequestException("null is not supported inside collections");
+                if (t instanceof Term.NonTerminal)
+                    allTerminal = false;
 
-                // We don't support value > 64K because the serialization format encode the length as an unsigned short.
-                if (bytes.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
-                    throw new InvalidRequestException(String.format("List value is too long. List values are limited to %d bytes but %d bytes value provided",
-                                                                    FBUtilities.MAX_UNSIGNED_SHORT,
-                                                                    bytes.remaining()));
-
-                values.add(bytes);
+                values.add(t);
             }
-            return new Value(values);
+            DelayedValue value = new DelayedValue(values);
+            return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
         }
 
         private void validateAssignableTo(ColumnSpecification receiver) throws InvalidRequestException
@@ -156,6 +150,56 @@
         }
     }
 
+    /*
+     * Basically similar to a Value, but with some non-pure function (that need
+     * to be evaluated at execution time) in it.
+     *
+     * Note: this would also work for a list with bind markers, but we don't support
+     * that because 1) it's not excessively useful and 2) we wouldn't have a good
+     * column name to return in the ColumnSpecification for those markers (not a
+     * blocker per-se but we don't bother due to 1)).
+     */
+    public static class DelayedValue extends Term.NonTerminal
+    {
+        private final List<Term> elements;
+
+        public DelayedValue(List<Term> elements)
+        {
+            this.elements = elements;
+        }
+
+        public boolean containsBindMarker()
+        {
+            // False since we don't support them in collection
+            return false;
+        }
+
+        public void collectMarkerSpecification(ColumnSpecification[] boundNames)
+        {
+        }
+
+        public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+        {
+            List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(elements.size());
+            for (Term t : elements)
+            {
+                ByteBuffer bytes = t.bindAndGet(values);
+
+                if (bytes == null)
+                    throw new InvalidRequestException("null is not supported inside collections");
+
+                // We don't support value > 64K because the serialization format encode the length as an unsigned short.
+                if (bytes.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
+                    throw new InvalidRequestException(String.format("List value is too long. List values are limited to %d bytes but %d bytes value provided",
+                                                                    FBUtilities.MAX_UNSIGNED_SHORT,
+                                                                    bytes.remaining()));
+
+                buffers.add(bytes);
+            }
+            return new Value(buffers);
+        }
+    }
+
     public static class Marker extends AbstractMarker
     {
         protected Marker(int bindIndex, ColumnSpecification receiver)
diff --git a/src/java/org/apache/cassandra/cql3/Maps.java b/src/java/org/apache/cassandra/cql3/Maps.java
index e6d7c7e..e34e076 100644
--- a/src/java/org/apache/cassandra/cql3/Maps.java
+++ b/src/java/org/apache/cassandra/cql3/Maps.java
@@ -19,6 +19,9 @@
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -58,42 +61,29 @@
             this.entries = entries;
         }
 
-        public Value prepare(ColumnSpecification receiver) throws InvalidRequestException
+        public Term prepare(ColumnSpecification receiver) throws InvalidRequestException
         {
             validateAssignableTo(receiver);
 
             ColumnSpecification keySpec = Maps.keySpecOf(receiver);
             ColumnSpecification valueSpec = Maps.valueSpecOf(receiver);
-            Map<ByteBuffer, ByteBuffer> values = new TreeMap<ByteBuffer, ByteBuffer>(((MapType)receiver.type).keys);
+            Map<Term, Term> values = new HashMap<Term, Term>(entries.size());
+            boolean allTerminal = true;
             for (Pair<Term.Raw, Term.Raw> entry : entries)
             {
                 Term k = entry.left.prepare(keySpec);
                 Term v = entry.right.prepare(valueSpec);
 
-                if (k instanceof Term.NonTerminal || v instanceof Term.NonTerminal)
+                if (k.containsBindMarker() || v.containsBindMarker())
                     throw new InvalidRequestException(String.format("Invalid map literal for %s: bind variables are not supported inside collection literals", receiver));
 
-                // We don't support values > 64K because the serialization format encode the length as an unsigned short.
-                ByteBuffer keyBytes = ((Constants.Value)k).bytes;
-                if (keyBytes == null)
-                    throw new InvalidRequestException("null is not supported inside collections");
-                if (keyBytes.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
-                    throw new InvalidRequestException(String.format("Map key is too long. Map keys are limited to %d bytes but %d bytes keys provided",
-                                                                    FBUtilities.MAX_UNSIGNED_SHORT,
-                                                                    keyBytes.remaining()));
+                if (k instanceof Term.NonTerminal || v instanceof Term.NonTerminal)
+                    allTerminal = false;
 
-                ByteBuffer valueBytes = ((Constants.Value)v).bytes;
-                if (valueBytes == null)
-                    throw new InvalidRequestException("null is not supported inside collections");
-                if (valueBytes.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
-                    throw new InvalidRequestException(String.format("Map value is too long. Map values are limited to %d bytes but %d bytes value provided",
-                                                                    FBUtilities.MAX_UNSIGNED_SHORT,
-                                                                    valueBytes.remaining()));
-
-                if (values.put(keyBytes, valueBytes) != null)
-                    throw new InvalidRequestException(String.format("Invalid map literal: duplicate entry for key %s", entry.left));
+                values.put(k, v);
             }
-            return new Value(values);
+            DelayedValue value = new DelayedValue(((MapType)receiver.type).keys, values);
+            return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
         }
 
         private void validateAssignableTo(ColumnSpecification receiver) throws InvalidRequestException
@@ -179,6 +169,56 @@
         }
     }
 
+    // See Lists.DelayedValue
+    public static class DelayedValue extends Term.NonTerminal
+    {
+        private final Comparator<ByteBuffer> comparator;
+        private final Map<Term, Term> elements;
+
+        public DelayedValue(Comparator<ByteBuffer> comparator, Map<Term, Term> elements)
+        {
+            this.comparator = comparator;
+            this.elements = elements;
+        }
+
+        public boolean containsBindMarker()
+        {
+            // False since we don't support them in collection
+            return false;
+        }
+
+        public void collectMarkerSpecification(ColumnSpecification[] boundNames)
+        {
+        }
+
+        public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+        {
+            Map<ByteBuffer, ByteBuffer> buffers = new TreeMap<ByteBuffer, ByteBuffer>(comparator);
+            for (Map.Entry<Term, Term> entry : elements.entrySet())
+            {
+                // We don't support values > 64K because the serialization format encode the length as an unsigned short.
+                ByteBuffer keyBytes = entry.getKey().bindAndGet(values);
+                if (keyBytes == null)
+                    throw new InvalidRequestException("null is not supported inside collections");
+                if (keyBytes.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
+                    throw new InvalidRequestException(String.format("Map key is too long. Map keys are limited to %d bytes but %d bytes keys provided",
+                                                                    FBUtilities.MAX_UNSIGNED_SHORT,
+                                                                    keyBytes.remaining()));
+
+                ByteBuffer valueBytes = entry.getValue().bindAndGet(values);
+                if (valueBytes == null)
+                    throw new InvalidRequestException("null is not supported inside collections");
+                if (valueBytes.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
+                    throw new InvalidRequestException(String.format("Map value is too long. Map values are limited to %d bytes but %d bytes value provided",
+                                                                    FBUtilities.MAX_UNSIGNED_SHORT,
+                                                                    valueBytes.remaining()));
+
+                buffers.put(keyBytes, valueBytes);
+            }
+            return new Value(buffers);
+        }
+    }
+
     public static class Marker extends AbstractMarker
     {
         protected Marker(int bindIndex, ColumnSpecification receiver)
diff --git a/src/java/org/apache/cassandra/cql3/Sets.java b/src/java/org/apache/cassandra/cql3/Sets.java
index fed6625..748f269 100644
--- a/src/java/org/apache/cassandra/cql3/Sets.java
+++ b/src/java/org/apache/cassandra/cql3/Sets.java
@@ -20,6 +20,8 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
@@ -57,7 +59,7 @@
             this.elements = elements;
         }
 
-        public Term.Terminal prepare(ColumnSpecification receiver) throws InvalidRequestException
+        public Term prepare(ColumnSpecification receiver) throws InvalidRequestException
         {
             validateAssignableTo(receiver);
 
@@ -66,31 +68,24 @@
             if (receiver.type instanceof MapType && elements.isEmpty())
                 return new Maps.Value(Collections.<ByteBuffer, ByteBuffer>emptyMap());
 
+
             ColumnSpecification valueSpec = Sets.valueSpecOf(receiver);
-            Set<ByteBuffer> values = new TreeSet<ByteBuffer>(((SetType)receiver.type).elements);
+            Set<Term> values = new HashSet<Term>(elements.size());
+            boolean allTerminal = true;
             for (Term.Raw rt : elements)
             {
                 Term t = rt.prepare(valueSpec);
 
-                if (t instanceof Term.NonTerminal)
+                if (t.containsBindMarker())
                     throw new InvalidRequestException(String.format("Invalid set literal for %s: bind variables are not supported inside collection literals", receiver));
 
-                // We don't allow prepared marker in collections, nor nested collections (for the later, prepare will throw an exception)
-                assert t instanceof Constants.Value;
-                ByteBuffer bytes = ((Constants.Value)t).bytes;
-                if (bytes == null)
-                    throw new InvalidRequestException("null is not supported inside collections");
+                if (t instanceof Term.NonTerminal)
+                    allTerminal = false;
 
-                // We don't support value > 64K because the serialization format encode the length as an unsigned short.
-                if (bytes.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
-                    throw new InvalidRequestException(String.format("Set value is too long. Set values are limited to %d bytes but %d bytes value provided",
-                                                                    FBUtilities.MAX_UNSIGNED_SHORT,
-                                                                    bytes.remaining()));
-
-                if (!values.add(bytes))
-                    throw new InvalidRequestException(String.format("Invalid set literal: duplicate value %s", rt));
+                values.add(t);
             }
-            return new Value(values);
+            DelayedValue value = new DelayedValue(((SetType)receiver.type).elements, values);
+            return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
         }
 
         private void validateAssignableTo(ColumnSpecification receiver) throws InvalidRequestException
@@ -166,6 +161,50 @@
         }
     }
 
+    // See Lists.DelayedValue
+    public static class DelayedValue extends Term.NonTerminal
+    {
+        private final Comparator<ByteBuffer> comparator;
+        private final Set<Term> elements;
+
+        public DelayedValue(Comparator<ByteBuffer> comparator, Set<Term> elements)
+        {
+            this.comparator = comparator;
+            this.elements = elements;
+        }
+
+        public boolean containsBindMarker()
+        {
+            // False since we don't support them in collection
+            return false;
+        }
+
+        public void collectMarkerSpecification(ColumnSpecification[] boundNames)
+        {
+        }
+
+        public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+        {
+            Set<ByteBuffer> buffers = new TreeSet<ByteBuffer>(comparator);
+            for (Term t : elements)
+            {
+                ByteBuffer bytes = t.bindAndGet(values);
+
+                if (bytes == null)
+                    throw new InvalidRequestException("null is not supported inside collections");
+
+                // We don't support value > 64K because the serialization format encode the length as an unsigned short.
+                if (bytes.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
+                    throw new InvalidRequestException(String.format("Set value is too long. Set values are limited to %d bytes but %d bytes value provided",
+                                                                    FBUtilities.MAX_UNSIGNED_SHORT,
+                                                                    bytes.remaining()));
+
+                buffers.add(bytes);
+            }
+            return new Value(buffers);
+        }
+    }
+
     public static class Marker extends AbstractMarker
     {
         protected Marker(int bindIndex, ColumnSpecification receiver)
diff --git a/src/java/org/apache/cassandra/cql3/Term.java b/src/java/org/apache/cassandra/cql3/Term.java
index 347a459..f2fd74e 100644
--- a/src/java/org/apache/cassandra/cql3/Term.java
+++ b/src/java/org/apache/cassandra/cql3/Term.java
@@ -59,6 +59,15 @@
     public ByteBuffer bindAndGet(List<ByteBuffer> values) throws InvalidRequestException;
 
     /**
+     * Whether or not that term contains at least one bind marker.
+     *
+     * Note that this is slightly different from being or not a NonTerminal,
+     * because calls to non pure functions will be NonTerminal (see #5616)
+     * even if they don't have bind markers.
+     */
+    public abstract boolean containsBindMarker();
+
+    /**
      * A parsed, non prepared (thus untyped) term.
      *
      * This can be one of:
@@ -83,7 +92,11 @@
     }
 
     /**
-     * A terminal term, i.e. one without any bind marker.
+     * A terminal term, one that can be reduced to a byte buffer directly.
+     *
+     * This includes most terms that don't have a bind marker (an exception
+     * being delayed call for non pure function that are NonTerminal even
+     * if they don't have bind markers).
      *
      * This can be only one of:
      *   - a constant value
@@ -97,6 +110,13 @@
         public void collectMarkerSpecification(ColumnSpecification[] boundNames) {}
         public Terminal bind(List<ByteBuffer> values) { return this; }
 
+        // While some NonTerminal may not have bind markers, no Term can be Terminal
+        // with a bind marker
+        public boolean containsBindMarker()
+        {
+            return false;
+        }
+
         /**
          * @return the serialized value of this terminal.
          */
@@ -109,12 +129,14 @@
     }
 
     /**
-     * A non terminal term, i.e. one that contains at least one bind marker.
+     * A non terminal term, i.e. a term that can only be reduce to a byte buffer
+     * at execution time.
      *
-     * We distinguish between the following type of NonTerminal:
+     * We have the following type of NonTerminal:
      *   - marker for a constant value
      *   - marker for a collection value (list, set, map)
      *   - a function having bind marker
+     *   - a non pure function (even if it doesn't have bind marker - see #5616)
      */
     public abstract class NonTerminal implements Term
     {
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
index dcc3d8b..a6b86a2 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
@@ -72,6 +72,16 @@
         return fun.execute(buffers);
     }
 
+    public boolean containsBindMarker()
+    {
+        for (Term t : terms)
+        {
+            if (t.containsBindMarker())
+                return true;
+        }
+        return false;
+    }
+
     private static Term.Terminal makeTerminal(Function fun, ByteBuffer result) throws InvalidRequestException
     {
         if (!(fun.returnType() instanceof CollectionType))