ACCESS-220:A badly formatted db level policy file shouldn't impact access to rest of the databases
diff --git a/access-provider/access-provider-file/src/main/java/org/apache/access/provider/file/SimplePolicyEngine.java b/access-provider/access-provider-file/src/main/java/org/apache/access/provider/file/SimplePolicyEngine.java
index 21f6d9c..8e68dc0 100644
--- a/access-provider/access-provider-file/src/main/java/org/apache/access/provider/file/SimplePolicyEngine.java
+++ b/access-provider/access-provider-file/src/main/java/org/apache/access/provider/file/SimplePolicyEngine.java
@@ -16,7 +16,11 @@
  */
 package org.apache.access.provider.file;
 
-import static org.apache.access.provider.file.PolicyFileConstants.*;
+import static org.apache.access.provider.file.PolicyFileConstants.DATABASES;
+import static org.apache.access.provider.file.PolicyFileConstants.GROUPS;
+import static org.apache.access.provider.file.PolicyFileConstants.ROLES;
+import static org.apache.access.provider.file.PolicyFileConstants.ROLE_SPLITTER;
+import static org.apache.access.provider.file.PolicyFileConstants.USERS;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -118,11 +122,11 @@
             if(perDbIni.containsKey(DATABASES)) {
               throw new ConfigurationException("Per-db policy files cannot contain " + DATABASES + " section");
             }
-            perDatabaseRoles.put(database, parseIni(database, perDbIni));
+            ImmutableSetMultimap<String, String> currentDbRoles = parseIni(database, perDbIni);
+            perDatabaseRoles.put(database, currentDbRoles);
             perDbResources.add(perDbPolicy);
           } catch (Exception e) {
             LOGGER.error("Error processing key " + entry.getKey() + ", skipping " + entry.getValue(), e);
-            throw e;
           }
         }
       }
diff --git a/access-provider/access-provider-file/src/test/java/org/apache/access/provider/file/TestPolicyParsingNegative.java b/access-provider/access-provider-file/src/test/java/org/apache/access/provider/file/TestPolicyParsingNegative.java
index 3e9fe0f..b85ef71 100644
--- a/access-provider/access-provider-file/src/test/java/org/apache/access/provider/file/TestPolicyParsingNegative.java
+++ b/access-provider/access-provider-file/src/test/java/org/apache/access/provider/file/TestPolicyParsingNegative.java
@@ -105,7 +105,7 @@
         Arrays.asList(new Authorizable[] {
             new Server("server1")
     }), Lists.newArrayList("admin")).get("admin");
-    Assert.assertTrue(permissions.toString(), permissions.isEmpty());
+    Assert.assertEquals(permissions.toString(), "[server=server1]");
     // test to ensure [databases] fails parsing of per-db file
     // by removing the user mapping from the per-db policy file
     policyFile.removeGroupsFromUser("admin1", "admin")
@@ -115,7 +115,7 @@
         Arrays.asList(new Authorizable[] {
             new Server("server1")
     }), Lists.newArrayList("admin")).get("admin");
-    Assert.assertTrue(permissions.toString(), permissions.isEmpty());
+    Assert.assertEquals(permissions.toString(), "[server=server1]");
   }
   @Test
   public void testDatabaseRequiredInRole() throws Exception {
@@ -177,4 +177,61 @@
     Assert.assertTrue(permissions.toString(), permissions.isEmpty());
   }
 
