Warn about unqualified prepared statement only if it is select or modification statement

patch by Stefan Miklosovic; reviewed by Benjamin Lerer for CASSANDRA-18322
diff --git a/CHANGES.txt b/CHANGES.txt
index 5c390dd..8cd0c29 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Warn about unqualified prepared statement only if it is select or modification statement (CASSANDRA-18322)
  * Update legacy peers tables during node replacement (CASSANDRA-19782)
  * Refactor ColumnCondition (CASSANDRA-19620)
  * Allow configuring log format for Audit Logs (CASSANDRA-19792)
diff --git a/src/java/org/apache/cassandra/cql3/CQLStatement.java b/src/java/org/apache/cassandra/cql3/CQLStatement.java
index 1ac36be..badf9c3 100644
--- a/src/java/org/apache/cassandra/cql3/CQLStatement.java
+++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java
@@ -48,7 +48,7 @@
     }
 
     /**
-     * Return an Iterable over all of the functions (both native and user-defined) used by any component of the statement
+     * Return an Iterable over all the functions (both native and user-defined) used by any component of the statement
      *
      * @return functions all functions found (may contain duplicates)
      */
@@ -62,14 +62,14 @@
      *
      * @param state the current client state
      */
-    public void authorize(ClientState state);
+    void authorize(ClientState state);
 
     /**
      * Perform additional validation required by the statment. To be overriden by subclasses if needed.
      *
      * @param state the current client state
      */
-    public void validate(ClientState state);
+    void validate(ClientState state);
 
     /**
      * Execute the statement and return the resulting result or null if there is no result.
@@ -78,14 +78,14 @@
      * @param options options for this query (consistency, variables, pageSize, ...)
      * @param requestTime request enqueue / and start times;
      */
-    public ResultMessage execute(QueryState state, QueryOptions options, Dispatcher.RequestTime requestTime);
+    ResultMessage execute(QueryState state, QueryOptions options, Dispatcher.RequestTime requestTime);
 
     /**
      * Variant of execute used for internal query against the system tables, and thus only query the local node.
      *
      * @param state the current query state
      */
-    public ResultMessage executeLocally(QueryState state, QueryOptions options);
+    ResultMessage executeLocally(QueryState state, QueryOptions options);
 
     /**
      * Provides the context needed for audit logging statements.
@@ -93,14 +93,29 @@
     AuditLogContext getAuditLogContext();
 
     /**
-     * Whether or not this CQL Statement has LWT conditions
+     * Whether this CQL Statement has LWT conditions
      */
-    default public boolean hasConditions()
+    default boolean hasConditions()
     {
         return false;
     }
 
-    public static abstract class Raw
+    /**
+     * If this CQL statement is not fully qualified and this method returns true,
+     * then the warning will be emitted to the client if the statement is executed on
+     * a keyspace it was not prepared on.
+     * <p>
+     * A warning is also emitted if a prepare statement is used for other than
+     * modifications statements.
+     *
+     * @return true if this statement is eligible to be a prepared statement, false otherwise.
+     */
+    default boolean eligibleAsPreparedStatement()
+    {
+        return false;
+    }
+
+    abstract class Raw
     {
         protected VariableSpecifications bindVariables;
 
@@ -112,8 +127,8 @@
         public abstract CQLStatement prepare(ClientState state);
     }
 
-    public static interface SingleKeyspaceCqlStatement extends CQLStatement
+    interface SingleKeyspaceCqlStatement extends CQLStatement
     {
-        public String keyspace();
+        String keyspace();
     }
 }
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 8c8caf7..5659f97 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -741,6 +741,9 @@
         Prepared prepared = parseAndPrepare(queryString, clientState, false);
         CQLStatement statement = prepared.statement;
 
+        if (!statement.eligibleAsPreparedStatement())
+            clientState.warnAboutUneligiblePreparedStatement(hashWithKeyspace);
+
         int boundTerms = statement.getBindVariables().size();
         if (boundTerms > FBUtilities.MAX_UNSIGNED_SHORT)
             throw new InvalidRequestException(String.format("Too many markers(?). %d markers exceed the allowed maximum of %d", boundTerms, FBUtilities.MAX_UNSIGNED_SHORT));
@@ -759,7 +762,8 @@
         }
         else
         {
-            clientState.warnAboutUseWithPreparedStatements(hashWithKeyspace, clientState.getRawKeyspace());
+            if (prepared.statement.eligibleAsPreparedStatement())
+                clientState.warnAboutUseWithPreparedStatements(hashWithKeyspace, clientState.getRawKeyspace());
 
             ResultMessage.Prepared nonQualifiedWithKeyspace = storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared);
             ResultMessage.Prepared nonQualifiedWithNullKeyspace = storePreparedStatement(queryString, null, prepared);
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index a70a889..e510437 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -164,6 +164,12 @@
         return functions;
     }
 
