SENTRY-2495: Support Conjunctive Matching in Solr QueryDocAuthorizationComponent (Tristan Stevens reviewed by Hrishikesh Gadre and Kalyan Kumar Kalvagadda)

Change-Id: I7c9e8f17420c81c47da63961f6651b937e36eb20
diff --git a/sentry-solr/solr-sentry-handlers/src/main/java/org/apache/solr/handler/component/DocAuthorizationComponent.java b/sentry-solr/solr-sentry-handlers/src/main/java/org/apache/solr/handler/component/DocAuthorizationComponent.java
new file mode 100644
index 0000000..cac8c74
--- /dev/null
+++ b/sentry-solr/solr-sentry-handlers/src/main/java/org/apache/solr/handler/component/DocAuthorizationComponent.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import org.apache.sentry.binding.solr.authz.SentrySolrPluginImpl;
+import org.apache.sentry.core.common.exception.SentryUserException;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.security.AuthorizationPlugin;
+
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+import java.util.Set;
+
+public abstract class DocAuthorizationComponent extends SearchComponent {
+  public static final String SUPERUSER = System.getProperty("solr.authorization.superuser", "solr");
+
+  public abstract boolean getEnabled();
+
+  public abstract void prepare(ResponseBuilder rb, String userName) throws IOException;
+
+  /**
+   * This method returns the roles associated with the specified <code>userName</code>
+   */
+  protected final Set<String> getRoles(SolrQueryRequest req, String userName) {
+    SolrCore solrCore = req.getCore();
+
+    AuthorizationPlugin plugin = solrCore.getCoreContainer().getAuthorizationPlugin();
+    if (!(plugin instanceof SentrySolrPluginImpl)) {
+      throw new SolrException(SolrException.ErrorCode.UNAUTHORIZED,
+          getClass().getSimpleName() + " can only be used with Sentry authorization plugin for Solr");
+    }
+    try {
+      return ((SentrySolrPluginImpl) plugin).getRoles(userName);
+    } catch (SentryUserException e) {
+      throw new SolrException(SolrException.ErrorCode.UNAUTHORIZED,
+          "Request from user: " + userName + " rejected due to SentryUserException: ", e);
+    }
+  }
+
+  /**
+   * This method return the user name from the provided {@linkplain SolrQueryRequest}
+   */
+  protected final String getUserName(SolrQueryRequest req) {
+    // If a local request, treat it like a super user request; i.e. it is equivalent to an
+    // http request from the same process.
+    if (req instanceof LocalSolrQueryRequest) {
+      return SUPERUSER;
+    }
+
+    SolrCore solrCore = req.getCore();
+
+    HttpServletRequest httpServletRequest = (HttpServletRequest) req.getContext().get("httpRequest");
+    if (httpServletRequest == null) {
+      StringBuilder builder = new StringBuilder("Unable to locate HttpServletRequest");
+      if (solrCore != null && !solrCore.getSolrConfig().getBool("requestDispatcher/requestParsers/@addHttpRequestToContext", true)) {
+        builder.append(", ensure requestDispatcher/requestParsers/@addHttpRequestToContext is set to true in solrconfig.xml");
+      }
+      throw new SolrException(SolrException.ErrorCode.UNAUTHORIZED, builder.toString());
+    }
+
+    String userName = httpServletRequest.getRemoteUser();
+    if (userName == null) {
+      userName = SentrySolrPluginImpl.getShortUserName(httpServletRequest.getUserPrincipal());
+    }
+    if (userName == null) {
+      throw new SolrException(SolrException.ErrorCode.UNAUTHORIZED, "This request is not authenticated.");
+    }
+
+    return userName;
+  }
+
+  @Override
+  public final void prepare(ResponseBuilder rb) throws IOException {
+    if (!getEnabled()) {
+      return;
+    }
+
+    String userName = getUserName(rb.req);
+    if (SUPERUSER.equals(userName)) {
+      return;
+    }
+
+    prepare(rb, userName);
+  }
+}
diff --git a/sentry-solr/solr-sentry-handlers/src/main/java/org/apache/solr/handler/component/QueryDocAuthorizationComponent.java b/sentry-solr/solr-sentry-handlers/src/main/java/org/apache/solr/handler/component/QueryDocAuthorizationComponent.java
index 9da3d6e..84c6eaa 100644
--- a/sentry-solr/solr-sentry-handlers/src/main/java/org/apache/solr/handler/component/QueryDocAuthorizationComponent.java
+++ b/sentry-solr/solr-sentry-handlers/src/main/java/org/apache/solr/handler/component/QueryDocAuthorizationComponent.java
@@ -14,67 +14,86 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.solr.handler.component;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.sentry.binding.solr.authz.SentrySolrPluginImpl;
-import org.apache.sentry.core.common.exception.SentryUserException;
+import com.google.common.base.Joiner;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.request.LocalSolrQueryRequest;
-import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.security.AuthorizationPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Set;
 
