fix serde issues with time-min-max extension (#11146)

* fix serde issues with time-min-max extension

* fix pom dependencies
diff --git a/extensions-contrib/time-min-max/pom.xml b/extensions-contrib/time-min-max/pom.xml
index 07742ce..ff87ef2 100644
--- a/extensions-contrib/time-min-max/pom.xml
+++ b/extensions-contrib/time-min-max/pom.xml
@@ -55,6 +55,16 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
       <scope>provided</scope>
@@ -70,11 +80,6 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
       <groupId>it.unimi.dsi</groupId>
       <artifactId>fastutil</artifactId>
       <scope>provided</scope>
@@ -90,6 +95,11 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>nl.jqno.equalsverifier</groupId>
+      <artifactId>equalsverifier</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.druid</groupId>
       <artifactId>druid-core</artifactId>
       <version>${project.parent.version}</version>
diff --git a/extensions-contrib/time-min-max/src/main/java/org/apache/druid/query/aggregation/TimestampAggregator.java b/extensions-contrib/time-min-max/src/main/java/org/apache/druid/query/aggregation/TimestampAggregator.java
index c11b50d..d1e8dde 100644
--- a/extensions-contrib/time-min-max/src/main/java/org/apache/druid/query/aggregation/TimestampAggregator.java
+++ b/extensions-contrib/time-min-max/src/main/java/org/apache/druid/query/aggregation/TimestampAggregator.java
@@ -38,7 +38,6 @@
   }
 
   private final BaseObjectColumnValueSelector selector;
-  private final String name;
   private final TimestampSpec timestampSpec;
   private final Comparator<Long> comparator;
   private final Long initValue;
@@ -46,14 +45,12 @@
   private long most;
 
   public TimestampAggregator(
-      String name,
       BaseObjectColumnValueSelector selector,
       TimestampSpec timestampSpec,
       Comparator<Long> comparator,
       Long initValue
   )
   {
-    this.name = name;
     this.selector = selector;
     this.timestampSpec = timestampSpec;
     this.comparator = comparator;
diff --git a/extensions-contrib/time-min-max/src/main/java/org/apache/druid/query/aggregation/TimestampAggregatorFactory.java b/extensions-contrib/time-min-max/src/main/java/org/apache/druid/query/aggregation/TimestampAggregatorFactory.java
index 21f7924..dca89e2 100644
--- a/extensions-contrib/time-min-max/src/main/java/org/apache/druid/query/aggregation/TimestampAggregatorFactory.java
+++ b/extensions-contrib/time-min-max/src/main/java/org/apache/druid/query/aggregation/TimestampAggregatorFactory.java
@@ -22,24 +22,25 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.cache.CacheKeyBuilder;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.column.ValueType;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
-import java.nio.ByteBuffer;
 import java.sql.Timestamp;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
 
-public class TimestampAggregatorFactory extends AggregatorFactory
+public abstract class TimestampAggregatorFactory extends AggregatorFactory
 {
   final String name;
+  @Nullable
   final String fieldName;
+  @Nullable
   final String timeFormat;
   private final Comparator<Long> comparator;
   private final Long initValue;
@@ -48,8 +49,8 @@
 
   TimestampAggregatorFactory(
       String name,
-      String fieldName,
-      String timeFormat,
+      @Nullable String fieldName,
+      @Nullable String timeFormat,
       Comparator<Long> comparator,
       Long initValue
   )
@@ -66,13 +67,23 @@
   @Override
   public Aggregator factorize(ColumnSelectorFactory metricFactory)
   {
-    return new TimestampAggregator(name, metricFactory.makeColumnValueSelector(fieldName), timestampSpec, comparator, initValue);
+    return new TimestampAggregator(
+        metricFactory.makeColumnValueSelector(timestampSpec.getTimestampColumn()),
+        timestampSpec,
+        comparator,
+        initValue
+    );
   }
 
   @Override
   public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
   {
-    return new TimestampBufferAggregator(metricFactory.makeColumnValueSelector(fieldName), timestampSpec, comparator, initValue);
+    return new TimestampBufferAggregator(
+        metricFactory.makeColumnValueSelector(timestampSpec.getTimestampColumn()),
+        timestampSpec,
+        comparator,
+        initValue
+    );
   }
 
   @Override
@@ -130,12 +141,6 @@
   }
 
   @Override
-  public AggregatorFactory getCombiningFactory()
-  {
-    return new TimestampAggregatorFactory(name, name, timeFormat, comparator, initValue);
-  }
-
-  @Override
   public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
   {
     if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
@@ -146,14 +151,6 @@
   }
 
   @Override
-  public List<AggregatorFactory> getRequiredColumns()
-  {
-    return Collections.singletonList(
-        new TimestampAggregatorFactory(fieldName, fieldName, timeFormat, comparator, initValue)
-    );
-  }
-
-  @Override
   public Object deserialize(Object object)
   {
     return object;
@@ -173,12 +170,14 @@
     return name;
   }
 
+  @Nullable
   @JsonProperty
   public String getFieldName()
   {
     return fieldName;
   }
 
+  @Nullable
   @JsonProperty
   public String getTimeFormat()
   {
@@ -188,16 +187,15 @@
   @Override
   public List<String> requiredFields()
   {
-    return Collections.singletonList(fieldName);
+    return Collections.singletonList(timestampSpec.getTimestampColumn());
   }
 
   @Override
   public byte[] getCacheKey()
   {
-    byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
-
-    return ByteBuffer.allocate(1 + fieldNameBytes.length)
-                     .put(AggregatorUtil.TIMESTAMP_CACHE_TYPE_ID).put(fieldNameBytes).array();
+    return new CacheKeyBuilder(AggregatorUtil.TIMESTAMP_CACHE_TYPE_ID).appendString(timestampSpec.getTimestampColumn())
+                                                                      .appendString(timestampSpec.getTimestampFormat())
+                                                                      .build();
   }
 
   @Override
@@ -221,43 +219,6 @@
     return Long.BYTES;
   }
 
-  @Override
-  public boolean equals(Object o)
-  {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    TimestampAggregatorFactory that = (TimestampAggregatorFactory) o;
-
-    if (!Objects.equals(fieldName, that.fieldName)) {
-      return false;
-    }
-    if (!Objects.equals(name, that.name)) {
-      return false;
-    }
-    if (!Objects.equals(comparator, that.comparator)) {
-      return false;
-    }
-    if (!Objects.equals(initValue, that.initValue)) {
-      return false;
-    }
-
-    return true;
-  }
-
-  @Override
-  public int hashCode()
-  {
-    int result = fieldName != null ? fieldName.hashCode() : 0;
-    result = 31 * result + (name != null ? name.hashCode() : 0);
-    result = 31 * result + (comparator != null ? comparator.hashCode() : 0);
-    result = 31 * result + (initValue != null ? initValue.hashCode() : 0);
-    return result;
-  }
 
   @Nullable
   static Long convertLong(TimestampSpec timestampSpec, Object input)
@@ -274,4 +235,26 @@
 
     return null;
   }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    TimestampAggregatorFactory that = (TimestampAggregatorFactory) o;
