Fix wrong encoding in PredicateFilteredDimensionSelector.getRow (#11339)

diff --git a/processing/src/main/java/org/apache/druid/query/dimension/ListFilteredDimensionSpec.java b/processing/src/main/java/org/apache/druid/query/dimension/ListFilteredDimensionSpec.java
index 777483b..9c2f6a1 100644
--- a/processing/src/main/java/org/apache/druid/query/dimension/ListFilteredDimensionSpec.java
+++ b/processing/src/main/java/org/apache/druid/query/dimension/ListFilteredDimensionSpec.java
@@ -21,7 +21,6 @@
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
 import org.apache.druid.common.config.NullHandling;
@@ -30,8 +29,8 @@
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.IdLookup;
 
-import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
+import java.util.Objects;
 import java.util.Set;
 
 /**
@@ -88,7 +87,7 @@
   private DimensionSelector filterWhiteList(DimensionSelector selector)
   {
     final int selectorCardinality = selector.getValueCardinality();
-    if (selectorCardinality < 0 || (selector.idLookup() == null && !selector.nameLookupPossibleInAdvance())) {
+    if (selectorCardinality < 0 || !selector.nameLookupPossibleInAdvance()) {
       return new PredicateFilteredDimensionSelector(selector, Predicates.in(values));
     }
     final int maxPossibleFilteredCardinality = values.size();
@@ -122,14 +121,7 @@
     if (selectorCardinality < 0 || !selector.nameLookupPossibleInAdvance()) {
       return new PredicateFilteredDimensionSelector(
           selector,
-          new Predicate<String>()
-          {
-            @Override
-            public boolean apply(@Nullable String input)
-            {
-              return !values.contains(input);
-            }
-          }
+          input -> !values.contains(input)
       );
     }
     final int maxPossibleFilteredCardinality = selectorCardinality;
@@ -187,22 +179,16 @@
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-
     ListFilteredDimensionSpec that = (ListFilteredDimensionSpec) o;
-
-    if (isWhitelist != that.isWhitelist) {
-      return false;
-    }
-    return values.equals(that.values);
-
+    return Objects.equals(getDelegate(), that.getDelegate())
+           && isWhitelist == that.isWhitelist
+           && Objects.equals(values, that.values);
   }
 
   @Override
   public int hashCode()
   {
-    int result = values.hashCode();
-    result = 31 * result + (isWhitelist ? 1 : 0);
-    return result;
+    return Objects.hash(getDelegate(), values, isWhitelist);
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/query/dimension/PredicateFilteredDimensionSelector.java b/processing/src/main/java/org/apache/druid/query/dimension/PredicateFilteredDimensionSelector.java
index b607c9e..535442b 100644
--- a/processing/src/main/java/org/apache/druid/query/dimension/PredicateFilteredDimensionSelector.java
+++ b/processing/src/main/java/org/apache/druid/query/dimension/PredicateFilteredDimensionSelector.java
@@ -51,8 +51,9 @@
     row.ensureSize(baseRowSize);
     int resultSize = 0;
     for (int i = 0; i < baseRowSize; i++) {
-      if (predicate.apply(selector.lookupName(baseRow.get(i)))) {
-        row.setValue(resultSize, i);
+      int id = baseRow.get(i);
+      if (predicate.apply(selector.lookupName(id))) {
+        row.setValue(resultSize, id);
         resultSize++;
       }
     }
diff --git a/processing/src/test/java/org/apache/druid/query/dimension/ListFilteredDimensionSpecDimensionSelectorTest.java b/processing/src/test/java/org/apache/druid/query/dimension/ListFilteredDimensionSpecDimensionSelectorTest.java
new file mode 100644
index 0000000..b9befa2
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/dimension/ListFilteredDimensionSpecDimensionSelectorTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.dimension;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.function.Supplier;
+
+public class ListFilteredDimensionSpecDimensionSelectorTest extends InitializedNullHandlingTest
+{
+  private final ListFilteredDimensionSpec targetWithAllowList = new ListFilteredDimensionSpec(
+      new DefaultDimensionSpec("foo", "bar"),
+      ImmutableSet.of("val1_1", "val2_2", "val2_3"),
+      true
+  );
+  private final ListFilteredDimensionSpec targetWithDenyList = new ListFilteredDimensionSpec(
+      new DefaultDimensionSpec("foo", "bar"),
+      ImmutableSet.of("val1_1", "val2_2", "val2_3"),
+      false
+  );
+  private final List<List<String>> data = ImmutableList.of(
+      ImmutableList.of("val1_1", "val1_2"),
+      ImmutableList.of("val2_1", "val2_2", "val2_3"),
+      ImmutableList.of("val3_1")
+  );
+
+  @Test
+  public void testNullDimensionSelectorReturnNull()
+  {
+    Assert.assertNull(targetWithAllowList.decorate((DimensionSelector) null));
+    Assert.assertNull(targetWithDenyList.decorate((DimensionSelector) null));
+  }
+
+  @Test
+  public void testAllowListWhenDictionaryLookupIsAvailable()
+  {
+    testAllowList(false, true, true, ForwardingFilteredDimensionSelector.class);
+  }
+
+  @Test
+  public void testAllowListWhenIdLookupIsNotAvailable()
+  {
+    testAllowList(false, false, true, ForwardingFilteredDimensionSelector.class);
+  }
+
+  @Test
+  public void testAllowListWhenCardinalityIsUnknown()
+  {
+    testAllowList(true, true, true, PredicateFilteredDimensionSelector.class);
+  }
+
+  @Test
+  public void testAllowListWhenNameLookupIsNotPossibleInAdvance()
+  {
+    testAllowList(false, true, false, PredicateFilteredDimensionSelector.class);
+  }
+
+  @Test
+  public void testDenyListWhenDictionaryLookupIsAvailable()
+  {
+    testDenyList(false, true, true, ForwardingFilteredDimensionSelector.class);
+  }
+
+  @Test
+  public void testDenyListWhenIdLookupIsNotAvailable()
+  {
+    testDenyList(false, false, true, ForwardingFilteredDimensionSelector.class);
+  }
+
+  @Test
+  public void testDenyListWhenCardinalityIsUnknown()
+  {
+    testDenyList(true, true, true, PredicateFilteredDimensionSelector.class);
+  }
+
+  @Test
+  public void testDenyListWhenNameLookupIsNotPossibleInAdvance()
+  {
+    testDenyList(false, true, false, PredicateFilteredDimensionSelector.class);
+  }
+
+  private void testAllowList(
+      boolean unknownCardinality,
+      boolean validIdLookup,
+      boolean nameLookupPossibleInAdvance,
+      Class<? extends DimensionSelector> expectedDimensionSelectorClass
+  )
+  {
+    RowSupplier rowSupplier = new RowSupplier();
+    NonnullPair<Object2IntMap<String>, Int2ObjectMap<String>> dictionaries = createDictionaries(data);
+    DimensionSelector selector = targetWithAllowList.decorate(
+        new StringDimensionSelectorForTest(
+            rowSupplier,
+            dictionaries.lhs,
+            dictionaries.rhs,
+            unknownCardinality,
+            validIdLookup,
+            nameLookupPossibleInAdvance
+        )
+    );
+    Assert.assertSame(expectedDimensionSelectorClass, selector.getClass());
+    assertAllowListFiltering(rowSupplier, selector);
+  }
+
+  private void testDenyList(
+      boolean unknownCardinality,
+      boolean validIdLookup,
+      boolean nameLookupPossibleInAdvance,
+      Class<? extends DimensionSelector> expectedDimensionSelectorClass
+  )
+  {
+    RowSupplier rowSupplier = new RowSupplier();
+    NonnullPair<Object2IntMap<String>, Int2ObjectMap<String>> dictionaries = createDictionaries(data);
+    DimensionSelector selector = targetWithDenyList.decorate(
+        new StringDimensionSelectorForTest(
+            rowSupplier,
+            dictionaries.lhs,
+            dictionaries.rhs,
+            unknownCardinality,
+            validIdLookup,
+            nameLookupPossibleInAdvance
+        )
+    );
+    Assert.assertSame(expectedDimensionSelectorClass, selector.getClass());
+
+    assertDenyListFiltering(rowSupplier, selector);
+  }
+
+  private NonnullPair<Object2IntMap<String>, Int2ObjectMap<String>> createDictionaries(List<List<String>> values)
+  {
+    Object2IntMap<String> dictionary = new Object2IntOpenHashMap<>();
+    Int2ObjectMap<String> reverseDictionary = new Int2ObjectOpenHashMap<>();
+    MutableInt nextId = new MutableInt(0);
+    for (List<String> multiValue : values) {
+      for (String value : multiValue) {
+        int dictId = dictionary.computeIntIfAbsent(value, k -> nextId.getAndIncrement());
+        reverseDictionary.putIfAbsent(dictId, value);
+      }
+    }
+    return new NonnullPair<>(dictionary, reverseDictionary);
+  }
+
+  private void assertAllowListFiltering(RowSupplier rowSupplier, DimensionSelector selector)
+  {
+    rowSupplier.set(data.get(0));
+    IndexedInts encodedRow = selector.getRow();
+    Assert.assertEquals(1, encodedRow.size());
+    Assert.assertEquals("val1_1", selector.lookupName(encodedRow.get(0)));
+
+    rowSupplier.set(data.get(1));
+    encodedRow = selector.getRow();
+    Assert.assertEquals(2, encodedRow.size());
+    Assert.assertEquals("val2_2", selector.lookupName(encodedRow.get(0)));
+    Assert.assertEquals("val2_3", selector.lookupName(encodedRow.get(1)));
+
+    rowSupplier.set(data.get(2));
+    encodedRow = selector.getRow();
+    Assert.assertEquals(0, encodedRow.size());
+  }
+
+  private void assertDenyListFiltering(RowSupplier rowSupplier, DimensionSelector selector)
+  {
+    rowSupplier.set(data.get(0));
+    IndexedInts encodedRow = selector.getRow();
+    Assert.assertEquals(1, encodedRow.size());
+    Assert.assertEquals("val1_2", selector.lookupName(encodedRow.get(0)));
+
+    rowSupplier.set(data.get(1));
+    encodedRow = selector.getRow();
+    Assert.assertEquals(1, encodedRow.size());
+    Assert.assertEquals("val2_1", selector.lookupName(encodedRow.get(0)));
+
+    rowSupplier.set(data.get(2));
+    encodedRow = selector.getRow();
+    Assert.assertEquals(1, encodedRow.size());
+    Assert.assertEquals("val3_1", selector.lookupName(encodedRow.get(0)));
+  }
+
+  private static class RowSupplier implements Supplier<List<String>>
+  {
+    private List<String> row;
+
+    public void set(List<String> row)
+    {
+      this.row = row;
+    }
+
+    @Override
+    public List<String> get()
+    {
+      return row;
+    }
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/dimension/ListFilteredDimensionSpecTest.java b/processing/src/test/java/org/apache/druid/query/dimension/ListFilteredDimensionSpecTest.java
index 636da05..7a7ee4d 100644
--- a/processing/src/test/java/org/apache/druid/query/dimension/ListFilteredDimensionSpecTest.java
+++ b/processing/src/test/java/org/apache/druid/query/dimension/ListFilteredDimensionSpecTest.java
@@ -21,6 +21,7 @@
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableSet;
+import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.data.IndexedInts;
@@ -189,4 +190,10 @@
     Assert.assertEquals(0, selector.idLookup().lookupId("a"));
     Assert.assertEquals(24, selector.idLookup().lookupId("z"));
   }
+
+  @Test
+  public void testEquals()
+  {
+    EqualsVerifier.forClass(ListFilteredDimensionSpec.class).withNonnullFields("values").usingGetClass().verify();
+  }
 }
diff --git a/processing/src/test/java/org/apache/druid/query/dimension/StringDimensionSelectorForTest.java b/processing/src/test/java/org/apache/druid/query/dimension/StringDimensionSelectorForTest.java
new file mode 100644
index 0000000..141ac65
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/dimension/StringDimensionSelectorForTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.dimension;
+
+import com.google.common.base.Predicate;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.AbstractDimensionSelector;
+import org.apache.druid.segment.DimensionDictionarySelector;
+import org.apache.druid.segment.DimensionSelectorUtils;
+import org.apache.druid.segment.IdLookup;
+import org.apache.druid.segment.data.ArrayBasedIndexedInts;
+import org.apache.druid.segment.data.IndexedInts;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.function.Supplier;
+
+public class StringDimensionSelectorForTest extends AbstractDimensionSelector
+{
+  private final Supplier<List<String>> rowSupplier;
+  private final boolean unknownCardinality;
+  private final boolean validIdLookup;
+  private final boolean nameLookupPossibleInAdvance;
+  private final Object2IntMap<String> dictionary;
+  private final Int2ObjectMap<String> reverseDictionary;
+
+  private final ArrayBasedIndexedInts currentRow = new ArrayBasedIndexedInts();
+
+  public StringDimensionSelectorForTest(
+      Supplier<List<String>> rowSupplier,
+      Object2IntMap<String> dictionary,
+      Int2ObjectMap<String> reverseDictionary,
+      boolean unknownCardinality,
+      boolean validIdLookup,
+      boolean nameLookupPossibleInAdvance
+  )
+  {
+    this.rowSupplier = rowSupplier;
+    this.unknownCardinality = unknownCardinality;
+    this.validIdLookup = validIdLookup;
+    this.nameLookupPossibleInAdvance = nameLookupPossibleInAdvance;
+    this.dictionary = dictionary;
+    this.reverseDictionary = reverseDictionary;
+  }
+
+  @Override
+  public int getValueCardinality()
+  {
+    return unknownCardinality ? DimensionDictionarySelector.CARDINALITY_UNKNOWN : dictionary.size();
+  }
+
+  @Nullable
+  @Override
+  public String lookupName(int id)
+  {
+    return reverseDictionary.get(id);
+  }
+
+  @Override
+  public boolean nameLookupPossibleInAdvance()
+  {
+    return nameLookupPossibleInAdvance;
+  }
+
+  @Nullable
+  @Override
+  public IdLookup idLookup()
+  {
+    return validIdLookup ? dictionary::getInt : null;
+  }
+
+  @Override
+  public IndexedInts getRow()
+  {
+    List<String> multiValues = rowSupplier.get();
+    currentRow.ensureSize(multiValues.size());
+    for (int i = 0; i < multiValues.size(); i++) {
+      currentRow.setValue(i, dictionary.getInt(multiValues.get(i)));
+    }
+    currentRow.setSize(multiValues.size());
+    return currentRow;
+  }
+
+  @Override
+  public ValueMatcher makeValueMatcher(@Nullable String value)
+  {
+    return DimensionSelectorUtils.makeValueMatcherGeneric(this, value);
+  }
+
+  @Override
+  public ValueMatcher makeValueMatcher(Predicate<String> predicate)
+  {
+    return DimensionSelectorUtils.makeValueMatcherGeneric(this, predicate);
+  }
+
+  @Override
+  public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+  {
+  }
+
+  @Override
+  public Class<?> classOfObject()
+  {
+    return Object.class;
+  }
+}