Support dictionary lookup for CassandraPasswordValidator

patch by Stefan Miklosovic; reviewed by Dinesh Joshi, Francisco Guerrero for CASSANDRA-19762
diff --git a/CHANGES.txt b/CHANGES.txt
index aa37acb..72d6e8f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Support dictionary lookup for CassandraPasswordValidator (CASSANDRA-19762)
  * Disallow denylisting keys in system_cluster_metadata (CASSANDRA-19713)
  * Fix gossip status after replacement (CASSANDRA-19712)
  * Ignore repair requests for system_cluster_metadata (CASSANDRA-19711)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index e49e9db..c5740f3 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -2200,6 +2200,10 @@
 #  # numerical (form '34567'), or US qwerty (form 'asdfg') as well as sequencies from supported character sets.
 #  # The minimum value for this property is 3, by default it is set to 5.
 #  illegal_sequence_length: 5
+#  # Dictionary to check the passwords against. Defaults to no dictionary.
+#  # Whole dictionary is cached into memory. Use with caution with relatively big dictionaries.
+#  # Entries in a dictionary, one per line, have to be sorted per String's compareTo contract.
+#  #dictionary: /path/to/dictionary/file
 #  # If set to true, a user will be informed what policies a suggested password is missing in order to be valid.
 #  # Defaults to true.
 #  detailed_messages: true
diff --git a/src/java/org/apache/cassandra/db/guardrails/CassandraPasswordConfiguration.java b/src/java/org/apache/cassandra/db/guardrails/CassandraPasswordConfiguration.java
index ed7939b..01a585a 100644
--- a/src/java/org/apache/cassandra/db/guardrails/CassandraPasswordConfiguration.java
+++ b/src/java/org/apache/cassandra/db/guardrails/CassandraPasswordConfiguration.java
@@ -21,6 +21,7 @@
 import java.util.Arrays;
 
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.File;
 import org.passay.IllegalSequenceRule;
 
 import static java.lang.String.format;
@@ -71,6 +72,7 @@
     public static final String SPECIAL_FAIL_KEY = "special_fail";
 
     public static final String ILLEGAL_SEQUENCE_LENGTH_KEY = "illegal_sequence_length";
+    public static final String DICTIONARY_KEY = "dictionary";
 
     public static final String DETAILED_MESSAGES_KEY = "detailed_messages";
 
@@ -93,7 +95,9 @@
     protected final int specialsWarn;
     protected final int specialsFail;
 
+    // various
     protected final int illegalSequenceLength;
+    protected final String dictionary;
 
     public boolean detailedMessages;
 
@@ -115,6 +119,7 @@
         config.put(SPECIAL_FAIL_KEY, specialsFail);
         config.put(ILLEGAL_SEQUENCE_LENGTH_KEY, illegalSequenceLength);
         config.put(DETAILED_MESSAGES_KEY, detailedMessages);
+        config.put(DICTIONARY_KEY, dictionary);
 
         return config;
     }
@@ -141,6 +146,7 @@
         specialsFail = config.resolveInteger(SPECIAL_FAIL_KEY, DEFAULT_SPECIAL_FAIL);
 
         illegalSequenceLength = config.resolveInteger(ILLEGAL_SEQUENCE_LENGTH_KEY, DEFAULT_ILLEGAL_SEQUENCE_LENGTH);
+        dictionary = config.resolveString(DICTIONARY_KEY);
         detailedMessages = config.resolveBoolean(DETAILED_MESSAGES_KEY, true);
 
         validateParameters();
@@ -258,6 +264,16 @@
                                                     minimumLenghtOfFailCharacteristics,
                                                     LENGTH_FAIL_KEY,
                                                     lengthFail));
+
+        if (dictionary != null)
+        {
+            File dictionaryFile = new File(dictionary);
+            if (!dictionaryFile.exists())
+                throw new ConfigurationException(format("Dictionary file %s does not exist.", dictionary));
+
+            if (!dictionaryFile.isReadable())
+                throw new ConfigurationException(format("Dictionary file %s is not readable.", dictionary));
+        }
     }
 
     private ConfigurationException getValidationException(String key1, int value1, String key2, int value2)