+    @Override
+    public boolean eligibleAsPreparedStatement()
+    {
+        return true;
+    }
+
     public void authorize(ClientState state) throws InvalidRequestException, UnauthorizedException
     {
         for (ModificationStatement statement : statements)
@@ -643,7 +649,27 @@
         @Override
         public String keyspace()
         {
-            return null;
+            if (parsedStatements.isEmpty())
+                return null;
+
+            String currentKeyspace = null;
+            for (ModificationStatement.Parsed statement : parsedStatements)
+            {
+                String keyspace = statement.keyspace();
+                if (keyspace == null && currentKeyspace != null)
+                    return null;
+
+                if (keyspace != null && currentKeyspace == null)
+                {
+                    currentKeyspace = keyspace;
+                    continue;
+                }
+
+                if (currentKeyspace != null && !currentKeyspace.equals(keyspace))
+                    return null;
+            }
+
+            return currentKeyspace;
         }
 
         public BatchStatement prepare(ClientState state)
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 9de5dc1..e3662a6 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -177,6 +177,12 @@
         return functions;
     }
 
+    @Override
+    public boolean eligibleAsPreparedStatement()
+    {
+        return true;
+    }
+
     public void addFunctionsTo(List<Function> functions)
     {
         attrs.addFunctionsTo(functions);
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 899dd95..aebfffd 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -194,6 +194,12 @@
         return functions;
     }
 
+    @Override
+    public boolean eligibleAsPreparedStatement()
+    {
+        return true;
+    }
+
     private void addFunctionsTo(List<Function> functions)
     {
         selection.addFunctionsTo(functions);
diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java
index 10940c8..0751d96 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -97,6 +97,7 @@
     private volatile AuthenticatedUser user;
     private volatile String keyspace;
     private volatile boolean issuedPreparedStatementsUseWarning;
+    private volatile boolean issuedWarningForUneligiblePreparedStatements;
 
     private static final QueryHandler cqlQueryHandler;
     static
@@ -615,6 +616,15 @@
         }
     }
 
