blob: 3c8f52915017fbd7b058e649b182af8baafb758a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.autoscaling.gce;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.compute.Compute;
import com.google.api.services.compute.ComputeScopes;
import com.google.api.services.compute.model.Instance;
import com.google.api.services.compute.model.InstanceGroupManagersDeleteInstancesRequest;
import com.google.api.services.compute.model.InstanceGroupManagersListManagedInstancesResponse;
import com.google.api.services.compute.model.InstanceList;
import com.google.api.services.compute.model.ManagedInstance;
import com.google.api.services.compute.model.NetworkInterface;
import com.google.api.services.compute.model.Operation;
import com.google.common.base.Preconditions;
import com.google.common.net.InetAddresses;
import org.apache.druid.indexing.overlord.autoscaling.AutoScaler;
import org.apache.druid.indexing.overlord.autoscaling.AutoScalingData;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import javax.annotation.Nullable;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/**
* This module permits the autoscaling of the workers in GCE
*
* General notes:
* - The IPs are IPs as in Internet Protocol, and they look like 1.2.3.4
* - The IDs are the names of the instances of instances created, they look like prefix-abcd,
* where the prefix is chosen by you and abcd is a suffix assigned by GCE
*/
@JsonTypeName("gce")
public class GceAutoScaler implements AutoScaler<GceEnvironmentConfig>
{
private static final EmittingLogger log = new EmittingLogger(GceAutoScaler.class);
private final GceEnvironmentConfig envConfig;
private final int minNumWorkers;
private final int maxNumWorkers;
private Compute cachedComputeService = null;
private static final long POLL_INTERVAL_MS = 5 * 1000; // 5 sec
private static final int RUNNING_INSTANCES_MAX_RETRIES = 10;
private static final int OPERATION_END_MAX_RETRIES = 10;
@JsonCreator
public GceAutoScaler(
@JsonProperty("minNumWorkers") int minNumWorkers,
@JsonProperty("maxNumWorkers") int maxNumWorkers,
@JsonProperty("envConfig") GceEnvironmentConfig envConfig
)
{
Preconditions.checkArgument(minNumWorkers > 0,
"minNumWorkers must be greater than 0");
this.minNumWorkers = minNumWorkers;
Preconditions.checkArgument(maxNumWorkers > 0,
"maxNumWorkers must be greater than 0");
Preconditions.checkArgument(maxNumWorkers > minNumWorkers,
"maxNumWorkers must be greater than minNumWorkers");
this.maxNumWorkers = maxNumWorkers;
this.envConfig = envConfig;
}
@Override
@JsonProperty
public int getMinNumWorkers()
{
return minNumWorkers;
}
@Override
@JsonProperty
public int getMaxNumWorkers()
{
return maxNumWorkers;
}
@Override
@JsonProperty
public GceEnvironmentConfig getEnvConfig()
{
return envConfig;
}
@Nullable
Compute createComputeServiceImpl()
throws IOException, GeneralSecurityException, GceServiceException
{
HttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport();
JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
GoogleCredential credential = GoogleCredential.getApplicationDefault(
httpTransport,
jsonFactory
);
if (credential.createScopedRequired()) {
List<String> scopes = new ArrayList<>();
scopes.add(ComputeScopes.CLOUD_PLATFORM);
scopes.add(ComputeScopes.COMPUTE);
credential = credential.createScoped(scopes);
}
if (credential.getClientAuthentication() != null) {
throw new GceServiceException("Not using a service account");
}
return new Compute.Builder(httpTransport, jsonFactory, credential)
.setApplicationName("DruidAutoscaler")
.build();
}
private synchronized Compute createComputeService()
throws IOException, GeneralSecurityException, InterruptedException, GceServiceException
{
final int maxRetries = 5;
int retries = 0;
// This retry loop is here to catch the cases in which the underlying call to
// Compute.Builder(...).build() returns null, case that has been experienced
// sporadically at start time
while (cachedComputeService == null && retries < maxRetries) {
if (retries > 0) {
Thread.sleep(POLL_INTERVAL_MS);
}
log.info("Creating new ComputeService [%d/%d]", retries + 1, maxRetries);
try {
cachedComputeService = createComputeServiceImpl();
retries++;
}
catch (Throwable e) {
log.error(e, "Got Exception in creating the ComputeService");
throw e;
}
}
return cachedComputeService;
}
// Used to wait for an operation to finish
@Nullable
private Operation.Error waitForOperationEnd(
Compute compute,
Operation operation) throws Exception
{
String status = operation.getStatus();
String opId = operation.getName();
for (int i = 0; i < OPERATION_END_MAX_RETRIES; i++) {
if (operation == null || "DONE".equals(status)) {
return operation == null ? null : operation.getError();
}
log.info("Waiting for operation %s to end", opId);
Thread.sleep(POLL_INTERVAL_MS);
Compute.ZoneOperations.Get get = compute.zoneOperations().get(
envConfig.getProjectId(),
envConfig.getZoneName(),
opId
);
operation = get.execute();
if (operation != null) {
status = operation.getStatus();
}
}
throw new InterruptedException(
StringUtils.format("Timed out waiting for operation %s to complete", opId)
);
}
/**
* When called resizes envConfig.getManagedInstanceGroupName() increasing it by creating
* envConfig.getNumInstances() new workers (unless the maximum is reached). Return the
* IDs of the workers created
*/
@Override
public AutoScalingData provision()
{
final String project = envConfig.getProjectId();
final String zone = envConfig.getZoneName();
final int numInstances = envConfig.getNumInstances();
final String managedInstanceGroupName = envConfig.getManagedInstanceGroupName();
try {
List<String> before = getRunningInstances();
log.debug("Existing instances [%s]", String.join(",", before));
int toSize = Math.min(before.size() + numInstances, getMaxNumWorkers());
if (before.size() >= toSize) {
// nothing to scale
return new AutoScalingData(new ArrayList<>());
}
log.info("Asked to provision instances, will resize to %d", toSize);
Compute computeService = createComputeService();
Compute.InstanceGroupManagers.Resize request =
computeService.instanceGroupManagers().resize(project, zone,
managedInstanceGroupName, toSize);
Operation response = request.execute();
Operation.Error err = waitForOperationEnd(computeService, response);
if (err == null || err.isEmpty()) {
List<String> after = null;
// as the waitForOperationEnd only waits for the operation to be scheduled
// this loop waits until the requested machines actually go up (or up to a
// certain amount of retries in checking)
for (int i = 0; i < RUNNING_INSTANCES_MAX_RETRIES; i++) {
after = getRunningInstances();
if (after.size() == toSize) {
break;
}
log.info("Machines not up yet, waiting");
Thread.sleep(POLL_INTERVAL_MS);
}
after.removeAll(before); // these should be the new ones
log.info("Added instances [%s]", String.join(",", after));
return new AutoScalingData(after);
} else {
log.error("Unable to provision instances: %s", err.toPrettyString());
}
}
catch (Exception e) {
log.error(e, "Unable to provision any gce instances.");
}
return new AutoScalingData(new ArrayList<>());
}
/**
* Terminates the instances in the list of IPs provided by the caller
*/
@Override
public AutoScalingData terminate(List<String> ips)
{
log.info("Asked to terminate: [%s]", String.join(",", ips));
if (ips.isEmpty()) {
return new AutoScalingData(new ArrayList<>());
}
List<String> nodeIds = ipToIdLookup(ips); // if they are not IPs, they will be unchanged
try {
return terminateWithIds(nodeIds != null ? nodeIds : new ArrayList<>());
}
catch (Exception e) {
log.error(e, "Unable to terminate any instances.");
}
return new AutoScalingData(new ArrayList<>());
}
private List<String> namesToInstances(List<String> names)
{
List<String> instances = new ArrayList<>();
for (String name : names) {
instances.add(
// convert the name into a URL's path to be used in calls to the API
StringUtils.format("zones/%s/instances/%s", envConfig.getZoneName(), name)
);
}
return instances;
}
/**
* Terminates the instances in the list of IDs provided by the caller
*/
@Override
public AutoScalingData terminateWithIds(List<String> ids)
{
log.info("Asked to terminate IDs: [%s]", String.join(",", ids));
if (ids.isEmpty()) {
return new AutoScalingData(new ArrayList<>());
}
try {
final String project = envConfig.getProjectId();
final String zone = envConfig.getZoneName();
final String managedInstanceGroupName = envConfig.getManagedInstanceGroupName();
List<String> before = getRunningInstances();
InstanceGroupManagersDeleteInstancesRequest requestBody =
new InstanceGroupManagersDeleteInstancesRequest();
requestBody.setInstances(namesToInstances(ids));
Compute computeService = createComputeService();
Compute.InstanceGroupManagers.DeleteInstances request =
computeService
.instanceGroupManagers()
.deleteInstances(project, zone, managedInstanceGroupName, requestBody);
Operation response = request.execute();
Operation.Error err = waitForOperationEnd(computeService, response);
if (err == null || err.isEmpty()) {
List<String> after = null;
// as the waitForOperationEnd only waits for the operation to be scheduled
// this loop waits until the requested machines actually go down (or up to a
// certain amount of retries in checking)
for (int i = 0; i < RUNNING_INSTANCES_MAX_RETRIES; i++) {
after = getRunningInstances();
if (after.size() == (before.size() - ids.size())) {
break;
}
log.info("Machines not down yet, waiting");
Thread.sleep(POLL_INTERVAL_MS);
}
before.removeAll(after); // keep only the ones no more present
return new AutoScalingData(before);
} else {
log.error("Unable to terminate instances: %s", err.toPrettyString());
}
}
catch (Exception e) {
log.error(e, "Unable to terminate any instances.");
}
return new AutoScalingData(new ArrayList<>());
}
// Returns the list of the IDs of the machines running in the MIG
private List<String> getRunningInstances()
{
final long maxResults = 500L; // 500 is sadly the max, see below
ArrayList<String> ids = new ArrayList<>();
try {
final String project = envConfig.getProjectId();
final String zone = envConfig.getZoneName();
final String managedInstanceGroupName = envConfig.getManagedInstanceGroupName();
Compute computeService = createComputeService();
Compute.InstanceGroupManagers.ListManagedInstances request =
computeService
.instanceGroupManagers()
.listManagedInstances(project, zone, managedInstanceGroupName);
// Notice that while the doc says otherwise, there is not nextPageToken to page
// through results and so everything needs to be in the same page
request.setMaxResults(maxResults);
InstanceGroupManagersListManagedInstancesResponse response = request.execute();
for (ManagedInstance mi : response.getManagedInstances()) {
ids.add(GceUtils.extractNameFromInstance(mi.getInstance()));
}
log.debug("Found running instances [%s]", String.join(",", ids));
}
catch (Exception e) {
log.error(e, "Unable to get instances.");
}
return ids;
}
/**
* Converts the IPs to IDs
*/
@Override
public List<String> ipToIdLookup(List<String> ips)
{
log.info("Asked IPs -> IDs for: [%s]", String.join(",", ips));
if (ips.isEmpty()) {
return new ArrayList<>();
}
// If the first one is not an IP, just assume all the other ones are not as well and just
// return them as they are. This check is here because Druid does not check if IPs are
// actually IPs and can send IDs to this function instead
if (!InetAddresses.isInetAddress(ips.get(0))) {
log.debug("Not IPs, doing nothing");
return ips;
}
final String project = envConfig.getProjectId();
final String zone = envConfig.getZoneName();
try {
Compute computeService = createComputeService();
Compute.Instances.List request = computeService.instances().list(project, zone);
// Cannot filter by IP atm, see below
// request.setFilter(GceUtils.buildFilter(ips, "networkInterfaces[0].networkIP"));
List<String> instanceIds = new ArrayList<>();
InstanceList response;
do {
response = request.execute();
if (response.getItems() == null) {
continue;
}
for (Instance instance : response.getItems()) {
// This stupid look up is needed because atm it is not possible to filter
// by IP, see https://issuetracker.google.com/issues/73455339
for (NetworkInterface ni : instance.getNetworkInterfaces()) {
if (ips.contains(ni.getNetworkIP())) {
instanceIds.add(instance.getName());
}
}
}
request.setPageToken(response.getNextPageToken());
} while (response.getNextPageToken() != null);
log.debug("Converted to [%s]", String.join(",", instanceIds));
return instanceIds;
}
catch (Exception e) {
log.error(e, "Unable to convert IPs to IDs.");
}
return new ArrayList<>();
}
/**
* Converts the IDs to IPs - this is actually never called from the outside but it is called once
* from inside the class if terminate is used instead of terminateWithIds
*/
@Override
public List<String> idToIpLookup(List<String> nodeIds)
{
log.info("Asked IDs -> IPs for: [%s]", String.join(",", nodeIds));
if (nodeIds.isEmpty()) {
return new ArrayList<>();
}
final String project = envConfig.getProjectId();
final String zone = envConfig.getZoneName();
try {
Compute computeService = createComputeService();
Compute.Instances.List request = computeService.instances().list(project, zone);
request.setFilter(GceUtils.buildFilter(nodeIds, "name"));
List<String> instanceIps = new ArrayList<>();
InstanceList response;
do {
response = request.execute();
if (response.getItems() == null) {
continue;
}
for (Instance instance : response.getItems()) {
// Assuming that every server has at least one network interface...
String ip = instance.getNetworkInterfaces().get(0).getNetworkIP();
// ...even though some IPs are reported as null on the spot but later they are ok,
// so we skip the ones that are null. fear not, they are picked up later this just
// prevents to have a machine called 'null' around which makes the caller wait for
// it for maxScalingDuration time before doing anything else
if (ip != null && !"null".equals(ip)) {
instanceIps.add(ip);
} else {
// log and skip it
log.warn("Call returned null IP for %s, skipping", instance.getName());
}
}
request.setPageToken(response.getNextPageToken());
} while (response.getNextPageToken() != null);
return instanceIps;
}
catch (Exception e) {
log.error(e, "Unable to convert IDs to IPs.");
}
return new ArrayList<>();
}
@Override
public String toString()
{
return "gceAutoScaler={" +
"envConfig=" + envConfig +
", maxNumWorkers=" + maxNumWorkers +
", minNumWorkers=" + minNumWorkers +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
GceAutoScaler that = (GceAutoScaler) o;
return Objects.equals(envConfig, that.envConfig) &&
minNumWorkers == that.minNumWorkers &&
maxNumWorkers == that.maxNumWorkers;
}
@Override
public int hashCode()
{
int result = 0;
result = 31 * result + Objects.hashCode(envConfig);
result = 31 * result + minNumWorkers;
result = 31 * result + maxNumWorkers;
return result;
}
}