diff --git a/src/java/org/apache/cassandra/db/guardrails/CassandraPasswordGenerator.java b/src/java/org/apache/cassandra/db/guardrails/CassandraPasswordGenerator.java
index 4c07f7a..87ba5d5 100644
--- a/src/java/org/apache/cassandra/db/guardrails/CassandraPasswordGenerator.java
+++ b/src/java/org/apache/cassandra/db/guardrails/CassandraPasswordGenerator.java
@@ -59,11 +59,16 @@
         if (size > configuration.maxLength)
             throw new ConfigurationException("Unable to generate a password of length " + size);
 
+        boolean dictionaryAware = validator instanceof PasswordDictionaryAware;
+
         for (int i = 0; i < maxPasswordGenerationAttempts; i++)
         {
             String generatedPassword = passwordGenerator.generatePassword(size, characterRules);
             if (validator.shouldWarn(generatedPassword, false).isEmpty())
-                return generatedPassword;
+            {
+                if (!dictionaryAware || ((PasswordDictionaryAware<?>) validator).foundInDictionary(generatedPassword).isValid())
+                    return generatedPassword;
+            }
         }
 
         throw new ConfigurationException("It was not possible to generate a valid password " +
diff --git a/src/java/org/apache/cassandra/db/guardrails/CassandraPasswordValidator.java b/src/java/org/apache/cassandra/db/guardrails/CassandraPasswordValidator.java
index dc5f350..e0d0291 100644
--- a/src/java/org/apache/cassandra/db/guardrails/CassandraPasswordValidator.java
+++ b/src/java/org/apache/cassandra/db/guardrails/CassandraPasswordValidator.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.db.guardrails;
 
+import java.io.IOException;
+import java.io.RandomAccessFile; // checkstyle: permit this import
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -38,6 +40,7 @@
 import org.passay.CyrillicSequenceData;
 import org.passay.CzechCharacterData;
 import org.passay.CzechSequenceData;
+import org.passay.DictionaryRule;
 import org.passay.EnglishCharacterData;
 import org.passay.EnglishSequenceData;
 import org.passay.GermanCharacterData;
@@ -54,6 +57,8 @@
 import org.passay.RuleResultMetadata;
 import org.passay.SequenceData;
 import org.passay.WhitespaceRule;
+import org.passay.dictionary.FileWordList;
+import org.passay.dictionary.WordListDictionary;
 
 import static java.util.Optional.empty;
 import static org.passay.EnglishCharacterData.Digit;
@@ -129,6 +134,10 @@
  *    <li>tests if a password does not violate a warning threshold, if it does, warning is emitted</li>
  * </ol>
  * <p>
+ * Passwords will be searched against a dictionary if "dictionary" configuration property is specified pointing to
+ * a dictionary to read passwords from. This dictionary is all cached in a memory. Dictionary lookup is done
+ * only upon {@link #shouldFail(String, boolean)} method call.
+ * <p>
  *
  * @see CharacterCharacteristicsRule
  * @see CassandraPasswordConfiguration
@@ -138,13 +147,16 @@
  * @see ValueValidator#shouldWarn(Object, boolean)
  * @see ValueValidator#shouldFail(Object, boolean)
  */
-public class CassandraPasswordValidator extends ValueValidator<String>
+public class CassandraPasswordValidator extends ValueValidator<String> implements PasswordDictionaryAware<CassandraPasswordConfiguration>
 {
+    private static final RuleResult VALID = new RuleResult(true);
+
     protected final PasswordValidator warnValidator;
     protected final PasswordValidator failValidator;
 
     protected final CassandraPasswordConfiguration configuration;
     private final UnsupportedCharsetRule unsupportedCharsetRule = new UnsupportedCharsetRule();
+    private final DictionaryRule dictionaryRule;
     private final boolean provideDetailedMessages;
 
     public CassandraPasswordValidator(CustomGuardrailConfig config)
@@ -153,6 +165,8 @@
         configuration = new CassandraPasswordConfiguration(config);
         provideDetailedMessages = configuration.detailedMessages;
 
+        dictionaryRule = initializeDictionaryRule(configuration);
+
         warnValidator = new PasswordValidator(getRules(configuration.lengthWarn,
                                                        configuration.maxLength,
                                                        configuration.characteristicsWarn,
@@ -204,6 +218,12 @@
         }
         else
         {
+            if (!toWarn && configuration.dictionary != null) // for shouldFail
+            {
+                RuleResult result = foundInDictionary(passwordData);
+                if (!result.isValid())
+                    return Optional.of(getValidationMessage(calledBySuperUser, validator, false, result));
+            }
             RuleResult result = validator.validate(passwordData);
             return result.isValid() ? empty() : Optional.of(getValidationMessage(calledBySuperUser, validator, toWarn, result));
         }
@@ -296,6 +316,24 @@
         }
     }
 
+    @Override
+    public RuleResult foundInDictionary(String password)
+    {
+        if (dictionaryRule == null)
+            return VALID;
+
+        return dictionaryRule.validate(new PasswordData(password));
+    }
+
+    @Override
+    public RuleResult foundInDictionary(PasswordData passwordData)
+    {
+        if (dictionaryRule == null)
+            return VALID;
+
+        return dictionaryRule.validate(passwordData);
+    }
+
     protected static class CustomLowerCaseCharacterData implements CharacterData
     {
         @Override
@@ -436,4 +474,33 @@
                         "0123456789").toCharArray();
         }
     }
