NIFIREG-253 Updated KerberosIdentityProvider to use the "Default Realm" property

  Implemented prioritized handling of appending the default realm
    A realm-qualified principal will not be modified before authentication
    A principal shortname will have Default Realm appended to it when it is not blank before authentication
    A principal shortname will not be modified if Default Realm is blank, and the underlying kerberos implementation will append the default_realm configured in krb5.conf
In nifi-registry-security-utils
  added KerberosPrincipalParser for determining the realm of a kerberos principal
  added tests for KerberosPrincipalParser
  updated pom with spock-core as a test dependency

This closes #172.

Signed-off-by: Kevin Doran <kdoran@apache.org>
diff --git a/nifi-registry-core/nifi-registry-security-utils/pom.xml b/nifi-registry-core/nifi-registry-security-utils/pom.xml
index 952f769..c60ca0e 100644
--- a/nifi-registry-core/nifi-registry-security-utils/pom.xml
+++ b/nifi-registry-core/nifi-registry-security-utils/pom.xml
@@ -37,6 +37,12 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.spockframework</groupId>
+            <artifactId>spock-core</artifactId>
+            <version>1.0-groovy-2.4</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/nifi-registry-core/nifi-registry-security-utils/src/main/java/org/apache/nifi/registry/security/util/kerberos/KerberosPrincipalParser.java b/nifi-registry-core/nifi-registry-security-utils/src/main/java/org/apache/nifi/registry/security/util/kerberos/KerberosPrincipalParser.java
new file mode 100644
index 0000000..22328e9
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-security-utils/src/main/java/org/apache/nifi/registry/security/util/kerberos/KerberosPrincipalParser.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.security.util.kerberos;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class KerberosPrincipalParser {
+
+    /**
+     * <p>Determines the realm specified in the given kerberos principal.
+     *
+     * <p>The content of the given {@code principal} after the occurrence
+     * of the last non-escaped realm delimiter ("@") will be considered
+     * the realm of the principal.
+     *
+     * <p>The validity of the given {@code principal} and the determined realm
+     * is not be verified by this method.
+     *
+     * @param principal the principal for which the realm will be determined
+     * @return the realm of the given principal
+     */
+    public static String getRealm(String principal) {
+        if (StringUtils.isBlank(principal)) {
+            throw new IllegalArgumentException("principal can not be null or empty");
+        }
+
+        char previousChar = 0;
+        int realmDelimiterIndex = -1;
+        char currentChar;
+        boolean realmDelimiterFound = false;
+        int principalLength = principal.length();
+
+        // find the last non-escaped occurrence of the realm delimiter
+        for (int i = 0; i < principalLength; ++i) {
+            currentChar = principal.charAt(i);
+            if (currentChar == '@' && previousChar != '\\' ) {
+                realmDelimiterIndex = i;
+                realmDelimiterFound = true;
+            }
+            previousChar = currentChar;
+        }
+
+        String principalAfterLastRealmDelimiter = principal.substring(realmDelimiterIndex + 1);
+        return realmDelimiterFound && realmDelimiterIndex + 1 < principalLength ? principalAfterLastRealmDelimiter : null;
+    }
+}
diff --git a/nifi-registry-core/nifi-registry-security-utils/src/test/groovy/org/apache/nifi/registry/security/util/kerberos/KerberosPrincipalParserSpec.groovy b/nifi-registry-core/nifi-registry-security-utils/src/test/groovy/org/apache/nifi/registry/security/util/kerberos/KerberosPrincipalParserSpec.groovy
new file mode 100644
index 0000000..e96307b
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-security-utils/src/test/groovy/org/apache/nifi/registry/security/util/kerberos/KerberosPrincipalParserSpec.groovy
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.security.util.kerberos
+
+import spock.lang.Specification
+import spock.lang.Unroll
+
+class KerberosPrincipalParserSpec extends Specification {
+
+    @Unroll
+    def "Verify parsed realm from '#testPrincipal' == '#expectedRealm'"() {
+        expect:
+        KerberosPrincipalParser.getRealm(testPrincipal) == expectedRealm
+
+        where:
+        testPrincipal                     || expectedRealm
+        "user"                            || null
+        "user@"                           || null
+        "user@EXAMPLE.COM"                || "EXAMPLE.COM"
+        "user@name@EXAMPLE.COM"           || "EXAMPLE.COM"
+        "user\\@"                         || null
+        "user\\@name"                     || null
+        "user\\@name@EXAMPLE.COM"         || "EXAMPLE.COM"
+        "user@EXAMPLE.COM\\@"             || "EXAMPLE.COM\\@"
+        "user@@name@\\@@\\@"              || "\\@"
+        "user@@name@\\@@\\@@EXAMPLE.COM"  || "EXAMPLE.COM"
+        "user@@name@\\@@\\@@EXAMPLE.COM@" || null
+        "user\\@\\@name@EXAMPLE.COM"      || "EXAMPLE.COM"
+    }
+}
diff --git a/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/security/authentication/kerberos/KerberosIdentityProvider.java b/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/security/authentication/kerberos/KerberosIdentityProvider.java
index 5e6e7bb..ee55c02 100644
--- a/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/security/authentication/kerberos/KerberosIdentityProvider.java
+++ b/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/security/authentication/kerberos/KerberosIdentityProvider.java
@@ -25,6 +25,7 @@
 import org.apache.nifi.registry.security.authentication.exception.InvalidCredentialsException;
 import org.apache.nifi.registry.security.exception.SecurityProviderCreationException;
 import org.apache.nifi.registry.security.exception.SecurityProviderDestructionException;