-import javax.servlet.http.HttpServletRequest;
-
-public class QueryDocAuthorizationComponent extends SearchComponent
-{
+public class QueryDocAuthorizationComponent extends DocAuthorizationComponent {
   private static final Logger LOG =
     LoggerFactory.getLogger(QueryDocAuthorizationComponent.class);
   public static final String AUTH_FIELD_PROP = "sentryAuthField";
   public static final String DEFAULT_AUTH_FIELD = "sentry_auth";
   public static final String ALL_ROLES_TOKEN_PROP = "allRolesToken";
   public static final String ENABLED_PROP = "enabled";
-  private static final String superUser = System.getProperty("solr.authorization.superuser", "solr");
+  public static final String MODE_PROP = "matchMode";
+  public static final String DEFAULT_MODE_PROP = MatchType.DISJUNCTIVE.toString();
+
+  public static final String ALLOW_MISSING_VAL_PROP = "allow_missing_val";
+  public static final String TOKEN_COUNT_PROP = "tokenCountField";
+  public static final String DEFAULT_TOKEN_COUNT_FIELD_PROP = "sentry_auth_count";
+  public static final String QPARSER_PROP = "qParser";
+
+
   private String authField;
   private String allRolesToken;
   private boolean enabled;
+  private MatchType matchMode;
+  private String tokenCountField;
+  private boolean allowMissingValue;
+
+  private String qParserName;
+
+  private enum MatchType {
+    DISJUNCTIVE,
+    CONJUNCTIVE
+  }
 
   @Override
   public void init(NamedList args) {
-    SolrParams params = SolrParams.toSolrParams(args);
+    SolrParams params = args.toSolrParams();
     this.authField = params.get(AUTH_FIELD_PROP, DEFAULT_AUTH_FIELD);
-    LOG.info("QueryDocAuthorizationComponent authField: " + this.authField);
+    LOG.info("QueryDocAuthorizationComponent authField: {}", this.authField);
     this.allRolesToken = params.get(ALL_ROLES_TOKEN_PROP, "");
-    LOG.info("QueryDocAuthorizationComponent allRolesToken: " + this.allRolesToken);
+    LOG.info("QueryDocAuthorizationComponent allRolesToken: {}", this.allRolesToken);
     this.enabled = params.getBool(ENABLED_PROP, false);
-    LOG.info("QueryDocAuthorizationComponent enabled: " + this.enabled);
+    LOG.info("QueryDocAuthorizationComponent enabled: {}", this.enabled);
+    this.matchMode = MatchType.valueOf(params.get(MODE_PROP, DEFAULT_MODE_PROP).toUpperCase());
+    LOG.info("QueryDocAuthorizationComponent matchType: {}", this.matchMode);
+
+    if (this.matchMode == MatchType.CONJUNCTIVE) {
+      this.qParserName = params.get(QPARSER_PROP, "subset").trim();
+      LOG.debug("QueryDocAuthorizationComponent qParserName: {}", this.qParserName);
+      this.allowMissingValue = params.getBool(ALLOW_MISSING_VAL_PROP, false);
+      LOG.debug("QueryDocAuthorizationComponent allowMissingValue: {}", this.allowMissingValue);
+      this.tokenCountField = params.get(TOKEN_COUNT_PROP, DEFAULT_TOKEN_COUNT_FIELD_PROP);
+      LOG.debug("QueryDocAuthorizationComponent tokenCountField: {}", this.tokenCountField);
+    }
   }
 
-  private void addRawClause(StringBuilder builder, String authField, String value) {
+  private void addDisjunctiveRawClause(StringBuilder builder, String value) {
     // requires a space before the first term, so the
     // default lucene query parser will be used
     builder.append(" {!raw f=").append(authField).append(" v=")
       .append(value).append("}");
   }
 
-  public String getFilterQueryStr(Set<String> roles) {
-    if (roles != null && roles.size() > 0) {
+  public String getDisjunctiveFilterQueryStr(Set<String> roles) {
+    if (roles != null && !roles.isEmpty()) {
       StringBuilder builder = new StringBuilder();
       for (String role : roles) {
-        addRawClause(builder, authField, role);
+        addDisjunctiveRawClause(builder, role);
       }
       if (allRolesToken != null && !allRolesToken.isEmpty()) {
-        addRawClause(builder, authField, allRolesToken);
+        addDisjunctiveRawClause(builder, allRolesToken);
       }
       return builder.toString();
     }
@@ -82,34 +101,43 @@
   }
 
   @Override
-  public void prepare(ResponseBuilder rb) throws IOException {
-    if (!enabled) {
-      return;
-    }
-
-    String userName = getUserName(rb.req);
-    if (superUser.equals(userName)) {
-      return;
-    }
-
+  public void prepare(ResponseBuilder rb, String userName) throws IOException {
     Set<String> roles = getRoles(rb.req, userName);
     if (roles != null && !roles.isEmpty()) {
-      String filterQuery = getFilterQueryStr(roles);
-
+      String filterQuery;
+      if (matchMode == MatchType.DISJUNCTIVE) {
+        filterQuery = getDisjunctiveFilterQueryStr(roles);
+      } else {
+        filterQuery = getConjunctiveFilterQueryStr(roles);
+      }
       ModifiableSolrParams newParams = new ModifiableSolrParams(rb.req.getParams());
       newParams.add("fq", filterQuery);
       rb.req.setParams(newParams);
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Adding filter query {} for user {} with roles {}", new Object[] {filterQuery, userName, roles});
+        LOG.debug("Adding filter query {} for user {} with roles {}", filterQuery, userName, roles);
       }
 
     } else {
       throw new SolrException(SolrException.ErrorCode.UNAUTHORIZED,
-        "Request from user: " + userName +
-        " rejected because user is not associated with any roles");
+        "Request from user: " + userName + " rejected because user is not associated with any roles");
     }
   }
 
+  private String getConjunctiveFilterQueryStr(Set<String> roles) {
+    StringBuilder filterQuery = new StringBuilder();
+    filterQuery
+        .append(" {!").append(qParserName)
+        .append(" set_field=\"").append(authField).append("\"")
+        .append(" set_value=\"").append(Joiner.on(',').join(roles.iterator())).append("\"")
+        .append(" count_field=\"").append(tokenCountField).append("\"");
+    if (allRolesToken != null && !allRolesToken.equals("")) {
+      filterQuery.append(" wildcard_token=\"").append(allRolesToken).append("\"");
+    }
+    filterQuery.append(" allow_missing_val=").append(allowMissingValue).append(" }");
+
+    return filterQuery.toString();
+  }
+
   @Override
   public void process(ResponseBuilder rb) throws IOException {
   }
@@ -119,61 +147,8 @@
     return "Handle Query Document Authorization";
   }
 
+  @Override
   public boolean getEnabled() {
     return enabled;
   }
-
-  /**
-   * This method return the user name from the provided {@linkplain SolrQueryRequest}
-   */
-  private String getUserName (SolrQueryRequest req) {
-    // If a local request, treat it like a super user request; i.e. it is equivalent to an
-    // http request from the same process.
-    if (req instanceof LocalSolrQueryRequest) {
-      return superUser;
-    }
-
-    SolrCore solrCore = req.getCore();
-
-    HttpServletRequest httpServletRequest = (HttpServletRequest)req.getContext().get("httpRequest");
-    if (httpServletRequest == null) {
-      StringBuilder builder = new StringBuilder("Unable to locate HttpServletRequest");
-      if (solrCore != null && solrCore.getSolrConfig().getBool(
-        "requestDispatcher/requestParsers/@addHttpRequestToContext", true) == false) {
-        builder.append(", ensure requestDispatcher/requestParsers/@addHttpRequestToContext is set to true in solrconfig.xml");
-      }
-      throw new SolrException(SolrException.ErrorCode.UNAUTHORIZED, builder.toString());
-    }
-
-    String userName = httpServletRequest.getRemoteUser();
-    if (userName == null) {
-      userName = SentrySolrPluginImpl.getShortUserName(httpServletRequest.getUserPrincipal());
-    }
-    if (userName == null) {
-      throw new SolrException(SolrException.ErrorCode.UNAUTHORIZED, "This request is not authenticated.");
-    }
-
-    return userName;
-  }
-
-  /**
-   * This method returns the roles associated with the specified <code>userName</code>
-   */
-  private Set<String> getRoles (SolrQueryRequest req, String userName) {
-    SolrCore solrCore = req.getCore();
-
-    AuthorizationPlugin plugin = solrCore.getCoreContainer().getAuthorizationPlugin();
-    if (!(plugin instanceof SentrySolrPluginImpl)) {
-      throw new SolrException(SolrException.ErrorCode.UNAUTHORIZED, getClass().getSimpleName() +
-          " can only be used with Sentry authorization plugin for Solr");
-    }
-    try {
-      return ((SentrySolrPluginImpl)plugin).getRoles(userName);
-    } catch (SentryUserException e) {
-      throw new SolrException(SolrException.ErrorCode.UNAUTHORIZED,
-        "Request from user: " + userName +
-        " rejected due to SentryUserException: ", e);
-    }
-  }
-
 }
diff --git a/sentry-solr/solr-sentry-handlers/src/main/java/org/apache/solr/handler/component/SubsetQueryPlugin.java b/sentry-solr/solr-sentry-handlers/src/main/java/org/apache/solr/handler/component/SubsetQueryPlugin.java
new file mode 100644
index 0000000..59c7853
--- /dev/null
+++ b/sentry-solr/solr-sentry-handlers/src/main/java/org/apache/solr/handler/component/SubsetQueryPlugin.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import com.google.common.base.Preconditions;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.CoveringQuery;
+import org.apache.lucene.search.LongValuesSource;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.WildcardQuery;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.search.QParser;
+import org.apache.solr.search.QParserPlugin;
+import org.apache.solr.search.SyntaxError;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * A custom {@linkplain QParserPlugin} which supports subset queries on a given Solr index.
+ * This filter accepts the name of the field whose value should be used for subset matching
+ * and the set against which subset queries are to be run ( as a comma separated string values).
+ */
+public class SubsetQueryPlugin extends QParserPlugin {
+  public static final String SETVAL_PARAM_NAME = "set_value";
+  public static final String SETVAL_FIELD_NAME = "set_field";
+  public static final String COUNT_FIELD_NAME = "count_field";
+  public static final String MISSING_VAL_ALLOWED = "allow_missing_val";
+  public static final String WILDCARD_CHAR = "wildcard_token";
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public void init(NamedList arg0) {
+  }
+
+  @Override
+  public QParser createParser(String qstr, SolrParams localParams, SolrParams params, SolrQueryRequest req) {
+    return new QParser(qstr, localParams, params, req) {
+
+      @Override
+      public Query parse() throws SyntaxError {
+        String fieldName = Preconditions.checkNotNull(localParams.get(SETVAL_FIELD_NAME));
+        String countFieldName = Preconditions.checkNotNull(localParams.get(COUNT_FIELD_NAME));
+        boolean allowMissingValues = Boolean.parseBoolean(Preconditions.checkNotNull(localParams.get(MISSING_VAL_ALLOWED)));
+        String wildcardToken = localParams.get(WILDCARD_CHAR);
+
+        LongValuesSource minimumNumberMatch = LongValuesSource.fromIntField(countFieldName);
+        Collection<Query> queries = new ArrayList<>();
+
+        String fieldVals = Preconditions.checkNotNull(localParams.get(SETVAL_PARAM_NAME));
+        for (String v : fieldVals.split(",")) {
+          queries.add(new TermQuery(new Term(fieldName, v)));
+        }
+        if (wildcardToken != null && !wildcardToken.equals("")) {
+          queries.add(new TermQuery(new Term(fieldName, wildcardToken)));
+        }
+        if (allowMissingValues) {
+          // To construct this query we need to do a little trick tho construct a test for an empty field as follows:
+          // (*:* AND -fieldName:*) ==> parses as: (+*:* -fieldName:*)
+          // It is a feature of Lucene that pure negative queries are not allowed (although Solr allows them as a top level construct)
+          // therefore we need to AND with *:*
+          // We can then pass this BooleanQuery to the CoveringQuery as one of its allowed matches.
+          BooleanQuery.Builder builder = new BooleanQuery.Builder();
+          builder.add(new BooleanClause(new MatchAllDocsQuery(), BooleanClause.Occur.SHOULD));
+          builder.add(new BooleanClause(new WildcardQuery(new Term(fieldName, "*")), BooleanClause.Occur.MUST_NOT));
+
+          queries.add(builder.build());
+        }
+        return new CoveringQuery(queries, minimumNumberMatch);
+      }
+    };
+  }
+
+}
diff --git a/sentry-solr/solr-sentry-handlers/src/test/java/org/apache/solr/handler/component/SubsetQueryTest.java b/sentry-solr/solr-sentry-handlers/src/test/java/org/apache/solr/handler/component/SubsetQueryTest.java
new file mode 100644
index 0000000..2308ad3
--- /dev/null
+++ b/sentry-solr/solr-sentry-handlers/src/test/java/org/apache/solr/handler/component/SubsetQueryTest.java
@@ -0,0 +1,329 @@
+package org.apache.solr.handler.component;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.schema.IndexSchema;
+import org.apache.solr.schema.SchemaField;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test for QueryDocAuthorizationComponent (with conjunctive match) and SubsetQueryPlugin
+ */
+public class SubsetQueryTest extends SolrTestCaseJ4 {
+  private static final String f = "stringdv";
+  private static final String countField = "valcount";
+  private static final String qParser = "subset";
+
+  @BeforeClass
+  public static void beforeTests() throws Exception {
+    initCore("solrconfig-subsetquery.xml", "schema-docValuesSubsetMatch.xml");
+
+    // sanity check our schema meets our expectations
+    final IndexSchema schema = h.getCore().getLatestSchema();
+
+    final SchemaField sf = schema.getField(f);
+    assert(sf.indexed());
+    final SchemaField sfCount = schema.getField(countField);
+    assert(sfCount.indexed());
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+    assertU(delQ("*:*"));
+  }
+
+  /** Tests the ability to do basic queries using SubsetQueryPlugin
+   */
+  @Test
+  public void testSubsetQueryPluginSimple()  {
+    assertU(adoc("id", "1", f, "a", countField, "1"));
+    assertU(adoc("id", "2", f, "b", countField, "1"));
+    assertU(adoc("id", "3", f, "c", countField, "1"));
+    assertU(adoc("id", "4", f, "d", countField, "1"));
+    assertU(adoc("id", "5", f, "a", f, "b", countField, "2"));
+    assertU(adoc("id", "6", f, "a", f, "b", f, "c", countField, "3"));
+    assertU(adoc("id", "7", f, "a", f, "b", f, "c", f, "d", countField, "4"));
+    assertU(adoc("id", "8", f, "a", f, "b", f, "c", f, "d", f, "bar", countField, "5"));
+    assertU(adoc("id", "9", countField, "0"));
+    assertU(commit());
+
+    // string: normal fq
+    assertQ(req("q", "*:*", "fq", "stringdv:b", "sort", "id asc"),
+        "//*[@numFound='5']",
+        "//result/doc[1]/str[@name='id'][.=2]",
+        "//result/doc[2]/str[@name='id'][.=5]",
+        "//result/doc[3]/str[@name='id'][.=6]",
+        "//result/doc[4]/str[@name='id'][.=7]",
+        "//result/doc[5]/str[@name='id'][.=8]"
+    );
+
+    assertQ(req("q", "*:*", "fq", "stringdv:b", "fq", "valcount:1", "sort", "id asc"),
+        "//*[@numFound='1']",
+        "//result/doc[1]/str[@name='id'][.=2]"
+    );
+
+    assertQ(req("q", "*:*", "fq", "{!" + qParser + " count_field=\"valcount\" set_value=\"b\" set_field=\"stringdv\" allow_missing_val=false wildcard_token=\"foo\" }", "sort", "id asc"),
+        "//*[@numFound='1']",
+        "//result/doc[1]/str[@name='id'][.=2]"
+    );
+
+    assertQ(req("q", "*:*", "fq", "{!" + qParser + " count_field=\"valcount\" set_value=\"a,b\" set_field=\"stringdv\" allow_missing_val=false wildcard_token=\"foo\" }", "sort", "id asc"),
+        "//*[@numFound='3']",
+        "//result/doc[1]/str[@name='id'][.=1]",
+        "//result/doc[2]/str[@name='id'][.=2]",
+        "//result/doc[3]/str[@name='id'][.=5]"
+    );
+
+    assertQ(req("q", "*:*", "fq", "{!" + qParser + " count_field=\"valcount\" set_value=\"a,b\" set_field=\"stringdv\" allow_missing_val=true wildcard_token=\"foo\" }", "sort", "id asc"),
+        "//*[@numFound='4']",
+        "//result/doc[1]/str[@name='id'][.=1]",
+        "//result/doc[2]/str[@name='id'][.=2]",
+        "//result/doc[3]/str[@name='id'][.=5]",
+        "//result/doc[4]/str[@name='id'][.=9]"
+    );
+
+    assertQ(req("q", "*:*", "fq", "{!" + qParser + " count_field=\"valcount\" set_value=\"a,b,c,d\" set_field=\"stringdv\" allow_missing_val=true wildcard_token=\"foo\" }", "sort", "id asc"),
+        "//*[@numFound='8']"
+    );
+
+    assertQ(req("q", "*:*", "fq", "{!" + qParser + " count_field=\"valcount\" set_value=\"a,b,c,d\" set_field=\"stringdv\" allow_missing_val=true wildcard_token=\"bar\" }", "sort", "id asc"),
+        "//*[@numFound='9']"
+    );
+
+  }
+
+  /** Tests the missingValues capability, both +ve and -ve testing
+   */
+  @Test
+  public void testMissingValues() throws Exception {
+    assertU(adoc("id", "1", f, "a", countField, "1"));
+    assertU(adoc("id", "2", countField, "0"));
+    assertU(adoc("id", "3", f, "b", countField, "1"));
+    assertU(adoc("id", "4", countField, "0"));
+    assertU(adoc("id", "5", f, "a", f, "b", countField, "2"));
+    assertU(adoc("id", "6", countField, "0"));
+    assertU(adoc("id", "7", f, "a", f, "b", f, "c", countField, "3"));
+    assertU(adoc("id", "8", countField, "0"));
+    assertU(commit());
+
+    // string: normal fq
+    assertQ(req("q", "*:*", "sort", "id asc"),
+    "//*[@numFound='8']"
+    );
+
+    // string: normal fq
+    assertQ(req("q", "*:*", "fq", "stringdv:b", "sort", "id asc"),
+    "//*[@numFound='3']",
+    "//result/doc[1]/str[@name='id'][.=3]",
+    "//result/doc[2]/str[@name='id'][.=5]",
+    "//result/doc[3]/str[@name='id'][.=7]"
+    );
+
+    // Just matching docs with only b
+    assertQ(req("q", "*:*", "fq", "{!" + qParser + " count_field=\"valcount\" set_value=\"b\" set_field=\"stringdv\" allow_missing_val=false wildcard_token=\"foo\" }", "sort", "id asc"),
+    "//*[@numFound='1']",
+    "//result/doc[1]/str[@name='id'][.=3]"
+    );
+
+    // Matching docs with only b and also the docs with no values (2, 4, 6, 8)
+    assertQ(req("q", "*:*", "fq", "{!" + qParser + " count_field=\"valcount\" set_value=\"b\" set_field=\"stringdv\" allow_missing_val=true wildcard_token=\"foo\" }", "sort", "id asc"),
+    "//*[@numFound='5']",
+    "//result/doc[1]/str[@name='id'][.=2]",
+    "//result/doc[2]/str[@name='id'][.=3]",
+    "//result/doc[3]/str[@name='id'][.=4]",
+    "//result/doc[4]/str[@name='id'][.=6]",
+    "//result/doc[5]/str[@name='id'][.=8]"
+    );
+
+    // Matching docs with a, b or a and b
+    assertQ(req("q", "*:*", "fq", "{!" + qParser + " count_field=\"valcount\" set_value=\"a,b\" set_field=\"stringdv\" allow_missing_val=false wildcard_token=\"foo\" }", "sort", "id asc"),
+    "//*[@numFound='3']",
+    "//result/doc[1]/str[@name='id'][.=1]",
+    "//result/doc[2]/str[@name='id'][.=3]",
+    "//result/doc[3]/str[@name='id'][.=5]"
+    );
+
+    // Matching docs with a, b or a and b and also the docs with no values (2, 4, 6, 8)
+    assertQ(req("q", "*:*", "fq", "{!" + qParser + " count_field=\"valcount\" set_value=\"a,b\" set_field=\"stringdv\" allow_missing_val=true wildcard_token=\"foo\" }", "sort", "id asc"),
+    "//*[@numFound='7']",
+    "//result/doc[1]/str[@name='id'][.=1]",
+    "//result/doc[2]/str[@name='id'][.=2]",
+    "//result/doc[3]/str[@name='id'][.=3]",
+    "//result/doc[4]/str[@name='id'][.=4]",
+    "//result/doc[5]/str[@name='id'][.=5]",
+    "//result/doc[6]/str[@name='id'][.=6]",
+    "//result/doc[7]/str[@name='id'][.=8]"
+    );
+  }
+
+  /** Tests the wildcardToken capability, both +ve and -ve testing
+   * Wildcard token means you should match those documents with that term, as well as those listed
+   */
+  @Test
+  public void testWildcardToken() throws Exception {
+    assertU(adoc("id", "1", f, "a", countField, "1"));
+    assertU(adoc("id", "2", f, "a", f, "foo", countField, "2"));
+    assertU(adoc("id", "3", f, "b", countField, "1"));
+    assertU(adoc("id", "4", f, "b", f, "foo", countField, "2"));
+    assertU(adoc("id", "5", f, "a", f, "b", countField, "2"));
+    assertU(adoc("id", "6", f, "a", f, "b", f, "foo", countField, "3"));
+    assertU(adoc("id", "7", f, "a", f, "b", f, "c", countField, "3"));
+    assertU(adoc("id", "8", f, "a", f, "b", f, "c", f, "foo", countField, "4"));
+    assertU(adoc("id", "9", f, "foo", countField, "1"));
+    assertU(commit());
+
+    // string: normal fq
+    assertQ(req("q", "*:*", "sort", "id asc"),
+        "//*[@numFound='9']"
+    );
+
+    // string: normal fq
+    assertQ(req("q", "*:*", "fq", "stringdv:b", "sort", "id asc"),
+        "//*[@numFound='6']",
+        "//result/doc[1]/str[@name='id'][.=3]",
+        "//result/doc[2]/str[@name='id'][.=4]",
+        "//result/doc[3]/str[@name='id'][.=5]",
+        "//result/doc[4]/str[@name='id'][.=6]",
+        "//result/doc[5]/str[@name='id'][.=7]",
+        "//result/doc[6]/str[@name='id'][.=8]"
+    );
+
+    // Matching docs with only b, plus wildcard token of bar (has no matches)
+    assertQ(req("q", "*:*", "fq", "{!" + qParser + " count_field=\"valcount\" set_value=\"b\" set_field=\"stringdv\" allow_missing_val=false wildcard_token=\"bar\" }", "sort", "id asc"),
+        "//*[@numFound='1']",
+        "//result/doc[1]/str[@name='id'][.=3]"
+    );
+
+    // Matching docs with only b, plus wildcard token of foo (matches 4 and 9)
+    assertQ(req("q", "*:*", "fq", "{!" + qParser + " count_field=\"valcount\" set_value=\"b\" set_field=\"stringdv\" allow_missing_val=false wildcard_token=\"foo\" }", "sort", "id asc"),
+        "//*[@numFound='3']",
+        "//result/doc[1]/str[@name='id'][.=3]",
+        "//result/doc[2]/str[@name='id'][.=4]",
+        "//result/doc[3]/str[@name='id'][.=9]"
+    );
+
+    // Matching docs with a, b or a and b, plus wildcard token of bar (has no matches)
+    assertQ(req("q", "*:*", "fq", "{!" + qParser + " count_field=\"valcount\" set_value=\"a,b\" set_field=\"stringdv\" allow_missing_val=false wildcard_token=\"bar\" }", "sort", "id asc"),
+        "//*[@numFound='3']",
+        "//result/doc[1]/str[@name='id'][.=1]",
+        "//result/doc[2]/str[@name='id'][.=3]",
+        "//result/doc[3]/str[@name='id'][.=5]"
+    );
+
+    // Matching docs with a, b or a and b, plus wildcard token of foo (matches 2, 4, 6 and 9)
+    assertQ(req("q", "*:*", "fq", "{!" + qParser + " count_field=\"valcount\" set_value=\"a,b\" set_field=\"stringdv\" allow_missing_val=false wildcard_token=\"foo\" }", "sort", "id asc"),
+        "//*[@numFound='7']",
+        "//result/doc[1]/str[@name='id'][.=1]",
+        "//result/doc[2]/str[@name='id'][.=2]",
+        "//result/doc[3]/str[@name='id'][.=3]",
+        "//result/doc[4]/str[@name='id'][.=4]",
+        "//result/doc[5]/str[@name='id'][.=5]",
+        "//result/doc[6]/str[@name='id'][.=6]",
+        "//result/doc[7]/str[@name='id'][.=9]"
+    );
+  }
+
+  /** Tests the wildcardToken capability and the missingValues together, both +ve and -ve testing
+   */
+  @Test
+  public void testWildcardTokenWithMissing() throws Exception {
+    assertU(adoc("id", "1", f, "a", countField, "1"));
+    assertU(adoc("id", "2", f, "a", f, "foo", countField, "2"));
+    assertU(adoc("id", "3", f, "b", countField, "1"));
+    assertU(adoc("id", "4", f, "b", f, "foo", countField, "2"));
+    assertU(adoc("id", "5", f, "a", f, "b", countField, "2"));
+    assertU(adoc("id", "6", f, "a", f, "b", f, "foo", countField, "3"));
+    assertU(adoc("id", "7", f, "a", f, "b", f, "c", countField, "3"));
+    assertU(adoc("id", "8", f, "a", f, "b", f, "c", f, "foo", countField, "4"));
+    assertU(adoc("id", "9", f, "foo", countField, "1"));
+    assertU(adoc("id", "10", countField, "0"));
+    assertU(commit());
+
+    // string: normal fq
+    assertQ(req("q", "*:*", "sort", "id asc"),
+        "//*[@numFound='10']"
+    );
+
+    // string: normal fq
+    assertQ(req("q", "*:*", "fq", "stringdv:b", "sort", "id asc"),
+        "//*[@numFound='6']",
+        "//result/doc[1]/str[@name='id'][.=3]",
+        "//result/doc[2]/str[@name='id'][.=4]",
+        "//result/doc[3]/str[@name='id'][.=5]",
+        "//result/doc[4]/str[@name='id'][.=6]",
+        "//result/doc[5]/str[@name='id'][.=7]",
+        "//result/doc[6]/str[@name='id'][.=8]"
+    );
+
+    // Matches docs with only b, allowMissing=false, wildcard of bar
+    assertQ(req("q", "*:*", "fq", "{!" + qParser + " count_field=\"valcount\" set_value=\"b\" set_field=\"stringdv\" allow_missing_val=false wildcard_token=\"bar\" }", "sort", "id asc"),
+        "//*[@numFound='1']",
+        "//result/doc[1]/str[@name='id'][.=3]"
+    );
+
+    // Matches docs with only b, allowMissing=false, wildcard of foo (matches 4 and 9)
+    assertQ(req("q", "*:*", "fq", "{!" + qParser + " count_field=\"valcount\" set_value=\"b\" set_field=\"stringdv\" allow_missing_val=false wildcard_token=\"foo\" }", "sort", "id asc"),
+        "//*[@numFound='3']",
+        "//result/doc[1]/str[@name='id'][.=3]",
+        "//result/doc[2]/str[@name='id'][.=4]",
+        "//result/doc[3]/str[@name='id'][.=9]"
+    );
+
+    // Matches docs with only b, allowMissing=true (matches 10), wildcard of foo (matches 4 and 9)
+    assertQ(req("q", "*:*", "fq", "{!" + qParser + " count_field=\"valcount\" set_value=\"b\" set_field=\"stringdv\" allow_missing_val=true wildcard_token=\"foo\" }", "sort", "id asc"),
+        "//*[@numFound='4']",
+        "//result/doc[1]/str[@name='id'][.=10]",
+        "//result/doc[2]/str[@name='id'][.=3]",
+        "//result/doc[3]/str[@name='id'][.=4]",
+        "//result/doc[4]/str[@name='id'][.=9]"
+    );
+
+    // Matching docs with a, b or a and b,
+    assertQ(req("q", "*:*", "fq", "{!" + qParser + " count_field=\"valcount\" set_value=\"a,b\" set_field=\"stringdv\" allow_missing_val=false wildcard_token=\"bar\" }", "sort", "id asc"),
+        "//*[@numFound='3']",
+        "//result/doc[1]/str[@name='id'][.=1]",
+        "//result/doc[2]/str[@name='id'][.=3]",
+        "//result/doc[3]/str[@name='id'][.=5]"
+    );
+
+    assertQ(req("q", "*:*", "fq", "{!" + qParser + " count_field=\"valcount\" set_value=\"a,b\" set_field=\"stringdv\" allow_missing_val=false wildcard_token=\"foo\" }", "sort", "id asc"),
+        "//*[@numFound='7']",
+        "//result/doc[1]/str[@name='id'][.=1]",
+        "//result/doc[2]/str[@name='id'][.=2]",
+        "//result/doc[3]/str[@name='id'][.=3]",
+        "//result/doc[4]/str[@name='id'][.=4]",
+        "//result/doc[5]/str[@name='id'][.=5]",
+        "//result/doc[6]/str[@name='id'][.=6]",
+        "//result/doc[7]/str[@name='id'][.=9]"
+    );
+
+    assertQ(req("q", "*:*", "fq", "{!" + qParser + " count_field=\"valcount\" set_value=\"a,b\" set_field=\"stringdv\" allow_missing_val=true wildcard_token=\"foo\" }", "sort", "id asc"),
+        "//*[@numFound='8']",
+        "//result/doc[1]/str[@name='id'][.=1]",
+        "//result/doc[2]/str[@name='id'][.=10]",
+        "//result/doc[3]/str[@name='id'][.=2]",
+        "//result/doc[4]/str[@name='id'][.=3]",
+        "//result/doc[5]/str[@name='id'][.=4]",
+        "//result/doc[6]/str[@name='id'][.=5]",
+        "//result/doc[7]/str[@name='id'][.=6]",
+        "//result/doc[8]/str[@name='id'][.=9]"
+    );
+  }
+
+}
diff --git a/sentry-solr/solr-sentry-handlers/src/test/resources/solr/collection1/schema-docValuesSubsetMatch.xml b/sentry-solr/solr-sentry-handlers/src/test/resources/solr/collection1/schema-docValuesSubsetMatch.xml
new file mode 100644
index 0000000..b9d9719
--- /dev/null
+++ b/sentry-solr/solr-sentry-handlers/src/test/resources/solr/collection1/schema-docValuesSubsetMatch.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<schema name="schema-docValuesSubsetMatch" version="1.6">
+
+  <fieldType name="string" class="solr.StrField"/>
+  <fieldType name="int" class="solr.TrieIntField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+  <fieldType name="long" class="solr.TrieLongField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+
+  <field name="id" type="string" required="true"/>
+  <field name="stringdv" type="string" indexed="true" stored="false" docValues="true" multiValued="true"/>
+  <field name="valcount" type="int" indexed="true" stored="true" multiValued="false" />
+  <uniqueKey>id</uniqueKey>
+
+</schema>
diff --git a/sentry-solr/solr-sentry-handlers/src/test/resources/solr/collection1/solrconfig-subsetmatchcomponent.xml b/sentry-solr/solr-sentry-handlers/src/test/resources/solr/collection1/solrconfig-subsetmatchcomponent.xml
new file mode 100644
index 0000000..c915d5e
--- /dev/null
+++ b/sentry-solr/solr-sentry-handlers/src/test/resources/solr/collection1/solrconfig-subsetmatchcomponent.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<config>
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+  <dataDir>${solr.data.dir:}</dataDir>
+  <xi:include href="solrconfig.snippet.randomindexconfig.xml" xmlns:xi="http://www.w3.org/2001/XInclude"/>
+  <directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}"/>
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+  <requestHandler name="/select" class="solr.SearchHandler">
+    <arr name="first-components">
+      <str>SubsetMatchComponent</str>
+    </arr>
+  </requestHandler>
+
+  <queryParser name="subset" class="org.apache.solr.handler.component.SubsetQueryPlugin"/>
+
+  <searchComponent name="SubsetMatchComponent" class="org.apache.solr.handler.component.QueryDocAuthorizationComponent" >
+    <str name="matchMode">CONJUNCTIVE</str>
+    <bool name="enabled">true</bool>
+    <str name="sentryAuthField">stringdv</str>
+    <str name="allRolesToken">testAllRolesToken</str>
+    <str name="allow_missing_val">true</str>
+  </searchComponent>
+</config>
diff --git a/sentry-solr/solr-sentry-handlers/src/test/resources/solr/collection1/solrconfig-subsetquery.xml b/sentry-solr/solr-sentry-handlers/src/test/resources/solr/collection1/solrconfig-subsetquery.xml
new file mode 100644
index 0000000..540f3f8
--- /dev/null
+++ b/sentry-solr/solr-sentry-handlers/src/test/resources/solr/collection1/solrconfig-subsetquery.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<config>
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+  <dataDir>${solr.data.dir:}</dataDir>
+  <xi:include href="solrconfig.snippet.randomindexconfig.xml" xmlns:xi="http://www.w3.org/2001/XInclude"/>
+  <directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}"/>
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+  <requestHandler name="/select" class="solr.SearchHandler" />
+  <queryParser name="subset" class="org.apache.solr.handler.component.SubsetQueryPlugin"/>
+</config>
diff --git a/sentry-solr/solr-sentry-handlers/src/test/resources/solr/collection1/solrconfig.snippet.randomindexconfig.xml b/sentry-solr/solr-sentry-handlers/src/test/resources/solr/collection1/solrconfig.snippet.randomindexconfig.xml
new file mode 100644
index 0000000..d80542d
--- /dev/null
+++ b/sentry-solr/solr-sentry-handlers/src/test/resources/solr/collection1/solrconfig.snippet.randomindexconfig.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!--
+
+A solrconfig.xml snippet containing indexConfig settings for randomized testing.
+
+-->
+<indexConfig>
+  <!-- this sys property is not set by SolrTestCaseJ4 because we ideally want to use
+       the RandomMergePolicy in all tests - but some tests expect very specific
+       Merge behavior, so those tests can set it as needed.
+  -->
+  <mergePolicyFactory class="${solr.tests.mergePolicyFactory:org.apache.solr.util.RandomMergePolicyFactory}" />
+
+  <useCompoundFile>${useCompoundFile:false}</useCompoundFile>
+
+  <maxBufferedDocs>${solr.tests.maxBufferedDocs}</maxBufferedDocs>
+  <ramBufferSizeMB>${solr.tests.ramBufferSizeMB}</ramBufferSizeMB>
+
+  <mergeScheduler class="${solr.tests.mergeScheduler}" />
+
+  <writeLockTimeout>1000</writeLockTimeout>
+  <commitLockTimeout>10000</commitLockTimeout>
+
+  <!-- this sys property is not set by SolrTestCaseJ4 because almost all tests should
+       use the single process lockType for speed - but tests that explicitly need
+       to vary the lockType canset it as needed.
+  -->
+  <lockType>${solr.tests.lockType:single}</lockType>
+
+  <infoStream>${solr.tests.infostream:false}</infoStream>
+
+</indexConfig>
diff --git a/sentry-tests/sentry-tests-solr/src/test/java/org/apache/sentry/tests/e2e/solr/AbstractSolrSentryTestCase.java b/sentry-tests/sentry-tests-solr/src/test/java/org/apache/sentry/tests/e2e/solr/AbstractSolrSentryTestCase.java
index 3d4d555..6aceb30 100644
--- a/sentry-tests/sentry-tests-solr/src/test/java/org/apache/sentry/tests/e2e/solr/AbstractSolrSentryTestCase.java
+++ b/sentry-tests/sentry-tests-solr/src/test/java/org/apache/sentry/tests/e2e/solr/AbstractSolrSentryTestCase.java
@@ -101,6 +101,19 @@
     }
   }
 
