| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.partitioned; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CancelException; |
| import org.apache.geode.DataSerializer; |
| import org.apache.geode.SystemFailure; |
| import org.apache.geode.cache.CacheException; |
| import org.apache.geode.cache.RegionDestroyedException; |
| import org.apache.geode.cache.query.Index; |
| import org.apache.geode.cache.query.IndexCreationException; |
| import org.apache.geode.cache.query.MultiIndexCreationException; |
| import org.apache.geode.cache.query.RegionNotFoundException; |
| import org.apache.geode.cache.query.internal.index.IndexCreationData; |
| import org.apache.geode.cache.query.internal.index.PartitionedIndex; |
| import org.apache.geode.distributed.internal.ClusterDistributionManager; |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.distributed.internal.ReplyException; |
| import org.apache.geode.distributed.internal.ReplyMessage; |
| import org.apache.geode.distributed.internal.ReplyProcessor21; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.cache.ForceReattemptException; |
| import org.apache.geode.internal.cache.PartitionedRegion; |
| import org.apache.geode.internal.cache.PartitionedRegionException; |
| import org.apache.geode.internal.logging.log4j.LogMarker; |
| import org.apache.geode.internal.serialization.DeserializationContext; |
| import org.apache.geode.internal.serialization.SerializationContext; |
| import org.apache.geode.internal.serialization.Version; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| public class IndexCreationMsg extends PartitionMessage { |
| private static final Logger logger = LogService.getLogger(); |
| |
| HashSet<IndexCreationData> indexDefinitions; |
| |
| /** |
| * Constructor for the index creation message to be sent over the wire with all the relevant |
| * information. |
| * |
| * @param recipients members to which this message has to be sent |
| * @param regionId partitioned region id |
| * @param processor The processor to reply to |
| * @param indexDefinitions definitions for the indexes |
| * |
| */ |
| |
| IndexCreationMsg(Set recipients, int regionId, ReplyProcessor21 processor, |
| HashSet<IndexCreationData> indexDefinitions) { |
| super(recipients, regionId, processor); |
| this.indexDefinitions = indexDefinitions; |
| } |
| |
| /** |
| * Empty default constructor. |
| * |
| */ |
| public IndexCreationMsg() { |
| |
| } |
| |
| /** |
| * This message may be sent to nodes before the PartitionedRegion is completely initialized due to |
| * the RegionAdvisor(s) knowing about the existence of a partitioned region at a very early part |
| * of the initialization |
| */ |
| @Override |
| protected boolean failIfRegionMissing() { |
| return false; |
| } |
| |
| /** |
| * This method actually operates on the partitioned region and creates given list of indexes from |
| * a index creation message. |
| * |
| * @param dm distribution manager. |
| * @param pr partitioned region on which to create an index. |
| * @throws CacheException indicating a cache level error |
| * @throws ForceReattemptException if the peer is no longer available |
| */ |
| @Override |
| protected boolean operateOnPartitionedRegion(ClusterDistributionManager dm, PartitionedRegion pr, |
| long startTime) throws CacheException, ForceReattemptException { |
| // region exists |
| ReplyException replyEx = null; |
| boolean result = false; |
| List<Index> indexes = null; |
| List<String> failedIndexNames = new ArrayList<String>(); |
| |
| if (logger.isDebugEnabled()) { |
| StringBuilder sb = new StringBuilder(); |
| for (IndexCreationData icd : indexDefinitions) { |
| sb.append(icd.getIndexName()).append(" "); |
| } |
| logger.debug( |
| "Processing index creation message on this remote partitioned region vm for indexes: {}", |
| sb); |
| } |
| |
| try { |
| indexes = pr.createIndexes(true, indexDefinitions); |
| } catch (IndexCreationException e1) { |
| replyEx = new ReplyException( |
| "Remote Index Creation Failed", e1); |
| } catch (MultiIndexCreationException exx) { |
| failedIndexNames.addAll(exx.getExceptionsMap().keySet()); |
| |
| if (logger.isDebugEnabled()) { |
| StringBuffer exceptionMsgs = new StringBuffer(); |
| for (Exception ex : exx.getExceptionsMap().values()) { |
| exceptionMsgs.append(ex.getMessage()).append("\n"); |
| } |
| logger.debug("Got an MultiIndexCreationException with \n: {}", exceptionMsgs); |
| logger.debug("{} indexes were created succesfully", failedIndexNames.size()); |
| } |
| replyEx = new ReplyException( |
| "Remote Index Creation Failed", exx); |
| } |
| |
| if (null == replyEx) { |
| result = true; |
| } |
| |
| if (result) { |
| Map<String, Integer> indexBucketsMap = new HashMap<String, Integer>(); |
| for (Index index : indexes) { |
| PartitionedIndex prIndex = (PartitionedIndex) index; |
| indexBucketsMap.put(prIndex.getName(), prIndex.getNumberOfIndexedBuckets()); |
| } |
| sendReply(getSender(), getProcessorId(), dm, replyEx, result, indexBucketsMap, |
| pr.getDataStore().getAllLocalBuckets().size()); |
| } else { |
| // add the indexes that were successfully created to the map |
| Map<String, Integer> indexBucketsMap = new HashMap<String, Integer>(); |
| for (IndexCreationData icd : indexDefinitions) { |
| // if the index was successfully created |
| if (!failedIndexNames.contains(icd.getIndexName())) { |
| PartitionedIndex prIndex = (PartitionedIndex) pr.getIndex(icd.getIndexName()); |
| indexBucketsMap.put(icd.getIndexName(), prIndex.getNumberOfIndexedBuckets()); |
| } |
| } |
| sendReply(getSender(), getProcessorId(), dm, replyEx, result, indexBucketsMap, |
| pr.getDataStore().getAllLocalBuckets().size()); |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "Multi Index creation completed on remote host and has sent the reply to the originating vm."); |
| } |
| return false; |
| } |
| |
| /** |
| * Process this index creation message on the receiver. |
| */ |
| @Override |
| public void process(final ClusterDistributionManager dm) { |
| |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| |
| Throwable thr = null; |
| boolean sendReply = true; |
| PartitionedRegion pr = null; |
| |
| try { |
| if (isDebugEnabled) { |
| logger.debug("Trying to get pr with id: {}", this.regionId); |
| } |
| try { |
| if (isDebugEnabled) { |
| logger.debug("Again trying to get pr with id : {}", this.regionId); |
| } |
| pr = PartitionedRegion.getPRFromId(this.regionId); |
| if (isDebugEnabled) { |
| logger.debug("Index creation message got the pr {}", pr); |
| } |
| if (null == pr) { |
| boolean wait = true; |
| int attempts = 0; |
| while (wait && attempts < 30) { // max 30 seconds of wait. |
| dm.getCancelCriterion().checkCancelInProgress(null); |
| if (isDebugEnabled) { |
| logger.debug( |
| "Waiting for Partitioned Region to be intialized with id {}for processing index creation messages", |
| this.regionId); |
| } |
| try { |
| boolean interrupted = Thread.interrupted(); |
| try { |
| Thread.sleep(500); |
| } catch (InterruptedException e) { |
| interrupted = true; |
| dm.getCancelCriterion().checkCancelInProgress(e); |
| } finally { |
| if (interrupted) |
| Thread.currentThread().interrupt(); |
| } |
| |
| pr = PartitionedRegion.getPRFromId(this.regionId); |
| if (null != pr) { |
| wait = false; |
| if (isDebugEnabled) { |
| logger.debug("Indexcreation message got the pr {}", pr); |
| } |
| } |
| attempts++; |
| } catch (CancelException ignorAndLoopWait) { |
| if (isDebugEnabled) { |
| logger.debug( |
| "IndexCreationMsg waiting for pr to be properly created with prId : {}", |
| this.regionId); |
| } |
| } |
| } |
| |
| } |
| } catch (CancelException letPRInitialized) { |
| // Not sure if the CacheClosedException is still thrown in response |
| // to the PR being initialized. |
| if (logger.isDebugEnabled()) { |
| logger.debug("Waiting for notification from pr being properly created on {}", |
| this.regionId); |
| } |
| |
| boolean wait = true; |
| while (wait) { |
| dm.getCancelCriterion().checkCancelInProgress(null); |
| try { |
| boolean interrupted = Thread.interrupted(); |
| try { |
| Thread.sleep(500); |
| } catch (InterruptedException e) { |
| interrupted = true; |
| dm.getCancelCriterion().checkCancelInProgress(e); |
| } finally { |
| if (interrupted) |
| Thread.currentThread().interrupt(); |
| } |
| pr = PartitionedRegion.getPRFromId(this.regionId); |
| wait = false; |
| if (logger.isDebugEnabled()) { |
| logger.debug("Indexcreation message got the pr {}", pr); |
| } |
| } catch (CancelException ignorAndLoopWait) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("IndexCreationMsg waiting for pr to be properly created with prId : {}", |
| this.regionId); |
| } |
| } |
| } |
| |
| } |
| |
| if (pr == null /* && failIfRegionMissing() */) { |
| String msg = |
| String.format( |
| "Could not get Partitioned Region from Id %s for message %s received on member= %s map= %s", |
| new Object[] {Integer.valueOf(this.regionId), this, dm.getId(), |
| PartitionedRegion.dumpPRId()}); |
| throw new PartitionedRegionException(msg, new RegionNotFoundException(msg)); |
| } |
| sendReply = operateOnPartitionedRegion(dm, pr, 0); |
| |
| } catch (PRLocallyDestroyedException pre) { |
| if (isDebugEnabled) { |
| logger.debug("Region is locally Destroyed "); |
| } |
| thr = pre; |
| } catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| // log the exception at fine level if there is no reply to the message |
| if (this.processorId == 0) { |
| logger.debug("{} exception while processing message:{}", this, t.getMessage(), t); |
| } else if (logger.isDebugEnabled(LogMarker.DM_VERBOSE) && (t instanceof RuntimeException)) { |
| logger.debug(LogMarker.DM_VERBOSE, "Exception caught while processing message: {}", |
| t.getMessage(), t); |
| } |
| if (t instanceof RegionDestroyedException && pr != null) { |
| if (pr.isClosed) { |
| logger.info("Region is locally destroyed, throwing RegionDestroyedException for {}", |
| pr); |
| thr = new RegionDestroyedException( |
| String.format("Region is locally destroyed on %s", |
| dm.getId()), |
| pr.getFullPath()); |
| } |
| } else { |
| thr = t; |
| } |
| } finally { |
| if (sendReply && this.processorId != 0) { |
| ReplyException rex = null; |
| if (thr != null) { |
| rex = new ReplyException(thr); |
| } |
| sendReply(getSender(), this.processorId, dm, rex, pr, 0); |
| } |
| } |
| |
| } |
| |
| /** |
| * Methods that sends the actual index creation message to all the members. |
| * |
| * @param recipient set of members. |
| * @param pr partitoned region associated with the index. |
| * @param indexDefinitions set of index definitions |
| * @return partitionresponse a response for the index creation |
| */ |
| public static PartitionResponse send(InternalDistributedMember recipient, PartitionedRegion pr, |
| HashSet<IndexCreationData> indexDefinitions) { |
| |
| RegionAdvisor advisor = (RegionAdvisor) (pr.getDistributionAdvisor()); |
| final Set<InternalDistributedMember> recipients; |
| /* |
| * Will only send create index to all the members storing data |
| */ |
| if (null == recipient) { |
| recipients = new HashSet(advisor.adviseDataStore()); |
| } else { |
| recipients = new HashSet<InternalDistributedMember>(); |
| recipients.add(recipient); |
| } |
| |
| for (InternalDistributedMember rec : recipients) { |
| if (rec.getVersionObject().compareTo(Version.GFE_81) < 0) { |
| throw new UnsupportedOperationException( |
| "Indexes should not be created during rolling upgrade"); |
| } |
| } |
| |
| IndexCreationResponse processor = null; |
| if (logger.isDebugEnabled()) { |
| logger.debug("Will be sending create index msg to : {}", recipients.toString()); |
| } |
| if (recipients.size() > 0) { |
| processor = |
| (IndexCreationResponse) (new IndexCreationMsg()).createReplyProcessor(pr, recipients); |
| } |
| |
| IndexCreationMsg indMsg = |
| new IndexCreationMsg(recipients, pr.getPRId(), processor, indexDefinitions); |
| indMsg.setTransactionDistributed(pr.getCache().getTxManager().isDistributed()); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Sending index creation message: {}, to member(s) {}.", indMsg, recipients); |
| } |
| /* Set failures = */pr.getDistributionManager().putOutgoing(indMsg); |
| // Set failures =r.getDistributionManager().putOutgoing(m); |
| // if (failures != null && failures.size() > 0) { |
| // throw new ForceReattemptException("Failed sending <" + indMsg + ">"); |
| // } |
| return processor; |
| } |
| |
| // override reply processor type from PartitionMessage |
| @Override |
| PartitionResponse createReplyProcessor(PartitionedRegion r, Set recipients) { |
| return new IndexCreationResponse(r.getSystem(), recipients); |
| } |
| |
| /** |
| * Send a reply for index creation message. |
| * |
| * @param member representing the actual index creatro in the system |
| * @param procId waiting processor |
| * @param dm distribution manager to send the message |
| * @param ex any exceptions |
| * @param result represents index created properly or not. |
| * @param indexBucketsMap Map of indexes created and number of buckets indexed |
| * @param numTotalBuckets Number of total buckets in this vm |
| */ |
| void sendReply(InternalDistributedMember member, int procId, DistributionManager dm, |
| ReplyException ex, boolean result, Map<String, Integer> indexBucketsMap, |
| int numTotalBuckets) { |
| IndexCreationReplyMsg.send(member, procId, dm, ex, result, indexBucketsMap, numTotalBuckets); |
| } |
| |
| @Override |
| public int getDSFID() { |
| return PR_INDEX_CREATION_MSG; |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| super.fromData(in, context); |
| this.indexDefinitions = DataSerializer.readHashSet(in); |
| } |
| |
| @Override |
| public Version[] getSerializationVersions() { |
| return null; |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| super.toData(out, context); |
| DataSerializer.writeHashSet(this.indexDefinitions, out); |
| } |
| |
| /** |
| * String representation of this message. |
| */ |
| @Override |
| public String toString() { |
| StringBuffer sb = new StringBuffer(); |
| for (IndexCreationData icd : indexDefinitions) { |
| sb.append(icd.getIndexName()).append(" "); |
| } |
| return sb.toString(); |
| } |
| |
| /** |
| * Class representing index creation response. This class has all the information for successful |
| * or unsuccessful index creation on this member of the partitioned region. |
| * |
| * |
| */ |
| public static class IndexCreationResponse extends PartitionResponse { |
| |
| /** Map of indexes created and number of buckets indexed. */ |
| private Map<String, Integer> indexBucketsMap; |
| |
| /** Number of total bukets in this vm. */ |
| private int numTotalBuckets; |
| |
| /** |
| * Construtor for index creation response message. |
| * |
| * @param ds distributed system for this member. |
| * @param recipients all the member associated with the index |
| */ |
| IndexCreationResponse(InternalDistributedSystem ds, Set recipients) { |
| super(ds, recipients); |
| } |
| |
| /** |
| * Waits for the response from the members for index creation. |
| * |
| * @return IndexCreationResult for creation of indexes |
| * @throws CacheException indicating a cache level error |
| * @throws ForceReattemptException if the peer is no longer available |
| */ |
| public IndexCreationResult waitForResult() throws CacheException, ForceReattemptException { |
| try { |
| waitForCacheException(); |
| } catch (RuntimeException re) { |
| if (re instanceof PartitionedRegionException) { |
| if (re.getCause() instanceof RegionNotFoundException) { |
| // Region may not be available at the receiver. |
| // ignore the exception. |
| // This will happen when the region on the remote end is still in |
| // initialization mode and is not yet created the region ID. |
| } else { |
| throw re; |
| } |
| } else { |
| throw re; |
| } |
| } |
| return new IndexCreationResult(this.indexBucketsMap, this.numTotalBuckets); |
| } |
| |
| /** |
| * Sets the relevant information in the response. |
| * |
| * @param result true if index created properly |
| * @param indexBucketsMap Map of indexes created and number of buckets indexed |
| * @param numTotalBuckets Number of total buckets in this vm |
| */ |
| public void setResponse(boolean result, Map<String, Integer> indexBucketsMap, |
| int numTotalBuckets) { |
| this.indexBucketsMap = indexBucketsMap; |
| this.numTotalBuckets = numTotalBuckets; |
| } |
| } |
| |
| /** |
| * Class representing index creation result. |
| * |
| * |
| */ |
| public static class IndexCreationResult { |
| /** Map of indexes created and number of buckets indexed. */ |
| private Map<String, Integer> indexBucketsMap; |
| |
| /** Number of total bukets in this vm. */ |
| private int numTotalBuckets; |
| |
| /** |
| * Constructor for index creation result. |
| * |
| * @param indexBucketsMap Map of indexes created and number of buckets indexed |
| * @param numTotalBuckets Number of total buckets in this vm |
| */ |
| IndexCreationResult(Map<String, Integer> indexBucketsMap, int numTotalBuckets) { |
| this.indexBucketsMap = indexBucketsMap; |
| this.numTotalBuckets = numTotalBuckets; |
| } |
| |
| /** |
| * Returns a map of index names and number of buckets indexed |
| * |
| */ |
| public Map<String, Integer> getIndexBucketsMap() { |
| return this.indexBucketsMap; |
| |
| } |
| |
| } |
| |
| /** |
| * Class for index creation reply. This class has the information about successful index creation. |
| * |
| * |
| */ |
| public static class IndexCreationReplyMsg extends ReplyMessage { |
| |
| /** Index created or not. */ |
| private boolean result; |
| |
| /** Map of indexes created and number of buckets indexed. */ |
| private Map<String, Integer> indexBucketsMap; |
| |
| /** Number of total buckets in this vm. */ |
| private int numTotalBuckets; |
| |
| /** Boolean indicating weather its a data store. */ |
| private boolean isDataStore; |
| |
| public IndexCreationReplyMsg() { |
| |
| } |
| |
| /** |
| * Constructor for index creation reply message. |
| * |
| * @param processorId processor id of the waiting processor |
| * @param ex any exceptions |
| * @param result true if index created properly else false |
| * @param indexBucketsMap Map of indexes created and number of buckets indexed |
| * @param numTotalBuckets Number of total buckets in this vm |
| */ |
| IndexCreationReplyMsg(int processorId, ReplyException ex, boolean result, boolean isDataStore, |
| Map<String, Integer> indexBucketsMap, int numTotalBuckets) { |
| super(); |
| super.setException(ex); |
| this.result = result; |
| this.indexBucketsMap = indexBucketsMap; |
| this.numTotalBuckets = numTotalBuckets; |
| this.isDataStore = isDataStore; |
| setProcessorId(processorId); |
| } |
| |
| @Override |
| public int getDSFID() { |
| return PR_INDEX_CREATION_REPLY_MSG; |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| super.fromData(in, context); |
| this.result = in.readBoolean(); |
| this.indexBucketsMap = DataSerializer.readObject(in); |
| this.numTotalBuckets = in.readInt(); |
| this.isDataStore = in.readBoolean(); |
| |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| super.toData(out, context); |
| out.writeBoolean(this.result); |
| DataSerializer.writeObject(this.indexBucketsMap, out); |
| out.writeInt(this.numTotalBuckets); |
| out.writeBoolean(this.isDataStore); |
| } |
| |
| /** |
| * Actual method sending the index creation reply message. |
| * |
| * @param recipient the originator of index creation message |
| * @param processorId waiting processor id |
| * @param dm distribution manager |
| * @param ex any exceptions |
| * @param result true is index created successfully |
| * @param indexBucketsMap Map of indexes created and number of buckets indexed |
| * @param numTotalBuckets Number of total buckets in this vm |
| */ |
| public static void send(InternalDistributedMember recipient, int processorId, |
| DistributionManager dm, ReplyException ex, boolean result, |
| Map<String, Integer> indexBucketsMap, int numTotalBuckets) { |
| IndexCreationReplyMsg indMsg = new IndexCreationReplyMsg(processorId, ex, result, result, |
| indexBucketsMap, numTotalBuckets); |
| indMsg.setRecipient(recipient); |
| dm.putOutgoing(indMsg); |
| } |
| |
| /** |
| * Processes the index creation result. |
| * |
| * @param dm distribution manager |
| */ |
| @Override |
| public void process(final DistributionManager dm, final ReplyProcessor21 p) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Processor id is : {}", this.processorId); |
| } |
| IndexCreationResponse processor = (IndexCreationResponse) p; |
| if (processor != null) { |
| processor.setResponse(this.result, this.indexBucketsMap, this.numTotalBuckets); |
| processor.process(this); |
| } |
| } |
| |
| } |
| } |