+    public void warnAboutUneligiblePreparedStatement(MD5Digest statementId)
+    {
+        if (!issuedWarningForUneligiblePreparedStatements)
+        {
+            ClientWarn.instance.warn(String.format("Prepared statements for other than modification and selection statements should be avoided, statement id: %s", statementId));
+            issuedWarningForUneligiblePreparedStatements = true;
+        }
+    }
+
     private static void validateKeyspace(String keyspace)
     {
         if (keyspace == null)
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 6199d54..5bed932 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -29,7 +29,6 @@
 import org.apache.cassandra.cql3.QueryHandler;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.ResultSet;
-import org.apache.cassandra.cql3.statements.BatchStatement;
 import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
@@ -136,14 +135,11 @@
             if (prepared == null)
                 throw new PreparedQueryNotFoundException(statementId);
 
-            if (!prepared.fullyQualified
-                && !Objects.equals(state.getClientState().getRawKeyspace(), prepared.keyspace)
-                // We can not reliably detect inconsistencies for batches yet
-                && !(prepared.statement instanceof BatchStatement)
-            )
+            if (!prepared.fullyQualified && prepared.statement.eligibleAsPreparedStatement() && !Objects.equals(state.getClientState().getRawKeyspace(), prepared.keyspace))
             {
                 state.getClientState().warnAboutUseWithPreparedStatements(statementId, prepared.keyspace);
-                String msg = String.format("Tried to execute a prepared unqalified statement on a keyspace it was not prepared on. " +
+
+                String msg = String.format("Tried to execute a prepared unqualified statement on a keyspace it was not prepared on. " +
                                            " Executing the resulting prepared statement will return unexpected results: %s (on keyspace %s, previously prepared on %s)",
                                            statementId, state.getClientState().getRawKeyspace(), prepared.keyspace);
                 nospam.error(msg);
diff --git a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
index ffd7e25..39c641d 100644
--- a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
@@ -18,7 +18,10 @@
 package org.apache.cassandra.cql3;
 
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
 import java.util.stream.Collectors;
 
 import org.junit.Before;
@@ -34,11 +37,14 @@
 import org.apache.cassandra.index.StubIndex;
 import org.apache.cassandra.serializers.BooleanSerializer;
 import org.apache.cassandra.serializers.Int32Serializer;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.transport.SimpleClient;
 import org.apache.cassandra.transport.messages.ResultMessage;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -56,6 +62,97 @@
     }
 
     @Test
+    public void testUnqualifiedPreparedSelectOrModificationStatementsEmitWarning()
+    {
+        for (String query : new String[]
+                            {
+                            "SELECT id, v1, v2 FROM %s WHERE id = 1",
+                            "INSERT INTO %s (id, v1, v2) VALUES (1, 2, 3)",
+                            "UPDATE %s SET v1 = 2, v2 = 3 where id = 1"
+                            })
+        {
+            assertWarningsOnPreparedStatements(query, true, true, true);
+        }
+    }
+
+    @Test
+    public void testQualifiedPreparedSelectOrModificationStatementsDoNotEmitWarning()
+    {
+        for (String query : new String[]
+                            {
+                            "SELECT id, v1, v2 FROM %keyspace%.%s WHERE id = 1",
+                            "INSERT INTO %keyspace%.%s (id, v1, v2) VALUES (1, 2, 3)",
+                            "UPDATE %keyspace%.%s SET v1 = 2, v2 = 3 where id = 1"
+                            })
+        {
+            assertWarningsOnPreparedStatements(query, false, true, true);
+            assertWarningsOnPreparedStatements(query, false, true, false);
+        }
+    }
+
+    @Test
+    public void testSchemaTransformationPreparedStatementEmitsWaring()
+    {
+        assertWarningsOnPreparedStatements("ALTER TABLE %s ADD c3 int", true, false, true);
+        assertWarningsOnPreparedStatements("ALTER TABLE %keyspace%.%s ADD c3 int", true, false, false);
+    }
+
+    @Test
+    public void testBatchPreparedStatementsEmitWarnings()
+    {
+        assertWarningsOnPreparedStatements("BEGIN BATCH INSERT INTO %s (id, v1, v2) VALUES (1,2,3) APPLY BATCH", true, true, true);
+
+        // this will evaluate a statement as unqualified because not all are qualified
+        assertWarningsOnPreparedStatements("BEGIN BATCH" +
+                                           "  INSERT INTO %keyspace%.%s (id, v1, v2) VALUES (1,2,3); " +
+                                           "  INSERT INTO %s (id, v1, v2) VALUES (3, 4, 5) " +
+                                           "APPLY BATCH;", true, true, true);
+
+        assertWarningsOnPreparedStatements("BEGIN BATCH INSERT INTO %keyspace%.%s (id, v1, v2) VALUES (1,2,3) APPLY BATCH;", false, true, true);
+        assertWarningsOnPreparedStatements("BEGIN BATCH INSERT INTO %keyspace%.%s (id, v1, v2) VALUES (1,2,3) APPLY BATCH;", false, true, false);
+    }
+
+    private void assertWarningsOnPreparedStatements(String query, boolean expectWarn, boolean forModificationOrSelectStatement, boolean useUse)
+    {
+        try
+        {
+            createKeyspace("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
+            createTable(currentKeyspace(),"CREATE TABLE %s (id int, v1 int, v2 int, primary key (id))");
+
+            ClientState clientState = ClientState.forInternalCalls();
+            if (useUse)
+                clientState.setKeyspace(currentKeyspace());
+
+            ClientWarn.instance.captureWarnings();
+
+            String maybeQueryWithKeyspace = query.replaceAll("%keyspace%", currentKeyspace());
+            String queryWithTable = maybeQueryWithKeyspace.replaceAll("%s", currentTable());
+
+            // two times is not a mistake, a warning is emitted just once
+            QueryProcessor.instance.prepare(queryWithTable, clientState);
+            QueryProcessor.instance.prepare(queryWithTable, clientState);
+
+            List<String> warnings = ClientWarn.instance.getWarnings();
+
+            if (expectWarn && forModificationOrSelectStatement)
+                assertTrue(warnings != null &&
+                           warnings.size() == 1 &&
+                           warnings.get(0).startsWith("`USE <keyspace>` with prepared statements is considered to be an anti-pattern"));
+            else if (expectWarn)
+                assertTrue(warnings != null &&
+                           warnings.size() == 1 &&
+                           warnings.get(0).startsWith("Prepared statements for other than modification and selection statements should be avoided,"));
+            else
+                assertNull(warnings);
+        }
+        finally
+        {
+            execute("DROP KEYSPACE " + currentKeyspace());
+            ClientWarn.instance.resetWarnings();
+        }
+    }
+
+    @Test
     public void testInvalidatePreparedStatementsOnDrop()
     {
         Session session = sessionNet(ProtocolVersion.V5);
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
index 1460a90..5fe3b9d 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
@@ -477,7 +477,7 @@
         List<ColumnMetadata> allColumns = com.google.common.collect.Lists.newArrayList(metadata.allColumnsInSelectOrder());
 
         StringBuilder sb = new StringBuilder();
-        sb.append("INSERT INTO ").append(quoteIdentifier(keyspaceName)).append(".").append(quoteIdentifier(tableName)).append(" (");
+        sb.append("INSERT INTO ").append(keyspaceName).append('.').append(quoteIdentifier(tableName)).append(" (");
         StringBuilder value = new StringBuilder();
         for (ColumnMetadata c : allColumns)
         {
@@ -542,7 +542,7 @@
                     StringBuilder sb = new StringBuilder();
                     if (!isKeyOnlyTable)
                     {
-                        sb.append("UPDATE ").append(quoteIdentifier(tableName)).append(" SET ");
+                        sb.append("UPDATE ").append(keyspaceName).append('.').append(quoteIdentifier(tableName)).append(" SET ");
                         //PK Columns
                         StringBuilder pred = new StringBuilder();
                         pred.append(" WHERE ");
@@ -595,7 +595,7 @@
                     }
                     else
                     {
-                        sb.append("INSERT INTO ").append(quoteIdentifier(tableName)).append(" (");
+                        sb.append("INSERT INTO ").append(keyspaceName).append('.').append(quoteIdentifier(tableName)).append(" (");
                         StringBuilder value = new StringBuilder();
                         for (com.datastax.driver.core.ColumnMetadata c : tableMetaData.getPrimaryKey())
                         {
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
index a1e90d0..cd4f78a 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
@@ -46,7 +46,10 @@
     @Override
     protected String buildQuery()
     {
-        StringBuilder query = new StringBuilder("UPDATE counter1 SET ");
+        StringBuilder query = new StringBuilder("UPDATE ")
+        .append(settings.schema.keyspace)
+        .append('.')
+        .append("counter1 SET ");
 
         // TODO : increment distribution subset of columns
         for (int i = 0; i < settings.columns.maxColumnsPerKey; i++)
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
index 5ba02b3..dda811f 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
@@ -48,7 +48,7 @@
     @Override
     protected String buildQuery()
     {
-        return "SELECT * FROM " + wrapInQuotes(type.table) + " WHERE KEY=?";
+        return "SELECT * FROM " + settings.schema.keyspace + '.' + wrapInQuotes(type.table) + " WHERE KEY=?";
     }
 
     @Override
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
index 255cf75..501c520 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
@@ -42,7 +42,11 @@
     @Override
     protected String buildQuery()
     {
-        StringBuilder query = new StringBuilder("UPDATE ").append(wrapInQuotes(type.table));
+        StringBuilder query = new StringBuilder("UPDATE ")
+        .append(settings.schema.keyspace)
+        .append('.')
+        .append(wrapInQuotes(type.table));
+
         if (settings.columns.timestamp != null)
             query.append(" USING TIMESTAMP ").append(settings.columns.timestamp);
 
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
index 8884919..3f60ad1 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
@@ -59,7 +59,7 @@
             }
         }
 
-        query.append(" FROM ").append(wrapInQuotes(type.table));
+        query.append(" FROM ").append(settings.schema.keyspace).append('.').append(wrapInQuotes(type.table));
         query.append(" WHERE KEY=?");
         return query.toString();
     }
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java
index 9313b0f..0eaa41e 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java
@@ -70,8 +70,6 @@
             //Keyspace
             client.execute(createKeyspaceStatementCQL3(), org.apache.cassandra.db.ConsistencyLevel.LOCAL_ONE);
 
-            client.execute("USE \""+keyspace+"\"", org.apache.cassandra.db.ConsistencyLevel.LOCAL_ONE);
-
             //Add standard1 and counter1
             client.execute(createStandard1StatementCQL3(settings), org.apache.cassandra.db.ConsistencyLevel.LOCAL_ONE);
             client.execute(createCounter1StatementCQL3(settings), org.apache.cassandra.db.ConsistencyLevel.LOCAL_ONE);
@@ -125,7 +123,8 @@
         StringBuilder b = new StringBuilder();
 
         b.append("CREATE TABLE IF NOT EXISTS ")
-         .append("standard1 (key blob PRIMARY KEY ");
+         .append(keyspace)
+         .append(".standard1 (key blob PRIMARY KEY ");
 
         try
         {
@@ -166,7 +165,8 @@
         StringBuilder b = new StringBuilder();
 
         b.append("CREATE TABLE IF NOT EXISTS ")
-         .append("counter1 (key blob PRIMARY KEY,");
+         .append(keyspace)
+         .append(".counter1 (key blob PRIMARY KEY,");
 
         try
         {