+
+    @Override
+    public DictionaryRule initializeDictionaryRule(CassandraPasswordConfiguration configuration)
+    {
+        if (configuration.dictionary == null)
+            return null;
+
+        try
+        {
+            RandomAccessFile raf = new RandomAccessFile(configuration.dictionary, "r");
+            FileWordList fileWordList = new FileWordList(raf, true, 100);
+            WordListDictionary wordListDictionary = new WordListDictionary(fileWordList);
+            return new DictionaryRule(wordListDictionary);
+        }
+        catch (IllegalArgumentException ex)
+        {
+            // improve message a little bit
+            if ("File is not sorted correctly for this comparator".equals(ex.getMessage()))
+                throw new ConfigurationException("Dictionary file " + configuration.dictionary + " is not correctly " +
+                                                 "sorted for case-sensitive comparator according to String's " +
+                                                 "compareTo contract.");
+            else
+                throw new ConfigurationException(ex.getMessage());
+        }
+        catch (IOException ex)
+        {
+            throw new ConfigurationException(ex.getMessage());
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/guardrails/PasswordDictionaryAware.java b/src/java/org/apache/cassandra/db/guardrails/PasswordDictionaryAware.java
new file mode 100644
index 0000000..5f7f209
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/guardrails/PasswordDictionaryAware.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.guardrails;
+
+import org.passay.DictionaryRule;
+import org.passay.PasswordData;
+import org.passay.RuleResult;
+
+/**
+ * Tells whether a validator, like e.g. {@link CassandraPasswordValidator} is supporting checking
+ * the values (passwords) against a dictionary. It is up to implementator of a validator to call
+ * {@link #initializeDictionaryRule(CassandraPasswordConfiguration)} in an implementation of a validator
+ * to decide how {@link DictionaryRule} should be constructed.
+ */
+public interface PasswordDictionaryAware<T extends CassandraPasswordConfiguration>
+{
+    /**
+     * Performs lookup of a password in a dictionary.
+     *
+     * @param password password to check if it is in a dictionary or not
+     * @return result of operation
+     */
+    RuleResult foundInDictionary(String password);
+
+    /**
+     * Performs lookup of a password in a dictionary.
+     *
+     * @param passwordData password data to check if it is in a dictionary or not
+     * @return result of operation
+     */
+    RuleResult foundInDictionary(PasswordData passwordData);
+
+    /**
+     * Initializes dictionary rule for a validator.
+     *
+     * @param configuration configuration to use to initialize dictionary rule
+     * @return initialized dictionary rule
+     */
+    DictionaryRule initializeDictionaryRule(T configuration);
+}
diff --git a/test/resources/passwordDictionary.txt b/test/resources/passwordDictionary.txt
new file mode 100644
index 0000000..8da4180
--- /dev/null
+++ b/test/resources/passwordDictionary.txt
@@ -0,0 +1 @@
+thisIsSOmePasswOrdInADictionary
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/db/guardrails/CassandraPasswordValidatorTest.java b/test/unit/org/apache/cassandra/db/guardrails/CassandraPasswordValidatorTest.java
index d0604c5..ddb9c42 100644
--- a/test/unit/org/apache/cassandra/db/guardrails/CassandraPasswordValidatorTest.java
+++ b/test/unit/org/apache/cassandra/db/guardrails/CassandraPasswordValidatorTest.java
@@ -27,14 +27,17 @@
 
 import org.apache.cassandra.db.guardrails.ValueValidator.ValidationViolation;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.File;
 import org.passay.IllegalSequenceRule;
 
 import static java.lang.Boolean.FALSE;
 import static java.lang.Boolean.TRUE;
 import static java.lang.String.format;
+import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.CHARACTERISTIC_FAIL_KEY;
+import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.CHARACTERISTIC_WARN_KEY;
 import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.DEFAULT_CHARACTERISTIC_FAIL;
-import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.DEFAULT_ILLEGAL_SEQUENCE_LENGTH;
 import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.DEFAULT_CHARACTERISTIC_WARN;
+import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.DEFAULT_ILLEGAL_SEQUENCE_LENGTH;
 import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.DEFAULT_LENGTH_FAIL;
 import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.DEFAULT_LENGTH_WARN;
 import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.DEFAULT_LOWER_CASE_FAIL;
@@ -43,17 +46,16 @@
 import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.DEFAULT_SPECIAL_WARN;
 import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.DEFAULT_UPPER_CASE_FAIL;
 import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.DEFAULT_UPPER_CASE_WARN;
-import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.ILLEGAL_SEQUENCE_LENGTH_KEY;
-import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.MAX_CHARACTERISTICS;
-import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.MAX_LENGTH_KEY;
-import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.CHARACTERISTIC_FAIL_KEY;
-import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.CHARACTERISTIC_WARN_KEY;
+import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.DICTIONARY_KEY;
 import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.DIGIT_FAIL_KEY;
 import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.DIGIT_WARN_KEY;
+import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.ILLEGAL_SEQUENCE_LENGTH_KEY;
 import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.LENGTH_FAIL_KEY;
 import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.LENGTH_WARN_KEY;
 import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.LOWER_CASE_FAIL_KEY;
 import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.LOWER_CASE_WARN_KEY;
+import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.MAX_CHARACTERISTICS;
+import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.MAX_LENGTH_KEY;
 import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.SPECIAL_FAIL_KEY;
 import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.SPECIAL_WARN_KEY;
 import static org.apache.cassandra.db.guardrails.CassandraPasswordConfiguration.UPPER_CASE_FAIL_KEY;
@@ -309,6 +311,40 @@
         }
     }
 
