| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.doris.system; |
| |
| import org.apache.doris.alter.DecommissionBackendJob.DecommissionType; |
| import org.apache.doris.catalog.Catalog; |
| import org.apache.doris.catalog.DiskInfo; |
| import org.apache.doris.catalog.DiskInfo.DiskState; |
| import org.apache.doris.common.FeMetaVersion; |
| import org.apache.doris.common.io.Text; |
| import org.apache.doris.common.io.Writable; |
| import org.apache.doris.system.HeartbeatResponse.HbStatus; |
| import org.apache.doris.thrift.TDisk; |
| |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| |
| import org.apache.logging.log4j.LogManager; |
| import org.apache.logging.log4j.Logger; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| /** |
| * This class extends the primary identifier of a Backend with ephemeral state, |
| * eg usage information, current administrative state etc. |
| */ |
| public class Backend implements Writable { |
| |
| public enum BackendState { |
| using, /* backend is belong to a cluster*/ |
| offline, |
| free /* backend is not belong to any clusters */ |
| } |
| |
| private static final Logger LOG = LogManager.getLogger(Backend.class); |
| |
| private long id; |
| private String host; |
| private String version; |
| |
| private int heartbeatPort; // heartbeat |
| private AtomicInteger bePort; // be |
| private AtomicInteger httpPort; // web service |
| private AtomicInteger beRpcPort; // be rpc port |
| private AtomicInteger brpcPort = new AtomicInteger(-1); |
| |
| private AtomicLong lastUpdateMs; |
| private AtomicLong lastStartTime; |
| private AtomicBoolean isAlive; |
| |
| private AtomicBoolean isDecommissioned; |
| private AtomicInteger decommissionType; |
| private AtomicReference<String> ownerClusterName; |
| // to index the state in some cluster |
| private AtomicInteger backendState; |
| // private BackendState backendState; |
| |
| // rootPath -> DiskInfo |
| private AtomicReference<ImmutableMap<String, DiskInfo>> disksRef; |
| |
| private String heartbeatErrMsg = ""; |
| |
| // This is used for the first time we init pathHashToDishInfo in SystemInfoService. |
| // after init it, this variable is set to true. |
| private boolean initPathInfo = false; |
| |
| private long lastMissingHeartbeatTime = -1; |
| // the max tablet compaction score of this backend. |
| // this field is set by tablet report, and just for metric monitor, no need to persist. |
| private AtomicLong tabletMaxCompactionScore = new AtomicLong(0); |
| |
| public Backend() { |
| this.host = ""; |
| this.version = ""; |
| this.lastUpdateMs = new AtomicLong(); |
| this.lastStartTime = new AtomicLong(); |
| this.isAlive = new AtomicBoolean(); |
| this.isDecommissioned = new AtomicBoolean(false); |
| |
| this.bePort = new AtomicInteger(); |
| this.httpPort = new AtomicInteger(); |
| this.beRpcPort = new AtomicInteger(); |
| this.disksRef = new AtomicReference<ImmutableMap<String, DiskInfo>>(ImmutableMap.<String, DiskInfo> of()); |
| |
| this.ownerClusterName = new AtomicReference<String>(""); |
| this.backendState = new AtomicInteger(BackendState.free.ordinal()); |
| |
| this.decommissionType = new AtomicInteger(DecommissionType.SystemDecommission.ordinal()); |
| } |
| |
| public Backend(long id, String host, int heartbeatPort) { |
| this.id = id; |
| this.host = host; |
| this.version = ""; |
| this.heartbeatPort = heartbeatPort; |
| this.bePort = new AtomicInteger(-1); |
| this.httpPort = new AtomicInteger(-1); |
| this.beRpcPort = new AtomicInteger(-1); |
| this.lastUpdateMs = new AtomicLong(-1L); |
| this.lastStartTime = new AtomicLong(-1L); |
| this.disksRef = new AtomicReference<ImmutableMap<String, DiskInfo>>(ImmutableMap.<String, DiskInfo> of()); |
| |
| this.isAlive = new AtomicBoolean(false); |
| this.isDecommissioned = new AtomicBoolean(false); |
| |
| this.ownerClusterName = new AtomicReference<String>(""); |
| this.backendState = new AtomicInteger(BackendState.free.ordinal()); |
| this.decommissionType = new AtomicInteger(DecommissionType.SystemDecommission.ordinal()); |
| } |
| |
| public long getId() { |
| return id; |
| } |
| |
| public String getHost() { |
| return host; |
| } |
| |
| public String getVersion() { |
| return version; |
| } |
| |
| public int getBePort() { |
| return bePort.get(); |
| } |
| |
| public int getHeartbeatPort() { |
| return heartbeatPort; |
| } |
| |
| public int getHttpPort() { |
| return httpPort.get(); |
| } |
| |
| public int getBeRpcPort() { |
| return beRpcPort.get(); |
| } |
| |
| public int getBrpcPort() { |
| return brpcPort.get(); |
| } |
| |
| public String getHeartbeatErrMsg() { |
| return heartbeatErrMsg; |
| } |
| |
| // for test only |
| public void updateOnce(int bePort, int httpPort, int beRpcPort) { |
| if (this.bePort.get() != bePort) { |
| this.bePort.set(bePort); |
| } |
| |
| if (this.httpPort.get() != httpPort) { |
| this.httpPort.set(httpPort); |
| } |
| |
| if (this.beRpcPort.get() != beRpcPort) { |
| this.beRpcPort.set(beRpcPort); |
| } |
| |
| long currentTime = System.currentTimeMillis(); |
| this.lastUpdateMs.set(currentTime); |
| if (!isAlive.get()) { |
| this.lastStartTime.set(currentTime); |
| LOG.info("{} is alive,", this.toString()); |
| this.isAlive.set(true); |
| } |
| |
| heartbeatErrMsg = ""; |
| } |
| |
| public boolean setDecommissioned(boolean isDecommissioned) { |
| if (this.isDecommissioned.compareAndSet(!isDecommissioned, isDecommissioned)) { |
| LOG.warn("{} set decommission: {}", this.toString(), isDecommissioned); |
| return true; |
| } |
| return false; |
| } |
| |
| public void setBackendState(BackendState state) { |
| this.backendState.set(state.ordinal()); |
| } |
| |
| public void setAlive(boolean isAlive) { |
| this.isAlive.set(isAlive); |
| } |
| |
| public void setBePort(int agentPort) { |
| this.bePort.set(agentPort); |
| } |
| |
| public void setHttpPort(int httpPort) { |
| this.httpPort.set(httpPort); |
| } |
| |
| public void setBeRpcPort(int beRpcPort) { |
| this.beRpcPort.set(beRpcPort); |
| } |
| |
| public void setBrpcPort(int brpcPort) { |
| this.brpcPort.set(brpcPort); |
| } |
| |
| public long getLastUpdateMs() { |
| return this.lastUpdateMs.get(); |
| } |
| |
| public void setLastUpdateMs(long currentTime) { |
| this.lastUpdateMs.set(currentTime); |
| } |
| |
| public long getLastStartTime() { |
| return this.lastStartTime.get(); |
| } |
| |
| public void setLastStartTime(long currentTime) { |
| this.lastStartTime.set(currentTime); |
| } |
| |
| public long getLastMissingHeartbeatTime() { |
| return lastMissingHeartbeatTime; |
| } |
| |
| public boolean isAlive() { |
| return this.isAlive.get(); |
| } |
| |
| public boolean isDecommissioned() { |
| return this.isDecommissioned.get(); |
| } |
| |
| public boolean isAvailable() { |
| return this.isAlive.get() && !this.isDecommissioned.get(); |
| } |
| |
| public void setDisks(ImmutableMap<String, DiskInfo> disks) { |
| this.disksRef.set(disks); |
| } |
| |
| /** |
| * backend belong to some cluster |
| * |
| * @return |
| */ |
| public boolean isUsedByCluster() { |
| return this.backendState.get() == BackendState.using.ordinal(); |
| } |
| |
| /** |
| * backend is free, and it isn't belong to any cluster |
| * |
| * @return |
| */ |
| public boolean isFreeFromCluster() { |
| return this.backendState.get() == BackendState.free.ordinal(); |
| } |
| |
| /** |
| * backend execute discommission in cluster , and backendState will be free |
| * finally |
| * |
| * @return |
| */ |
| public boolean isOffLineFromCluster() { |
| return this.backendState.get() == BackendState.offline.ordinal(); |
| } |
| |
| public ImmutableMap<String, DiskInfo> getDisks() { |
| return this.disksRef.get(); |
| } |
| |
| public boolean hasPathHash() { |
| return disksRef.get().values().stream().allMatch(v -> v.hasPathHash()); |
| } |
| |
| public long getTotalCapacityB() { |
| ImmutableMap<String, DiskInfo> disks = disksRef.get(); |
| long totalCapacityB = 0L; |
| for (DiskInfo diskInfo : disks.values()) { |
| if (diskInfo.getState() == DiskState.ONLINE) { |
| totalCapacityB += diskInfo.getTotalCapacityB(); |
| } |
| } |
| return totalCapacityB; |
| } |
| |
| public long getAvailableCapacityB() { |
| // when cluster init, disks is empty, return 1L. |
| ImmutableMap<String, DiskInfo> disks = disksRef.get(); |
| long availableCapacityB = 1L; |
| for (DiskInfo diskInfo : disks.values()) { |
| if (diskInfo.getState() == DiskState.ONLINE) { |
| availableCapacityB += diskInfo.getAvailableCapacityB(); |
| } |
| } |
| return availableCapacityB; |
| } |
| |
| public long getDataUsedCapacityB() { |
| ImmutableMap<String, DiskInfo> disks = disksRef.get(); |
| long dataUsedCapacityB = 0L; |
| for (DiskInfo diskInfo : disks.values()) { |
| if (diskInfo.getState() == DiskState.ONLINE) { |
| dataUsedCapacityB += diskInfo.getDataUsedCapacityB(); |
| } |
| } |
| return dataUsedCapacityB; |
| } |
| |
| public double getMaxDiskUsedPct() { |
| ImmutableMap<String, DiskInfo> disks = disksRef.get(); |
| double maxPct = 0.0; |
| for (DiskInfo diskInfo : disks.values()) { |
| if (diskInfo.getState() == DiskState.ONLINE) { |
| double percent = diskInfo.getUsedPct(); |
| if (percent > maxPct) { |
| maxPct = percent; |
| } |
| } |
| } |
| return maxPct; |
| } |
| |
| public String getPathByPathHash(long pathHash) { |
| for (DiskInfo diskInfo : disksRef.get().values()) { |
| if (diskInfo.getPathHash() == pathHash) { |
| return diskInfo.getRootPath(); |
| } |
| } |
| return null; |
| } |
| |
| public void updateDisks(Map<String, TDisk> backendDisks) { |
| ImmutableMap<String, DiskInfo> disks = disksRef.get(); |
| // The very first time to init the path info |
| if (!initPathInfo) { |
| boolean allPathHashUpdated = true; |
| for (DiskInfo diskInfo : disks.values()) { |
| if (diskInfo.getPathHash() == 0) { |
| allPathHashUpdated = false; |
| break; |
| } |
| } |
| if (allPathHashUpdated) { |
| initPathInfo = true; |
| Catalog.getCurrentSystemInfo().updatePathInfo(new ArrayList<>(disks.values()), Lists.newArrayList()); |
| } |
| } |
| |
| // update status or add new diskInfo |
| Map<String, DiskInfo> newDiskInfos = Maps.newHashMap(); |
| List<DiskInfo> addedDisks = Lists.newArrayList(); |
| List<DiskInfo> removedDisks = Lists.newArrayList(); |
| /* |
| * set isChanged to true only if new disk is added or old disk is dropped. |
| * we ignore the change of capacity, because capacity info is only used in master FE. |
| */ |
| boolean isChanged = false; |
| for (TDisk tDisk : backendDisks.values()) { |
| String rootPath = tDisk.getRoot_path(); |
| long totalCapacityB = tDisk.getDisk_total_capacity(); |
| long dataUsedCapacityB = tDisk.getData_used_capacity(); |
| long diskAvailableCapacityB = tDisk.getDisk_available_capacity(); |
| boolean isUsed = tDisk.isUsed(); |
| |
| DiskInfo diskInfo = disks.get(rootPath); |
| if (diskInfo == null) { |
| diskInfo = new DiskInfo(rootPath); |
| addedDisks.add(diskInfo); |
| isChanged = true; |
| LOG.info("add new disk info. backendId: {}, rootPath: {}", id, rootPath); |
| } |
| newDiskInfos.put(rootPath, diskInfo); |
| |
| diskInfo.setTotalCapacityB(totalCapacityB); |
| diskInfo.setDataUsedCapacityB(dataUsedCapacityB); |
| diskInfo.setAvailableCapacityB(diskAvailableCapacityB); |
| if (tDisk.isSetPath_hash()) { |
| diskInfo.setPathHash(tDisk.getPath_hash()); |
| } |
| |
| if (tDisk.isSetStorage_medium()) { |
| diskInfo.setStorageMedium(tDisk.getStorage_medium()); |
| } |
| |
| if (isUsed) { |
| if (diskInfo.setState(DiskState.ONLINE)) { |
| isChanged = true; |
| } |
| } else { |
| if (diskInfo.setState(DiskState.OFFLINE)) { |
| isChanged = true; |
| } |
| } |
| LOG.debug("update disk info. backendId: {}, diskInfo: {}", id, diskInfo.toString()); |
| } |
| |
| // remove not exist rootPath in backend |
| for (DiskInfo diskInfo : disks.values()) { |
| String rootPath = diskInfo.getRootPath(); |
| if (!backendDisks.containsKey(rootPath)) { |
| removedDisks.add(diskInfo); |
| isChanged = true; |
| LOG.warn("remove not exist rootPath. backendId: {}, rootPath: {}", id, rootPath); |
| } |
| } |
| |
| if (isChanged) { |
| // update disksRef |
| disksRef.set(ImmutableMap.copyOf(newDiskInfos)); |
| Catalog.getCurrentSystemInfo().updatePathInfo(addedDisks, removedDisks); |
| // log disk changing |
| Catalog.getInstance().getEditLog().logBackendStateChange(this); |
| } |
| } |
| |
| public static Backend read(DataInput in) throws IOException { |
| Backend backend = new Backend(); |
| backend.readFields(in); |
| return backend; |
| } |
| |
| @Override |
| public void write(DataOutput out) throws IOException { |
| out.writeLong(id); |
| Text.writeString(out, host); |
| out.writeInt(heartbeatPort); |
| out.writeInt(bePort.get()); |
| out.writeInt(httpPort.get()); |
| out.writeInt(beRpcPort.get()); |
| out.writeBoolean(isAlive.get()); |
| out.writeBoolean(isDecommissioned.get()); |
| out.writeLong(lastUpdateMs.get()); |
| |
| out.writeLong(lastStartTime.get()); |
| |
| ImmutableMap<String, DiskInfo> disks = disksRef.get(); |
| out.writeInt(disks.size()); |
| for (Map.Entry<String, DiskInfo> entry : disks.entrySet()) { |
| Text.writeString(out, entry.getKey()); |
| entry.getValue().write(out); |
| } |
| |
| Text.writeString(out, ownerClusterName.get()); |
| out.writeInt(backendState.get()); |
| out.writeInt(decommissionType.get()); |
| |
| out.writeInt(brpcPort.get()); |
| } |
| |
| public void readFields(DataInput in) throws IOException { |
| id = in.readLong(); |
| host = Text.readString(in); |
| heartbeatPort = in.readInt(); |
| bePort.set(in.readInt()); |
| httpPort.set(in.readInt()); |
| if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_31) { |
| beRpcPort.set(in.readInt()); |
| } |
| isAlive.set(in.readBoolean()); |
| |
| if (Catalog.getCurrentCatalogJournalVersion() >= 5) { |
| isDecommissioned.set(in.readBoolean()); |
| } |
| |
| lastUpdateMs.set(in.readLong()); |
| |
| if (Catalog.getCurrentCatalogJournalVersion() >= 2) { |
| lastStartTime.set(in.readLong()); |
| |
| Map<String, DiskInfo> disks = Maps.newHashMap(); |
| int size = in.readInt(); |
| for (int i = 0; i < size; i++) { |
| String rootPath = Text.readString(in); |
| DiskInfo diskInfo = DiskInfo.read(in); |
| disks.put(rootPath, diskInfo); |
| } |
| |
| disksRef.set(ImmutableMap.copyOf(disks)); |
| } |
| if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_30) { |
| ownerClusterName.set(Text.readString(in)); |
| backendState.set(in.readInt()); |
| decommissionType.set(in.readInt()); |
| } else { |
| ownerClusterName.set(SystemInfoService.DEFAULT_CLUSTER); |
| backendState.set(BackendState.using.ordinal()); |
| decommissionType.set(DecommissionType.SystemDecommission.ordinal()); |
| } |
| |
| if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_40) { |
| brpcPort.set(in.readInt()); |
| } |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (this == obj) { |
| return true; |
| } |
| if (!(obj instanceof Backend)) { |
| return false; |
| } |
| |
| Backend backend = (Backend) obj; |
| |
| return (id == backend.id) && (host.equals(backend.host)) && (heartbeatPort == backend.heartbeatPort) |
| && (bePort.get() == backend.bePort.get()) && (isAlive.get() == backend.isAlive.get()); |
| } |
| |
| @Override |
| public String toString() { |
| return "Backend [id=" + id + ", host=" + host + ", heartbeatPort=" + heartbeatPort + ", alive=" + isAlive.get() |
| + "]"; |
| } |
| |
| public String getOwnerClusterName() { |
| return ownerClusterName.get(); |
| } |
| |
| public void setOwnerClusterName(String name) { |
| ownerClusterName.set(name); |
| } |
| |
| public void clearClusterName() { |
| ownerClusterName.set(""); |
| } |
| |
| public BackendState getBackendState() { |
| switch (backendState.get()) { |
| case 0: |
| return BackendState.using; |
| case 1: |
| return BackendState.offline; |
| default: |
| return BackendState.free; |
| } |
| } |
| |
| public void setDecommissionType(DecommissionType type) { |
| decommissionType.set(type.ordinal()); |
| } |
| |
| public DecommissionType getDecommissionType() { |
| if (decommissionType.get() == DecommissionType.ClusterDecommission.ordinal()) { |
| return DecommissionType.ClusterDecommission; |
| } |
| return DecommissionType.SystemDecommission; |
| } |
| |
| /* |
| * handle Backend's heartbeat response. |
| * return true if any port changed, or alive state is changed. |
| */ |
| public boolean handleHbResponse(BackendHbResponse hbResponse) { |
| boolean isChanged = false; |
| if (hbResponse.getStatus() == HbStatus.OK) { |
| if (!this.version.equals(hbResponse.getVersion())) { |
| isChanged = true; |
| this.version = hbResponse.getVersion(); |
| } |
| |
| if (this.bePort.get() != hbResponse.getBePort()) { |
| isChanged = true; |
| this.bePort.set(hbResponse.getBePort()); |
| } |
| |
| if (this.httpPort.get() != hbResponse.getHttpPort()) { |
| isChanged = true; |
| this.httpPort.set(hbResponse.getHttpPort()); |
| } |
| |
| if (this.brpcPort.get() != hbResponse.getBrpcPort()) { |
| isChanged = true; |
| this.brpcPort.set(hbResponse.getBrpcPort()); |
| } |
| |
| this.lastUpdateMs.set(hbResponse.getHbTime()); |
| if (!isAlive.get()) { |
| isChanged = true; |
| this.lastStartTime.set(hbResponse.getHbTime()); |
| LOG.info("{} is alive, last start time: {}", this.toString(), hbResponse.getHbTime()); |
| this.isAlive.set(true); |
| } else if (this.lastStartTime.get() <= 0) { |
| this.lastStartTime.set(hbResponse.getHbTime()); |
| } |
| |
| heartbeatErrMsg = ""; |
| } else { |
| if (isAlive.compareAndSet(true, false)) { |
| isChanged = true; |
| LOG.info("{} is dead,", this.toString()); |
| } |
| |
| heartbeatErrMsg = hbResponse.getMsg() == null ? "Unknown error" : hbResponse.getMsg(); |
| lastMissingHeartbeatTime = System.currentTimeMillis(); |
| } |
| |
| return isChanged; |
| } |
| |
| public void setTabletMaxCompactionScore(long compactionScore) { |
| tabletMaxCompactionScore.set(compactionScore); |
| } |
| |
| public long getTabletMaxCompactionScore() { |
| return tabletMaxCompactionScore.get(); |
| } |
| } |
| |