+  protected void deleteCollection (String userName, String collectionName) throws SolrServerException, IOException {
+    String tmp = getAuthenticatedUser();
+    try {
+      setAuthenticationUser(userName);
+      // Create collection.
+      CollectionAdminRequest.Delete deleteCmd =
+          CollectionAdminRequest.deleteCollection(collectionName);
+      assertEquals(0, deleteCmd.process(cluster.getSolrClient()).getStatus());
+    } finally {
+      setAuthenticationUser(tmp);
+    }
+  }
+
   /**
    * Function to clean Solr collections
    * @param userName Name of the user performing this operation
diff --git a/sentry-tests/sentry-tests-solr/src/test/java/org/apache/sentry/tests/e2e/solr/DocLevelGenerator.java b/sentry-tests/sentry-tests-solr/src/test/java/org/apache/sentry/tests/e2e/solr/DocLevelGenerator.java
index 40cc153..91f0125 100644
--- a/sentry-tests/sentry-tests-solr/src/test/java/org/apache/sentry/tests/e2e/solr/DocLevelGenerator.java
+++ b/sentry-tests/sentry-tests-solr/src/test/java/org/apache/sentry/tests/e2e/solr/DocLevelGenerator.java
@@ -18,6 +18,7 @@
 
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.common.SolrInputDocument;
+import org.junit.Assert;
 
 import java.util.ArrayList;
 
@@ -69,4 +70,33 @@
     client.add(collection, docs);
     client.commit(collection, true, true);
   }
+
+  public void generateDocsForSubsetQueries(CloudSolrClient client, int numDocs, int numTokens, String tokenPrefix) throws Exception {
+
+    Assert.assertTrue("Num Tokens should be divisible by numTokens",numDocs % numTokens == 0);
+
+    // create documents
+    ArrayList<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
+    for (int i = 0; i < numDocs; ++i) {
+      int tokenCount = 0;
+      SolrInputDocument doc = new SolrInputDocument();
+      String iStr = Long.toString(i);
+      doc.addField("id", iStr);
+      doc.addField("description", "description" + iStr);
+
+      for (int j=0; j < numTokens; j++) {
+        if ((i & 1 << j ) == 1 << j) {
+          doc.addField(authField, tokenPrefix + j);
+          tokenCount++;
+        }
+      }
+      doc.addField("sentry_auth_count", tokenCount);
+System.out.println(doc);
+      docs.add(doc);
+    }
+
+    client.add(collection, docs);
+    client.commit(collection,true, true);
+  }
+
 }
diff --git a/sentry-tests/sentry-tests-solr/src/test/java/org/apache/sentry/tests/e2e/solr/SolrSentryServiceTestBase.java b/sentry-tests/sentry-tests-solr/src/test/java/org/apache/sentry/tests/e2e/solr/SolrSentryServiceTestBase.java
index e1f789c..09f095a 100644
--- a/sentry-tests/sentry-tests-solr/src/test/java/org/apache/sentry/tests/e2e/solr/SolrSentryServiceTestBase.java
+++ b/sentry-tests/sentry-tests-solr/src/test/java/org/apache/sentry/tests/e2e/solr/SolrSentryServiceTestBase.java
@@ -25,6 +25,7 @@
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.Sets;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.sentry.core.common.exception.SentryUserException;
 import org.apache.sentry.core.model.solr.SolrConstants;
@@ -90,6 +91,10 @@
         .addConfig("cloud-managed", TEST_PATH().resolve("configsets").resolve("cloud-managed").resolve("conf"))
         .addConfig("cloud-minimal_doc_level_security", TEST_PATH().resolve("configsets")
                       .resolve("cloud-minimal_doc_level_security").resolve("conf"))
+        .addConfig("cloud-minimal_subset_match", TEST_PATH().resolve("configsets")
+                     .resolve("cloud-minimal_subset_match").resolve("conf"))
+        .addConfig("cloud-minimal_subset_match_missing_false", TEST_PATH().resolve("configsets")
+              .resolve("cloud-minimal_subset_match_missing_false").resolve("conf"))
         .configure();
       log.info("Successfully started Solr service");
 
@@ -204,6 +209,19 @@
     result.put("solr", Collections.singleton("solr"));
     result.put("junit", Collections.singleton("junit"));
     result.put("doclevel", Collections.singleton("doclevel"));
+    result.put("user3", Collections.singleton("group3"));
+
+    result.put("subset_user_012", Sets.newHashSet("subset_group0", "subset_group1", "subset_group2", "subset_nogroup"));
+    result.put("subset_user_013", Sets.newHashSet("subset_group0", "subset_group1", "subset_group3", "subset_nogroup"));
+    result.put("subset_user_023", Sets.newHashSet("subset_group0", "subset_group2", "subset_group3", "subset_nogroup"));
+    result.put("subset_user_123", Sets.newHashSet("subset_group1", "subset_group2", "subset_group3", "subset_nogroup"));
+    result.put("subset_user_0", Sets.newHashSet("subset_group0", "subset_nogroup"));
+    result.put("subset_user_2", Sets.newHashSet("subset_group2", "subset_nogroup"));
+    result.put("subset_user_01", Sets.newHashSet("subset_group0", "subset_group1", "subset_nogroup", "subset_delete"));
+    result.put("subset_user_23", Sets.newHashSet("subset_group2", "subset_group3", "subset_nogroup"));
+    result.put("subset_user_0123", Sets.newHashSet("subset_group0", "subset_group1", "subset_group2", "subset_group3", "subset_nogroup", "subset_delete"));
+    result.put("subset_user_no", Sets.newHashSet("subset_nogroup"));
+
     return Collections.unmodifiableMap(result);
   }
 
diff --git a/sentry-tests/sentry-tests-solr/src/test/java/org/apache/sentry/tests/e2e/solr/TestDocLevelOperations.java b/sentry-tests/sentry-tests-solr/src/test/java/org/apache/sentry/tests/e2e/solr/TestDocLevelOperations.java
index 7834f33..99be4d3 100644
--- a/sentry-tests/sentry-tests-solr/src/test/java/org/apache/sentry/tests/e2e/solr/TestDocLevelOperations.java
+++ b/sentry-tests/sentry-tests-solr/src/test/java/org/apache/sentry/tests/e2e/solr/TestDocLevelOperations.java
@@ -361,15 +361,18 @@
 
     createDocsAndQuerySimple(collectionName, true);
 
-    // test deleteByQuery "*:*"
+    // test deleteByQuery "*:*" - we expect this to delete all docs, and it does.
     deleteByQueryTest(collectionName, "junit", "*:*", "doclevel", 0);
 
-    // test deleteByQuery non-*:*
+    // test deleteByQuery non-*:* - this proves that the junit can delete documents (sentry_auth:doclevel_role) that he can't see.
+    //                              We verify this with querying as doclevel (who can see all, and he now sees zero of these docs)
     deleteByQueryTest(collectionName, "junit", "sentry_auth:doclevel_role", "doclevel", 0);
 
-    // test deleting all documents by Id
+    // test deleting all documents by Id - in this case the junit user can delete all docs
     deleteByIdTest(collectionName);
 
+    // This is testing the expected behaviour that if we have been granted the ALL role then we can
+    // update docs that we can't see, even to the point that we make them visible.
     updateDocsTest(collectionName);
 
   }
diff --git a/sentry-tests/sentry-tests-solr/src/test/java/org/apache/sentry/tests/e2e/solr/TestSubsetQueryOperations.java b/sentry-tests/sentry-tests-solr/src/test/java/org/apache/sentry/tests/e2e/solr/TestSubsetQueryOperations.java
new file mode 100644
index 0000000..dff85e3
--- /dev/null
+++ b/sentry-tests/sentry-tests-solr/src/test/java/org/apache/sentry/tests/e2e/solr/TestSubsetQueryOperations.java
@@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.tests.e2e.solr;
+
+import com.google.common.collect.Sets;
+import org.apache.sentry.core.common.exception.SentryUserException;
+import org.apache.sentry.core.model.solr.SolrConstants;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.servlet.http.HttpServletResponse;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.sentry.tests.e2e.solr.TestSentryServer.ADMIN_USER;
+
+/**
+ * Test the document-level security features
+ */
+public class TestSubsetQueryOperations extends SolrSentryServiceTestBase {
+  private static final String AUTH_FIELD = "sentry_auth";
+  private static final String COUNTER_FIELD = "sentry_auth_count";
+  private static final int NUM_DOCS = 16;
+  private static final int NUM_AUTH_TOKENS = 4;
+  private static final String AUTH_TOKEN_PREFIX = "subset_role";
+  private static final String AUTH_GROUP_PREFIX = "subset_group";
+
+  @BeforeClass
+  public static void setupPermissions() throws SentryUserException {
+    sentryClient.createRole(ADMIN_USER, "junit_role", COMPONENT_SOLR);
+    sentryClient.createRole(ADMIN_USER, "doclevel_role", COMPONENT_SOLR);
+    sentryClient.grantRoleToGroups(ADMIN_USER, "junit_role", COMPONENT_SOLR,
+        Collections.singleton("junit"));
+    sentryClient.grantRoleToGroups(ADMIN_USER, "doclevel_role", COMPONENT_SOLR,
+        Collections.singleton("doclevel"));
+
+    // junit user
+    grantAdminPrivileges(ADMIN_USER, "junit_role", SolrConstants.ALL, SolrConstants.ALL);
+    grantCollectionPrivileges(ADMIN_USER, "junit_role", "docLevelCollection", SolrConstants.ALL);
+    grantCollectionPrivileges(ADMIN_USER, "junit_role", "allRolesCollection1", SolrConstants.ALL);
+    grantCollectionPrivileges(ADMIN_USER, "junit_role", "allRolesCollection2", SolrConstants.ALL);
+    grantCollectionPrivileges(ADMIN_USER, "junit_role", "testUpdateDeleteOperations", SolrConstants.ALL);
+    grantCollectionPrivileges(ADMIN_USER, "junit_role", "subsetCollection", SolrConstants.QUERY);
+
+    // docLevel user
+    grantCollectionPrivileges(ADMIN_USER, "doclevel_role", "docLevelCollection", SolrConstants.ALL);
+    grantCollectionPrivileges(ADMIN_USER, "doclevel_role", "testUpdateDeleteOperations", SolrConstants.ALL);
+
+    // admin user
+    grantCollectionPrivileges(ADMIN_USER, ADMIN_ROLE, SolrConstants.ALL, SolrConstants.ALL);
+
+    for (int i=0; i<NUM_AUTH_TOKENS; i++) {
+      String roleName = AUTH_TOKEN_PREFIX + i;
+      sentryClient.createRole(ADMIN_USER, roleName, COMPONENT_SOLR);
+      sentryClient.grantRoleToGroups(ADMIN_USER, roleName, COMPONENT_SOLR, Collections.singleton(AUTH_GROUP_PREFIX + i));
+      grantCollectionPrivileges(ADMIN_USER, roleName, "subsetCollection", SolrConstants.QUERY);
+      grantCollectionPrivileges(ADMIN_USER, roleName, "allRolesCollection1", SolrConstants.QUERY);
+      grantCollectionPrivileges(ADMIN_USER, roleName, "allRolesCollection2", SolrConstants.QUERY);
+      grantCollectionPrivileges(ADMIN_USER, roleName, "testUpdateDeleteOperations", SolrConstants.QUERY);
+      grantCollectionPrivileges(ADMIN_USER, roleName, "testIndexlevelDoclevelOperations", SolrConstants.QUERY);
+    }
+
+    sentryClient.createRole(ADMIN_USER, "subset_norole", COMPONENT_SOLR);
+    sentryClient.createRole(ADMIN_USER, "subset_delete", COMPONENT_SOLR);
+    sentryClient.grantRoleToGroups(ADMIN_USER, "subset_norole", COMPONENT_SOLR, Collections.singleton("subset_nogroup"));
+    sentryClient.grantRoleToGroups(ADMIN_USER, "subset_delete", COMPONENT_SOLR, Collections.singleton("subset_delete"));
+    grantCollectionPrivileges(ADMIN_USER, "subset_norole", "subsetCollection", SolrConstants.QUERY);
+    grantCollectionPrivileges(ADMIN_USER, "subset_norole", "allRolesCollection1", SolrConstants.QUERY);
+    grantCollectionPrivileges(ADMIN_USER, "subset_norole", "allRolesCollection2", SolrConstants.QUERY);
+    grantCollectionPrivileges(ADMIN_USER, "subset_norole", "testUpdateDeleteOperations", SolrConstants.QUERY);
+    grantCollectionPrivileges(ADMIN_USER, "subset_norole", "testIndexlevelDoclevelOperations", SolrConstants.QUERY);
+    grantCollectionPrivileges(ADMIN_USER, "subset_delete", "testUpdateDeleteOperations", SolrConstants.ALL);
+
+  }
+
+  @Before
+  public void resetAuthenticatedUser() {
+    setAuthenticationUser("admin");
+  }
+
+  private QueryRequest getRealTimeGetRequest() {
+    // real time get request
+    StringBuilder idsBuilder = new StringBuilder("0");
+    for (int i = 1; i < NUM_DOCS; ++i) {
+      idsBuilder.append("," + i);
+    }
+    return getRealTimeGetRequest(idsBuilder.toString());
+  }
+
+  @SuppressWarnings("serial")
+  private QueryRequest getRealTimeGetRequest(String ids) {
+    final ModifiableSolrParams idsParams = new ModifiableSolrParams();
+    idsParams.add("ids", ids);
+    return new QueryRequest() {
+      @Override
+      public String getPath() {
+        return "/get";
+      }
+
+      @Override
+      public SolrParams getParams() {
+        return idsParams;
+      }
+    };
+  }
+
+  /**
+   * Creates docs as follows and verifies queries work as expected:
+   * - creates NUM_DOCS documents, where the document id equals the order
+   *   it was created in, starting at 0
+   * - even-numbered documents get "junit_role" auth token
+   * - odd-numbered documents get "admin_role" auth token
+   * - all documents get some bogus auth tokens
+   * - all documents get a docLevel_role auth token
+   */
+  private void createDocsAndQuerySimple(String collectionName) throws Exception {
+
+    // ensure no current documents
+    verifyDeletedocsPass(ADMIN_USER, collectionName, true);
+
+    DocLevelGenerator generator = new DocLevelGenerator(collectionName, AUTH_FIELD);
+    generator.generateDocsForSubsetQueries(cluster.getSolrClient(), NUM_DOCS, NUM_AUTH_TOKENS, AUTH_TOKEN_PREFIX);
+
+    querySimple(collectionName, new QueryRequest(new SolrQuery("*:*")), cluster.getSolrClient());
+    querySimple(collectionName, getRealTimeGetRequest(), cluster.getSolrClient());
+  }
+
+  private void querySimple(String collectionName, QueryRequest request, CloudSolrClient client) throws Exception {
+    /*
+    subset_user_012  => subset_role0, subset_role1, subset_role2
+    subset_user_013  => subset_role0, subset_role1, subset_role3
+    subset_user_023  => subset_role0, subset_role2, subset_role3
+    subset_user_123  => subset_role1, subset_role2, subset_role3
+    subset_user_0    => subset_role0
+    subset_user_2    => subset_role2
+    subset_user_01   => subset_role0, subset_role1
+    subset_user_23   => subset_role2, subset_role3
+    subset_user_0123 => subset_role0, subset_role1, subset_role2, subset_role3
+    Note: All users have an extra role for good measure. This should not impact the results
+     */
+
+    // as junit  -- should only get docs with no labels as allowMissing=true
+
+
+    int expectedResultMultiplier = (int) (NUM_DOCS / (Math.pow(2, NUM_AUTH_TOKENS)));
+
+    Set<String> roleSet = Sets.newHashSet();
+    checkSimpleQueryResults(collectionName, request, client, "subset_user_no", expectedResultMultiplier, roleSet);
+
+
+    setAuthenticationUser("subset_user_no");
+    QueryResponse  rsp = request.process(client, collectionName);
+    SolrDocumentList docList = rsp.getResults();
+    assertEquals(expectedResultMultiplier, docList.getNumFound());
+    for (SolrDocument doc : docList) {
+      String id = doc.getFieldValue("id").toString();
+      assertEquals(0, Long.valueOf(id) % 16);
+      assertTrue(doc.getFieldValues(AUTH_FIELD) == null || doc.getFieldValues(AUTH_FIELD).isEmpty());
+    }
+
+    // as subset_user_1234 - should see all docs
+    roleSet = Sets.newHashSet("subset_role0", "subset_role1", "subset_role2", "subset_role3");
+    checkSimpleQueryResults(collectionName, request, client, "subset_user_0123", NUM_DOCS, roleSet);
+
+    roleSet = Sets.newHashSet("subset_role0", "subset_role1", "subset_role2");
+    checkSimpleQueryResults(collectionName, request, client, "subset_user_012", 8 * expectedResultMultiplier, roleSet);
+
+    roleSet = Sets.newHashSet("subset_role0", "subset_role1", "subset_role3");
+    checkSimpleQueryResults(collectionName, request, client, "subset_user_013", 8 * expectedResultMultiplier, roleSet);
+
+    roleSet = Sets.newHashSet("subset_role0", "subset_role2", "subset_role3");
+    checkSimpleQueryResults(collectionName, request, client, "subset_user_023", 8 * expectedResultMultiplier, roleSet);
+
+    roleSet = Sets.newHashSet("subset_role1", "subset_role2", "subset_role3");
+    checkSimpleQueryResults(collectionName, request, client, "subset_user_123", 8 * expectedResultMultiplier, roleSet);
+
+
+    roleSet = Sets.newHashSet("subset_role0");
+    checkSimpleQueryResults(collectionName, request, client, "subset_user_0", 2 * expectedResultMultiplier, roleSet);
+
+    roleSet = Sets.newHashSet("subset_role2");
+    checkSimpleQueryResults(collectionName, request, client, "subset_user_2", 2 * expectedResultMultiplier, roleSet);
+
+
+    roleSet = Sets.newHashSet("subset_role0", "subset_role1");
+    checkSimpleQueryResults(collectionName, request, client, "subset_user_01", 4 * expectedResultMultiplier, roleSet);
+
+    roleSet = Sets.newHashSet("subset_role2", "subset_role3");
+    checkSimpleQueryResults(collectionName, request, client, "subset_user_23", 4 * expectedResultMultiplier, roleSet);
+
+
+  }
+
+  private void checkSimpleQueryResults(String collectionName, QueryRequest request, CloudSolrClient client, String username, int expectedResults, Set<String> roles) throws Exception {
+    setAuthenticationUser(username);
+    QueryResponse rsp = request.process(client, collectionName);
+    SolrDocumentList docList = rsp.getResults();
+
+    assertEquals(expectedResults, docList.getNumFound());
+    for (SolrDocument doc : docList) {
+      Collection<Object> fieldValues = doc.getFieldValues(AUTH_FIELD);
+      if (fieldValues != null && !fieldValues.isEmpty()) {
+        assertTrue(roles.containsAll(fieldValues));
+      }
+    }
+  }
+
+  /**
+   * Test that queries from different users only return the documents they have access to.
+   */
+  @Test
+  public void testDocLevelOperations() throws Exception {
+    String collectionName = "subsetCollection";
+    createCollection(ADMIN_USER, collectionName, "cloud-minimal_subset_match", NUM_SERVERS, 1);
+
+    /*
+       Going to test using subset_user_01 - he should only be able to access 4 / 16th of the docs available
+       We're going try and break out and access docs with subset_role2.
+    */
+
+    String targetRole = "subset_role2";
+
+    createDocsAndQuerySimple(collectionName);
+    CloudSolrClient client = cluster.getSolrClient();
+
+    // test filter queries work as AND -- i.e. user can't avoid doc-level
+    // checks by prefixing their own filterQuery
+    setAuthenticationUser("subset_user_01");
+    String fq = URLEncoder.encode(" {!raw f=" + AUTH_FIELD + " v=" + targetRole + "}");
+    String path = "/" + collectionName + "/select?q=*:*&fq="+fq;
+    String retValue = makeHttpRequest(client, "GET", path, null, null, HttpServletResponse.SC_OK);
+    assertTrue(retValue.contains("numFound\":" + 0 + ",\""));
+
+    // test that user can't use a simple q
+    path = "/" + collectionName + "/select?q=" + AUTH_FIELD + ":" + targetRole + "&fq="+fq;
+    retValue = makeHttpRequest(client, "GET", path, null, null, HttpServletResponse.SC_OK);
+    assertTrue(retValue.contains("numFound\":" + 0 + ",\""));
+
+
+    // test that user can't inject an "OR" into the query
+    final String syntaxErrorMsg = "org.apache.solr.search.SyntaxError: Cannot parse";
+    fq = URLEncoder.encode(" {!raw f=" + AUTH_FIELD + " v=" + targetRole + "} OR ");
+    path = "/" + collectionName + "/select?q=*:*&fq="+fq;
+    retValue = makeHttpRequest(client, "GET", path, null, null, HttpServletResponse.SC_BAD_REQUEST);
+    assertTrue(retValue.contains(syntaxErrorMsg));
+
+    // same test, prefix OR this time
+    fq = URLEncoder.encode(" OR {!raw f=" + AUTH_FIELD + " v=" + targetRole + "}");
+    path = "/" + collectionName + "/select?q=*:*&fq="+fq;
+    retValue = makeHttpRequest(client, "GET", path, null, null, HttpServletResponse.SC_BAD_REQUEST);
+    assertTrue(retValue.contains(syntaxErrorMsg));
+
+  }
+
+  /**
+   * Test the allRolesToken.  Make it a keyword in the query language ("OR")
+   * to make sure it is treated literally rather than interpreted.
+   * Note: In the {@link org.apache.solr.handler.component.QueryDocAuthorizationComponent}, allRoles is a role that is automatically given to everyone.
+   * It is then added to the list of things that is ANDed together.
+   * So if a doc has ROLE1, ROLE2, ROLE3 and ROLE1 is the allRoles token, the user must have ROLE2 AND ROLE3
+   * Note: This test performs differently if the allow_missing_val is set (allow_missing_val works in a similar way to allRoles)
+   * i.e. If turned on, everyone can see a doc that has no values for the auth token.
+   */
+  @Test
+  public void testAllRolesToken() throws Exception {
+    testAllRolesAndMissingValues(true);
+  }
+
+  @Test
+  public void testAllRolesTokenWithMissingFalse() throws Exception {
+    testAllRolesAndMissingValues(false);
+  }
+
+
+
+  private void testAllRolesAndMissingValues(boolean allowMissingValues) throws Exception {
+    String collectionName;
+
+    if (allowMissingValues) {
+      collectionName = "allRolesCollection1";
+      createCollection(ADMIN_USER, collectionName, "cloud-minimal_subset_match", NUM_SERVERS, 1);
+    } else {
+      collectionName = "allRolesCollection2";
+      createCollection(ADMIN_USER, collectionName, "cloud-minimal_subset_match_missing_false", NUM_SERVERS, 1);
+    }
+
+    String allRolesToken = "OR";
+    /* Going to create:
+       4 with no auth tokens
+       5 with the junit role, 3 of which have all roles token
+       13 with the junit2 role, 7 of which have all roles token
+       17 with junit2 AND junit1, 2 of which have no roles
+       19 with just all roles token
+
+       Expected results:
+       junit user can see 19 + 5 + 4 = 28 docs when allow_missing_val is true, and 24 when not
+       admin user can see 19 + 4 = 23 roles when allow_missing_val is true, and 19 when not
+     */
+
+    String junitRole = "junit_role";
+    String junit2Role = "junit_role2";
+
+    int junit = 5;
+    int junitAllRoles = 3;
+    int junit2 = 13;
+    int junit2AllRoles = 7;
+    int allRolesOnly = 19;
+    int noRoles = 4;
+    int junit1Andjunit2 = 17;
+    int junit1Andjunit2AllRoles = 2;
+    int counter=0;
+
+    ArrayList<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
+
+    for (int i=0; i<junit; i++) {
+      Set<String> roles = Sets.newHashSet(junitRole);
+      if (i<junitAllRoles) {
+        roles.add(allRolesToken);
+      }
+      docs.add(createAllDocsTestDocument(counter++, roles));
+    }
+
+    for (int i=0; i<junit2; i++) {
+      Set<String> roles = Sets.newHashSet(junit2Role);
+      if (i<junit2AllRoles) {
+        roles.add(allRolesToken);
+      }
+      docs.add(createAllDocsTestDocument(counter++, roles));
+    }
+
+    for (int i=0; i<allRolesOnly; i++) {
+      Set<String> roles = Sets.newHashSet(allRolesToken);
+      docs.add(createAllDocsTestDocument(counter++, roles));
+    }
+
+    for (int i=0; i<noRoles; i++) {
+      Set<String> roles = Sets.newHashSet();
+      docs.add(createAllDocsTestDocument(counter++, roles));
+    }
+
+    for (int i=0; i<junit1Andjunit2; i++) {
+      Set<String> roles = Sets.newHashSet(junitRole, junit2Role);
+      if (i<junit1Andjunit2AllRoles) {
+        roles.add(allRolesToken);
+      }
+      docs.add(createAllDocsTestDocument(counter++, roles));
+    }
+
+    CloudSolrClient client = cluster.getSolrClient();
+
+    client.add(collectionName, docs);
+    client.commit(collectionName, true, true);
+
+    testAllRolesTokenQueries(collectionName, allowMissingValues, junit, allRolesOnly, noRoles, client);
+
+    //TODO SecureRealTimeGetRequest
+    //checkAllRolesToken(getRealTimeGetRequest(), server,
+    //     totalAllRolesAdded, totalOnlyAllRolesAdded, allRolesFactor, totalJunitAdded, junitFactor);
+
+
+
+  }
+
+  private void testAllRolesTokenQueries(String collectionName, boolean allowMissingValues, int junit, int allRolesOnly, int noRoles, CloudSolrClient client) throws Exception {
+    QueryRequest request = new QueryRequest(new SolrQuery("*:*"));
+
+    setAuthenticationUser("admin");
+    QueryResponse rsp = request.process(client, collectionName);
+    SolrDocumentList docList = rsp.getResults();
+
+    int expectedResults = allRolesOnly;
+    if (allowMissingValues) {
+      expectedResults += noRoles;
+    }
+
+    assertEquals(expectedResults, docList.getNumFound());
+
+    // as junit -- should get junit added + onlyAllRolesAdded
+    setAuthenticationUser("junit");
+    rsp = request.process(client, collectionName);
+    docList = rsp.getResults();
+
+    expectedResults = junit + allRolesOnly;
+    if (allowMissingValues) {
+      expectedResults += noRoles;
+    }
+
+    assertEquals("junit user, with allowMissingValues: " + allowMissingValues, expectedResults, docList.getNumFound());
+  }
+
+  private SolrInputDocument createAllDocsTestDocument(int id, Set<String> roles) {
+    SolrInputDocument doc = new SolrInputDocument();
+    String iStr = Long.toString(id);
+    doc.addField("id", iStr);
+    doc.addField("description", "description" + iStr);
+
+    for (String role : roles) {
+      doc.addField(AUTH_FIELD, role);
+    }
+    doc.addField(COUNTER_FIELD, roles.size());
+    return doc;
+  }
+
+
+
+  /**
+   * delete the docs as "deleteUser" using deleteByQuery "deleteQueryStr".
+   * Verify that number of docs returned for "queryUser" equals
+   * "expectedQueryDocs" after deletion.
+   */
+  private void deleteByQueryTest(String collectionName, String deleteUser,
+      String deleteByQueryStr, String queryUser, int expectedQueryDocs) throws Exception {
+    setAuthenticationUser(ADMIN_USER);
+    createDocsAndQuerySimple(collectionName);
+
+    // First check that the end result isn't yet true
+    setAuthenticationUser(queryUser);
+    assertFalse(expectedQueryDocs == checkDeleteByQuery(collectionName, new QueryRequest(new SolrQuery("*:*")), cluster.getSolrClient(), queryUser));
+
+    // Now delete the docs
+    setAuthenticationUser(deleteUser);
+    cluster.getSolrClient().deleteByQuery(collectionName, deleteByQueryStr);
+    cluster.getSolrClient().commit(collectionName);
+
+    // Now check that the end result is now true
+    assertEquals(expectedQueryDocs, checkDeleteByQuery(collectionName, new QueryRequest(new SolrQuery("*:*")), cluster.getSolrClient(), queryUser));
+    assertEquals(expectedQueryDocs, checkDeleteByQuery(collectionName, getRealTimeGetRequest(), cluster.getSolrClient(), queryUser));
+  }
+
+  private long checkDeleteByQuery(String collectionName, QueryRequest query, CloudSolrClient server,
+      String queryUser) throws Exception {
+
+    setAuthenticationUser(queryUser);
+    QueryResponse rsp =  query.process(server, collectionName);
+    long docLevelResults = rsp.getResults().getNumFound();
+    return docLevelResults;
+  }
+
+  private void deleteByIdTest(String collectionName) throws Exception {
+    createDocsAndQuerySimple(collectionName);
+    setAuthenticationUser("subset_user_01");
+    List<String> allIds = new ArrayList<String>(NUM_DOCS);
+    for (int i = 0; i < NUM_DOCS; ++i) {
+      allIds.add(Long.toString(i));
+    }
+    cluster.getSolrClient().deleteById(collectionName, allIds);
+    cluster.getSolrClient().commit(collectionName);
+
+    checkDeleteById(collectionName, new QueryRequest(new SolrQuery("*:*")), cluster.getSolrClient());
+    checkDeleteById(collectionName, getRealTimeGetRequest(), cluster.getSolrClient());
+
+  }
+
+  private void checkDeleteById(String collectionName, QueryRequest request, CloudSolrClient server)
+      throws Exception {
+    QueryResponse rsp = request.process(server, collectionName);
+    long junitResults = rsp.getResults().getNumFound();
+    assertEquals(0, junitResults);
+
+    setAuthenticationUser("subset_user_0123");
+    rsp =  request.process(server, collectionName);
+    long docLevelResults = rsp.getResults().getNumFound();
+    assertEquals(0, docLevelResults);
+  }
+
+  private void updateDocsTest(String collectionName) throws Exception {
+    createDocsAndQuerySimple(collectionName);
+    setAuthenticationUser("subset_user_01");
+    String docIdStr = Long.toString(4);
+
+    // verify we can't view one of the odd documents
+    QueryRequest query = new QueryRequest(new SolrQuery("id:"+docIdStr));
+    QueryRequest rtgQuery = getRealTimeGetRequest(docIdStr);
+    checkUpdateDocsQuery(collectionName, query, cluster.getSolrClient(), 0);
+    checkUpdateDocsQuery(collectionName, rtgQuery, cluster.getSolrClient(), 0);
+
+    // overwrite the document that we can't see
+    ArrayList<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
+    SolrInputDocument doc = new SolrInputDocument();
+    doc.addField("id", docIdStr);
+    doc.addField("description", "description" + docIdStr);
+    doc.addField(AUTH_FIELD, "subset_role1");
+    doc.addField(COUNTER_FIELD, 1);
+    docs.add(doc);
+    cluster.getSolrClient().add(collectionName, docs);
+    cluster.getSolrClient().commit(collectionName);
+
+    // verify we can now view the document
+    checkUpdateDocsQuery(collectionName, query, cluster.getSolrClient(), 1);
+    //checkUpdateDocsQuery(collectionName, rtgQuery, cluster.getSolrClient(), 1);
+
+  }
+
+  private void checkUpdateDocsQuery(String collectionName, QueryRequest request, CloudSolrClient server, int expectedDocs)
+      throws Exception {
+    QueryResponse rsp = request.process(server, collectionName);
+    assertEquals(expectedDocs, rsp.getResults().getNumFound());
+  }
+
+  @Test
+  public void testUpdateDeleteOperations() throws Exception {
+    String collectionName = "testUpdateDeleteOperations";
+    createCollection(ADMIN_USER, collectionName, "cloud-minimal_subset_match", NUM_SERVERS, 1);
+
+    createDocsAndQuerySimple(collectionName);
+
+    // test deleteByQuery "*:*"
+    deleteByQueryTest(collectionName, "subset_user_01", "*:*", "subset_user_0123", 0);
+
+    // test deleteByQuery non-*:* - There should have been 8 of these documents in the collection before, and zero after, therefore leaving 8 docs remaining
+    deleteByQueryTest(collectionName, "subset_user_01", AUTH_FIELD +":subset_role2", "subset_user_0123", 8);
+
+    // test deleting all documents by Id
+    deleteByIdTest(collectionName);
+
+    // Test that, by design, users who have been granted ALL on index-level perms can update documents, even if they can't see them
+    updateDocsTest(collectionName);
+
+  }
+
+  /**
+   * Test to validate doc level security on collections without perm for Index level auth.
+   * @throws Exception
+   */
+  @Test
+  public void indexDocAuthTests() throws Exception {
+    String collectionName = "testIndexlevelDoclevelOperations";
+    createCollection(ADMIN_USER, collectionName, "cloud-minimal_subset_match", NUM_SERVERS, 1);
+
+    createDocsAndQuerySimple(collectionName);
+
+    // test query for "*:*" fails as junit user (junit user doesn't have index level permissions but has doc level permissions set)
+    verifyQueryFail("junit", collectionName, ALL_DOCS);
+
+    // test query for "*:*" fails as docLevel user (docLevel user has neither index level permissions nor doc level permissions set)
+    verifyQueryFail("doclevel", collectionName, ALL_DOCS);
+  }
+}
diff --git a/sentry-tests/sentry-tests-solr/src/test/resources/log4j.properties b/sentry-tests/sentry-tests-solr/src/test/resources/log4j.properties
index d941816..f6a6e16 100644
--- a/sentry-tests/sentry-tests-solr/src/test/resources/log4j.properties
+++ b/sentry-tests/sentry-tests-solr/src/test/resources/log4j.properties
@@ -31,5 +31,6 @@
 
 log4j.logger.org.apache.hadoop.conf.Configuration=ERROR
 log4j.logger.org.apache.sentry=DEBUG
