SOLR-13619: Forwarded requests with Kerberos should carry forward the original user principal
diff --git a/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java b/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
index 5995f2b..fb658da 100644
--- a/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
+++ b/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
@@ -31,6 +31,7 @@
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.handler.component.ResponseBuilder;
 import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.servlet.SolrDispatchFilter;
 import org.apache.solr.util.TimeZoneUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,6 +47,7 @@
   protected TimeZone tz;
   protected ResponseBuilder rb;
   protected List<Closeable> closeHooks;
+  protected SolrDispatchFilter.Action action;
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -86,11 +88,21 @@
     this.req = req;
     this.rsp = rsp;    
   }
-  public SolrRequestInfo(HttpServletRequest  httpReq, SolrQueryResponse rsp) {
+  public SolrRequestInfo(SolrQueryRequest req, SolrQueryResponse rsp, SolrDispatchFilter.Action action) {
+    this(req, rsp);
+    this.setAction(action);
+  }
+
+  public SolrRequestInfo(HttpServletRequest httpReq, SolrQueryResponse rsp) {
     this.httpRequest = httpReq;
     this.rsp = rsp;
   }
 
+  public SolrRequestInfo(HttpServletRequest httpReq, SolrQueryResponse rsp, SolrDispatchFilter.Action action) {
+    this(httpReq, rsp);
+    this.action = action;
+  }
+
   public Principal getUserPrincipal() {
     if (req != null) return req.getUserPrincipal();
     if (httpRequest != null) return httpRequest.getUserPrincipal();
@@ -149,6 +161,14 @@
     }
   }
 
