PHOENIX-6506 : Tenant Connection is not able to access/validate Global Sequences (#1261)

Signed-off-by: Viraj Jasani <vjasani@apache.org>
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
index 8365ca0..1d53b16 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.end2end;
 
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.apache.phoenix.util.TestUtil.closeStatement;
 import static org.apache.phoenix.util.TestUtil.closeStmtAndConn;
 import static org.junit.Assert.assertEquals;
@@ -44,11 +45,14 @@
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.SequenceNotFoundException;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Assert;
 import org.junit.Test;
 
 
@@ -670,5 +674,146 @@
             assertTrue(next.containsColumn(Bytes.toBytes("CF2"), PInteger.INSTANCE.toBytes(3)));
         }
     }
-    
+
+    @Test
+    public void testUpsertValueWithDiffSequenceAndConnections() throws Exception {
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(true);
+            PreparedStatement createTableStatement = conn.prepareStatement(String.format("CREATE TABLE IF NOT EXISTS " +
+                    "%s (SERVICE VARCHAR NOT NULL, SEQUENCE_NUMBER BIGINT NOT NULL , " +
+                    "CONSTRAINT PK PRIMARY KEY (SERVICE, SEQUENCE_NUMBER)) MULTI_TENANT = TRUE", tableName));
+            createTableStatement.execute();
+        }
+
+        testGlobalSequenceUpsertWithTenantConnection(tableName);
+        testGlobalSequenceUpsertWithGlobalConnection(tableName);
+        testTenantSequenceUpsertWithSameTenantConnection(tableName);
+        testTenantSequenceUpsertWithDifferentTenantConnection(tableName);
+        testTenantSequenceUpsertWithGlobalConnection(tableName);
+
+    }
+
+    private void testTenantSequenceUpsertWithGlobalConnection(String tableName) throws Exception {
+        String sequenceName = generateUniqueSequenceName();
+
+        try (Connection conn = getTenantConnection("PHOENIX")) {
+            conn.setAutoCommit(true);
+            PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " +
+                    "IF NOT EXISTS %s", sequenceName));
+            createSequenceStatement.execute();
+        }
+
+        try (Connection tenantConn = DriverManager.getConnection(getUrl())) {
+            tenantConn.setAutoCommit(true);
+            Statement executeUpdateStatement = tenantConn.createStatement();
+            try {
+                executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SERVICE, SEQUENCE_NUMBER) VALUES " +
+                        "( 'PHOENIX', NEXT VALUE FOR %s)", tableName, sequenceName));
+                Assert.fail();
+            } catch (SequenceNotFoundException e) {
+                assertTrue(true);
+            } catch (Exception e) {
+                Assert.fail();
+            }
+        }
+    }
+
+    private void testTenantSequenceUpsertWithDifferentTenantConnection(String tableName) throws Exception {
+        String sequenceName = generateUniqueSequenceName();
+
+        try (Connection conn = getTenantConnection("PHOENIX")) {
+            conn.setAutoCommit(true);
+            PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " +
+                    "IF NOT EXISTS %s", sequenceName));
+            createSequenceStatement.execute();
+        }
+
+        try (Connection tenantConn = getTenantConnection("HBASE")) {
+            tenantConn.setAutoCommit(true);
+
+            Statement executeUpdateStatement = tenantConn.createStatement();
+            try {
+                executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SEQUENCE_NUMBER) VALUES " +
+                        "( NEXT VALUE FOR %s)", tableName, sequenceName));
+                Assert.fail();
+            } catch (SequenceNotFoundException e) {
+                assertTrue(true);
+            } catch (Exception e) {
+                Assert.fail();
+            }
+        }
+    }
+
+    private void testTenantSequenceUpsertWithSameTenantConnection(String tableName) throws Exception {
+        String sequenceName = generateUniqueSequenceName();
+
+        try (Connection conn = getTenantConnection("ZOOKEEPER")) {
+            conn.setAutoCommit(true);
+            PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " +
+                    "IF NOT EXISTS %s", sequenceName));
+            createSequenceStatement.execute();
+            Statement executeUpdateStatement = conn.createStatement();
+            executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SEQUENCE_NUMBER) VALUES " +
+                    "( NEXT VALUE FOR %s)", tableName, sequenceName));
+            ResultSet rs = executeUpdateStatement.executeQuery("select * from " + tableName);
+            assertTrue(rs.next());
+            assertEquals("1", rs.getString(1));
+            assertFalse(rs.next());
+        }
+
+    }
+
+    private void testGlobalSequenceUpsertWithGlobalConnection(String tableName) throws Exception {
+        String sequenceName = generateUniqueSequenceName();
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(true);
+            PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " +
+                    "IF NOT EXISTS %s", sequenceName));
+            createSequenceStatement.execute();
+            Statement executeUpdateStatement = conn.createStatement();
+            executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SERVICE, SEQUENCE_NUMBER) VALUES " +
+                    "( 'PHOENIX', NEXT VALUE FOR %s)", tableName, sequenceName));
+            ResultSet rs = executeUpdateStatement.executeQuery("select * from " + tableName);
+            assertTrue(rs.next());
+            assertEquals("HBASE", rs.getString(1));
+            assertEquals("1", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals("PHOENIX", rs.getString(1));
+            assertEquals("1", rs.getString(2));
+            assertFalse(rs.next());
+        }
+    }
+
+    private void testGlobalSequenceUpsertWithTenantConnection(String tableName) throws Exception {
+        String sequenceName = generateUniqueSequenceName();
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(true);
+            PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " +
+                    "IF NOT EXISTS %s", sequenceName));
+            createSequenceStatement.execute();
+        }
+
+        try (Connection tenantConn = getTenantConnection("HBASE")) {
+            tenantConn.setAutoCommit(true);
+
+            Statement executeUpdateStatement = tenantConn.createStatement();
+            executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SEQUENCE_NUMBER) VALUES " +
+                    "( NEXT VALUE FOR %s)", tableName, sequenceName));
+
+            ResultSet rs = executeUpdateStatement.executeQuery("select * from " + tableName);
+            assertTrue(rs.next());
+            assertEquals("1", rs.getString(1));
+            assertFalse(rs.next());
+
+        }
+    }
+
+    private static Connection getTenantConnection(String tenantId) throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        props.setProperty(TENANT_ID_ATTRIB, tenantId);
+        return DriverManager.getConnection(getUrl(), props);
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index c9d1eee..a667d67 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -5006,7 +5006,7 @@
         for (SequenceAllocation sequenceAllocation : sequenceAllocations) {
             SequenceKey key = sequenceAllocation.getSequenceKey();
             Sequence newSequences = new Sequence(key);
-            Sequence sequence = sequenceMap.putIfAbsent(key, newSequences);
+            Sequence sequence = getSequence(sequenceAllocation);
             if (sequence == null) {
                 sequence = newSequences;
             }
@@ -5079,6 +5079,43 @@
         }
     }
 
