HCAT-567 HCatClient must allow retrieval of multiple partitions using a partial partition spec.(for branch)

git-svn-id: https://svn.apache.org/repos/asf/incubator/hcatalog/branches/branch-0.4@1425202 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 894db52..f738e92 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -71,6 +71,8 @@
   OPTIMIZATIONS
 
   BUG FIXES
+  HCAT-567 HCatClient must allow retrieval of multiple partitions using a partial partition spec.(mithun via avandana)
+
   HCAT-554 Loading data using HCatLoader() from a table on non default namenode fails.(amalakar via avandana)
 
   HCAT-566 HCatTable doesn't report partition columns correctly. (mithun via traviscrawford)
diff --git a/webhcat/java-client/src/main/java/org/apache/hcatalog/api/HCatClient.java b/webhcat/java-client/src/main/java/org/apache/hcatalog/api/HCatClient.java
index b275294..c059ddf 100644
--- a/webhcat/java-client/src/main/java/org/apache/hcatalog/api/HCatClient.java
+++ b/webhcat/java-client/src/main/java/org/apache/hcatalog/api/HCatClient.java
@@ -204,11 +204,30 @@
             throws HCatException;
 
     /**
+     * Gets all the partitions that match the specified (and possibly partial) partition specification.
+     * A partial partition-specification is one where not all partition-keys have associated values. For example,
+     * for a table ('myDb.myTable') with 2 partition keys (dt string, region string),
+     * if for each dt ('20120101', '20120102', etc.) there can exist 3 regions ('us', 'uk', 'in'), then,
+     *  1. Complete partition spec: getPartitions('myDb', 'myTable', {dt='20120101', region='us'}) would return 1 partition.
+     *  2. Partial  partition spec: getPartitions('myDb', 'myTable', {dt='20120101'}) would return all 3 partitions,
+     *                              with dt='20120101' (i.e. region = 'us', 'uk' and 'in').
+     * @param dbName The name of the database.
+     * @param tableName The name of the table.
+     * @param partitionSpec The partition specification. (Need not include all partition keys.)
+     * @return A list of partitions.
+     * @throws HCatException
+     */
+    public abstract List<HCatPartition> getPartitions(String dbName, String tableName,
+                                               Map<String,String> partitionSpec) throws HCatException;
+
+
+    /**
      * Gets the partition.
      *
      * @param dbName The database name.
      * @param tableName The table name.
-     * @param partitionSpec The partition specification, {[col_name,value],[col_name2,value2]}.
+     * @param partitionSpec The partition specification, {[col_name,value],[col_name2,value2]}. All partition-key-values
+     *                      must be specified.
      * @return An instance of HCatPartitionInfo.
      * @throws HCatException
      */
@@ -235,15 +254,20 @@
             throws HCatException;
 
     /**
-     * Drops partition.
-     *
+     * Drops partition(s) that match the specified (and possibly partial) partition specification.
+     * A partial partition-specification is one where not all partition-keys have associated values. For example,
+     * for a table ('myDb.myTable') with 2 partition keys (dt string, region string),
+     * if for each dt ('20120101', '20120102', etc.) there can exist 3 regions ('us', 'uk', 'in'), then,
+     *  1. Complete partition spec: dropPartitions('myDb', 'myTable', {dt='20120101', region='us'}) would drop 1 partition.
+     *  2. Partial  partition spec: dropPartitions('myDb', 'myTable', {dt='20120101'}) would drop all 3 partitions,
+     *                              with dt='20120101' (i.e. region = 'us', 'uk' and 'in').
      * @param dbName The database name.
      * @param tableName The table name.
      * @param partitionSpec The partition specification, {[col_name,value],[col_name2,value2]}.
      * @param ifExists Hive returns an error if the partition specified does not exist, unless ifExists is set to true.
      * @throws HCatException
      */
-    public abstract void dropPartition(String dbName, String tableName,
+    public abstract void dropPartitions(String dbName, String tableName,
             Map<String, String> partitionSpec, boolean ifExists)
             throws HCatException;
 
diff --git a/webhcat/java-client/src/main/java/org/apache/hcatalog/api/HCatClientHMSImpl.java b/webhcat/java-client/src/main/java/org/apache/hcatalog/api/HCatClientHMSImpl.java
index db0b5f9..4a3b79b 100644
--- a/webhcat/java-client/src/main/java/org/apache/hcatalog/api/HCatClientHMSImpl.java
+++ b/webhcat/java-client/src/main/java/org/apache/hcatalog/api/HCatClientHMSImpl.java
@@ -327,12 +327,44 @@
     }
 
     @Override
