blob: ff4f7ce1522af58a33653a80d3b1be087c75d906 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.redis.internal.executor.cluster;
import static org.apache.geode.redis.internal.RedisConstants.ERROR_UNKNOWN_CLUSTER_SUBCOMMAND;
import static org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS;
import static org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS_PER_BUCKET;
import static org.apache.geode.redis.internal.netty.Coder.bytesToString;
import static org.apache.geode.redis.internal.netty.Coder.equalsIgnoreCaseBytes;
import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bINFO;
import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bKEYSLOT;
import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bNODES;
import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bSLOTS;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.partition.PartitionMemberInfo;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.cache.partition.PartitionRegionInfo;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.redis.internal.SlotAdvisor;
import org.apache.geode.redis.internal.data.KeyHashUtil;
import org.apache.geode.redis.internal.data.RedisData;
import org.apache.geode.redis.internal.data.RedisKey;
import org.apache.geode.redis.internal.executor.CommandExecutor;
import org.apache.geode.redis.internal.executor.RedisResponse;
import org.apache.geode.redis.internal.netty.Command;
import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
import org.apache.geode.redis.internal.parameters.RedisParametersMismatchException;
public class ClusterExecutor implements CommandExecutor {
private static final Logger logger = LogService.getLogger();
@Override
public RedisResponse executeCommand(Command command, ExecutionHandlerContext context)
throws Exception {
List<byte[]> args = command.getProcessedCommand();
byte[] subcommand = args.get(1);
logger.error("CLUSTER, arg is:" + bytesToString(subcommand));
if (equalsIgnoreCaseBytes(subcommand, bINFO)) {
checkNumArgs(command, subcommand, 2);
return getInfo(context);
} else if (equalsIgnoreCaseBytes(subcommand, bNODES)) {
checkNumArgs(command, subcommand, 2);
return getNodes(context);
} else if (equalsIgnoreCaseBytes(subcommand, bSLOTS)) {
checkNumArgs(command, subcommand, 2);
return getSlots(context);
} else if (equalsIgnoreCaseBytes(subcommand, bKEYSLOT)) {
checkNumArgs(command, subcommand, 3);
return RedisResponse.integer(KeyHashUtil.slotForKey(args.get(2)));
} else {
return RedisResponse.error(
String.format(ERROR_UNKNOWN_CLUSTER_SUBCOMMAND, bytesToString(subcommand)));
}
}
private void checkNumArgs(Command command, byte[] subcommand, int expectedNumArgs) {
if (command.getProcessedCommand().size() != expectedNumArgs) {
throw new RedisParametersMismatchException(
String.format(ERROR_UNKNOWN_CLUSTER_SUBCOMMAND, bytesToString(subcommand)));
}
}
private RedisResponse getSlots(ExecutionHandlerContext ctx) throws InterruptedException {
List<Object> slots = new ArrayList<>();
for (SlotAdvisor.MemberBucketSlot mbs : ctx.getRegionProvider().getSlotAdvisor()
.getBucketSlots()) {
if (mbs == null) {
continue;
}
List<Object> entry = new ArrayList<>();
entry.add(mbs.getSlotStart());
entry.add(mbs.getSlotEnd());
entry.add(Arrays.asList(mbs.getPrimaryIpAddress(), mbs.getPrimaryPort()));
slots.add(entry);
}
return RedisResponse.array(slots, true);
}
/**
* The format being produced is something like this:
*
* <pre>
* 67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1:30002@31002 master - 0 1426238316232 2 connected 5461-10922
* 292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f 127.0.0.1:30003@31003 master - 0 1426238318243 3 connected 10923-16383
* e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001@31001 myself,master - 0 0 1 connected 0-5460
* </pre>
*
* Note that there are no 'slave' entries since Geode does not host all secondary data apart from
* primary as redis does. The cluster port is provided only for consistency with the format of the
* output.
*/
private RedisResponse getNodes(ExecutionHandlerContext ctx) throws InterruptedException {
String memberId = ctx.getMemberName();
List<SlotAdvisor.MemberBucketSlot> memberBucketSlots =
ctx.getRegionProvider().getSlotAdvisor().getBucketSlots();
Map<String, List<Integer>> memberBuckets = getMemberBuckets(memberBucketSlots);
StringBuilder response = new StringBuilder();
for (Map.Entry<String, List<Integer>> member : memberBuckets.entrySet()) {
List<Integer> buckets = member.getValue();
SlotAdvisor.MemberBucketSlot mbs = memberBucketSlots.get(buckets.get(0));
if (mbs == null) {
continue;
}
response.append(String.format("%s %s:%3$d@%3$d master",
member.getKey(), mbs.getPrimaryIpAddress(), mbs.getPrimaryPort()));
if (member.getKey().equals(memberId)) {
response.append(",myself");
}
response.append(" - 0 0 1 connected");
for (int bucket : member.getValue()) {
response.append(" ");
response.append(bucket * REDIS_SLOTS_PER_BUCKET);
response.append("-");
response.append(((bucket + 1) * REDIS_SLOTS_PER_BUCKET) - 1);
}
response.append("\n");
}
logger.error("CLUSTER NODES response:" + response.toString());
return RedisResponse.bulkString(response.toString());
}
private Map<String, List<Integer>> getMemberBuckets(
List<SlotAdvisor.MemberBucketSlot> bucketSlots) {
Map<String, List<Integer>> memberBuckets = new HashMap<>();
for (SlotAdvisor.MemberBucketSlot mbs : bucketSlots) {
memberBuckets.computeIfAbsent(mbs.getMember().getUniqueId(), k -> new ArrayList<>())
.add(mbs.getBucketId());
}
return memberBuckets;
}
private RedisResponse getInfo(ExecutionHandlerContext ctx) {
int memberCount = getRegionMembers(ctx).size();
return RedisResponse.bulkString(
"cluster_state:ok\r\n"
+ "cluster_slots_assigned:" + REDIS_SLOTS + "\r\n"
+ "cluster_slots_ok:" + REDIS_SLOTS + "\r\n"
+ "cluster_slots_pfail:0\r\n"
+ "cluster_slots_fail:0\r\n"
+ "cluster_known_nodes:" + memberCount + "\r\n"
+ "cluster_size:" + memberCount + "\r\n"
+ "cluster_current_epoch:1\r\n"
+ "cluster_my_epoch:1\r\n"
+ "cluster_stats_messages_sent:0\r\n"
+ "cluster_stats_messages_received:0\r\n");
}
private Set<PartitionMemberInfo> getRegionMembers(ExecutionHandlerContext ctx) {
Region<RedisKey, RedisData> dataRegion = ctx.getRegionProvider().getDataRegion();
PartitionRegionInfo info = PartitionRegionHelper.getPartitionRegionInfo(dataRegion);
assert info != null; // Mostly to appease IJ since the region is always a PR
return info.getPartitionMemberInfo();
}
}