PHOENIX-6506 : Tenant Connection is not able to access/validate Global Sequences (#1261)
Signed-off-by: Viraj Jasani <vjasani@apache.org>
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
index 8365ca0..1d53b16 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.end2end;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
import static org.apache.phoenix.util.TestUtil.closeStatement;
import static org.apache.phoenix.util.TestUtil.closeStmtAndConn;
import static org.junit.Assert.assertEquals;
@@ -44,11 +45,14 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.SequenceNotFoundException;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Assert;
import org.junit.Test;
@@ -670,5 +674,146 @@
assertTrue(next.containsColumn(Bytes.toBytes("CF2"), PInteger.INSTANCE.toBytes(3)));
}
}
-
+
+ @Test
+ public void testUpsertValueWithDiffSequenceAndConnections() throws Exception {
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(true);
+ PreparedStatement createTableStatement = conn.prepareStatement(String.format("CREATE TABLE IF NOT EXISTS " +
+ "%s (SERVICE VARCHAR NOT NULL, SEQUENCE_NUMBER BIGINT NOT NULL , " +
+ "CONSTRAINT PK PRIMARY KEY (SERVICE, SEQUENCE_NUMBER)) MULTI_TENANT = TRUE", tableName));
+ createTableStatement.execute();
+ }
+
+ testGlobalSequenceUpsertWithTenantConnection(tableName);
+ testGlobalSequenceUpsertWithGlobalConnection(tableName);
+ testTenantSequenceUpsertWithSameTenantConnection(tableName);
+ testTenantSequenceUpsertWithDifferentTenantConnection(tableName);
+ testTenantSequenceUpsertWithGlobalConnection(tableName);
+
+ }
+
+ private void testTenantSequenceUpsertWithGlobalConnection(String tableName) throws Exception {
+ String sequenceName = generateUniqueSequenceName();
+
+ try (Connection conn = getTenantConnection("PHOENIX")) {
+ conn.setAutoCommit(true);
+ PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " +
+ "IF NOT EXISTS %s", sequenceName));
+ createSequenceStatement.execute();
+ }
+
+ try (Connection tenantConn = DriverManager.getConnection(getUrl())) {
+ tenantConn.setAutoCommit(true);
+ Statement executeUpdateStatement = tenantConn.createStatement();
+ try {
+ executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SERVICE, SEQUENCE_NUMBER) VALUES " +
+ "( 'PHOENIX', NEXT VALUE FOR %s)", tableName, sequenceName));
+ Assert.fail();
+ } catch (SequenceNotFoundException e) {
+ assertTrue(true);
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+ }
+
+ private void testTenantSequenceUpsertWithDifferentTenantConnection(String tableName) throws Exception {
+ String sequenceName = generateUniqueSequenceName();
+
+ try (Connection conn = getTenantConnection("PHOENIX")) {
+ conn.setAutoCommit(true);
+ PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " +
+ "IF NOT EXISTS %s", sequenceName));
+ createSequenceStatement.execute();
+ }
+
+ try (Connection tenantConn = getTenantConnection("HBASE")) {
+ tenantConn.setAutoCommit(true);
+
+ Statement executeUpdateStatement = tenantConn.createStatement();
+ try {
+ executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SEQUENCE_NUMBER) VALUES " +
+ "( NEXT VALUE FOR %s)", tableName, sequenceName));
+ Assert.fail();
+ } catch (SequenceNotFoundException e) {
+ assertTrue(true);
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+ }
+
+ private void testTenantSequenceUpsertWithSameTenantConnection(String tableName) throws Exception {
+ String sequenceName = generateUniqueSequenceName();
+
+ try (Connection conn = getTenantConnection("ZOOKEEPER")) {
+ conn.setAutoCommit(true);
+ PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " +
+ "IF NOT EXISTS %s", sequenceName));
+ createSequenceStatement.execute();
+ Statement executeUpdateStatement = conn.createStatement();
+ executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SEQUENCE_NUMBER) VALUES " +
+ "( NEXT VALUE FOR %s)", tableName, sequenceName));
+ ResultSet rs = executeUpdateStatement.executeQuery("select * from " + tableName);
+ assertTrue(rs.next());
+ assertEquals("1", rs.getString(1));
+ assertFalse(rs.next());
+ }
+
+ }
+
+ private void testGlobalSequenceUpsertWithGlobalConnection(String tableName) throws Exception {
+ String sequenceName = generateUniqueSequenceName();
+
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(true);
+ PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " +
+ "IF NOT EXISTS %s", sequenceName));
+ createSequenceStatement.execute();
+ Statement executeUpdateStatement = conn.createStatement();
+ executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SERVICE, SEQUENCE_NUMBER) VALUES " +
+ "( 'PHOENIX', NEXT VALUE FOR %s)", tableName, sequenceName));
+ ResultSet rs = executeUpdateStatement.executeQuery("select * from " + tableName);
+ assertTrue(rs.next());
+ assertEquals("HBASE", rs.getString(1));
+ assertEquals("1", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("PHOENIX", rs.getString(1));
+ assertEquals("1", rs.getString(2));
+ assertFalse(rs.next());
+ }
+ }
+
+ private void testGlobalSequenceUpsertWithTenantConnection(String tableName) throws Exception {
+ String sequenceName = generateUniqueSequenceName();
+
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(true);
+ PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " +
+ "IF NOT EXISTS %s", sequenceName));
+ createSequenceStatement.execute();
+ }
+
+ try (Connection tenantConn = getTenantConnection("HBASE")) {
+ tenantConn.setAutoCommit(true);
+
+ Statement executeUpdateStatement = tenantConn.createStatement();
+ executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SEQUENCE_NUMBER) VALUES " +
+ "( NEXT VALUE FOR %s)", tableName, sequenceName));
+
+ ResultSet rs = executeUpdateStatement.executeQuery("select * from " + tableName);
+ assertTrue(rs.next());
+ assertEquals("1", rs.getString(1));
+ assertFalse(rs.next());
+
+ }
+ }
+
+ private static Connection getTenantConnection(String tenantId) throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ props.setProperty(TENANT_ID_ATTRIB, tenantId);
+ return DriverManager.getConnection(getUrl(), props);
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index c9d1eee..a667d67 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -5006,7 +5006,7 @@
for (SequenceAllocation sequenceAllocation : sequenceAllocations) {
SequenceKey key = sequenceAllocation.getSequenceKey();
Sequence newSequences = new Sequence(key);
- Sequence sequence = sequenceMap.putIfAbsent(key, newSequences);
+ Sequence sequence = getSequence(sequenceAllocation);
if (sequence == null) {
sequence = newSequences;
}
@@ -5079,6 +5079,43 @@
}
}
+ /**
+ * checks if sequenceAllocation's sequence there in sequenceMap, also returns Global Sequences
+ * from Tenant sequenceAllocations
+ * @param sequenceAllocation
+ * @return
+ */
+
+ private Sequence getSequence(SequenceAllocation sequenceAllocation) {
+ SequenceKey key = sequenceAllocation.getSequenceKey();
+ if (key.getTenantId() == null) {
+ return sequenceMap.putIfAbsent(key, new Sequence(key));
+ } else {
+ Sequence sequence = sequenceMap.get(key);
+ if (sequence == null) {
+ return sequenceMap.entrySet().stream()
+ .filter(entry -> compareSequenceKeysWithoutTenant(key, entry.getKey()))
+ .findFirst()
+ .map(Entry::getValue)
+ .orElse(null);
+ } else {
+ return sequence;
+ }
+ }
+ }
+
+ private boolean compareSequenceKeysWithoutTenant(SequenceKey keyToCompare, SequenceKey availableKey) {
+ if (availableKey.getTenantId() != null) {
+ return false;
+ }
+ boolean sameSchema = keyToCompare.getSchemaName() == null ? availableKey.getSchemaName() == null :
+ keyToCompare.getSchemaName().equals(availableKey.getSchemaName());
+ if (!sameSchema) {
+ return false;
+ }
+ return keyToCompare.getSequenceName().equals(availableKey.getSequenceName());
+ }
+
@Override
public void clearTableFromCache(final byte[] tenantId, final byte[] schemaName, final byte[] tableName,
final long clientTS) throws SQLException {