+    return name.equals(that.name) && Objects.equals(fieldName, that.fieldName) && Objects.equals(
+        timeFormat,
+        that.timeFormat
+    ) && comparator.equals(that.comparator) && initValue.equals(that.initValue);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(name, fieldName, timeFormat, comparator, initValue);
+  }
 }
diff --git a/extensions-contrib/time-min-max/src/main/java/org/apache/druid/query/aggregation/TimestampMaxAggregatorFactory.java b/extensions-contrib/time-min-max/src/main/java/org/apache/druid/query/aggregation/TimestampMaxAggregatorFactory.java
index 4514c28..fea1c85 100644
--- a/extensions-contrib/time-min-max/src/main/java/org/apache/druid/query/aggregation/TimestampMaxAggregatorFactory.java
+++ b/extensions-contrib/time-min-max/src/main/java/org/apache/druid/query/aggregation/TimestampMaxAggregatorFactory.java
@@ -22,28 +22,46 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Ordering;
 
+import javax.annotation.Nullable;
+import java.util.List;
+
 public class TimestampMaxAggregatorFactory extends TimestampAggregatorFactory
 {
   @JsonCreator
   public TimestampMaxAggregatorFactory(
       @JsonProperty("name") String name,
-      @JsonProperty("fieldName") String fieldName,
-      @JsonProperty("timeFormat") String timeFormat
+      @JsonProperty("fieldName") @Nullable String fieldName,
+      @JsonProperty("timeFormat") @Nullable String timeFormat
   )
   {
     super(name, fieldName, timeFormat, Ordering.natural(), Long.MIN_VALUE);
     Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
-    Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
+  }
+
+  @Override
+  public AggregatorFactory getCombiningFactory()
+  {
+    return new TimestampMaxAggregatorFactory(name, name, timeFormat);
+  }
+
+  @Override
+  public List<AggregatorFactory> getRequiredColumns()
+  {
+    return ImmutableList.of(
+        new TimestampMaxAggregatorFactory(name, fieldName, timeFormat)
+    );
   }
 
   @Override
   public String toString()
   {
     return "TimestampMaxAggregatorFactory{" +
-        "fieldName='" + fieldName + '\'' +
-        ", name='" + name + '\'' +
-        '}';
+           "name='" + name + '\'' +
+           ", fieldName='" + fieldName + '\'' +
+           ", timeFormat='" + timeFormat + '\'' +
+           '}';
   }
 }