+    public List<HCatPartition> getPartitions(String dbName, String tblName, Map<String, String> partitionSpec) throws HCatException {
+        return listPartitionsByFilter(dbName, tblName, getFilterString(partitionSpec));
+    }
+
+    private static String getFilterString(Map<String, String> partitionSpec) {
+        final String AND = " AND ";
+        StringBuilder filter = new StringBuilder();
+        for (Map.Entry<String, String> entry : partitionSpec.entrySet()) {
+            filter.append(entry.getKey()).append("=").append("\"").append(entry.getValue()).append("\"").append(AND);
+        }
+
+        int length = filter.toString().length();
+        if (length > 0)
+            filter.delete(length - AND.length(), length);
+
+        return filter.toString();
+    }
+
+    @Override
     public HCatPartition getPartition(String dbName, String tableName,
             Map<String, String> partitionSpec) throws HCatException {
         HCatPartition partition = null;
         try {
+            List<HCatFieldSchema> partitionColumns = getTable(checkDB(dbName), tableName).getPartCols();
+            if (partitionColumns.size() != partitionSpec.size()) {
+                throw new HCatException("Partition-spec doesn't have the right number of partition keys.");
+            }
+
             ArrayList<String> ptnValues = new ArrayList<String>();
-            ptnValues.addAll(partitionSpec.values());
+            for (HCatFieldSchema partitionColumn : partitionColumns) {
+                String partKey = partitionColumn.getName();
+                if (partitionSpec.containsKey(partKey)) {
+                    ptnValues.add(partitionSpec.get(partKey)); // Partition-keys added in order.
+                }
+                else {
+                    throw new HCatException("Invalid partition-key specified: " + partKey);
+                }
+            }
             Partition hivePartition = hmsClient.getPartition(checkDB(dbName),
                     tableName, ptnValues);
             if (hivePartition != null) {
@@ -383,19 +415,22 @@
     }
 
     @Override
-    public void dropPartition(String dbName, String tableName,
+    public void dropPartitions(String dbName, String tableName,
             Map<String, String> partitionSpec, boolean ifExists)
             throws HCatException {
         try {
-            List<String> ptnValues = new ArrayList<String>();
-            ptnValues.addAll(partitionSpec.values());
-            hmsClient.dropPartition(checkDB(dbName), tableName, ptnValues,
-                    ifExists);
-        } catch (NoSuchObjectException e) {
-            if (!ifExists) {
-                throw new ObjectNotFoundException(
-                        "NoSuchObjectException while dropping partition.", e);
+            dbName = checkDB(dbName);
+            List<Partition> partitions = hmsClient.listPartitionsByFilter(dbName, tableName,
+                    getFilterString(partitionSpec), (short)-1);
+
+            for (Partition partition : partitions) {
+                dropPartition(partition, ifExists);
             }
+
+        } catch (NoSuchObjectException e) {
+            throw new ObjectNotFoundException(
+                    "NoSuchObjectException while dropping partition. " +
+                            "Either db(" + dbName + ") or table(" + tableName + ") missing.", e);
         } catch (MetaException e) {
             throw new HCatException("MetaException while dropping partition.",
                     e);
@@ -405,6 +440,18 @@
         }
     }
 
+    private void dropPartition(Partition partition, boolean ifExists)
+            throws HCatException, MetaException, TException {
+        try {
+            hmsClient.dropPartition(partition.getDbName(), partition.getTableName(), partition.getValues());
+        } catch (NoSuchObjectException e) {
+            if (!ifExists) {
+                throw new ObjectNotFoundException(
+                        "NoSuchObjectException while dropping partition: " + partition.getValues(), e);
+            }
+        }
+    }
+
     @Override
     public List<HCatPartition> listPartitionsByFilter(String dbName,
             String tblName, String filter) throws HCatException {
diff --git a/webhcat/java-client/src/test/java/org/apache/hcatalog/api/TestHCatClient.java b/webhcat/java-client/src/test/java/org/apache/hcatalog/api/TestHCatClient.java
index 2b230cf..546bb0b 100644
--- a/webhcat/java-client/src/test/java/org/apache/hcatalog/api/TestHCatClient.java
+++ b/webhcat/java-client/src/test/java/org/apache/hcatalog/api/TestHCatClient.java
@@ -215,7 +215,7 @@
         HCatPartition ptn = client.getPartition(dbName, tableName, firstPtn);
         assertTrue(ptn != null);
 
-        client.dropPartition(dbName, tableName, firstPtn, true);
+        client.dropPartitions(dbName, tableName, firstPtn, true);
         ptnList = client.listPartitionsByFilter(dbName,
                 tableName, null);
         assertTrue(ptnList.size() == 2);
@@ -503,6 +503,104 @@
     }
 
     @Test
+    public void testGetPartitionsWithPartialSpec() throws Exception {
+        try {
+            HCatClient client = HCatClient.create(new Configuration(hcatConf));
+            final String dbName = "myDb";
+            final String tableName = "myTable";
+
+            client.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE);
+
+            client.createDatabase(HCatCreateDBDesc.create(dbName).build());
+            List<HCatFieldSchema> columnSchema = Arrays.asList(new HCatFieldSchema("foo", Type.INT, ""),
+                    new HCatFieldSchema("bar", Type.STRING, ""));
+
+            List<HCatFieldSchema> partitionSchema = Arrays.asList(new HCatFieldSchema("dt", Type.STRING, ""),
+                    new HCatFieldSchema("grid", Type.STRING, ""));
+
+            client.createTable(HCatCreateTableDesc.create(dbName, tableName, columnSchema).partCols(new ArrayList<HCatFieldSchema>(partitionSchema)).build());
+
+            Map<String, String> partitionSpec = new HashMap<String, String>();
+            partitionSpec.put("grid", "AB");
+            partitionSpec.put("dt", "2011_12_31");
+            client.addPartition(HCatAddPartitionDesc.create(dbName, tableName, "", partitionSpec).build());
+            partitionSpec.put("grid", "AB");
+            partitionSpec.put("dt", "2012_01_01");
+            client.addPartition(HCatAddPartitionDesc.create(dbName, tableName, "", partitionSpec).build());
+            partitionSpec.put("dt", "2012_01_01");
+            partitionSpec.put("grid", "OB");
+            client.addPartition(HCatAddPartitionDesc.create(dbName, tableName, "", partitionSpec).build());
+            partitionSpec.put("dt", "2012_01_01");
+            partitionSpec.put("grid", "XB");
+            client.addPartition(HCatAddPartitionDesc.create(dbName, tableName, "", partitionSpec).build());
+
+            Map<String, String> partialPartitionSpec = new HashMap<String, String>();
+            partialPartitionSpec.put("dt", "2012_01_01");
+
+            List<HCatPartition> partitions = client.getPartitions(dbName, tableName, partialPartitionSpec);
+            assertEquals("Unexpected number of partitions.", 3, partitions.size());
+            assertArrayEquals("Mismatched partition.", new String[]{"2012_01_01", "AB"}, partitions.get(0).getValues().toArray());
+            assertArrayEquals("Mismatched partition.", new String[]{"2012_01_01", "OB"}, partitions.get(1).getValues().toArray());
+            assertArrayEquals("Mismatched partition.", new String[]{"2012_01_01", "XB"}, partitions.get(2).getValues().toArray());
+
+            client.dropDatabase(dbName, false, HCatClient.DropDBMode.CASCADE);
+        }
+        catch (Exception unexpected) {
+            LOG.error("Unexpected exception!", unexpected);
+            assertTrue("Unexpected exception! " + unexpected.getMessage(), false);
+        }
+    }
+
+    @Test
+    public void testDropPartitionsWithPartialSpec() throws Exception {
+        try {
+            HCatClient client = HCatClient.create(new Configuration(hcatConf));
+            final String dbName = "myDb";
+            final String tableName = "myTable";
+
+            client.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE);
+
+            client.createDatabase(HCatCreateDBDesc.create(dbName).build());
+            List<HCatFieldSchema> columnSchema = Arrays.asList(new HCatFieldSchema("foo", Type.INT, ""),
+                    new HCatFieldSchema("bar", Type.STRING, ""));
+
+            List<HCatFieldSchema> partitionSchema = Arrays.asList(new HCatFieldSchema("dt", Type.STRING, ""),
+                    new HCatFieldSchema("grid", Type.STRING, ""));
+
+            client.createTable(HCatCreateTableDesc.create(dbName, tableName, columnSchema).partCols(new ArrayList<HCatFieldSchema>(partitionSchema)).build());
+
+            Map<String, String> partitionSpec = new HashMap<String, String>();
+            partitionSpec.put("grid", "AB");
+            partitionSpec.put("dt", "2011_12_31");
+            client.addPartition(HCatAddPartitionDesc.create(dbName, tableName, "", partitionSpec).build());
+            partitionSpec.put("grid", "AB");
+            partitionSpec.put("dt", "2012_01_01");
+            client.addPartition(HCatAddPartitionDesc.create(dbName, tableName, "", partitionSpec).build());
+            partitionSpec.put("dt", "2012_01_01");
+            partitionSpec.put("grid", "OB");
+            client.addPartition(HCatAddPartitionDesc.create(dbName, tableName, "", partitionSpec).build());
+            partitionSpec.put("dt", "2012_01_01");
+            partitionSpec.put("grid", "XB");
+            client.addPartition(HCatAddPartitionDesc.create(dbName, tableName, "", partitionSpec).build());
+
+            Map<String, String> partialPartitionSpec = new HashMap<String, String>();
+            partialPartitionSpec.put("dt", "2012_01_01");
+
+            client.dropPartitions(dbName, tableName, partialPartitionSpec, true);
+
+            List<HCatPartition> partitions = client.getPartitions(dbName, tableName);
+            assertEquals("Unexpected number of partitions.", 1, partitions.size());
+            assertArrayEquals("Mismatched partition.", new String[]{"2011_12_31", "AB"}, partitions.get(0).getValues().toArray());
+
+            client.dropDatabase(dbName, false, HCatClient.DropDBMode.CASCADE);
+        }
+        catch (Exception unexpected) {
+            LOG.error("Unexpected exception!", unexpected);
+            assertTrue("Unexpected exception! " + unexpected.getMessage(), false);
+        }
+    }
+
+    @Test
     public void testPartitionSchema() throws Exception {
         try {
             HCatClient client = HCatClient.create(new Configuration(hcatConf));