+  /**
+   * Create policy file with multiple per db files.
+   * Verify that a file with bad format is the only one that's ignored
+   * @throws Exception
+   */
+  @Test
+  public void testMultiDbWithErrors() throws Exception {
+    File db1PolicyFile = new File(baseDir, "db1.ini");
+    File db2PolicyFile = new File(baseDir, "db2.ini");
+
+    // global policy file
+    append("[databases]", globalPolicyFile);
+    append("db1 = " + db1PolicyFile.getPath(), globalPolicyFile);
+    append("db2 = " + db2PolicyFile.getPath(), globalPolicyFile);
+    append("[groups]", globalPolicyFile);
+    append("db3_group = db3_rule", globalPolicyFile);
+    append("[roles]", globalPolicyFile);
+    append("db3_rule = server=server1->db=db3->table=sales->action=select", globalPolicyFile);
+
+    //db1 policy file with badly formatted rule
+    append("[groups]", db1PolicyFile);
+    append("db1_group = bad_rule", db1PolicyFile);
+    append("[roles]", db1PolicyFile);
+    append("bad_rule = server=server1->db=customers->=purchases->action=", db1PolicyFile);
+
+    //db2 policy file with proper rule
+    append("[groups]", db2PolicyFile);
+    append("db2_group = db2_rule", db2PolicyFile);
+    append("[roles]", db2PolicyFile);
+    append("db2_rule = server=server1->db=db2->table=purchases->action=select", db2PolicyFile);
+
+    PolicyEngine policy = new SimplePolicyEngine(globalPolicyFile.getPath(), "server1");
+
+    // verify that the db1 rule is empty
+    ImmutableSet<String> permissions = policy.getPermissions(
+        Arrays.asList(new Authorizable[] {
+            new Server("server1"),
+            new Database("db1")
+    }), Lists.newArrayList("db1_group")).get("db1_group");
+    Assert.assertTrue(permissions.toString(), permissions.isEmpty());
+
+    permissions = policy.getPermissions(
+        Arrays.asList(new Authorizable[] {
+            new Server("server1"),
+            new Database("db2")
+    }), Lists.newArrayList("db2_group")).get("db2_group");
+    Assert.assertEquals(permissions.toString(), 1, permissions.size());
+
+    permissions = policy.getPermissions(
+        Arrays.asList(new Authorizable[] {
+            new Server("server1"),
+            new Database("db2")
+    }), Lists.newArrayList("db2_group")).get("db2_group");
+    Assert.assertEquals(permissions.toString(), 1, permissions.size());
+
+  }
+
 }
diff --git a/access-tests/src/test/java/org/apache/access/tests/e2e/TestPerDBConfiguration.java b/access-tests/src/test/java/org/apache/access/tests/e2e/TestPerDBConfiguration.java
index 17ab997..442f7b3 100644
--- a/access-tests/src/test/java/org/apache/access/tests/e2e/TestPerDBConfiguration.java
+++ b/access-tests/src/test/java/org/apache/access/tests/e2e/TestPerDBConfiguration.java
@@ -26,8 +26,6 @@
 import java.sql.SQLException;
 import java.sql.Statement;
 
-import junit.framework.Assert;
-
 import org.junit.After;
 import org.junit.Test;
 
@@ -151,6 +149,156 @@
     statement.close();
     connection.close();
   }
