/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.slider.server.services.curator;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceInstanceBuilder;
import org.apache.curator.x.discovery.ServiceType;
import org.apache.curator.x.discovery.UriSpec;
import org.apache.slider.core.exceptions.BadClusterStateException;
import org.apache.slider.core.persist.JsonSerDeser;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * YARN service for Curator service discovery; the discovery instance's
 * start/close methods are tied to the lifecycle of this service
 * @param <Payload> the payload of the operation
 */
public class RegistryBinderService<Payload> extends CuratorService {
  private static final Logger log =
    LoggerFactory.getLogger(RegistryBinderService.class);

  private final ServiceDiscovery<Payload> discovery;

  private final Map<String, ServiceInstance<Payload>> entries =
    new HashMap<>();

  private JsonSerDeser<CuratorServiceInstance<Payload>> deser =
    new JsonSerDeser<>(CuratorServiceInstance.class);

  /**
   * Create an instance
   * @param curator  Does not need to be started
   * @param basePath base directory
   * @param discovery discovery instance -not yet started
   */
  public RegistryBinderService(CuratorFramework curator,
                               String basePath,
                               ServiceDiscovery<Payload> discovery) {
    super("RegistryBinderService", curator, basePath);

    this.discovery =
      Preconditions.checkNotNull(discovery, "null discovery arg");
  }


  public ServiceDiscovery<Payload> getDiscovery() {
    return discovery;
  }

  @Override
  protected void serviceStart() throws Exception {
    super.serviceStart();
    discovery.start();
  }

  @Override
  protected void serviceStop() throws Exception {
    closeCuratorComponent(discovery);
    super.serviceStop();
  }

  /**
   * register an instance -only valid once the service is started
   * @param id ID -must be unique
   * @param name name
   * @param url URL
   * @param payload payload (may be null)
   * @return the instance
   * @throws Exception on registration problems
   */
  public ServiceInstance<Payload> register(String name,
                                           String id,
                                           URL url,
                                           Payload payload) throws Exception {
    Preconditions.checkNotNull(id, "null `id` arg");
    Preconditions.checkNotNull(name, "null `name` arg");
    Preconditions.checkState(isInState(STATE.STARTED), "Not started: " + this);

    ServiceInstanceBuilder<Payload> instanceBuilder = builder()
        .name(name)
        .id(id)
        .payload(payload)
        .serviceType(ServiceType.DYNAMIC);
    if (url != null) {
      UriSpec uriSpec = new UriSpec(url.toString());

      int port = url.getPort();
      if (port == 0) {
        throw new IOException("Port undefined in " + url);
      }
      instanceBuilder
          .uriSpec(uriSpec)
          .port(port);
    }
    ServiceInstance<Payload> instance = instanceBuilder.build();
    log.info("registering {}", instance.toString());
    discovery.registerService(instance);
    log.info("registration completed {}", instance.toString());
    synchronized (this) {
      entries.put(id, instance);
    }
    return instance;
  }

  /**
   * Create a builder. This is already pre-prepared with address, registration
   * time and a (random) UUID
   * @return a builder
   * @throws IOException IO problems, including enumerating network ports
   */
  public ServiceInstanceBuilder<Payload> builder() throws
                                                   IOException {
    try {
      return ServiceInstance.builder();
    } catch (IOException e) {
      throw e;
    } catch (Exception e) {
      throw new IOException(e);
    }
  }


  /**
   * List all instance IDs of a service type
   * @param servicetype service type
   * @return list of matches
   * @throws Exception
   */
  public List<String> instanceIDs(String servicetype) throws Exception {
    Preconditions.checkNotNull(servicetype);
    List<String> instanceIds;
    try {
      instanceIds =
        getCurator().getChildren().forPath(pathForServicetype(servicetype));
    } catch (KeeperException.NoNodeException e) {
      instanceIds = Lists.newArrayList();
    }
    return instanceIds;
  }

