/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ambari.metrics.core.timeline;


import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_APP_ID;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES;
import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
import static org.easymock.EasyMock.anyBoolean;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createNiceMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.powermock.api.easymock.PowerMock.mockStatic;
import static org.powermock.api.easymock.PowerMock.replayAll;

@RunWith(PowerMockRunner.class)
@PrepareForTest(TimelineMetricConfiguration.class)

@PowerMockIgnore("javax.management.*")
public class TimelineMetricsIgniteCacheTest {
  private static TimelineMetricsIgniteCache timelineMetricsIgniteCache;
  @BeforeClass
  public static void setupConf() throws Exception {
    TimelineMetricConfiguration conf = new TimelineMetricConfiguration(new
      Configuration(), new Configuration());
    mockStatic(TimelineMetricConfiguration.class);
    expect(TimelineMetricConfiguration.getInstance()).andReturn(conf).anyTimes();
    conf.getMetricsConf().set(CLUSTER_AGGREGATOR_APP_IDS, "appIdForHostsAggr");
    conf.getMetricsConf().set(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES, "localhost");
    replayAll();

    timelineMetricsIgniteCache = new TimelineMetricsIgniteCache();
  }

  @Test
  public void putEvictMetricsFromCacheSlicesMerging() throws Exception {
    long cacheSliceIntervalMillis = 30000L;

    TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
    expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class), anyBoolean())).andReturn(new byte[16]).once();
    replay(metricMetadataManagerMock);

    long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), cacheSliceIntervalMillis);

    long seconds = 1000;
    TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
    /*

    0        +30s      +60s
    |         |         |
     (1)(2)(3) (4)(5)(6)  h1

    */
    // Case 1 : data points are distributed equally, no values are lost, single host.
    metricValues.put(startTime + 4*seconds, 1.0);
    metricValues.put(startTime + 14*seconds, 2.0);
    metricValues.put(startTime + 24*seconds, 3.0);
    metricValues.put(startTime + 34*seconds, 4.0);
    metricValues.put(startTime + 44*seconds, 5.0);
    metricValues.put(startTime + 54*seconds, 6.0);

    TimelineMetric timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
    timelineMetric.setStartTime(metricValues.firstKey());
    timelineMetric.addMetricValues(metricValues);

    Collection<TimelineMetric> timelineMetrics = new ArrayList<>();
    timelineMetrics.add(timelineMetric);
    timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);

    Assert.assertEquals(aggregateMap.size(), 2);
    TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(),
      timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 30*seconds);

    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
    Assert.assertEquals(2.0, aggregateMap.get(timelineClusterMetric).getSum());

    timelineClusterMetric.setTimestamp(startTime + 2*30*seconds);
    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
    Assert.assertEquals(5.0, aggregateMap.get(timelineClusterMetric).getSum());

    metricValues.clear();
    timelineMetrics.clear();

    /*

    0        +30s      +60s
    |         |         |
     (1)(2)(3) (4)(5)(6)   h1, h2

    */
    // Case 2 : data points are distributed equally, no values are lost, two hosts.
    metricValues.put(startTime + 4*seconds, 1.0);
    metricValues.put(startTime + 14*seconds, 2.0);
    metricValues.put(startTime + 24*seconds, 3.0);
    metricValues.put(startTime + 34*seconds, 4.0);
    metricValues.put(startTime + 44*seconds, 5.0);
    metricValues.put(startTime + 54*seconds, 6.0);

    timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
    timelineMetric.setMetricValues(metricValues);

    metricValues = new TreeMap<>();
    metricValues.put(startTime + 5*seconds, 2.0);
    metricValues.put(startTime + 15*seconds, 4.0);
    metricValues.put(startTime + 25*seconds, 6.0);
    metricValues.put(startTime + 35*seconds, 8.0);
    metricValues.put(startTime + 45*seconds, 10.0);
    metricValues.put(startTime + 55*seconds, 12.0);
    TimelineMetric timelineMetric2 = new TimelineMetric("metric1", "host2", "app1", "instance1");
    timelineMetric2.setMetricValues(metricValues);

    timelineMetrics = new ArrayList<>();
    timelineMetrics.add(timelineMetric);
    timelineMetrics.add(timelineMetric2);
    timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
    aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);

    Assert.assertEquals(aggregateMap.size(), 2);
    timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(),
      timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 30*seconds);

    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
    Assert.assertEquals(6.0, aggregateMap.get(timelineClusterMetric).getSum());

    timelineClusterMetric.setTimestamp(startTime + 2*30*seconds);
    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
    Assert.assertEquals(15.0, aggregateMap.get(timelineClusterMetric).getSum());

    metricValues.clear();
    timelineMetrics.clear();

    Assert.assertEquals(0d, timelineMetricsIgniteCache.getPointInTimeCacheMetrics().get("Cluster_KeySize"));
  }

  @Test
  public void updateAppAggregatesFromHostMetricTest() {
    //make sure hosts metrics are aggregated for appIds from "timeline.metrics.service.cluster.aggregator.appIds"

    long cacheSliceIntervalMillis = 30000L;

    TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
    expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class), anyBoolean())).andReturn(new byte[16]).once();
    expect(metricMetadataManagerMock.getHostedAppsCache()).andReturn(new HashMap<>()).anyTimes();
    replay(metricMetadataManagerMock);

    long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), cacheSliceIntervalMillis);

    long seconds = 1000;

    TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
    List<TimelineMetric> timelineMetrics = new ArrayList<>();
    TimelineMetric timelineMetric;

    metricValues = new TreeMap<>();
    metricValues.put(startTime + 15*seconds, 1.0);
    metricValues.put(startTime + 55*seconds, 2.0);
    timelineMetric = new TimelineMetric("host_metric", "host1", HOST_APP_ID, "instance1");
    timelineMetric.setMetricValues(metricValues);
    timelineMetrics.add(timelineMetric);

    metricValues = new TreeMap<>();
    metricValues.put(startTime + 45*seconds, 3.0);
    metricValues.put(startTime + 85*seconds, 4.0);
    timelineMetric = new TimelineMetric("app_metric", "host1", "appIdForHostsAggr", "instance1");
    timelineMetric.setMetricValues(metricValues);
    timelineMetrics.add(timelineMetric);

    metricValues = new TreeMap<>();
    metricValues.put(startTime + 85*seconds, 5.0);
    timelineMetric = new TimelineMetric("host_metric", "host1", HOST_APP_ID, "instance1");
    timelineMetric.setMetricValues(metricValues);
    timelineMetrics.add(timelineMetric);

    metricValues = new TreeMap<>();
    metricValues.put(startTime + 85*seconds, 6.0);
    timelineMetric = new TimelineMetric("host_metric", "host2", HOST_APP_ID, "instance1");
    timelineMetric.setMetricValues(metricValues);
    timelineMetrics.add(timelineMetric);

    timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);

    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
    Assert.assertEquals(aggregateMap.size(), 6);
    TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(),
        timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 90*seconds);

    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
    Assert.assertEquals(11.0, aggregateMap.get(timelineClusterMetric).getSum());

    timelineClusterMetric = new TimelineClusterMetric("app_metric",
        "appIdForHostsAggr", "instance1", startTime + 90*seconds);
    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
    Assert.assertEquals(4.0, aggregateMap.get(timelineClusterMetric).getSum());

    timelineClusterMetric = new TimelineClusterMetric("host_metric",
        "appIdForHostsAggr", "instance1", startTime + 90*seconds);
    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
    Assert.assertEquals(5.0, aggregateMap.get(timelineClusterMetric).getSum());

    Assert.assertEquals(0d, timelineMetricsIgniteCache.getPointInTimeCacheMetrics().get("Cluster_KeySize"));
  }
}