diff --git a/extensions-contrib/time-min-max/src/main/java/org/apache/druid/query/aggregation/TimestampMinAggregatorFactory.java b/extensions-contrib/time-min-max/src/main/java/org/apache/druid/query/aggregation/TimestampMinAggregatorFactory.java
index 6e4a0ea..79cb1ca 100644
--- a/extensions-contrib/time-min-max/src/main/java/org/apache/druid/query/aggregation/TimestampMinAggregatorFactory.java
+++ b/extensions-contrib/time-min-max/src/main/java/org/apache/druid/query/aggregation/TimestampMinAggregatorFactory.java
@@ -22,8 +22,11 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Ordering;
 
+import java.util.List;
+
 public class TimestampMinAggregatorFactory extends TimestampAggregatorFactory
 {
   @JsonCreator
@@ -35,15 +38,29 @@
   {
     super(name, fieldName, timeFormat, Ordering.natural().reverse(), Long.MAX_VALUE);
     Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
-    Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
+  }
+
+  @Override
+  public AggregatorFactory getCombiningFactory()
+  {
+    return new TimestampMinAggregatorFactory(name, name, timeFormat);
+  }
+
+  @Override
+  public List<AggregatorFactory> getRequiredColumns()
+  {
+    return ImmutableList.of(
+        new TimestampMinAggregatorFactory(name, fieldName, timeFormat)
+    );
   }
 
   @Override
   public String toString()
   {
     return "TimestampMinAggregatorFactory{" +
-        "fieldName='" + fieldName + '\'' +
-        ", name='" + name + '\'' +
-        '}';
+           "name='" + name + '\'' +
+           ", fieldName='" + fieldName + '\'' +
+           ", timeFormat='" + timeFormat + '\'' +
+           '}';
   }
 }
diff --git a/extensions-contrib/time-min-max/src/test/java/org/apache/druid/query/aggregation/TimestampMinMaxAggregatorFactoryTest.java b/extensions-contrib/time-min-max/src/test/java/org/apache/druid/query/aggregation/TimestampMinMaxAggregatorFactoryTest.java
index c8424c3..4b87f80 100644
--- a/extensions-contrib/time-min-max/src/test/java/org/apache/druid/query/aggregation/TimestampMinMaxAggregatorFactoryTest.java
+++ b/extensions-contrib/time-min-max/src/test/java/org/apache/druid/query/aggregation/TimestampMinMaxAggregatorFactoryTest.java
@@ -19,12 +19,16 @@
 
 package org.apache.druid.query.aggregation;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
 import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
 import org.apache.druid.query.timeseries.TimeseriesQuery;
 import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
+import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.column.ValueType;
 import org.junit.Assert;
@@ -32,6 +36,54 @@
 
 public class TimestampMinMaxAggregatorFactoryTest
 {
+  private static final ObjectMapper JSON_MAPPER = TestHelper.makeJsonMapper();
+
+  @Test
+  public void testSerde() throws JsonProcessingException
+  {
+    TimestampMaxAggregatorFactory maxAgg = new TimestampMaxAggregatorFactory("timeMax", "__time", null);
+    TimestampMinAggregatorFactory minAgg = new TimestampMinAggregatorFactory("timeMin", "__time", null);
+
+    Assert.assertEquals(
+        maxAgg,
+        JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsString(maxAgg), TimestampMaxAggregatorFactory.class)
+    );
+    Assert.assertEquals(
+        maxAgg.getCombiningFactory(),
+        JSON_MAPPER.readValue(
+            JSON_MAPPER.writeValueAsString(maxAgg.getCombiningFactory()),
+            TimestampMaxAggregatorFactory.class
+        )
+    );
+
+    Assert.assertEquals(
+        minAgg,
+        JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsString(minAgg), TimestampMinAggregatorFactory.class)
+    );
+    Assert.assertEquals(
+        minAgg.getCombiningFactory(),
+        JSON_MAPPER.readValue(
+            JSON_MAPPER.writeValueAsString(minAgg.getCombiningFactory()),
+            TimestampMinAggregatorFactory.class
+        )
+    );
+  }
+
+  @Test
+  public void testEqualsAndHashcode()
+  {
+    EqualsVerifier.forClass(TimestampMinAggregatorFactory.class)
+                  .withNonnullFields("name", "comparator", "initValue")
+                  .withIgnoredFields("timestampSpec")
+                  .usingGetClass()
+                  .verify();
+    EqualsVerifier.forClass(TimestampMaxAggregatorFactory.class)
+                  .withNonnullFields("name", "comparator", "initValue")
+                  .withIgnoredFields("timestampSpec")
+                  .usingGetClass()
+                  .verify();
+  }
+
   @Test
   public void testResultArraySignature()
   {