+import org.apache.nifi.registry.security.util.kerberos.KerberosPrincipalParser;
 import org.apache.nifi.registry.util.FormatUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +45,7 @@
 
     private KerberosAuthenticationProvider provider;
 
+    private String defaultRealm;
     private long expiration;
 
     @Override
@@ -65,6 +67,11 @@
                     String.format("The Expiration Duration '%s' is not a valid time duration", rawExpiration));
         }
 
+        defaultRealm = configurationContext.getProperty("Default Realm");
+        if (StringUtils.isNotBlank(defaultRealm) && defaultRealm.contains("@")) {
+            throw new SecurityProviderCreationException(String.format("The Default Realm '%s' must not contain \"@\"", defaultRealm));
+        }
+
         provider = new KerberosAuthenticationProvider();
         SunJaasKerberosClient client = new SunJaasKerberosClient();
         client.setDebug(enableDebug);
@@ -80,24 +87,47 @@
             throw new IdentityAccessException("The Kerberos authentication provider is not initialized.");
         }
 
+
         try {
-            // perform the authentication
-            final String username = authenticationRequest.getUsername();
+            final String rawPrincipal = authenticationRequest.getUsername();
             final Object credentials = authenticationRequest.getCredentials();
-            final String password = credentials != null && credentials instanceof String ? (String) credentials : null;
+            final String parsedRealm = KerberosPrincipalParser.getRealm(rawPrincipal);
+
+            // Apply default realm from KerberosIdentityProvider's configuration specified in identity-providers.xml if a principal without a realm was given
+            // Otherwise, the default realm configured from the krb5 configuration specified in the nifi.registry.kerberos.krb5.file property will end up being used
+            boolean realmInRawPrincipal = StringUtils.isNotBlank(parsedRealm);
+            final String identity;
+            if (realmInRawPrincipal) {
+                // there's a realm already in the given principal, use it
+                identity = rawPrincipal;
+                logger.debug("Realm was specified in principal {}, default realm was not added to the identity being authenticated", rawPrincipal);
+            } else if (StringUtils.isNotBlank(defaultRealm)) {
+                // the value for the default realm is not blank, append the realm to the given principal
+                identity = StringUtils.joinWith("@", rawPrincipal, defaultRealm);
+                logger.debug("Realm was not specified in principal {}, default realm {} was added to the identity being authenticated", rawPrincipal, defaultRealm);
+            } else {
+                // otherwise, use the given principal, which will use the default realm as specified in the krb5 configuration
+                identity = rawPrincipal;
+                logger.debug("Realm was not specified in principal {}, default realm is blank and was not added to the identity being authenticated", rawPrincipal);
+            }
 
             // perform the authentication
-            final UsernamePasswordAuthenticationToken token = new UsernamePasswordAuthenticationToken(username, credentials);
-            logger.debug("Created authentication token " + token.toString());
+            final UsernamePasswordAuthenticationToken token = new UsernamePasswordAuthenticationToken(identity, credentials);
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("Created authentication token " + token.toString());
+            }
 
             final Authentication authentication = provider.authenticate(token);
-            logger.debug("Ran provider.authenticate(token) and returned authentication for " +
-                    "principal={} with name={} and isAuthenticated={}",
-                    authentication.getPrincipal(),
-                    authentication.getName(),
-                    authentication.isAuthenticated());
+            if (logger.isDebugEnabled()) {
+                logger.debug("Ran provider.authenticate(token) and returned authentication for " +
+                                "principal={} with name={} and isAuthenticated={}",
+                        authentication.getPrincipal(),
+                        authentication.getName(),
+                        authentication.isAuthenticated());
+            }
 
-            return new AuthenticationResponse(authentication.getName(), username, expiration, issuer);
+            return new AuthenticationResponse(authentication.getName(), identity, expiration, issuer);
         } catch (final AuthenticationException e) {
             throw new InvalidCredentialsException(e.getMessage(), e);
         }