+
+  /**
+   * Multiple DB files with some containing badly formatted rules
+   * The privileges should work for good files
+   * No access for bad formatted ones
+   * @throws Exception
+   */
+  @Test
+  public void testMultiPerDBwithErrors() throws Exception {
+    String DB3_POLICY_FILE = "db3-policy-file.ini";
+    String DB4_POLICY_FILE = "db4-policy-file.ini";
+
+    context = createContext();
+    File policyFile = context.getPolicyFile();
+    File db2PolicyFile = new File(policyFile.getParent(), DB2_POLICY_FILE);
+    File db3PolicyFile = new File(policyFile.getParent(), DB3_POLICY_FILE);
+    File db4PolicyFile = new File(policyFile.getParent(), DB4_POLICY_FILE);
+    File dataDir = context.getDataDir();
+    //copy data file to test dir
+    File dataFile = new File(dataDir, MULTI_TYPE_DATA_FILE_NAME);
+    FileOutputStream to = new FileOutputStream(dataFile);
+    Resources.copy(Resources.getResource(MULTI_TYPE_DATA_FILE_NAME), to);
+    to.close();
+    //delete existing policy file; create new policy file
+    assertTrue("Could not delete " + policyFile, context.deletePolicyFile());
+    assertTrue("Could not delete " + db2PolicyFile,!db2PolicyFile.exists() || db2PolicyFile.delete());
+
+    String[] policyFileContents = {
+        // groups : role -> group
+        "[groups]",
+        "admin = all_server",
+        "user_group1 = select_tbl1",
+        "user_group2 = select_tbl2",
+        // roles: privileges -> role
+        "[roles]",
+        "all_server = server=server1",
+        "select_tbl1 = server=server1->db=db1->table=tbl1->action=select",
+        // users: users -> groups
+        "[users]",
+        "hive = admin",
+        "user_1 = user_group1",
+        "user_2 = user_group2",
+        "user_3 = user_group3",
+        "user_4 = user_group4",
+        "[databases]",
+        "db2 = " + db2PolicyFile.getPath(),
+        "db3 = " + db3PolicyFile.getPath(),
+        "db4 = " + db4PolicyFile.getPath(),
+    };
+    context.makeNewPolicy(policyFileContents);
+
+    String[] db2PolicyFileContents = {
+        "[groups]",
+        "user_group2 = select_tbl2",
+        "[roles]",
+        "select_tbl2 = server=server1->db=db2->table=tbl2->action=select"
+    };
+    String[] db3PolicyFileContents = {
+        "[groups]",
+        "user_group3 = select_tbl3_BAD",
+        "[roles]",
+        "select_tbl3_BAD = server=server1->db=db3------>table->action=select"
+    };
+    String[] db4PolicyFileContents = {
+        "[groups]",
+        "user_group4 = select_tbl4",
+        "[roles]",
+        "select_tbl4 = server=server1->db=db4->table=tbl4->action=select"
+    };
+
+    Files.write(Joiner.on("\n").join(db2PolicyFileContents), db2PolicyFile, Charsets.UTF_8);
+    Files.write(Joiner.on("\n").join(db3PolicyFileContents), db3PolicyFile, Charsets.UTF_8);
+    Files.write(Joiner.on("\n").join(db4PolicyFileContents), db4PolicyFile, Charsets.UTF_8);
+
+    // setup db objects needed by the test
+    Connection connection = context.createConnection("hive", "hive");
+    Statement statement = context.createStatement(connection);
+
+    statement.execute("DROP DATABASE IF EXISTS db1 CASCADE");
+    statement.execute("CREATE DATABASE db1");
+    statement.execute("USE db1");
+    statement.execute("CREATE TABLE tbl1(B INT, A STRING) " +
+                      " row format delimited fields terminated by '|'  stored as textfile");
+    statement.execute("LOAD DATA LOCAL INPATH '" + dataFile.getPath() + "' INTO TABLE tbl1");
+
+    statement.execute("DROP DATABASE IF EXISTS db2 CASCADE");
+    statement.execute("CREATE DATABASE db2");
+    statement.execute("USE db2");
+    statement.execute("CREATE TABLE tbl2(B INT, A STRING) " +
+                      " row format delimited fields terminated by '|'  stored as textfile");
+    statement.execute("LOAD DATA LOCAL INPATH '" + dataFile.getPath() + "' INTO TABLE tbl2");
+
+    statement.execute("DROP DATABASE IF EXISTS db3 CASCADE");
+    statement.execute("CREATE DATABASE db3");
+    statement.execute("USE db3");
+    statement.execute("CREATE TABLE tbl3(B INT, A STRING) " +
+                      " row format delimited fields terminated by '|'  stored as textfile");
+    statement.execute("LOAD DATA LOCAL INPATH '" + dataFile.getPath() + "' INTO TABLE tbl3");
+
+    statement.execute("DROP DATABASE IF EXISTS db4 CASCADE");
+    statement.execute("CREATE DATABASE db4");
+    statement.execute("USE db4");
+    statement.execute("CREATE TABLE tbl4(B INT, A STRING) " +
+                      " row format delimited fields terminated by '|'  stored as textfile");
+    statement.execute("LOAD DATA LOCAL INPATH '" + dataFile.getPath() + "' INTO TABLE tbl4");
+
+    statement.close();
+    connection.close();
+
+    // test execution
+    connection = context.createConnection("user_1", "password");
+    statement = context.createStatement(connection);
+    statement.execute("USE db1");
+    // test user1 can execute query on tbl1
+    verifyCount(statement, "SELECT COUNT(*) FROM tbl1");
+    connection.close();
+
+    connection = context.createConnection("user_2", "password");
+    statement = context.createStatement(connection);
+    statement.execute("USE db2");
+    // test user1 can execute query on tbl1
+    verifyCount(statement, "SELECT COUNT(*) FROM tbl2");
+    connection.close();
+
+    // verify no access to db3 due to badly formatted rule in db3 policy file
+    connection = context.createConnection("user_3", "password");
+    statement = context.createStatement(connection);
+    context.assertAuthzException(statement, "USE db3");
+    // test user1 can execute query on tbl1
+    context.assertAuthzException(statement, "SELECT COUNT(*) FROM db3.tbl3");
+    connection.close();
+
+    connection = context.createConnection("user_4", "password");
+    statement = context.createStatement(connection);
+    statement.execute("USE db4");
+    // test user1 can execute query on tbl1
+    verifyCount(statement, "SELECT COUNT(*) FROM tbl4");
+    connection.close();
+
+    //test cleanup
+    connection = context.createConnection("hive", "hive");
+    statement = context.createStatement(connection);
+    statement.execute("DROP DATABASE db1 CASCADE");
+    statement.execute("DROP DATABASE db2 CASCADE");
+    statement.execute("DROP DATABASE db3 CASCADE");
+    statement.execute("DROP DATABASE db4 CASCADE");
+    statement.close();
+    connection.close();
+  }
+
   private void verifyCount(Statement statement, String query) throws SQLException {
     ResultSet resultSet = statement.executeQuery(query);
     int count = 0;