/*
 * Druid - a distributed column store.
 * Copyright (C) 2012  Metamarkets Group Inc.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 */

package com.metamx.druid.realtime;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.BaseServerNode;
import com.metamx.druid.client.ClientConfig;
import com.metamx.druid.client.ClientInventoryManager;
import com.metamx.druid.client.MutableServerView;
import com.metamx.druid.client.OnlyNewSegmentWatcherServerView;
import com.metamx.druid.client.ServerView;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.http.QueryServlet;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.Monitor;


import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 */
public class RealtimeNode extends BaseServerNode<RealtimeNode>
{
  private static final Logger log = new Logger(RealtimeNode.class);

  public static Builder builder()
  {
    return new Builder();
  }

  private final Map<String, Object> injectablesMap = Maps.newLinkedHashMap();

  private MetadataUpdater metadataUpdater = null;
  private DataSegmentPusher dataSegmentPusher = null;
  private List<FireDepartment> fireDepartments = null;
  private ServerView view = null;

  private boolean initialized = false;

  public RealtimeNode(
      Properties props,
      Lifecycle lifecycle,
      ObjectMapper jsonMapper,
      ObjectMapper smileMapper,
      ConfigurationObjectFactory configFactory
  )
  {
    super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
  }

  public RealtimeNode setView(ServerView view)
  {
    Preconditions.checkState(this.view == null, "Cannot set view once it has already been set.");
    this.view = view;
    return this;
  }

  public RealtimeNode setMetadataUpdater(MetadataUpdater metadataUpdater)
  {
    Preconditions.checkState(this.metadataUpdater == null, "Cannot set metadataUpdater once it has already been set.");
    this.metadataUpdater = metadataUpdater;
    return this;
  }

  public RealtimeNode setDataSegmentPusher(DataSegmentPusher dataSegmentPusher)
  {
    Preconditions.checkState(this.dataSegmentPusher == null, "Cannot set segmentPusher once it has already been set.");
    this.dataSegmentPusher = dataSegmentPusher;
    return this;
  }

  public RealtimeNode setFireDepartments(List<FireDepartment> fireDepartments)
  {
    Preconditions.checkState(this.fireDepartments == null, "Cannot set fireDepartments once it has already been set.");
    this.fireDepartments = fireDepartments;
    return this;
  }

  public RealtimeNode registerJacksonInjectable(String name, Object object)
  {
    Preconditions.checkState(injectablesMap.containsKey(name), "Already registered jackson object[%s]", name);
    injectablesMap.put(name, object);
    return this;
  }

  public MetadataUpdater getMetadataUpdater()
  {
    initializeMetadataUpdater();
    return metadataUpdater;
  }

  public DataSegmentPusher getDataSegmentPusher()
  {
    initializeSegmentPusher();
    return dataSegmentPusher;
  }

  public List<FireDepartment> getFireDepartments()
  {
    initializeFireDepartments();
    return fireDepartments;
  }

  public ServerView getView()
  {
    initializeView();
    return view;
  }

  protected void doInit() throws Exception
  {
    initializeView();
    initializeMetadataUpdater();
    initializeSegmentPusher();
    initializeJacksonInjectables();

    initializeFireDepartments();

    final Lifecycle lifecycle = getLifecycle();
    final ServiceEmitter emitter = getEmitter();
    final QueryRunnerFactoryConglomerate conglomerate = getConglomerate();
    final List<Monitor> monitors = getMonitors();

    monitors.add(new RealtimeMetricsMonitor(fireDepartments));

    final RealtimeManager realtimeManager = new RealtimeManager(fireDepartments, conglomerate);
    lifecycle.addManagedInstance(realtimeManager);

    startMonitoring(monitors);

    final Context v2Druid = new Context(getServer(), "/druid/v2", Context.SESSIONS);
    v2Druid.addServlet(new ServletHolder(new StatusServlet()), "/status");
    v2Druid.addServlet(
        new ServletHolder(
            new QueryServlet(getJsonMapper(), getSmileMapper(), realtimeManager, emitter, getRequestLogger())
        ),
        "/*"
    );

    initialized = true;
  }