+    /**
+     * checks if sequenceAllocation's sequence there in sequenceMap, also returns Global Sequences
+     * from Tenant sequenceAllocations
+     * @param sequenceAllocation
+     * @return
+     */
+
+    private Sequence getSequence(SequenceAllocation sequenceAllocation) {
+        SequenceKey key = sequenceAllocation.getSequenceKey();
+        if (key.getTenantId() == null) {
+            return sequenceMap.putIfAbsent(key, new Sequence(key));
+        } else {
+            Sequence sequence = sequenceMap.get(key);
+            if (sequence == null) {
+                return sequenceMap.entrySet().stream()
+                        .filter(entry -> compareSequenceKeysWithoutTenant(key, entry.getKey()))
+                        .findFirst()
+                        .map(Entry::getValue)
+                        .orElse(null);
+            } else {
+                return sequence;
+            }
+        }
+    }
+
+    private boolean compareSequenceKeysWithoutTenant(SequenceKey keyToCompare, SequenceKey availableKey) {
+        if (availableKey.getTenantId() != null) {
+            return false;
+        }
+        boolean sameSchema = keyToCompare.getSchemaName() == null ? availableKey.getSchemaName() == null :
+                keyToCompare.getSchemaName().equals(availableKey.getSchemaName());
+        if (!sameSchema) {
+            return false;
+        }
+        return keyToCompare.getSequenceName().equals(availableKey.getSequenceName());
+    }
+
     @Override
     public void clearTableFromCache(final byte[] tenantId, final byte[] schemaName, final byte[] tableName,
             final long clientTS) throws SQLException {