blob: 34ed7a53fcfe78a10d5e0bd88b09afe1a9ed2e18 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type.CORE_IDX;
import static org.apache.solr.common.params.CoreAdminParams.NODE;
/**
* Each instance represents a node in the cluster
*/
public class Row implements MapWriter {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final Random random = new Random(0);
public final String node;
final Cell[] cells;
//this holds the details of each replica in the node
private Map<String, Map<String, List<ReplicaInfo>>> collectionVsShardVsReplicas;
boolean anyValueMissing = false;
boolean isLive = true;
Policy.Session session;
Object[] globalCache;
Map<String, Map> perCollCache;
Map<String, Map<String, ReplicaCount>> replicaCounts;
public enum GlobalCacheEntryKey {
TOTALCORES,
FREEDISK
}
public Row(String node, List<Pair<String, Variable.Type>> params, List<String> perReplicaAttributes, Policy.Session session) {
this.session = session;
collectionVsShardVsReplicas = session.nodeStateProvider.getReplicaInfo(node, perReplicaAttributes);
if (collectionVsShardVsReplicas == null) collectionVsShardVsReplicas = new HashMap<>();
this.node = node;
cells = new Cell[params.size()];
isLive = session.cloudManager.getClusterStateProvider().getLiveNodes().contains(node);
List<String> paramNames = params.stream().map(Pair::first).collect(Collectors.toList());
Map<String, Object> vals = isLive ? session.nodeStateProvider.getNodeValues(node, paramNames) : Collections.emptyMap();
for (int i = 0; i < params.size(); i++) {
Pair<String, Variable.Type> pair = params.get(i);
cells[i] = new Cell(i, pair.first(), Clause.validate(pair.first(), vals.get(pair.first()), false), null, pair.second(), this);
if (NODE.equals(pair.first())) cells[i].val = node;
if (cells[i].val == null) anyValueMissing = true;
}
this.globalCache = new Object[GlobalCacheEntryKey.values().length];
this.perCollCache = new HashMap<>();
_initReplicaCounts();
isAlreadyCopied = true;
initPerClauseData();
}
private void _initReplicaCounts() {
replicaCounts = new HashMap<>();
for (Map.Entry<String, Map<String, List<ReplicaInfo>>> perCollEntry : collectionVsShardVsReplicas.entrySet()) {
Map<String, ReplicaCount> perShard = new HashMap<>();
replicaCounts.put(perCollEntry.getKey(), perShard);
ReplicaCount anyShard = new ReplicaCount();
perShard.put(Policy.ANY, anyShard);
for (Map.Entry<String, List<ReplicaInfo>> perShardEntry : perCollEntry.getValue().entrySet()) {
ReplicaCount rc = new ReplicaCount();
perShard.put(perShardEntry.getKey(), rc);
for (int i = 0; i < perShardEntry.getValue().size(); i++) {
ReplicaInfo info = perShardEntry.getValue().get(i);
rc.increment(info);
anyShard.increment(info);
}
}
}
}
private void _addReplicaCount(String collection, String shard, ReplicaInfo ri) {
Map<String, ReplicaCount> perShard = replicaCounts.get(collection);
if (perShard == null) {
perShard = new HashMap<>();
replicaCounts.put(collection, perShard);
}
ReplicaCount rc = perShard.get(shard);
if (rc == null) {
rc = new ReplicaCount();
perShard.put(shard, rc);
}
ReplicaCount anyShard = perShard.get(Policy.ANY);
if (anyShard == null) {
anyShard = new ReplicaCount();
perShard.put(Policy.ANY, anyShard);
}
rc.increment(ri);
anyShard.increment(ri);
}
private void _removeReplicaCount(String collection, String shard, ReplicaInfo ri) {
Map<String, ReplicaCount> perShard = replicaCounts.get(collection);
if (perShard == null) {
throw new RuntimeException("missing ReplicaCount for " + collection + "/" + shard + ": " + ri);
}
ReplicaCount rc = perShard.get(shard);
if (rc == null) {
throw new RuntimeException("missing ReplicaCount for " + collection + "/" + shard + ": " + ri);
}
ReplicaCount anyShard = perShard.get(Policy.ANY);
if (anyShard == null) {
throw new RuntimeException("missing ANY ReplicaCount for " + collection + "/" + shard + ": " + ri);
}
rc.decrement(ri);
anyShard.decrement(ri);
}
public ReplicaCount getReplicaCount(String collection, String shard) {
return replicaCounts.getOrDefault(collection, Collections.emptyMap()).get(shard);
}
public Map<String, Map<String, List<ReplicaInfo>>> getCollectionVsShardVsReplicas() {
return Collections.unmodifiableMap(collectionVsShardVsReplicas);
}
public static final Map<String, CacheEntry> cacheStats = new HashMap<>();
static class CacheEntry implements MapWriter {
AtomicLong hits = new AtomicLong(), misses = new AtomicLong();
@Override
public void writeMap(EntryWriter ew) throws IOException {
ew.put("hits", hits.get());
ew.put("misses", misses.get());
}
public static boolean hit(String cacheName) {
getCacheEntry(cacheName).hits.incrementAndGet();
return true;
}
private static CacheEntry getCacheEntry(String cacheName) {
CacheEntry cacheEntry = cacheStats.get(cacheName);
if (cacheEntry == null) {
cacheStats.put(cacheName, cacheEntry = new CacheEntry());
}
return cacheEntry;
}
public static boolean miss(String cacheName) {
getCacheEntry(cacheName).misses.incrementAndGet();
return true;
}
}
public void forEachShard(String collection, BiConsumer<String, List<ReplicaInfo>> consumer) {
collectionVsShardVsReplicas
.getOrDefault(collection, Collections.emptyMap())
.forEach(consumer);
}
public <R> R computeCacheIfAbsent(GlobalCacheEntryKey key, Function<Object, R> supplier) {
R result = (R) globalCache[key.ordinal()];
if (result != null) {
//assert CacheEntry.hit(key.toString());
return result;
} else {
//assert CacheEntry.miss(key.toString());
globalCache[key.ordinal()] = result = supplier.apply(key);
return result;
}
}
public <R> R computeCacheIfAbsent(String coll, String shard, String cacheName, Object key, Function<Object, R> supplier) {
Map collMap = (Map) this.perCollCache.get(coll);
if (collMap == null) this.perCollCache.put(coll, collMap = new HashMap());
Map shardMap = (Map) collMap.get(shard);
if (shardMap == null) collMap.put(shard, shardMap = new HashMap());
Map cacheNameMap = (Map) shardMap.get(cacheName);
if (cacheNameMap == null) shardMap.put(cacheName, cacheNameMap = new HashMap());
R result = (R) cacheNameMap.get(key);
if (result == null) {
//CacheEntry.miss(cacheName);
cacheNameMap.put(key, result = supplier.apply(key));
return result;
} else {
//CacheEntry.hit(cacheName);
return result;
}
}
// copy constructor
Row(String node, Cell[] cells, boolean anyValueMissing, Map<String,
Map<String, List<ReplicaInfo>>> collectionVsShardVsReplicas, boolean isLive, Policy.Session session,
Map<String, Map> perRowCache, Object[] globalCache, Map<String, Map<String, ReplicaCount>> replicaCounts) {
this.session = session;
this.node = node;
this.isLive = isLive;
this.cells = new Cell[cells.length];
for (int i = 0; i < this.cells.length; i++) {
this.cells[i] = cells[i].copy();
this.cells[i].row = this;
}
this.anyValueMissing = anyValueMissing;
this.collectionVsShardVsReplicas = collectionVsShardVsReplicas;
this.perCollCache = perRowCache != null ? perRowCache : new HashMap<>();
this.globalCache = globalCache != null ? globalCache : new Object[GlobalCacheEntryKey.values().length];
if (replicaCounts != null) { // deep copy
this.replicaCounts = new HashMap<>();
for (Map.Entry<String, Map<String, ReplicaCount>> perCollEntry : replicaCounts.entrySet()) {
Map<String, ReplicaCount> perShard = new HashMap<>();
this.replicaCounts.put(perCollEntry.getKey(), perShard);
for (Map.Entry<String, ReplicaCount> perShardEntry : perCollEntry.getValue().entrySet()) {
perShard.put(perShardEntry.getKey(), perShardEntry.getValue().copy());
}
}
} else {
_initReplicaCounts();
}
}
@Override
public void writeMap(EntryWriter ew) throws IOException {
ew.put(NODE, node);
ew.put("replicas", collectionVsShardVsReplicas);
ew.put("isLive", isLive);
ew.put("attributes", Arrays.asList(cells));
}
Row copy(Policy.Session session) {
return new Row(node, cells, anyValueMissing, collectionVsShardVsReplicas, isLive, session,
this.perCollCache, this.globalCache, this.replicaCounts);
}
Object getVal(String name) {
if (NODE.equals(name)) return this.node;
for (Cell cell : cells) if (cell.name.equals(name)) return cell.val;
return null;
}
public Object getVal(String name, Object def) {
for (Cell cell : cells)
if (cell.name.equals(name)) {
return cell.val == null ? def : cell.val;
}
return def;
}
@Override
public String toString() {
return jsonStr();
}
public Row addReplica(String coll, String shard, Replica.Type type) {
return addReplica(coll, shard, type, 0, true);
}
public Row addReplica(String coll, String shard, Replica.Type type, boolean strictMode) {
return addReplica(coll, shard, type, 0, strictMode);
}
/**
* this simulates adding a replica of a certain coll+shard to node. as a result of adding a replica ,
* values of certain attributes will be modified, in this node as well as other nodes. Please note that
* the state of the current session is kept intact while this operation is being performed
*
* @param coll collection name
* @param shard shard name
* @param type replica type
* @param recursionCount the number of times we have recursed to add more replicas
* @param strictMode whether suggester is operating in strict mode or not
*/
Row addReplica(String coll, String shard, Replica.Type type, int recursionCount, boolean strictMode) {
if (recursionCount > 3) {
log.error("more than 3 levels of recursion ", new RuntimeException());
return this;
}
lazyCopyReplicas(coll, shard);
List<OperationInfo> furtherOps = new ArrayList<>(3);
Consumer<OperationInfo> opCollector = it -> furtherOps.add(it);
Row row = session.copy().getNode(this.node);
if (row == null) throw new RuntimeException("couldn't get a row");
row.lazyCopyReplicas(coll, shard);
Map<String, List<ReplicaInfo>> c = row.collectionVsShardVsReplicas.computeIfAbsent(coll, Utils.NEW_HASHMAP_FUN);
List<ReplicaInfo> replicas = c.computeIfAbsent(shard, Utils.NEW_ARRAYLIST_FUN);
String replicaname = "SYNTHETIC." + (random.nextInt(1000) + 1000);
ReplicaInfo ri = new ReplicaInfo(replicaname, replicaname, coll, shard, type, this.node, null);
replicas.add(ri);
row._addReplicaCount(coll, shard, ri);
for (Cell cell : row.cells) {
cell.type.projectAddReplica(cell, ri, opCollector, strictMode);
}
for (OperationInfo op : furtherOps) {
if (op.isAdd) {
row = row.session.getNode(op.node).addReplica(op.coll, op.shard, op.type, recursionCount + 1, strictMode);
} else {
row.session.getNode(op.node).removeReplica(op.coll, op.shard, op.type, recursionCount + 1);
}
}
row.modifyPerClauseCount(ri, 1, this);
return row;
}
boolean isAlreadyCopied = false;
private void lazyCopyReplicas(String coll, String shard) {
//log.info("--- drop globalCache " + node);
// nocommit: not needed?
//globalCache = new Object[GlobalCacheEntryKey.values().length];
Map<String, Map> cacheCopy = new HashMap<>(perCollCache);
// nocommit: per-shard removal has unexpected side-effects, TestPolicy.testFreeDiskSuggestions fails
cacheCopy.remove(coll);
perCollCache = cacheCopy;
if (isAlreadyCopied) return;//caches need to be invalidated but the rest can remain as is
Map<String, Map<String, List<ReplicaInfo>>> replicasCopy = new HashMap<>(collectionVsShardVsReplicas);
Map<String, List<ReplicaInfo>> oneColl = replicasCopy.get(coll);
if (oneColl != null) {
replicasCopy.put(coll, Utils.getDeepCopy(oneColl, 2));
}
collectionVsShardVsReplicas = replicasCopy;
isAlreadyCopied = true;
}
boolean isEmpty() {
return collectionVsShardVsReplicas.isEmpty();
}
boolean hasColl(String coll) {
return collectionVsShardVsReplicas.containsKey(coll);
}
public void createCollShard(Pair<String, String> collShard) {
Map<String, List<ReplicaInfo>> shardInfo = collectionVsShardVsReplicas.computeIfAbsent(collShard.first(), Utils.NEW_HASHMAP_FUN);
if (collShard.second() != null) shardInfo.computeIfAbsent(collShard.second(), Utils.NEW_ARRAYLIST_FUN);
}
static class OperationInfo {
final String coll, shard, node, cellName;
final boolean isAdd;// true =addReplica, false=removeReplica
final Replica.Type type;
OperationInfo(String coll, String shard, String node, String cellName, boolean isAdd, Replica.Type type) {
this.coll = coll;
this.shard = shard;
this.node = node;
this.cellName = cellName;
this.isAdd = isAdd;
this.type = type;
}
}
public ReplicaInfo getReplica(String coll, String shard, Replica.Type type) {
Map<String, List<ReplicaInfo>> c = collectionVsShardVsReplicas.get(coll);
if (c == null) return null;
List<ReplicaInfo> r = c.get(shard);
if (r == null) return null;
for (int i = 0; i < r.size(); i++) {
ReplicaInfo info = r.get(i);
if (type == null || info.getType() == type) {
return info;
}
}
return null;
}
public Row removeReplica(String coll, String shard, Replica.Type type) {
return removeReplica(coll, shard, type, 0);
}
// this simulates removing a replica from a node
public Row removeReplica(String coll, String shard, Replica.Type type, int recursionCount) {
if (recursionCount > 3) {
log.error("more than 3 levels of recursion ", new RuntimeException());
return this;
}
List<OperationInfo> furtherOps = new ArrayList<>(3);
Consumer<OperationInfo> opCollector = it -> furtherOps.add(it);
Row row = session.copy().getNode(this.node);
row.lazyCopyReplicas(coll, shard);
Map<String, List<ReplicaInfo>> c = row.collectionVsShardVsReplicas.get(coll);
if (c == null) return null;
List<ReplicaInfo> r = c.get(shard);
if (r == null) return null;
int idx = -1;
for (int i = 0; i < r.size(); i++) {
ReplicaInfo info = r.get(i);
if (type == null || info.getType() == type) {
idx = i;
break;
}
}
if (idx == -1) return null;
ReplicaInfo removed = r.remove(idx);
row._removeReplicaCount(coll, shard, removed);
for (Cell cell : row.cells) {
cell.type.projectRemoveReplica(cell, removed, opCollector);
}
row.modifyPerClauseCount(removed, -1, this);
return row;
}
public Cell[] getCells() {
return cells;
}
public boolean isLive() {
return isLive;
}
public void forEachReplica(Consumer<ReplicaInfo> consumer) {
forEachReplica(collectionVsShardVsReplicas, consumer);
}
public void forEachReplica(String coll, Consumer<ReplicaInfo> consumer) {
for (Map.Entry<String, List<ReplicaInfo>> entry : collectionVsShardVsReplicas.getOrDefault(coll, Collections.emptyMap()).entrySet()) {
for (int i = 0; i < entry.getValue().size(); i++) {
consumer.accept(entry.getValue().get(i));
}
}
}
public static void forEachReplica(Map<String, Map<String, List<ReplicaInfo>>> collectionVsShardVsReplicas, Consumer<ReplicaInfo> consumer) {
for (Map.Entry<String, Map<String, List<ReplicaInfo>>> perColl : collectionVsShardVsReplicas.entrySet()) {
for (Map.Entry<String, List<ReplicaInfo>> perShard : perColl.getValue().entrySet()) {
for (int i = 0; i < perShard.getValue().size(); i++) {
consumer.accept(perShard.getValue().get(i));
}
}
}
}
void modifyPerClauseCount(ReplicaInfo ri, int delta, Row previous) {
if (session == null || session.perClauseData == null || ri == null) return;
session.perClauseData.getShardDetails(ri.getCollection(), ri.getShard()).incrReplicas(ri.getType(), delta);
for (Clause clause : session.expandedClauses) {
if (!clause.dataGrouping.storePerClauseData()) continue;
if (!clause.collection.isPass(ri.getCollection()) ||
!clause.shard.isPass(ri.getShard()) ||
!clause.isType(ri.getType())) continue;
if (clause.put == Clause.Put.ON_EACH) continue;
if (clause.dataGrouping.storePerClauseData()) {
if (clause.tag.isPass(this)) {
session.perClauseData.getCountsForClause(
ri.getCollection(),
ri.getShard(),
clause, Row.this)
.incr(ri, delta);
}
}
}
}
void initPerClauseData() {
if (session == null || session.perClauseData == null) return;
Clause.ComputedValueEvaluator evaluator = new Clause.ComputedValueEvaluator(session);
evaluator.nodeObj = this;
evaluator.node = this.node;
forEachReplica(it -> {
PerClauseData.ShardDetails shardDetails = session.perClauseData.getShardDetails(it.getCollection(), it.getShard());
shardDetails.incrReplicas(it.getType(), 1);// replicas.increment(it.getType());
Number idxSize = (Number) it.getVariable(CORE_IDX.tagName);
if (idxSize != null) {
shardDetails.indexSize = idxSize.doubleValue();
}
for (Clause clause : session.expandedClauses) {
if (clause.put == Clause.Put.ON_EACH) continue;
if (clause.dataGrouping.storePerClauseData()) {
if(!clause.collection.isPass(it.getCollection())) return;
if(!clause.shard.isPass(it.getShard())) return;
session.perClauseData.getCountsForClause(it.getCollection(), it.getShard(), clause, Row.this);
}
}
});
for (Clause clause : session.expandedClauses) {
if (clause.put == Clause.Put.ON_EACH) continue;
if (clause.dataGrouping.storePerClauseData()) {
if (clause.tag.isPass(this, evaluator)) {
forEachReplica(it -> {
if(!clause.collection.isPass(it.getCollection())) return;
if(!clause.shard.isPass(it.getShard())) return;
if(!clause.isType(it.getType())) return;
session.perClauseData.getCountsForClause(
it.getCollection(),
it.getShard(),
clause, this)
.incr(it, 1);
});
}
}
}
}
}