+    @Test
+    public void testDictionary()
+    {
+        CustomGuardrailConfig config = new CustomGuardrailConfig()
+        {{
+            put(DICTIONARY_KEY, new File("test/resources/passwordDictionary.txt").absolutePath());
+        }};
+        CassandraPasswordValidator validator = new CassandraPasswordValidator(config);
+
+        Optional<ValidationViolation> maybeViolation = validator.shouldFail("thisIsSOmePasswOrdInADictionary",
+                                                                            false);
+        assertTrue(maybeViolation.isPresent());
+        assertEquals("[ILLEGAL_WORD]", maybeViolation.get().redactedMessage);
+
+        validator = new CassandraPasswordValidator(new CustomGuardrailConfig());
+        maybeViolation = validator.shouldFail("thisIsSOmePasswOrdInADictionary", false);
+        assertTrue(maybeViolation.isPresent());
+        assertEquals("[INSUFFICIENT_DIGIT, INSUFFICIENT_CHARACTERISTICS, INSUFFICIENT_SPECIAL]",
+                     maybeViolation.get().redactedMessage);
+    }
+
+    @Test
+    public void testMissingDictionary()
+    {
+        CustomGuardrailConfig config = new CustomGuardrailConfig()
+        {{
+            put(DICTIONARY_KEY, new File("this/file/does/not/exist").absolutePath());
+        }};
+
+        assertThatThrownBy(() -> new CassandraPasswordValidator(config))
+        .isInstanceOf(ConfigurationException.class)
+        .hasMessageContaining("does not exist");
+    }
+
     private void validateWithConfig(Supplier<Map<String, Object>> configSupplier, String expectedMessage)
     {
         CustomGuardrailConfig customConfig = new CustomGuardrailConfig();