  @LifecycleStart
  public synchronized void start() throws Exception
  {
    if (! initialized) {
      init();
    }

    getLifecycle().start();
  }

  @LifecycleStop
  public synchronized void stop()
  {
    getLifecycle().stop();
  }

  protected void initializeJacksonInjectables()
  {
    final Map<String, Object> injectables = Maps.newHashMap();

    for (Map.Entry<String, Object> entry : injectablesMap.entrySet()) {
      injectables.put(entry.getKey(), entry.getValue());
    }

    injectables.put("queryRunnerFactoryConglomerate", getConglomerate());
    injectables.put("segmentPusher", dataSegmentPusher);
    injectables.put("metadataUpdater", metadataUpdater);
    injectables.put("serverView", view);
    injectables.put("serviceEmitter", getEmitter());

    getJsonMapper().setInjectableValues(
        new InjectableValues()
        {
          @Override
          public Object findInjectableValue(
              Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance
          )
          {
            return injectables.get(valueId);
          }
        }
    );
  }

  private void initializeFireDepartments()
  {
    if (fireDepartments == null) {
      try {
        fireDepartments = getJsonMapper().readValue(
            new File(PropUtils.getProperty(getProps(), "druid.realtime.specFile")),
            new TypeReference<List<FireDepartment>>(){}
        );
      }
      catch (IOException e) {
        throw Throwables.propagate(e);
      }
    }
  }

  private void initializeSegmentPusher()
  {
    if (dataSegmentPusher == null) {
      dataSegmentPusher = ServerInit.getSegmentPusher(getProps(), getConfigFactory(), getJsonMapper());
    }
  }

  protected void initializeMetadataUpdater()
  {
    if (metadataUpdater == null) {
      metadataUpdater = new MetadataUpdater(
          getJsonMapper(),
          getConfigFactory().build(MetadataUpdaterConfig.class),
          getPhoneBook(),
          new DbConnector(getConfigFactory().build(DbConnectorConfig.class)).getDBI()
      );
      getLifecycle().addManagedInstance(metadataUpdater);
    }
  }

  private void initializeView()
  {
    if (view == null) {
      final MutableServerView view = new OnlyNewSegmentWatcherServerView();
      final ClientInventoryManager clientInventoryManager = new ClientInventoryManager(
          getConfigFactory().build(ClientConfig.class),
          getPhoneBook(),
          view
      );
      getLifecycle().addManagedInstance(clientInventoryManager);

      this.view = view;
    }
  }

  public static class Builder
  {
    private ObjectMapper jsonMapper = null;
    private ObjectMapper smileMapper = null;
    private Lifecycle lifecycle = null;
    private Properties props = null;
    private ConfigurationObjectFactory configFactory = null;

    public Builder withMappers(ObjectMapper jsonMapper, ObjectMapper smileMapper)
    {
      this.jsonMapper = jsonMapper;
      this.smileMapper = smileMapper;
      return this;
    }

    public Builder withProps(Properties props)
    {
      this.props = props;
      return this;
    }

    public Builder withConfigFactory(ConfigurationObjectFactory configFactory)
    {
      this.configFactory = configFactory;
      return this;
    }

    public RealtimeNode build()
    {
      if (jsonMapper == null && smileMapper == null) {
        jsonMapper = new DefaultObjectMapper();
        smileMapper = new DefaultObjectMapper(new SmileFactory());
        smileMapper.getJsonFactory().setCodec(smileMapper);
      }
      else if (jsonMapper == null || smileMapper == null) {
        throw new ISE("Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", jsonMapper, smileMapper);
      }

      if (lifecycle == null) {
        lifecycle = new Lifecycle();
      }

      if (props == null) {
        props = Initialization.loadProperties();
      }

      if (configFactory == null) {
        configFactory = Config.createFactory(props);
      }

      return new RealtimeNode(props, lifecycle, jsonMapper, smileMapper, configFactory);
    }
  }
}