blob: c935197796fe98a69d647d9b0bc5d944f1891424 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.client.internal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.logging.log4j.Logger;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.cache.FixedPartitionAttributes;
import org.apache.geode.cache.PartitionResolver;
import org.apache.geode.cache.Region;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.internal.ClassPathLoader;
import org.apache.geode.internal.cache.BucketServerLocation66;
import org.apache.geode.internal.cache.FixedPartitionAttributesImpl;
import org.apache.geode.internal.logging.LogService;
/**
* Stores the information such as partition attributes and meta data details
*
*
* @since GemFire 6.5
*
*/
public class ClientPartitionAdvisor {
private static final Logger logger = LogService.getLogger();
private final ConcurrentMap<Integer, List<BucketServerLocation66>> bucketServerLocationsMap =
new ConcurrentHashMap<Integer, List<BucketServerLocation66>>();
private final int totalNumBuckets;
private String serverGroup = "";
private final String colocatedWith;
private PartitionResolver partitionResolver = null;
private Map<String, List<Integer>> fixedPAMap = null;
private boolean fpaAttrsCompletes = false;
private Random random = new Random();
@SuppressWarnings("unchecked")
public ClientPartitionAdvisor(int totalNumBuckets, String colocatedWith,
String partitionResolverName, Set<FixedPartitionAttributes> fpaSet) {
this.totalNumBuckets = totalNumBuckets;
this.colocatedWith = colocatedWith;
try {
if (partitionResolverName != null) {
this.partitionResolver = (PartitionResolver) ClassPathLoader.getLatest()
.forName(partitionResolverName).newInstance();
}
} catch (Exception e) {
if (logger.isErrorEnabled()) {
logger.error(e.getMessage(), e);
}
throw new InternalGemFireException(
String.format("Cannot create an instance of PartitionResolver : %s",
partitionResolverName));
}
if (fpaSet != null) {
fixedPAMap = new ConcurrentHashMap<String, List<Integer>>();
int totalFPABuckets = 0;
for (FixedPartitionAttributes fpa : fpaSet) {
List attrList = new ArrayList();
totalFPABuckets += fpa.getNumBuckets();
attrList.add(fpa.getNumBuckets());
attrList.add(((FixedPartitionAttributesImpl) fpa).getStartingBucketID());
fixedPAMap.put(fpa.getPartitionName(), attrList);
}
if (totalFPABuckets == this.totalNumBuckets) {
this.fpaAttrsCompletes = true;
}
}
}
public ServerLocation adviseServerLocation(int bucketId) {
if (this.bucketServerLocationsMap.containsKey(bucketId)) {
List<BucketServerLocation66> locations = this.bucketServerLocationsMap.get(bucketId);
List<BucketServerLocation66> locationsCopy = new ArrayList<BucketServerLocation66>(locations);
if (locationsCopy.isEmpty()) {
return null;
}
if (locationsCopy.size() == 1) {
return locationsCopy.get(0);
}
int index = random.nextInt(locationsCopy.size());
return locationsCopy.get(index);
}
return null;
}
public ServerLocation adviseRandomServerLocation() {
ArrayList<Integer> bucketList = new ArrayList<Integer>(this.bucketServerLocationsMap.keySet());
int size = bucketList.size();
if (size > 0) {
List<BucketServerLocation66> locations =
this.bucketServerLocationsMap.get(bucketList.get(random.nextInt(size)));
if (locations != null) {
List<BucketServerLocation66> serverList = new ArrayList<BucketServerLocation66>(locations);
if (serverList.size() == 0)
return null;
return serverList.get(0);
}
}
return null;
}
public List<BucketServerLocation66> adviseServerLocations(int bucketId) {
if (this.bucketServerLocationsMap.containsKey(bucketId)) {
List<BucketServerLocation66> locationsCopy =
new ArrayList<BucketServerLocation66>(this.bucketServerLocationsMap.get(bucketId));
return locationsCopy;
}
return null;
}
public ServerLocation advisePrimaryServerLocation(int bucketId) {
if (this.bucketServerLocationsMap.containsKey(bucketId)) {
List<BucketServerLocation66> locations = this.bucketServerLocationsMap.get(bucketId);
List<BucketServerLocation66> locationsCopy = new ArrayList<BucketServerLocation66>(locations);
for (BucketServerLocation66 loc : locationsCopy) {
if (loc.isPrimary()) {
return loc;
}
}
}
return null;
}
public void updateBucketServerLocations(int bucketId,
List<BucketServerLocation66> bucketServerLocations, ClientMetadataService cms) {
List<BucketServerLocation66> locationCopy = new ArrayList<BucketServerLocation66>();
List<BucketServerLocation66> locations;
boolean honourSeverGroup = cms.honourServerGroup();
if (this.serverGroup.length() != 0 && honourSeverGroup) {
for (BucketServerLocation66 s : bucketServerLocations) {
String[] groups = s.getServerGroups();
if (groups.length > 0) {
for (String str : groups) {
if (str.equals(this.serverGroup)) {
locationCopy.add(s);
break;
}
}
} else {
locationCopy.add(s);
}
}
locations = Collections.unmodifiableList(locationCopy);
} else {
locations = Collections.unmodifiableList(bucketServerLocations);
}
this.bucketServerLocationsMap.put(bucketId, locations);
}
public void removeBucketServerLocation(ServerLocation serverLocation) {
Iterator<Map.Entry<Integer, List<BucketServerLocation66>>> iter =
this.bucketServerLocationsMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<Integer, List<BucketServerLocation66>> entry = iter.next();
Integer key = entry.getKey();
List<BucketServerLocation66> oldLocations = entry.getValue();
List<BucketServerLocation66> newLocations =
new ArrayList<BucketServerLocation66>(oldLocations);
// if this serverLocation contains in the list the remove the
// serverLocation and update the map with new List
while (newLocations.remove(serverLocation)
&& !this.bucketServerLocationsMap.replace(key, oldLocations, newLocations)) {
oldLocations = this.bucketServerLocationsMap.get(key);
newLocations = new ArrayList<BucketServerLocation66>(oldLocations);
}
}
}
public Map<Integer, List<BucketServerLocation66>> getBucketServerLocationsMap_TEST_ONLY() {
return this.bucketServerLocationsMap;
}
/**
* This method returns total number of buckets for a PartitionedRegion.
*
* @return total number of buckets for a PartitionedRegion.
*/
public int getTotalNumBuckets() {
return this.totalNumBuckets;
}
/**
* @return the serverGroup
*/
public String getServerGroup() {
return this.serverGroup;
}
public void setServerGroup(String group) {
this.serverGroup = group;
}
/**
* Returns name of the colocated PartitionedRegion on CacheServer
*/
public String getColocatedWith() {
return this.colocatedWith;
}
/**
* Returns the PartitionResolver set for custom partitioning
*
* @return <code>PartitionResolver</code> for the PartitionedRegion
*/
public PartitionResolver getPartitionResolver() {
return this.partitionResolver;
}
public Set<String> getFixedPartitionNames() {
return this.fixedPAMap.keySet();
}
public int assignFixedBucketId(Region region, String partition, Object resolveKey) {
if (this.fixedPAMap.containsKey(partition)) {
List<Integer> attList = this.fixedPAMap.get(partition);
int hc = resolveKey.hashCode();
int bucketId = Math.abs(hc % (attList.get(0)));
int partitionBucketID = bucketId + attList.get(1);
return partitionBucketID;
} else {
// We don't know as we might not have got the all FPAttributes
// from the FPR, So don't throw the exception but send the request
// to the server and update the FPA attributes
// This exception should be thrown from the server as we will
// not be sure of partition not available unless we contact the server.
return -1;
}
}
public Map<String, List<Integer>> getFixedPAMap() {
return this.fixedPAMap;
}
public void updateFixedPAMap(Map<String, List<Integer>> map) {
this.fixedPAMap.putAll(map);
}
public boolean isFPAAttrsComplete() {
return this.fpaAttrsCompletes;
}
}