blob: a4e40c20b3b079e0e69cd8b3029e9521c25712f9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.CoreAdminParams.NODE;
/**
* Each instance represents a node in the cluster
*
* @deprecated to be removed in Solr 9.0 (see SOLR-14656)
*/
public class Row implements MapWriter {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public final String node;
final Cell[] cells;
//this holds the details of each replica in the node
public Map<String, Map<String, List<ReplicaInfo>>> collectionVsShardVsReplicas;
boolean anyValueMissing = false;
boolean isLive = true;
Policy.Session session;
@SuppressWarnings({"rawtypes"})
Map globalCache;
@SuppressWarnings({"rawtypes"})
Map perCollCache;
public Row(String node, List<Pair<String, Variable.Type>> params, List<String> perReplicaAttributes, Policy.Session session) {
this(node, params, perReplicaAttributes, session, session.nodeStateProvider, session.cloudManager);
}
/**
* Constructor that allows explicitly passing a {@link NodeStateProvider} and a {@link SolrCloudManager} in order not to
* use those obtained through the passed <code>session</code>.
* <p>Note the resulting row has a {@link Policy.Session} that may not be consistent with the rest of the Row's state. When rows are copied
* as part of a {@link Policy.Session} copy, the copied rows' sessions are eventually updated in
* {@link org.apache.solr.client.solrj.cloud.autoscaling.Policy.Session#Session(List, SolrCloudManager, List, Set, List, NodeStateProvider, Policy, Policy.Transaction)}
* once the new {@link Policy.Session} instance is available.</p>
*/
@SuppressWarnings({"rawtypes"})
Row(String node, List<Pair<String, Variable.Type>> params, List<String> perReplicaAttributes, Policy.Session session,
NodeStateProvider nsp, SolrCloudManager cloudManager) {
this.session = session;
collectionVsShardVsReplicas = nsp.getReplicaInfo(node, perReplicaAttributes);
if (collectionVsShardVsReplicas == null) collectionVsShardVsReplicas = new HashMap<>();
this.node = node;
cells = new Cell[params.size()];
isLive = cloudManager.getClusterStateProvider().getLiveNodes().contains(node);
List<String> paramNames = params.stream().map(Pair::first).collect(Collectors.toList());
Map<String, Object> vals = isLive ? nsp.getNodeValues(node, paramNames) : Collections.emptyMap();
for (int i = 0; i < params.size(); i++) {
Pair<String, Variable.Type> pair = params.get(i);
cells[i] = new Cell(i, pair.first(), Clause.validate(pair.first(), vals.get(pair.first()), false), null, pair.second(), this);
if (NODE.equals(pair.first())) cells[i].val = node;
if (cells[i].val == null) anyValueMissing = true;
}
this.globalCache = new HashMap();
this.perCollCache = new HashMap();
isAlreadyCopied = true;
}
public static final Map<String, CacheEntry> cacheStats = new HashMap<>();
static class CacheEntry implements MapWriter {
AtomicLong hits = new AtomicLong(), misses = new AtomicLong();
@Override
public void writeMap(EntryWriter ew) throws IOException {
ew.put("hits", hits.get());
ew.put("misses", misses.get());
}
public static boolean hit(String cacheName) {
// getCacheEntry(cacheName).hits.incrementAndGet();
return true;
}
private static CacheEntry getCacheEntry(String cacheName) {
CacheEntry cacheEntry = cacheStats.get(cacheName);
if (cacheEntry == null) {
cacheStats.put(cacheName, cacheEntry = new CacheEntry());
}
return cacheEntry;
}
public static boolean miss(String cacheName) {
getCacheEntry(cacheName).misses.incrementAndGet();
return true;
}
}
public void forEachShard(String collection, BiConsumer<String, List<ReplicaInfo>> consumer) {
collectionVsShardVsReplicas
.getOrDefault(collection, Collections.emptyMap())
.forEach(consumer);
}
@SuppressWarnings({"unchecked"})
public <R> R computeCacheIfAbsent(String cacheName, Function<Object, R> supplier) {
R result = (R) globalCache.get(cacheName);
if (result != null) {
assert CacheEntry.hit(cacheName);
return result;
} else {
assert CacheEntry.miss(cacheName);
globalCache.put(cacheName, result = supplier.apply(cacheName));
return result;
}
}
@SuppressWarnings({"unchecked", "rawtypes"})
public <R> R computeCacheIfAbsent(String coll, String shard, String cacheName, Object key, Function<Object, R> supplier) {
Map collMap = (Map) this.perCollCache.get(coll);
if (collMap == null) this.perCollCache.put(coll, collMap = new HashMap());
Map shardMap = (Map) collMap.get(shard);
if (shardMap == null) collMap.put(shard, shardMap = new HashMap());
Map cacheNameMap = (Map) shardMap.get(cacheName);
if (cacheNameMap == null) shardMap.put(cacheName, cacheNameMap = new HashMap());
R result = (R) cacheNameMap.get(key);
if (result == null) {
CacheEntry.miss(cacheName);
cacheNameMap.put(key, result = supplier.apply(key));
return result;
} else {
CacheEntry.hit(cacheName);
return result;
}
}
public Row(String node, Cell[] cells, boolean anyValueMissing, Map<String,Map<String, List<ReplicaInfo>>> collectionVsShardVsReplicas,
boolean isLive, Policy.Session session,
@SuppressWarnings({"rawtypes"}) Map perRowCache,
@SuppressWarnings({"rawtypes"})Map globalCache) {
this.session = session;
this.node = node;
this.isLive = isLive;
this.cells = new Cell[cells.length];
for (int i = 0; i < this.cells.length; i++) {
this.cells[i] = cells[i].copy();
this.cells[i].row = this;
}
this.anyValueMissing = anyValueMissing;
this.collectionVsShardVsReplicas = collectionVsShardVsReplicas;
this.perCollCache = perRowCache;
this.globalCache = globalCache;
}
@Override
public void writeMap(EntryWriter ew) throws IOException {
ew.put(NODE, node);
ew.put("replicas", collectionVsShardVsReplicas);
ew.put("isLive", isLive);
ew.put("attributes", Arrays.asList(cells));
}
Row copy() {
return new Row(node, cells, anyValueMissing, collectionVsShardVsReplicas, isLive, session, this.globalCache, this.perCollCache);
}
Object getVal(String name) {
if (NODE.equals(name)) return this.node;
for (Cell cell : cells) if (cell.name.equals(name)) return cell.val;
return null;
}
public Object getVal(String name, Object def) {
for (Cell cell : cells)
if (cell.name.equals(name)) {
return cell.val == null ? def : cell.val;
}
return def;
}
@Override
public String toString() {
return jsonStr();
}
public Row addReplica(String coll, String shard, Replica.Type type) {
return addReplica(coll, shard, type, 0, true);
}
public Row addReplica(String coll, String shard, Replica.Type type, boolean strictMode) {
return addReplica(coll, shard, type, 0, strictMode);
}
/**
* this simulates adding a replica of a certain coll+shard to node. as a result of adding a replica ,
* values of certain attributes will be modified, in this node as well as other nodes. Please note that
* the state of the current session is kept intact while this operation is being performed
*
* @param coll collection name
* @param shard shard name
* @param type replica type
* @param recursionCount the number of times we have recursed to add more replicas
* @param strictMode whether suggester is operating in strict mode or not
*/
Row addReplica(String coll, String shard, Replica.Type type, int recursionCount, boolean strictMode) {
if (recursionCount > 3) {
log.error("more than 3 levels of recursion ", new RuntimeException());
return this;
}
lazyCopyReplicas(coll, shard);
List<OperationInfo> furtherOps = new LinkedList<>();
Consumer<OperationInfo> opCollector = it -> furtherOps.add(it);
Row row = null;
row = session.copy().getNode(this.node);
if (row == null) throw new RuntimeException("couldn't get a row");
row.lazyCopyReplicas(coll, shard);
Map<String, List<ReplicaInfo>> c = row.collectionVsShardVsReplicas.computeIfAbsent(coll, k -> new HashMap<>());
List<ReplicaInfo> replicas = c.computeIfAbsent(shard, k -> new ArrayList<>());
String replicaname = "SYNTHETIC." + new Random().nextInt(1000) + 1000;
ReplicaInfo ri = new ReplicaInfo(replicaname, replicaname, coll, shard, type, this.node,
Utils.makeMap(ZkStateReader.REPLICA_TYPE, type != null ? type.toString() : Replica.Type.NRT.toString()));
replicas.add(ri);
for (Cell cell : row.cells) {
cell.type.projectAddReplica(cell, ri, opCollector, strictMode);
}
for (OperationInfo op : furtherOps) {
if (op.isAdd) {
row = row.session.getNode(op.node).addReplica(op.coll, op.shard, op.type, recursionCount + 1, strictMode);
} else {
row.session.getNode(op.node).removeReplica(op.coll, op.shard, op.type, recursionCount + 1);
}
}
return row;
}
boolean isAlreadyCopied = false;
@SuppressWarnings({"unchecked", "rawtypes"})
private void lazyCopyReplicas(String coll, String shard) {
globalCache = new HashMap();
Map cacheCopy = new HashMap<>(perCollCache);
cacheCopy.remove(coll);//todo optimize at shard level later
perCollCache = cacheCopy;
if (isAlreadyCopied) return;//caches need to be invalidated but the rest can remain as is
Map<String, Map<String, List<ReplicaInfo>>> replicasCopy = new HashMap<>(collectionVsShardVsReplicas);
Map<String, List<ReplicaInfo>> oneColl = replicasCopy.get(coll);
if (oneColl != null) {
replicasCopy.put(coll, Utils.getDeepCopy(oneColl, 2));
}
collectionVsShardVsReplicas = replicasCopy;
isAlreadyCopied = true;
}
boolean hasColl(String coll) {
return collectionVsShardVsReplicas.containsKey(coll);
}
public void createCollShard(Pair<String, String> collShard) {
Map<String, List<ReplicaInfo>> shardInfo = collectionVsShardVsReplicas.computeIfAbsent(collShard.first(), o -> new HashMap<String, List<ReplicaInfo>>());
if (collShard.second() != null) shardInfo.computeIfAbsent(collShard.second(), o -> new ArrayList<ReplicaInfo>());
}
static class OperationInfo {
final String coll, shard, node, cellName;
final boolean isAdd;// true =addReplica, false=removeReplica
final Replica.Type type;
OperationInfo(String coll, String shard, String node, String cellName, boolean isAdd, Replica.Type type) {
this.coll = coll;
this.shard = shard;
this.node = node;
this.cellName = cellName;
this.isAdd = isAdd;
this.type = type;
}
}
public ReplicaInfo getReplica(String coll, String shard, Replica.Type type) {
Map<String, List<ReplicaInfo>> c = collectionVsShardVsReplicas.get(coll);
if (c == null) return null;
List<ReplicaInfo> r = c.get(shard);
if (r == null) return null;
int idx = -1;
for (int i = 0; i < r.size(); i++) {
ReplicaInfo info = r.get(i);
if (type == null || info.getType() == type) {
idx = i;
break;
}
}
if (idx == -1) return null;
return r.get(idx);
}
public Row removeReplica(String coll, String shard, Replica.Type type) {
return removeReplica(coll, shard, type, 0);
}
// this simulates removing a replica from a node
public Row removeReplica(String coll, String shard, Replica.Type type, int recursionCount) {
if (recursionCount > 3) {
log.error("more than 3 levels of recursion ", new RuntimeException());
return this;
}
List<OperationInfo> furtherOps = new LinkedList<>();
Consumer<OperationInfo> opCollector = it -> furtherOps.add(it);
Row row = session.copy().getNode(this.node);
row.lazyCopyReplicas(coll, shard);
Map<String, List<ReplicaInfo>> c = row.collectionVsShardVsReplicas.get(coll);
if (c == null) return null;
List<ReplicaInfo> r = c.get(shard);
if (r == null) return null;
int idx = -1;
for (int i = 0; i < r.size(); i++) {
ReplicaInfo info = r.get(i);
if (type == null || info.getType() == type) {
idx = i;
break;
}
}
if (idx == -1) return null;
ReplicaInfo removed = r.remove(idx);
for (Cell cell : row.cells) {
cell.type.projectRemoveReplica(cell, removed, opCollector);
}
return row;
}
public Cell[] getCells() {
return cells;
}
public boolean isLive() {
return isLive;
}
public void forEachReplica(Consumer<ReplicaInfo> consumer) {
forEachReplica(collectionVsShardVsReplicas, consumer);
}
public void forEachReplica(String coll, Consumer<ReplicaInfo> consumer) {
collectionVsShardVsReplicas.getOrDefault(coll, Collections.emptyMap()).forEach((shard, replicaInfos) -> {
for (ReplicaInfo replicaInfo : replicaInfos) {
consumer.accept(replicaInfo);
}
});
}
public static void forEachReplica(Map<String, Map<String, List<ReplicaInfo>>> collectionVsShardVsReplicas, Consumer<ReplicaInfo> consumer) {
collectionVsShardVsReplicas.forEach((coll, shardVsReplicas) -> shardVsReplicas
.forEach((shard, replicaInfos) -> {
for (int i = 0; i < replicaInfos.size(); i++) {
ReplicaInfo r = replicaInfos.get(i);
consumer.accept(r);
}
}));
}
}