+  public SolrDispatchFilter.Action getAction() {
+    return action;
+  }
+
+  public void setAction(SolrDispatchFilter.Action action) {
+    this.action = action;
+  }
+
   public static ExecutorUtil.InheritableThreadLocalProvider getInheritableThreadLocalProvider() {
     return new ExecutorUtil.InheritableThreadLocalProvider() {
       @Override
diff --git a/solr/core/src/java/org/apache/solr/security/KerberosPlugin.java b/solr/core/src/java/org/apache/solr/security/KerberosPlugin.java
index 8bc5625..870b25e 100644
--- a/solr/core/src/java/org/apache/solr/security/KerberosPlugin.java
+++ b/solr/core/src/java/org/apache/solr/security/KerberosPlugin.java
@@ -33,6 +33,8 @@
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.collections.iterators.IteratorEnumeration;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationHandler;
+import org.apache.http.HttpRequest;
+import org.apache.http.protocol.HttpContext;
 import org.apache.solr.client.solrj.impl.Http2SolrClient;
 import org.apache.solr.client.solrj.impl.Krb5HttpClientBuilder;
 import org.apache.solr.client.solrj.impl.SolrHttpClientBuilder;
@@ -41,6 +43,9 @@
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.SecurityAwareZkACLProvider;
 import org.apache.solr.core.CoreContainer;
+import org.apache.solr.request.SolrRequestInfo;
+import org.apache.solr.servlet.HttpSolrCall;
+import org.apache.solr.servlet.SolrDispatchFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -236,6 +241,20 @@
   }
 
   @Override
+  protected boolean interceptInternodeRequest(HttpRequest httpRequest, HttpContext httpContext) {
+    SolrRequestInfo info = SolrRequestInfo.getRequestInfo();
+    if (info != null && (info.getAction() == SolrDispatchFilter.Action.FORWARD ||
+        info.getAction() == SolrDispatchFilter.Action.REMOTEQUERY)) {
+      if (info.getUserPrincipal() != null) {
+        log.info("Setting original user principal: {}", info.getUserPrincipal().getName());
+        httpRequest.setHeader(HttpSolrCall.ORIGINAL_USER_PRINCIPAL_HEADER, info.getUserPrincipal().getName());
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
   public SolrHttpClientBuilder getHttpClientBuilder(SolrHttpClientBuilder builder) {
     return kerberosBuilder.getBuilder(builder);
   }
diff --git a/solr/core/src/java/org/apache/solr/security/RuleBasedAuthorizationPlugin.java b/solr/core/src/java/org/apache/solr/security/RuleBasedAuthorizationPlugin.java
index 8b4a65c..850517d 100644
--- a/solr/core/src/java/org/apache/solr/security/RuleBasedAuthorizationPlugin.java
+++ b/solr/core/src/java/org/apache/solr/security/RuleBasedAuthorizationPlugin.java
@@ -30,6 +30,7 @@
 import org.apache.solr.common.SpecProvider;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.common.util.ValidatingJsonMap;
+import org.apache.solr.servlet.HttpSolrCall;
 import org.apache.solr.common.util.CommandOperation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -111,7 +112,7 @@
 
   private MatchStatus checkPathPerm(List<Permission> permissions, AuthorizationContext context) {
     if (permissions == null || permissions.isEmpty()) return MatchStatus.NO_PERMISSIONS_FOUND;
-    Principal principal = context.getUserPrincipal();
+    Principal principal = getUserPrincipal(context);
 
     final Permission governingPermission = findFirstGoverningPermission(permissions, context);
     if (governingPermission == null) {
@@ -122,6 +123,49 @@
     return determineIfPermissionPermitsPrincipal(principal, governingPermission);
   }
 
+  private Principal getUserPrincipal(AuthorizationContext context) {
+    Principal principal = context.getUserPrincipal();
+
+    // If principal is an admin user, i.e. has ALL permissions (e.g. request coming from Solr
+    // node), and "originalUserPrincipal" is specified, then set originalUserPrincipal
+    // as the principal. This is typically the case in forwarded/remote requests
+    // through KerberosPlugin. This is needed because the original node that received
+    // this request did not perform any authorization, and hence we are the first ones
+    // to authorize the request (and we need the original user principal to do so).
+    if (context.getHttpHeader("originalUserPrincipal") != null) {
+      Set<String> roles = usersVsRoles.get(principal.getName());
+      if (roles != null) {
+        for (String role: roles) {
+          if (mapping.get(null) == null) continue;
+          List<Permission> permissions = mapping.get(null).get(null);
+          if (permissions != null) {
+            for (Permission p: permissions) {
+              if (PermissionNameProvider.Name.ALL.equals(p.wellknownName) && p.role.contains(role)) {
+                // The role for the principal has ALL permissions
+                String originalUserRequest = context.getHttpHeader(HttpSolrCall.ORIGINAL_USER_PRINCIPAL_HEADER);
+                log.info("Principal: " + principal + ", original user principal: " + originalUserRequest);
+                principal = new Principal() {
+                  @Override
+                  public String getName() {
+                    return originalUserRequest;
+                  }
+
+                  @Override
+                  public String toString() {
+                    return originalUserRequest;
+                  }
+                };
+                return principal;
+              }
+            }
+          }
+        }
+      }
+    }
+
+    return principal;
+  }
+
   private Permission findFirstGoverningPermission(List<Permission> permissions, AuthorizationContext context) {
     for (int i = 0; i < permissions.size(); i++) {
       Permission permission = permissions.get(i);
diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
index 2ed00f4..546a195 100644
--- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
@@ -137,6 +137,8 @@
 public class HttpSolrCall {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  public static final String ORIGINAL_USER_PRINCIPAL_HEADER = "originalUserPrincipal";
+
   public static final Random random;
   static {
     // We try to make things reproducible in the context of our tests by initializing the random instance
@@ -414,10 +416,12 @@
           if (path.equals(req.getServletPath())) {
             // avoid endless loop - pass through to Restlet via webapp
             action = PASSTHROUGH;
+            SolrRequestInfo.getRequestInfo().setAction(action);
             return;
           } else {
             // forward rewritten URI (without path prefix and core/collection name) to Restlet
             action = FORWARD;
+            SolrRequestInfo.getRequestInfo().setAction(action);
             return;
           }
         }
@@ -542,7 +546,7 @@
           handleAdminRequest();
           return RETURN;
         case REMOTEQUERY:
-          SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, new SolrQueryResponse()));
+          SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, new SolrQueryResponse(), action));
           remoteQuery(coreUrl + path, resp);
           return RETURN;
         case PROCESS:
@@ -558,7 +562,7 @@
                * QueryResponseWriter is selected and we get the correct
                * Content-Type)
                */
-            SolrRequestInfo.setRequestInfo(new SolrRequestInfo(solrReq, solrRsp));
+            SolrRequestInfo.setRequestInfo(new SolrRequestInfo(solrReq, solrRsp, action));
             execute(solrRsp);
             if (shouldAudit()) {
               EventType eventType = solrRsp.getException() == null ? EventType.COMPLETED : EventType.ERROR;