blob: 55c8d8d8c1c9e848ce6c1dbe1cc3a5c41f3ad708 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.system;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.cluster.Cluster;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Status;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.system.Backend.BackendState;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.apache.commons.validator.routines.InetAddressValidator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
public class SystemInfoService {
private static final Logger LOG = LogManager.getLogger(SystemInfoService.class);
public static final String DEFAULT_CLUSTER = "default_cluster";
private volatile AtomicReference<ImmutableMap<Long, Backend>> idToBackendRef;
private volatile AtomicReference<ImmutableMap<Long, AtomicLong>> idToReportVersionRef;
// last backend id used by round robin for sequential choosing backends for
// tablet creation
private ConcurrentHashMap<String, Long> lastBackendIdForCreationMap;
// last backend id used by round robin for sequential choosing backends in
// other jobs
private ConcurrentHashMap<String, Long> lastBackendIdForOtherMap;
private long lastBackendIdForCreation = -1;
private long lastBackendIdForOther = -1;
private AtomicReference<ImmutableMap<Long, DiskInfo>> pathHashToDishInfoRef;
// sort host backends list by num of backends, descending
private static final Comparator<List<Backend>> hostBackendsListComparator = new Comparator<List<Backend>> (){
@Override
public int compare(List<Backend> list1, List<Backend> list2) {
if (list1.size() > list2.size()) {
return -1;
} else {
return 1;
}
}
};
public SystemInfoService() {
idToBackendRef = new AtomicReference<ImmutableMap<Long, Backend>>(ImmutableMap.<Long, Backend> of());
idToReportVersionRef = new AtomicReference<ImmutableMap<Long, AtomicLong>>(
ImmutableMap.<Long, AtomicLong> of());
lastBackendIdForCreationMap = new ConcurrentHashMap<String, Long>();
lastBackendIdForOtherMap = new ConcurrentHashMap<String, Long>();
pathHashToDishInfoRef = new AtomicReference<ImmutableMap<Long, DiskInfo>>(ImmutableMap.<Long, DiskInfo>of());
}
// for deploy manager
public void addBackends(List<Pair<String, Integer>> hostPortPairs, boolean isFree) throws DdlException {
addBackends(hostPortPairs, isFree, "");
}
/**
* @param hostPortPairs : backend's host and port
* @param isFree : if true the backend is not owned by any cluster
* @param destCluster : if not null or empty backend will be added to destCluster
* @throws DdlException
*/
public void addBackends(List<Pair<String, Integer>> hostPortPairs,
boolean isFree, String destCluster) throws DdlException {
for (Pair<String, Integer> pair : hostPortPairs) {
// check is already exist
if (getBackendWithHeartbeatPort(pair.first, pair.second) != null) {
throw new DdlException("Same backend already exists[" + pair.first + ":" + pair.second + "]");
}
}
for (Pair<String, Integer> pair : hostPortPairs) {
addBackend(pair.first, pair.second, isFree, destCluster);
}
}
// for test
public void addBackend(Backend backend) {
Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef.get());
copiedBackends.put(backend.getId(), backend);
ImmutableMap<Long, Backend> newIdToBackend = ImmutableMap.copyOf(copiedBackends);
idToBackendRef.set(newIdToBackend);
}
private void setBackendOwner(Backend backend, String clusterName) {
final Cluster cluster = Catalog.getInstance().getCluster(clusterName);
Preconditions.checkState(cluster != null);
cluster.addBackend(backend.getId());
backend.setOwnerClusterName(clusterName);
backend.setBackendState(BackendState.using);
}
// Final entry of adding backend
private void addBackend(String host, int heartbeatPort, boolean isFree, String destCluster) throws DdlException {
Backend newBackend = new Backend(Catalog.getInstance().getNextId(), host, heartbeatPort);
// update idToBackend
Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef.get());
copiedBackends.put(newBackend.getId(), newBackend);
ImmutableMap<Long, Backend> newIdToBackend = ImmutableMap.copyOf(copiedBackends);
idToBackendRef.set(newIdToBackend);
// set new backend's report version as 0L
Map<Long, AtomicLong> copiedReportVerions = Maps.newHashMap(idToReportVersionRef.get());
copiedReportVerions.put(newBackend.getId(), new AtomicLong(0L));
ImmutableMap<Long, AtomicLong> newIdToReportVersion = ImmutableMap.copyOf(copiedReportVerions);
idToReportVersionRef.set(newIdToReportVersion);
if (!Strings.isNullOrEmpty(destCluster)) {
// add backend to destCluster
setBackendOwner(newBackend, destCluster);
} else if (!isFree) {
// add backend to DEFAULT_CLUSTER
setBackendOwner(newBackend, DEFAULT_CLUSTER);
} else {
// backend is free
}
// log
Catalog.getInstance().getEditLog().logAddBackend(newBackend);
LOG.info("finished to add {} ", newBackend);
// backends is changed, regenerated tablet number metrics
MetricRepo.generateBackendsTabletMetrics();
}
public void dropBackends(List<Pair<String, Integer>> hostPortPairs) throws DdlException {
for (Pair<String, Integer> pair : hostPortPairs) {
// check is already exist
if (getBackendWithHeartbeatPort(pair.first, pair.second) == null) {
throw new DdlException("backend does not exists[" + pair.first + ":" + pair.second + "]");
}
}
for (Pair<String, Integer> pair : hostPortPairs) {
dropBackend(pair.first, pair.second);
}
}
// for decommission
public void dropBackend(long backendId) throws DdlException {
Backend backend = getBackend(backendId);
if (backend == null) {
throw new DdlException("Backend[" + backendId + "] does not exist");
}
dropBackend(backend.getHost(), backend.getHeartbeatPort());
}
// final entry of dropping backend
public void dropBackend(String host, int heartbeatPort) throws DdlException {
if (getBackendWithHeartbeatPort(host, heartbeatPort) == null) {
throw new DdlException("backend does not exists[" + host + ":" + heartbeatPort + "]");
}
Backend droppedBackend = getBackendWithHeartbeatPort(host, heartbeatPort);
// update idToBackend
Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef.get());
copiedBackends.remove(droppedBackend.getId());
ImmutableMap<Long, Backend> newIdToBackend = ImmutableMap.copyOf(copiedBackends);
idToBackendRef.set(newIdToBackend);
// update idToReportVersion
Map<Long, AtomicLong> copiedReportVerions = Maps.newHashMap(idToReportVersionRef.get());
copiedReportVerions.remove(droppedBackend.getId());
ImmutableMap<Long, AtomicLong> newIdToReportVersion = ImmutableMap.copyOf(copiedReportVerions);
idToReportVersionRef.set(newIdToReportVersion);
// update cluster
final Cluster cluster = Catalog.getInstance().getCluster(droppedBackend.getOwnerClusterName());
if (null != cluster) {
cluster.removeBackend(droppedBackend.getId());
} else {
LOG.error("Cluster " + droppedBackend.getOwnerClusterName() + " no exist.");
}
// log
Catalog.getInstance().getEditLog().logDropBackend(droppedBackend);
LOG.info("finished to drop {}", droppedBackend);
// backends is changed, regenerated tablet number metrics
MetricRepo.generateBackendsTabletMetrics();
}
// only for test
public void dropAllBackend() {
// update idToBackend
idToBackendRef.set(ImmutableMap.<Long, Backend> of());
// update idToReportVersion
idToReportVersionRef.set(ImmutableMap.<Long, AtomicLong> of());
}
public Backend getBackend(long backendId) {
return idToBackendRef.get().get(backendId);
}
public boolean checkBackendAvailable(long backendId) {
Backend backend = idToBackendRef.get().get(backendId);
if (backend == null || !backend.isAvailable()) {
return false;
}
return true;
}
public boolean checkBackendAlive(long backendId) {
Backend backend = idToBackendRef.get().get(backendId);
if (backend == null || !backend.isAlive()) {
return false;
}
return true;
}
public Backend getBackendWithHeartbeatPort(String host, int heartPort) {
ImmutableMap<Long, Backend> idToBackend = idToBackendRef.get();
for (Backend backend : idToBackend.values()) {
if (backend.getHost().equals(host) && backend.getHeartbeatPort() == heartPort) {
return backend;
}
}
return null;
}
public Backend getBackendWithBePort(String host, int bePort) {
ImmutableMap<Long, Backend> idToBackend = idToBackendRef.get();
for (Backend backend : idToBackend.values()) {
if (backend.getHost().equals(host) && backend.getBePort() == bePort) {
return backend;
}
}
return null;
}
public Backend getBackendWithHttpPort(String host, int httpPort) {
ImmutableMap<Long, Backend> idToBackend = idToBackendRef.get();
for (Backend backend : idToBackend.values()) {
if (backend.getHost().equals(host) && backend.getHttpPort() == httpPort) {
return backend;
}
}
return null;
}
public List<Long> getBackendIds(boolean needAlive) {
ImmutableMap<Long, Backend> idToBackend = idToBackendRef.get();
List<Long> backendIds = Lists.newArrayList(idToBackend.keySet());
if (!needAlive) {
return backendIds;
} else {
Iterator<Long> iter = backendIds.iterator();
while (iter.hasNext()) {
Backend backend = this.getBackend(iter.next());
if (backend == null || !backend.isAlive()) {
iter.remove();
}
}
return backendIds;
}
}
public List<Long> getDecommissionedBackendIds() {
ImmutableMap<Long, Backend> idToBackend = idToBackendRef.get();
List<Long> backendIds = Lists.newArrayList(idToBackend.keySet());
Iterator<Long> iter = backendIds.iterator();
while (iter.hasNext()) {
Backend backend = this.getBackend(iter.next());
if (backend == null || !backend.isDecommissioned()) {
iter.remove();
}
}
return backendIds;
}
/**
* choose backends to create cluster
*
* @param clusterName
* @param instanceNum
* @return if BE avaliable is less than requested , return null.
*/
public List<Long> createCluster(String clusterName, int instanceNum) {
final List<Long> chosenBackendIds = Lists.newArrayList();
final Map<String, List<Backend>> hostBackendsMap = getHostBackendsMap(true /* need alive*/,
true /* need free */,
false /* can not be in decommission*/);
LOG.info("begin to create cluster {} with instance num: {}", clusterName, instanceNum);
int availableBackendsCount = 0;
// list of backends on each host.
List<List<Backend>> hostBackendsList = Lists.newArrayList();
for (List<Backend> list : hostBackendsMap.values()) {
availableBackendsCount += list.size();
hostBackendsList.add(list);
}
if (instanceNum > availableBackendsCount) {
LOG.warn("not enough available backends. requires :" + instanceNum
+ ", available:" + availableBackendsCount);
return null;
}
// sort by number of backend in host
Collections.sort(hostBackendsList, hostBackendsListComparator);
// hostIsEmpty is used to mark if host is empty, so avoid
// iterating hostIsEmpty with numOfHost in every circle.
boolean[] hostIsEmpty = new boolean[hostBackendsList.size()];
for (int i = 0; i < hostBackendsList.size(); i++) {
hostIsEmpty[i] = false;
}
// to select backend in circle
int numOfHost = hostBackendsList.size();
for (int i = 0; ; i = ++i % hostBackendsList.size()) {
if (hostBackendsList.get(i).size() > 0) {
chosenBackendIds.add(hostBackendsList.get(i).remove(0).getId());
} else {
// avoid counting repeatedly
if (hostIsEmpty[i] == false) {
hostIsEmpty[i] = true;
numOfHost--;
}
}
if (chosenBackendIds.size() == instanceNum || numOfHost == 0) {
break;
}
}
if (chosenBackendIds.size() != instanceNum) {
LOG.warn("not enough available backends. require :" + instanceNum + " get:" + chosenBackendIds.size());
return null;
}
lastBackendIdForCreationMap.put(clusterName, (long) -1);
lastBackendIdForOtherMap.put(clusterName, (long) -1);
return chosenBackendIds;
}
/**
* remove backends in cluster
*
* @throws DdlException
*/
public void releaseBackends(String clusterName, boolean isReplay) {
ImmutableMap<Long, Backend> idToBackend = idToBackendRef.get();
final List<Long> backendIds = getClusterBackendIds(clusterName);
final Iterator<Long> iterator = backendIds.iterator();
while (iterator.hasNext()) {
final Long id = iterator.next();
if (!idToBackend.containsKey(id)) {
LOG.warn("cluster {} contain backend {} that does't exist", clusterName, id);
} else {
final Backend backend = idToBackend.get(id);
backend.setBackendState(BackendState.free);
backend.clearClusterName();
if (!isReplay) {
Catalog.getInstance().getEditLog().logBackendStateChange(backend);
}
}
}
lastBackendIdForCreationMap.remove(clusterName);
lastBackendIdForOtherMap.remove(clusterName);
}
/**
* select host where has least free backends , be's state become free when decommission finish
*
* @param shrinkNum
* @return
*/
public List<Long> calculateDecommissionBackends(String clusterName, int shrinkNum) {
LOG.info("calculate decommission backend in cluster: {}. decommission num: {}", clusterName, shrinkNum);
final List<Long> decomBackendIds = Lists.newArrayList();
ImmutableMap<Long, Backend> idToBackends = idToBackendRef.get();
final List<Long> clusterBackends = getClusterBackendIds(clusterName);
// host -> backends of this cluster
final Map<String, List<Backend>> hostBackendsMapInCluster = Maps.newHashMap();
// put backend in same host in list
for (Long id : clusterBackends) {
final Backend backend = idToBackends.get(id);
if (hostBackendsMapInCluster.containsKey(backend.getHost())) {
hostBackendsMapInCluster.get(backend.getHost()).add(backend);
} else {
List<Backend> list = Lists.newArrayList();
list.add(backend);
hostBackendsMapInCluster.put(backend.getHost(), list);
}
}
List<List<Backend>> hostList = Lists.newArrayList(hostBackendsMapInCluster.values());
Collections.sort(hostList, hostBackendsListComparator);
// in each cycle, choose one backend from the host which has maximal backends num.
// break if all hosts are empty or get enough backends.
while (true) {
if (hostList.get(0).size() > 0) {
decomBackendIds.add(hostList.get(0).remove(0).getId());
if (decomBackendIds.size() == shrinkNum) {
// enough
break;
}
Collections.sort(hostList, hostBackendsListComparator);
} else {
// all hosts empty
break;
}
}
if (decomBackendIds.size() != shrinkNum) {
LOG.info("failed to get enough backends to shrink in cluster: {}. required: {}, get: {}",
clusterName, shrinkNum, decomBackendIds.size());
return null;
}
return decomBackendIds;
}
/**
* to expand backends in cluster.
* firstly, acquire backends from hosts not in this cluster.
* if not enough, secondly acquire backends from hosts in this cluster, returns a list of hosts
* sorted by the descending order of the number of backend in the first two ways,
* and get backends from the list in cycle.
*
* @param clusterName
* @param expansionNum
* @return
*/
public List<Long> calculateExpansionBackends(String clusterName, int expansionNum) {
LOG.debug("calculate expansion backend in cluster: {}, new instance num: {}", clusterName, expansionNum);
final List<Long> chosenBackendIds = Lists.newArrayList();
ImmutableMap<Long, Backend> idToBackends = idToBackendRef.get();
// host -> backends
final Map<String, List<Backend>> hostBackendsMap = getHostBackendsMap(true /* need alive*/,
true /* need free */,
false /* can not be in decommission */);
final List<Long> clusterBackends = getClusterBackendIds(clusterName);
// hosts not in cluster
List<List<Backend>> hostsNotInCluster = Lists.newArrayList();
// hosts in cluster
List<List<Backend>> hostsInCluster = Lists.newArrayList();
int availableBackendsCount = 0;
Set<String> hostsSet = Sets.newHashSet();
for (Long beId : clusterBackends) {
hostsSet.add(getBackend(beId).getHost());
}
// distinguish backend in or not in cluster
for (Map.Entry<String, List<Backend>> entry : hostBackendsMap.entrySet()) {
availableBackendsCount += entry.getValue().size();
if (hostsSet.contains(entry.getKey())) {
hostsInCluster.add(entry.getValue());
} else {
hostsNotInCluster.add(entry.getValue());
}
}
if (expansionNum > availableBackendsCount) {
LOG.info("not enough available backends. requires :" + expansionNum
+ ", available:" + availableBackendsCount);
return null;
}
Collections.sort(hostsNotInCluster, hostBackendsListComparator);
Collections.sort(hostsInCluster, hostBackendsListComparator);
// first select backends which belong to the hosts NOT IN this cluster
if (hostsNotInCluster.size() > 0) {
// hostIsEmpty is userd to mark if host is empty, so
// avoid iterating hostIsEmpty with numOfHost in every circle
boolean[] hostIsEmpty = new boolean[hostsNotInCluster.size()];
for (int i = 0; i < hostsNotInCluster.size(); i++) {
hostIsEmpty[i] = false;
}
int numOfHost = hostsNotInCluster.size();
for (int i = 0;; i = ++i % hostsNotInCluster.size()) {
if (hostsNotInCluster.get(i).size() > 0) {
chosenBackendIds.add(hostsNotInCluster.get(i).remove(0).getId());
} else {
// avoid counting repeatedly
if (hostIsEmpty[i] == false) {
hostIsEmpty[i] = true;
numOfHost--;
}
}
if (chosenBackendIds.size() == expansionNum || numOfHost == 0) {
break;
}
}
}
// if not enough, select backends which belong to the hosts IN this cluster
if (hostsInCluster.size() > 0 && chosenBackendIds.size() != expansionNum) {
boolean[] hostIsEmpty = new boolean[hostsInCluster.size()];
for (int i = 0; i < hostsInCluster.size(); i++) {
hostIsEmpty[i] = false;
}
int numOfHost = hostsInCluster.size();
for (int i = 0;; i = ++i % hostsInCluster.size()) {
if (hostsInCluster.get(i).size() > 0) {
chosenBackendIds.add(hostsInCluster.get(i).remove(0).getId());
} else {
if (hostIsEmpty[i] == false) {
hostIsEmpty[i] = true;
numOfHost--;
}
}
if (chosenBackendIds.size() == expansionNum || numOfHost == 0) {
break;
}
}
}
if (chosenBackendIds.size() != expansionNum) {
LOG.info("not enough available backends. requires :" + expansionNum
+ ", get:" + chosenBackendIds.size());
return null;
}
// set be state and owner
Iterator<Long> iterator = chosenBackendIds.iterator();
while (iterator.hasNext()) {
final Long id = iterator.next();
final Backend backend = idToBackends.get(id);
backend.setOwnerClusterName(clusterName);
backend.setBackendState(BackendState.using);
Catalog.getInstance().getEditLog().logBackendStateChange(backend);
}
return chosenBackendIds;
}
/**
* get cluster's backend id list
*
* @param name
* @return
*/
public List<Backend> getClusterBackends(String name) {
final Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef.get());
final List<Backend> ret = Lists.newArrayList();
if (Strings.isNullOrEmpty(name)) {
return ret;
}
for (Backend backend : copiedBackends.values()) {
if (name.equals(backend.getOwnerClusterName())) {
ret.add(backend);
}
}
return ret;
}
/**
* get cluster's backend id list
*
* @param name
* @return
*/
public List<Backend> getClusterBackends(String name, boolean needAlive) {
final Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef.get());
final List<Backend> ret = new ArrayList<Backend>();
if (Strings.isNullOrEmpty(name)) {
return null;
}
if (needAlive) {
for (Backend backend : copiedBackends.values()) {
if (backend != null && name.equals(backend.getOwnerClusterName())
&& backend.isAlive()) {
ret.add(backend);
}
}
} else {
for (Backend backend : copiedBackends.values()) {
if (name.equals(backend.getOwnerClusterName())) {
ret.add(backend);
}
}
}
return ret;
}
/**
* get cluster's backend id list
*
* @param clusterName
* @return
*/
public List<Long> getClusterBackendIds(String clusterName) {
if (Strings.isNullOrEmpty(clusterName)) {
return null;
}
ImmutableMap<Long, Backend> idToBackend = idToBackendRef.get();
final List<Long> beIds = Lists.newArrayList();
for (Backend backend : idToBackend.values()) {
if (clusterName.equals(backend.getOwnerClusterName())) {
beIds.add(backend.getId());
}
}
return beIds;
}
/**
* get cluster's backend id list
*
* @param clusterName
* @return
*/
public List<Long> getClusterBackendIds(String clusterName, boolean needAlive) {
final Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef.get());
final List<Long> ret = new ArrayList<Long>();
if (Strings.isNullOrEmpty(clusterName)) {
return null;
}
if (needAlive) {
for (Backend backend : copiedBackends.values()) {
if (backend != null && clusterName.equals(backend.getOwnerClusterName())
&& backend.isAlive()) {
ret.add(backend.getId());
}
}
} else {
for (Backend backend : copiedBackends.values()) {
if (clusterName.equals(backend.getOwnerClusterName())) {
ret.add(backend.getId());
}
}
}
return ret;
}
/**
* return backend list in every host
*
* @return
*/
private Map<String, List<Backend>> getHostBackendsMap(boolean needAlive, boolean needFree,
boolean canBeDecommission) {
final Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef.get());
final Map<String, List<Backend>> classMap = Maps.newHashMap();
// to select backend where state is free
for (Backend backend : copiedBackends.values()) {
if ((needAlive && !backend.isAlive()) || (needFree && !backend.isFreeFromCluster())
|| (!canBeDecommission && backend.isDecommissioned())) {
continue;
}
if (classMap.containsKey(backend.getHost())) {
final List<Backend> list = classMap.get(backend.getHost());
list.add(backend);
classMap.put(backend.getHost(), list);
} else {
final List<Backend> list = new ArrayList<Backend>();
list.add(backend);
classMap.put(backend.getHost(), list);
}
}
return classMap;
}
// choose backends by round robin
// return null if not enough backend
// use synchronized to run serially
public synchronized List<Long> seqChooseBackendIds(int backendNum, boolean needAlive, boolean isCreate,
String clusterName) {
long lastBackendId = -1L;
if (clusterName.equals(DEFAULT_CLUSTER)) {
if (isCreate) {
lastBackendId = lastBackendIdForCreation;
} else {
lastBackendId = lastBackendIdForOther;
}
} else {
if (isCreate) {
if (lastBackendIdForCreationMap.containsKey(clusterName)) {
lastBackendId = lastBackendIdForCreationMap.get(clusterName);
} else {
lastBackendId = -1;
lastBackendIdForCreationMap.put(clusterName, lastBackendId);
}
} else {
if (lastBackendIdForOtherMap.containsKey(clusterName)) {
lastBackendId = lastBackendIdForOtherMap.get(clusterName);
} else {
lastBackendId = -1;
lastBackendIdForOtherMap.put(clusterName, lastBackendId);
}
}
}
// put backend with same host in same list
final List<Backend> srcBackends = getClusterBackends(clusterName);
// host -> BE list
Map<String, List<Backend>> backendMaps = Maps.newHashMap();
for (Backend backend : srcBackends) {
if (backendMaps.containsKey(backend.getHost())){
backendMaps.get(backend.getHost()).add(backend);
} else {
List<Backend> list = Lists.newArrayList();
list.add(backend);
backendMaps.put(backend.getHost(), list);
}
}
// if more than one backend exists in same host, select a backend at random
List<Backend> backends = Lists.newArrayList();
for (List<Backend> list : backendMaps.values()) {
Collections.shuffle(list);
backends.add(list.get(0));
}
Collections.shuffle(backends);
List<Long> backendIds = Lists.newArrayList();
// get last backend index
int lastBackendIndex = -1;
int index = -1;
for (Backend backend : backends) {
index++;
if (backend.getId() == lastBackendId) {
lastBackendIndex = index;
break;
}
}
Iterator<Backend> iterator = Iterators.cycle(backends);
index = -1;
boolean failed = false;
// 2 cycle at most
int maxIndex = 2 * backends.size();
while (iterator.hasNext() && backendIds.size() < backendNum) {
Backend backend = iterator.next();
index++;
if (index <= lastBackendIndex) {
continue;
}
if (index > maxIndex) {
failed = true;
break;
}
if (needAlive) {
if (!backend.isAlive() || backend.isDecommissioned()) {
continue;
}
}
long backendId = backend.getId();
if (!backendIds.contains(backendId)) {
backendIds.add(backendId);
lastBackendId = backendId;
} else {
failed = true;
break;
}
}
if (clusterName.equals(DEFAULT_CLUSTER)) {
if (isCreate) {
lastBackendIdForCreation = lastBackendId;
} else {
lastBackendIdForOther = lastBackendId;
}
} else {
// update last backendId
if (isCreate) {
lastBackendIdForCreationMap.put(clusterName, lastBackendId);
} else {
lastBackendIdForOtherMap.put(clusterName, lastBackendId);
}
}
if (backendIds.size() != backendNum) {
failed = true;
}
if (!failed) {
return backendIds;
}
// debug
for (Backend backend : backends) {
LOG.debug("random select: {}", backend.toString());
}
return null;
}
public ImmutableMap<Long, Backend> getIdToBackend() {
return idToBackendRef.get();
}
public ImmutableMap<Long, Backend> getBackendsInCluster(String cluster) {
if (Strings.isNullOrEmpty(cluster)) {
return idToBackendRef.get();
}
Map<Long, Backend> retMaps = Maps.newHashMap();
for (Backend backend : idToBackendRef.get().values().asList()) {
if (cluster.equals(backend.getOwnerClusterName())) {
retMaps.put(backend.getId(), backend);
}
}
return ImmutableMap.copyOf(retMaps);
}
public long getBackendReportVersion(long backendId) {
AtomicLong atomicLong = null;
if ((atomicLong = idToReportVersionRef.get().get(backendId)) == null) {
return -1L;
} else {
return atomicLong.get();
}
}
public void updateBackendReportVersion(long backendId, long newReportVersion, long dbId) {
AtomicLong atomicLong = null;
if ((atomicLong = idToReportVersionRef.get().get(backendId)) != null) {
Database db = Catalog.getInstance().getDb(dbId);
if (db != null) {
db.readLock();
try {
atomicLong.set(newReportVersion);
LOG.debug("update backend {} report version: {}, db: {}", backendId, newReportVersion, dbId);
return;
} finally {
db.readUnlock();
}
} else {
LOG.warn("failed to update backend report version, db {} does not exist", dbId);
}
} else {
LOG.warn("failed to update backend report version, backend {} does not exist", backendId);
}
}
public long saveBackends(DataOutputStream dos, long checksum) throws IOException {
ImmutableMap<Long, Backend> idToBackend = idToBackendRef.get();
int backendCount = idToBackend.size();
checksum ^= backendCount;
dos.writeInt(backendCount);
for (Map.Entry<Long, Backend> entry : idToBackend.entrySet()) {
long key = entry.getKey();
checksum ^= key;
dos.writeLong(key);
entry.getValue().write(dos);
}
return checksum;
}
public long loadBackends(DataInputStream dis, long checksum) throws IOException {
int count = dis.readInt();
checksum ^= count;
for (int i = 0; i < count; i++) {
long key = dis.readLong();
checksum ^= key;
Backend backend = Backend.read(dis);
replayAddBackend(backend);
}
return checksum;
}
public void clear() {
this.idToBackendRef = null;
this.idToReportVersionRef = null;
}
public static Pair<String, Integer> validateHostAndPort(String hostPort) throws AnalysisException {
hostPort = hostPort.replaceAll("\\s+", "");
if (hostPort.isEmpty()) {
throw new AnalysisException("Invalid host port: " + hostPort);
}
String[] pair = hostPort.split(":");
if (pair.length != 2) {
throw new AnalysisException("Invalid host port: " + hostPort);
}
String host = pair[0];
if (Strings.isNullOrEmpty(host)) {
throw new AnalysisException("Host is null");
}
int heartbeatPort = -1;
try {
// validate host
if (!InetAddressValidator.getInstance().isValid(host)) {
// maybe this is a hostname
// if no IP address for the host could be found, 'getByName'
// will throw
// UnknownHostException
InetAddress inetAddress = InetAddress.getByName(host);
host = inetAddress.getHostAddress();
}
// validate port
heartbeatPort = Integer.parseInt(pair[1]);
if (heartbeatPort <= 0 || heartbeatPort >= 65536) {
throw new AnalysisException("Port is out of range: " + heartbeatPort);
}
return new Pair<String, Integer>(host, heartbeatPort);
} catch (UnknownHostException e) {
throw new AnalysisException("Unknown host: " + e.getMessage());
} catch (Exception e) {
throw new AnalysisException("Encounter unknown exception: " + e.getMessage());
}
}
public void replayAddBackend(Backend newBackend) {
// update idToBackend
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_30) {
newBackend.setOwnerClusterName(DEFAULT_CLUSTER);
newBackend.setBackendState(BackendState.using);
}
Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef.get());
copiedBackends.put(newBackend.getId(), newBackend);
ImmutableMap<Long, Backend> newIdToBackend = ImmutableMap.copyOf(copiedBackends);
idToBackendRef.set(newIdToBackend);
// set new backend's report version as 0L
Map<Long, AtomicLong> copiedReportVerions = Maps.newHashMap(idToReportVersionRef.get());
copiedReportVerions.put(newBackend.getId(), new AtomicLong(0L));
ImmutableMap<Long, AtomicLong> newIdToReportVersion = ImmutableMap.copyOf(copiedReportVerions);
idToReportVersionRef.set(newIdToReportVersion);
// to add be to DEFAULT_CLUSTER
if (newBackend.getBackendState() == BackendState.using) {
final Cluster cluster = Catalog.getInstance().getCluster(DEFAULT_CLUSTER);
if (null != cluster) {
// replay log
cluster.addBackend(newBackend.getId());
} else {
// This happens in loading image when fe is restarted, because loadCluster is after loadBackend,
// cluster is not created. Be in cluster will be updated in loadCluster.
}
}
}
public void replayDropBackend(Backend backend) {
LOG.debug("replayDropBackend: {}", backend);
// update idToBackend
Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef.get());
copiedBackends.remove(backend.getId());
ImmutableMap<Long, Backend> newIdToBackend = ImmutableMap.copyOf(copiedBackends);
idToBackendRef.set(newIdToBackend);
// update idToReportVersion
Map<Long, AtomicLong> copiedReportVerions = Maps.newHashMap(idToReportVersionRef.get());
copiedReportVerions.remove(backend.getId());
ImmutableMap<Long, AtomicLong> newIdToReportVersion = ImmutableMap.copyOf(copiedReportVerions);
idToReportVersionRef.set(newIdToReportVersion);
// update cluster
final Cluster cluster = Catalog.getInstance().getCluster(backend.getOwnerClusterName());
if (null != cluster) {
cluster.removeBackend(backend.getId());
} else {
LOG.error("Cluster " + backend.getOwnerClusterName() + " no exist.");
}
}
public void updateBackendState(Backend be) {
long id = be.getId();
Backend memoryBe = getBackend(id);
if (memoryBe == null) {
// backend may already be dropped. this may happen when
// 1. SystemHandler drop the decommission backend
// 2. at same time, user try to cancel the decommission of that backend.
// These two operations do not guarantee the order.
return;
}
memoryBe.setBePort(be.getBePort());
memoryBe.setAlive(be.isAlive());
memoryBe.setDecommissioned(be.isDecommissioned());
memoryBe.setHttpPort(be.getHttpPort());
memoryBe.setBeRpcPort(be.getBeRpcPort());
memoryBe.setBrpcPort(be.getBrpcPort());
memoryBe.setLastUpdateMs(be.getLastUpdateMs());
memoryBe.setLastStartTime(be.getLastStartTime());
memoryBe.setDisks(be.getDisks());
memoryBe.setBackendState(be.getBackendState());
memoryBe.setOwnerClusterName(be.getOwnerClusterName());
memoryBe.setDecommissionType(be.getDecommissionType());
}
private long getClusterAvailableCapacityB(String clusterName) {
List<Backend> clusterBackends = getClusterBackends(clusterName);
long capacity = 0L;
for (Backend backend : clusterBackends) {
// Here we do not check if backend is alive,
// We suppose the dead backends will back to alive later.
if (backend.isDecommissioned()) {
// Data on decommissioned backend will move to other backends,
// So we need to minus size of those data.
capacity -= backend.getTotalCapacityB() - backend.getAvailableCapacityB();
} else {
capacity += backend.getAvailableCapacityB();
}
}
return capacity;
}
public void checkClusterCapacity(String clusterName) throws DdlException {
if (getClusterAvailableCapacityB(clusterName) <= 0L) {
throw new DdlException("Cluster " + clusterName + " has no available capacity");
}
}
/*
* Try to randomly get a backend id by given host.
* If not found, return -1
*/
public long getBackendIdByHost(String host) {
ImmutableMap<Long, Backend> idToBackend = idToBackendRef.get();
List<Backend> selectedBackends = Lists.newArrayList();
for (Backend backend : idToBackend.values()) {
if (backend.getHost().equals(host)) {
selectedBackends.add(backend);
}
}
if (selectedBackends.isEmpty()) {
return -1L;
}
Collections.shuffle(selectedBackends);
return selectedBackends.get(0).getId();
}
public Set<String> getClusterNames() {
ImmutableMap<Long, Backend> idToBackend = idToBackendRef.get();
Set<String> clusterNames = Sets.newHashSet();
for (Backend backend : idToBackend.values()) {
if (!Strings.isNullOrEmpty(backend.getOwnerClusterName())) {
clusterNames.add(backend.getOwnerClusterName());
}
}
return clusterNames;
}
/*
* Check if the specified disks' capacity has reached the limit.
* bePathsMap is (BE id -> list of path hash)
* If floodStage is true, it will check with the floodStage threshold.
*
* return Status.OK if not reach the limit
*/
public Status checkExceedDiskCapacityLimit(Multimap<Long, Long> bePathsMap, boolean floodStage) {
LOG.debug("pathBeMap: {}", bePathsMap);
ImmutableMap<Long, DiskInfo> pathHashToDiskInfo = pathHashToDishInfoRef.get();
for (Long beId : bePathsMap.keySet()) {
for (Long pathHash : bePathsMap.get(beId)) {
DiskInfo diskInfo = pathHashToDiskInfo.get(pathHash);
if (diskInfo != null && diskInfo.exceedLimit(floodStage)) {
return new Status(TStatusCode.CANCELLED,
"disk " + pathHash + " on backend " + beId + " exceed limit usage");
}
}
}
return Status.OK;
}
// update the path info when disk report
// there is only one thread can update path info, so no need to worry about concurrency control
public void updatePathInfo(List<DiskInfo> addedDisks, List<DiskInfo> removedDisks) {
Map<Long, DiskInfo> copiedPathInfos = Maps.newHashMap(pathHashToDishInfoRef.get());
for (DiskInfo diskInfo : addedDisks) {
copiedPathInfos.put(diskInfo.getPathHash(), diskInfo);
}
for (DiskInfo diskInfo : removedDisks) {
copiedPathInfos.remove(diskInfo.getPathHash());
}
ImmutableMap<Long, DiskInfo> newPathInfos = ImmutableMap.copyOf(copiedPathInfos);
pathHashToDishInfoRef.set(newPathInfos);
LOG.debug("update path infos: {}", newPathInfos);
}
}