+log4j.logger.org.apache.solr.handler.component=DEBUG
 
 log4j.category.DataNucleus=ERROR
diff --git a/sentry-tests/sentry-tests-solr/src/test/resources/solr/configsets/cloud-minimal_subset_match/conf/schema.xml b/sentry-tests/sentry-tests-solr/src/test/resources/solr/configsets/cloud-minimal_subset_match/conf/schema.xml
new file mode 100644
index 0000000..13c1497
--- /dev/null
+++ b/sentry-tests/sentry-tests-solr/src/test/resources/solr/configsets/cloud-minimal_subset_match/conf/schema.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+  <fieldType name="string" class="solr.StrField"/>
+  <fieldType name="int" class="solr.TrieIntField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+  <fieldType name="long" class="solr.TrieLongField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+  <dynamicField name="*" type="string" indexed="true" stored="true"/>
+  <!-- for versioning -->
+  <field name="_version_" type="long" indexed="true" stored="true"/>
+  <field name="_root_" type="string" indexed="true" stored="true" multiValued="false" required="false"/>
+  <field name="id" type="string" indexed="true" stored="true"/>
+  <field name="sentry_auth" type="string" indexed="true" stored="true" docValues="true" multiValued="true"/>
+  <field name="sentry_auth_count" type="int" indexed="true" stored="true" multiValued="false" />
+  <uniqueKey>id</uniqueKey>
+</schema>
diff --git a/sentry-tests/sentry-tests-solr/src/test/resources/solr/configsets/cloud-minimal_subset_match/conf/solrconfig.xml b/sentry-tests/sentry-tests-solr/src/test/resources/solr/configsets/cloud-minimal_subset_match/conf/solrconfig.xml
new file mode 100644
index 0000000..d1e7ca4
--- /dev/null
+++ b/sentry-tests/sentry-tests-solr/src/test/resources/solr/configsets/cloud-minimal_subset_match/conf/solrconfig.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
+
+<config>
+
+  <dataDir>${solr.data.dir:}</dataDir>
+
+  <directoryFactory name="DirectoryFactory"
+                    class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+  <updateHandler class="solr.DirectUpdateHandler2">
+    <commitWithin>
+      <softCommit>${solr.commitwithin.softcommit:true}</softCommit>
+    </commitWithin>
+    <updateLog></updateLog>
+  </updateHandler>
+
+   <requestDispatcher handleSelect="false" >
+     <requestParsers enableRemoteStreaming="true"
+                     multipartUploadLimitInKB="2048000"
+                     formdataUploadLimitInKB="2048"
+                     addHttpRequestToContext="true"/>
+
+    <httpCaching never304="true" />
+  </requestDispatcher>
+
+  <requestHandler name="/select" class="solr.SearchHandler">
+    <lst name="defaults">
+      <str name="echoParams">explicit</str>
+      <str name="indent">true</str>
+      <str name="df">text</str>
+    </lst>
+    <arr name="first-components">
+      <str>queryDocAuthorization</str>
+    </arr>
+  </requestHandler>
+
+  <requestHandler name="/get" class="solr.RealTimeGetHandler">
+     <lst name="defaults">
+       <str name="omitHeader">true</str>
+       <str name="wt">json</str>
+       <str name="indent">true</str>
+     </lst>
+     <arr name="first-components">
+       <str>queryDocAuthorization</str>
+     </arr>
+  </requestHandler>
+
+  <queryParser name="subset" class="org.apache.solr.handler.component.SubsetQueryPlugin"/>
+
+  <searchComponent name="queryDocAuthorization" class="org.apache.solr.handler.component.QueryDocAuthorizationComponent" >
+    <str name="matchMode">CONJUNCTIVE</str>
+    <!-- Set to true to enabled document-level authorization -->
+    <bool name="enabled">true</bool>
+
+    <!-- Field where the auth tokens are stored in the document -->
+    <str name="sentryAuthField">sentry_auth</str>
+
+    <!-- Auth token defined to allow any role to access the document.
+         Uncomment to enable. -->
+    <str name="allRolesToken">OR</str>
+    <!-- Configure to permit access to documents that do not have a value for the sentryAuthField -->
+    <str name="allow_missing_val">true</str>
+    <str name="tokenCountField">sentry_auth_count</str>
+  </searchComponent>
+
+</config>
diff --git a/sentry-tests/sentry-tests-solr/src/test/resources/solr/configsets/cloud-minimal_subset_match_missing_false/conf/schema.xml b/sentry-tests/sentry-tests-solr/src/test/resources/solr/configsets/cloud-minimal_subset_match_missing_false/conf/schema.xml
new file mode 100644
index 0000000..f01bc2c
--- /dev/null
+++ b/sentry-tests/sentry-tests-solr/src/test/resources/solr/configsets/cloud-minimal_subset_match_missing_false/conf/schema.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+  <fieldType name="string" class="solr.StrField"/>
+  <fieldType name="int" class="solr.TrieIntField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+  <fieldType name="long" class="solr.TrieLongField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+  <dynamicField name="*" type="string" indexed="true" stored="true"/>
+  <!-- for versioning -->
+  <field name="_version_" type="long" indexed="true" stored="true"/>
+  <field name="_root_" type="string" indexed="true" stored="true" multiValued="false" required="false"/>
+  <field name="id" type="string" indexed="true" stored="true"/>
+  <field name="sentry_auth" type="string" indexed="true" stored="true" docValues="true" multiValued="true"/>
+  <field name="sentry_auth_count" type="int" indexed="true" stored="true" multiValued="false" />
+
+  <uniqueKey>id</uniqueKey>
+</schema>
diff --git a/sentry-tests/sentry-tests-solr/src/test/resources/solr/configsets/cloud-minimal_subset_match_missing_false/conf/solrconfig.xml b/sentry-tests/sentry-tests-solr/src/test/resources/solr/configsets/cloud-minimal_subset_match_missing_false/conf/solrconfig.xml
new file mode 100644
index 0000000..81fe32b
--- /dev/null
+++ b/sentry-tests/sentry-tests-solr/src/test/resources/solr/configsets/cloud-minimal_subset_match_missing_false/conf/solrconfig.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
+
+<config>
+
+  <dataDir>${solr.data.dir:}</dataDir>
+
+  <directoryFactory name="DirectoryFactory"
+                    class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+  <updateHandler class="solr.DirectUpdateHandler2">
+    <commitWithin>
+      <softCommit>${solr.commitwithin.softcommit:true}</softCommit>
+    </commitWithin>
+    <updateLog></updateLog>
+  </updateHandler>
+
+   <requestDispatcher handleSelect="false" >
+     <requestParsers enableRemoteStreaming="true"
+                     multipartUploadLimitInKB="2048000"
+                     formdataUploadLimitInKB="2048"
+                     addHttpRequestToContext="true"/>
+
+    <httpCaching never304="true" />
+  </requestDispatcher>
+
+  <requestHandler name="/select" class="solr.SearchHandler">
+    <lst name="defaults">
+      <str name="echoParams">explicit</str>
+      <str name="indent">true</str>
+      <str name="df">text</str>
+    </lst>
+    <arr name="first-components">
+      <str>queryDocAuthorization</str>
+    </arr>
+  </requestHandler>
+
+  <requestHandler name="/get" class="solr.RealTimeGetHandler">
+     <lst name="defaults">
+       <str name="omitHeader">true</str>
+       <str name="wt">json</str>
+       <str name="indent">true</str>
+     </lst>
+     <arr name="first-components">
+       <str>queryDocAuthorization</str>
+     </arr>
+  </requestHandler>
+
+  <queryParser name="subset" class="org.apache.solr.handler.component.SubsetQueryPlugin"/>
+
+  <searchComponent name="queryDocAuthorization" class="org.apache.solr.handler.component.QueryDocAuthorizationComponent" >
+    <str name="matchMode">CONJUNCTIVE</str>
+    <!-- Set to true to enabled document-level authorization -->
+    <bool name="enabled">true</bool>
+
+    <!-- Field where the auth tokens are stored in the document -->
+    <str name="sentryAuthField">sentry_auth</str>
+
+    <!-- Auth token defined to allow any role to access the document.
+         Uncomment to enable. -->
+    <str name="allRolesToken">OR</str>
+    <!-- Configure to permit access to documents that do not have a value for the sentryAuthField -->
+    <str name="allow_missing_val">false</str>
+    <str name="tokenCountField">sentry_auth_count</str>
+  </searchComponent>
+
+</config>