blob: e852a14bbf5857793b6c330a088a7e2df9419c99 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.util;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TUpdateExecutorMembershipRequest;
import com.google.common.collect.Sets;
/**
* Singleton class that represents a snapshot of the Impalad executor membership. Host
* membership is determined by both IP address and hostname (to mimic the backend's
* Scheduler). A new snapshot is created whenever the cluster membership changes
* so that clients don't need to hold a lock while examining a snapshot.
*/
public class ExecutorMembershipSnapshot {
// The latest instance of the ExecutorMembershipSnapshot.
private static AtomicReference<ExecutorMembershipSnapshot> cluster_ =
new AtomicReference<ExecutorMembershipSnapshot>(new ExecutorMembershipSnapshot());
// The set of hosts that are members of the cluster given by hostname. Can be empty when
// using executor groups, in which case the planner shall assume that all reads are
// remote.
private final Set<String> hostnames_;
// The set of hosts that are members of the cluster given by IP address. Can be empty
// when using executor groups, in which case the planner shall assume that all reads are
// remote.
private final Set<String> ipAddresses_;
// The number of executor nodes of the cluster.
//
// When not using executor groups, this value reflects the number of executors in the
// cluster. It will be equal to hostnames_.size(), except in the test minicluster where
// multiple impalads are running on a single host.
// When using executor groups, this value reflects the number of executors in the
// largest healthy group. If all group become unhealthy, the backend will not send a
// membership update and this value will retain the last non-zero value. This allows the
// planner to work on the assumption that a healthy executor group of the same size will
// eventually come online to execute queries.
private final int numExecutors_;
// Used only to construct the initial ExecutorMembershipSnapshot.
private ExecutorMembershipSnapshot() {
hostnames_ = Sets.newHashSet();
ipAddresses_ = Sets.newHashSet();
// We use 0 for the number of executors to indicate that no update from the
// ClusterMembershipManager has arrived yet and we will return the value
// '-num_expected_executors' in numExecutors().
numExecutors_ = 0;
}
// Construct a new snapshot based on the TUpdateExecutorMembershipRequest.
private ExecutorMembershipSnapshot(TUpdateExecutorMembershipRequest request) {
hostnames_ = request.getHostnames();
ipAddresses_ = request.getIp_addresses();
// If the updates snapshot does not contain any executors we fall back to the previous
// value. This can happen if no healthy executor groups are currently online.
if (request.getNum_executors() > 0) {
numExecutors_ = request.getNum_executors();
} else {
numExecutors_ = cluster_.get().numExecutors_;
}
}
// Determine whether a host, given either by IP address or hostname, is a member of
// this snapshot. Returns true if it is, false otherwise.
public boolean contains(TNetworkAddress address) {
String host = address.getHostname();
return ipAddresses_.contains(host) || hostnames_.contains(host);
}
// Return the number of executors that should be used for planning. If no executors have
// been registered so far, this method will return a configurable default to allow the
// planner to operated based on the expected number of executors.
public int numExecutors() {
if (numExecutors_ == 0) {
return BackendConfig.INSTANCE.getBackendCfg().num_expected_executors;
}
return numExecutors_;
}
// Atomically update the singleton snapshot instance. After the update completes,
// all calls to getCluster() will return the new snapshot.
public static void update(TUpdateExecutorMembershipRequest request) {
cluster_.set(new ExecutorMembershipSnapshot(request));
}
// Return the current singleton snapshot instance.
public static ExecutorMembershipSnapshot getCluster() { return cluster_.get(); }
}