  /**
   * List all service types registered
   * @return a list of service types
   * @throws Exception
   */
  public List<String> serviceTypes() throws Exception {
    List<String> types;
    try {
      types =
        getCurator().getChildren().forPath(getBasePath());
    } catch (KeeperException.NoNodeException e) {
      types = Lists.newArrayList();
    }
    return types;
  }


  /**
   * Return a service instance POJO
   *
   * @param servicetype name of the service
   * @param id ID of the instance
   * @return the instance or <code>null</code> if not found
   * @throws Exception errors
   */
  public CuratorServiceInstance<Payload> queryForInstance(String servicetype, String id)
      throws Exception {
    CuratorServiceInstance<Payload> instance = null;
    String path = pathForInstance(servicetype, id);
    try {
      byte[] bytes = getCurator().getData().forPath(path);
      if (bytes!=null &&  bytes.length>0) {
        instance = deser.fromBytes(bytes);
      }
    } catch (KeeperException.NoNodeException ignore) {
      // ignore
    }
    return instance;
  }
  
  /**
   * List all the instances
   * @param servicetype name of the service
   * @return a list of instances and their payloads
   * @throws IOException any problem
   */
  public List<CuratorServiceInstance<Payload>> listInstances(String servicetype) throws
    IOException {
    try {
      List<String> instanceIDs = instanceIDs(servicetype);
      List<CuratorServiceInstance<Payload>> instances =
        new ArrayList<>(instanceIDs.size());
      for (String instanceID : instanceIDs) {
        CuratorServiceInstance<Payload> instance =
          queryForInstance(servicetype, instanceID);
        if (instance != null) {
          instances.add(instance);
        }
      }
      return instances;
    } catch (IOException e) {
      throw e;
    } catch (Exception e) {
      throw new IOException(e);
    }
  }

  /**
   * Find an instance with a given ID
   * @param instances instances
   * @param name ID to look for
   * @return the discovered instance or null
   */
  public CuratorServiceInstance<Payload> findByID(List<CuratorServiceInstance<Payload>> instances, String name) {
    Preconditions.checkNotNull(name);
    for (CuratorServiceInstance<Payload> instance : instances) {
      if (instance.id.equals(name)) {
        return instance;
      }
    }
    return null;
  }

  /**
   * Find a single instance -return that value or raise an exception
   * @param serviceType service type
   * @param name the name (required(
   * @return the instance that matches the criteria
   * @throws FileNotFoundException if there were no matches
   * @throws IOException any network problem
   */
  public CuratorServiceInstance<Payload> findInstance(String serviceType,
      String name) throws IOException {
    Preconditions.checkArgument(StringUtils.isNotEmpty(name), "name");
    return findInstances(serviceType, name).get(0);
  }
  /**
   * List registry entries. If a name was given, then the single match is returned
   * -otherwise all entries matching the service type
   * @param serviceType service type
   * @param name an optional name
   * @return the (non-empty) list of instances that match the criteria
   * @throws FileNotFoundException if there were no matches
   * @throws IOException any network problem
   */
  public List<CuratorServiceInstance<Payload>> findInstances(String serviceType,
      String name)
      throws FileNotFoundException, IOException {
    List<CuratorServiceInstance<Payload>> instances =
        listInstances(serviceType);
    if (instances.isEmpty()) {
      throw new FileNotFoundException(
          "No registry entries for service type " + serviceType);
    }
    if (StringUtils.isNotEmpty(name)) {
      CuratorServiceInstance<Payload> foundInstance = findByID(instances, name);
      if (foundInstance == null) {
        throw new FileNotFoundException(
            "No registry entries for service name " + name
            + " and service type " + serviceType);
      }
      instances.clear();
      instances.add(foundInstance);
    }
    return instances;
  }

  /**
   * Enum all service types in the registry
   * @return a possibly empty collection of service types
   * @throws IOException networking
   */
  public Collection<String> getServiceTypes() throws IOException {
    try {
      return getDiscovery().queryForNames();
    } catch (IOException e) {
      throw e;
    } catch (Exception e) {
      throw new IOException(e);
    }
  }

}
