| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.wan.serial; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| import java.util.stream.Collectors; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.GemFireException; |
| import org.apache.geode.InternalGemFireException; |
| import org.apache.geode.cache.CacheException; |
| import org.apache.geode.cache.EntryEvent; |
| import org.apache.geode.cache.partition.PartitionRegionHelper; |
| import org.apache.geode.cache.wan.GatewayQueueEvent; |
| import org.apache.geode.cache.wan.GatewaySender.OrderPolicy; |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.internal.cache.EntryEventImpl; |
| import org.apache.geode.internal.cache.EnumListenerEvent; |
| import org.apache.geode.internal.cache.EventID; |
| import org.apache.geode.internal.cache.PartitionedRegionHelper; |
| import org.apache.geode.internal.cache.RegionQueue; |
| import org.apache.geode.internal.cache.ha.ThreadIdentifier; |
| import org.apache.geode.internal.cache.wan.AbstractGatewaySender; |
| import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor; |
| import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher; |
| import org.apache.geode.internal.cache.wan.GatewaySenderException; |
| import org.apache.geode.internal.monitoring.ThreadsMonitoring; |
| import org.apache.geode.internal.offheap.annotations.Released; |
| import org.apache.geode.logging.internal.executors.LoggingExecutors; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| public class ConcurrentSerialGatewaySenderEventProcessor |
| extends AbstractGatewaySenderEventProcessor { |
| |
| private static final Logger logger = LogService.getLogger(); |
| |
| protected final List<SerialGatewaySenderEventProcessor> processors = |
| new ArrayList<SerialGatewaySenderEventProcessor>(); |
| |
| protected final AbstractGatewaySender sender; |
| |
| private GemFireException ex = null; |
| |
| private final Set<RegionQueue> queues; |
| |
| public ConcurrentSerialGatewaySenderEventProcessor(AbstractGatewaySender sender, |
| ThreadsMonitoring tMonitoring, boolean cleanQueues) { |
| super("Event Processor for GatewaySender_" + sender.getId(), sender, tMonitoring); |
| this.sender = sender; |
| |
| initializeMessageQueue(sender.getId(), cleanQueues); |
| queues = new HashSet<RegionQueue>(); |
| for (SerialGatewaySenderEventProcessor processor : processors) { |
| queues.add(processor.getQueue()); |
| } |
| } |
| |
| @Override |
| public int getTotalQueueSize() { |
| int totalSize = 0; |
| for (RegionQueue regionQueue : queues) { |
| totalSize = totalSize + regionQueue.size(); |
| } |
| return totalSize; |
| } |
| |
| @Override |
| protected void initializeMessageQueue(String id, boolean cleanQueues) { |
| for (int i = 0; i < sender.getDispatcherThreads(); i++) { |
| processors.add( |
| new SerialGatewaySenderEventProcessor(this.sender, id + "." + i, getThreadMonitorObj(), |
| cleanQueues)); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Created the SerialGatewayEventProcessor_{}->{}", i, processors.get(i)); |
| } |
| } |
| } |
| |
| @Override |
| public int eventQueueSize() { |
| int size = 0; |
| for (RegionQueue queue : queues) { |
| size += queue.size(); |
| } |
| return size; |
| } |
| |
| // based on the fix for old wan Bug#46992 .revision is 39437 |
| @Override |
| public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue) |
| throws IOException, CacheException { |
| enqueueEvent(operation, event, substituteValue, false); |
| |
| } |
| |
| @Override |
| public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue, |
| boolean isLastEventInTransaction) throws IOException, CacheException { |
| // Get the appropriate index into the gateways |
| int index = Math.abs(getHashCode(((EntryEventImpl) event)) % this.processors.size()); |
| // Distribute the event to the gateway |
| enqueueEvent(operation, event, substituteValue, index, isLastEventInTransaction); |
| } |
| |
| public void setModifiedEventId(EntryEventImpl clonedEvent, int index) { |
| EventID originalEventId = clonedEvent.getEventId(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("The original EventId is {}", originalEventId); |
| } |
| // PARALLEL_THREAD_BUFFER * (index +1) + originalEventId.getThreadID(); |
| // generating threadId by the algorithm explained above used to clash with |
| // fakeThreadId generated by putAll |
| // below is new way to generate threadId so that it doesn't clash with |
| // any. |
| long newThreadId = |
| ThreadIdentifier.createFakeThreadIDForParallelGateway(index, originalEventId.getThreadID(), |
| 0 /* |
| * gateway sender event id index has already been applied in |
| * SerialGatewaySenderImpl.setModifiedEventId |
| */); |
| EventID newEventId = new EventID(originalEventId.getMembershipID(), newThreadId, |
| originalEventId.getSequenceID()); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "{}: Generated event id for event with key={}, index={}, original event id={}, threadId={}, new event id={}, newThreadId={}" |
| + ":index=" + this.sender.getEventIdIndex(), |
| this, clonedEvent.getKey(), index, originalEventId, |
| ThreadIdentifier.toDisplayString(originalEventId.getThreadID()), newEventId, |
| ThreadIdentifier.toDisplayString(newThreadId)); |
| } |
| clonedEvent.setEventId(newEventId); |
| } |
| |
| public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue, |
| int index, boolean isLastEventInTransaction) throws CacheException, IOException { |
| // Get the appropriate gateway |
| SerialGatewaySenderEventProcessor serialProcessor = this.processors.get(index); |
| |
| if (sender.getOrderPolicy() == OrderPolicy.KEY |
| || sender.getOrderPolicy() == OrderPolicy.PARTITION) { |
| // Create copy since the event id will be changed, otherwise the same |
| // event will be changed for multiple gateways. Fix for bug 44471. |
| @Released |
| EntryEventImpl clonedEvent = new EntryEventImpl((EntryEventImpl) event); |
| try { |
| setModifiedEventId(clonedEvent, index); |
| serialProcessor.enqueueEvent(operation, clonedEvent, substituteValue, |
| isLastEventInTransaction); |
| } finally { |
| clonedEvent.release(); |
| } |
| } else { |
| serialProcessor.enqueueEvent(operation, event, substituteValue); |
| } |
| |
| } |
| |
| @Override |
| public void run() { |
| boolean isDebugEnabled = logger.isDebugEnabled(); |
| if (this.sender.getEnforceThreadsConnectSameReceiver()) { |
| this.processors.get(0).start(); |
| waitForRunningStatus(this.processors.get(0)); |
| String receiverUniqueId = this.processors.get(0).getExpectedReceiverUniqueId(); |
| if (isDebugEnabled) { |
| logger.debug("First dispatcher is connected to " + receiverUniqueId); |
| } |
| for (int j = 1; j < this.processors.size(); j++) { |
| this.processors.get(j).setExpectedReceiverUniqueId(receiverUniqueId); |
| } |
| } |
| |
| for (int i = this.sender.getEnforceThreadsConnectSameReceiver() ? 1 : 0; i < this.processors |
| .size(); i++) { |
| if (isDebugEnabled) { |
| logger.debug("Starting the serialProcessor {}", i); |
| } |
| this.processors.get(i).start(); |
| } |
| |
| try { |
| waitForRunningStatus(); |
| } catch (GatewaySenderException e) { |
| this.ex = e; |
| } |
| |
| synchronized (this.getRunningStateLock()) { |
| if (ex != null) { |
| this.setException(ex); |
| setIsStopped(true); |
| } else { |
| setIsStopped(false); |
| } |
| this.getRunningStateLock().notifyAll(); |
| } |
| |
| for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) { |
| try { |
| serialProcessor.join(); |
| } catch (InterruptedException e) { |
| if (isDebugEnabled) { |
| logger.debug("Got InterruptedException while waiting for child threads to finish."); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| } |
| |
| @Override |
| protected void rebalance() { |
| // No reason to rebalance a serial sender since all connections are to the same server. |
| throw new UnsupportedOperationException(); |
| } |
| |
| private void waitForRunningStatus(SerialGatewaySenderEventProcessor serialProcessor) { |
| synchronized (serialProcessor.getRunningStateLock()) { |
| while (serialProcessor.getException() == null && serialProcessor.isStopped()) { |
| try { |
| serialProcessor.getRunningStateLock().wait(); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| Exception ex = serialProcessor.getException(); |
| if (ex != null) { |
| throw new GatewaySenderException( |
| String.format("Could not start a gateway sender %s because of exception %s", |
| new Object[] {this.sender.getId(), ex.getMessage()}), |
| ex.getCause()); |
| } |
| } |
| } |
| |
| private void waitForRunningStatus() { |
| for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) { |
| waitForRunningStatus(serialProcessor); |
| } |
| } |
| |
| private int getHashCode(EntryEventImpl event) { |
| // Get the hash code for the event based on the configured order policy |
| int eventHashCode = 0; |
| switch (this.sender.getOrderPolicy()) { |
| case KEY: |
| // key ordering |
| eventHashCode = event.getKey().hashCode(); |
| break; |
| case THREAD: |
| // member id, thread id ordering |
| // requires a lot of threads to achieve parallelism |
| EventID eventId = event.getEventId(); |
| byte[] memberId = eventId.getMembershipID(); |
| long threadId = eventId.getThreadID(); |
| int memberIdHashCode = Arrays.hashCode(memberId); |
| int threadIdHashCode = (int) (threadId ^ (threadId >>> 32)); |
| eventHashCode = memberIdHashCode + threadIdHashCode; |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Generated hashcode for event with key={}, memberId={}, threadId={}: {}", |
| this, event.getKey(), Arrays.toString(memberId), threadId, eventHashCode); |
| } |
| break; |
| case PARTITION: |
| eventHashCode = PartitionRegionHelper.isPartitionedRegion(event.getRegion()) |
| ? PartitionedRegionHelper.getHashKey(event) |
| // Get the partition for the event |
| : event.getKey().hashCode(); |
| // Fall back to key ordering if the region is not partitioned |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: Generated partition hashcode for event with key={}: {}", this, |
| event.getKey(), eventHashCode); |
| } |
| break; |
| |
| } |
| return eventHashCode; |
| } |
| |
| @Override |
| public void stopProcessing() { |
| if (!this.isAlive()) { |
| return; |
| } |
| |
| setIsStopped(true); |
| |
| List<SenderStopperCallable> stopperCallables = new ArrayList<SenderStopperCallable>(); |
| for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) { |
| stopperCallables.add(new SenderStopperCallable(serialProcessor)); |
| } |
| |
| ExecutorService stopperService = LoggingExecutors.newFixedThreadPool( |
| processors.size(), "ConcurrentSerialGatewaySenderEventProcessor Stopper Thread", |
| true); |
| try { |
| List<Future<Boolean>> futures = stopperService.invokeAll(stopperCallables); |
| for (Future<Boolean> f : futures) { |
| try { |
| boolean b = f.get(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("ConcurrentSerialGatewaySenderEventProcessor: {} stopped dispatching: {}", |
| (b ? "Successfully" : "Unsuccessfully"), this); |
| } |
| } catch (ExecutionException e) { |
| // we don't expect any exception but if caught then eat it and log |
| // warning |
| logger.warn("GatewaySender {} caught exception while stopping: {}", |
| new Object[] {sender, e.getCause()}); |
| } |
| } |
| } catch (InterruptedException e) { |
| throw new InternalGemFireException(e.getMessage()); |
| } |
| // shutdown the stopperService. This will release all the stopper threads |
| stopperService.shutdown(); |
| |
| closeProcessor(); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("ConcurrentSerialGatewaySenderEventProcessor: Stopped dispatching: {}", this); |
| } |
| } |
| |
| @Override |
| public void closeProcessor() { |
| for (SerialGatewaySenderEventProcessor processor : processors) { |
| processor.closeProcessor(); |
| } |
| } |
| |
| @Override |
| public void pauseDispatching() { |
| for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) { |
| serialProcessor.pauseDispatching(); |
| } |
| super.pauseDispatching(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("ConcurrentSerialGatewaySenderEventProcessor: Paused dispatching: {}", this); |
| } |
| } |
| |
| @Override |
| public void resumeDispatching() { |
| for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) { |
| serialProcessor.resumeDispatching(); |
| } |
| super.resumeDispatching(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("ConcurrentSerialGatewaySenderEventProcessor: Resumed dispatching: {}", this); |
| } |
| } |
| |
| public List<SerialGatewaySenderEventProcessor> getProcessors() { |
| return new LinkedList<>(processors); |
| } |
| |
| /** |
| * @return the queues |
| */ |
| public Set<RegionQueue> getQueues() { |
| return queues; |
| } |
| |
| @Override |
| public void removeCacheListener() { |
| for (SerialGatewaySenderEventProcessor processor : processors) { |
| processor.removeCacheListener(); |
| } |
| } |
| |
| @Override |
| public void waitForDispatcherToPause() { |
| for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) { |
| serialProcessor.waitForDispatcherToPause(); |
| } |
| // super.waitForDispatcherToPause(); |
| } |
| |
| @Override |
| public GatewaySenderEventDispatcher getDispatcher() { |
| return this.processors.get(0).getDispatcher();// Suranjan is that fine?? |
| } |
| |
| @Override |
| public void initializeEventDispatcher() { |
| // no op for concurrent |
| |
| } |
| |
| @Override |
| protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) { |
| // modified event again for concurrent SGSEP |
| int index = Math.abs(getHashCode(((EntryEventImpl) droppedEvent)) % this.processors.size()); |
| setModifiedEventId(droppedEvent, index); |
| |
| this.processors.get(index).sendBatchDestroyOperationForDroppedEvent(droppedEvent, index); |
| } |
| |
| @Override |
| protected void enqueueEvent(GatewayQueueEvent event) { |
| for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) { |
| serialProcessor.enqueueEvent(event); |
| } |
| } |
| |
| protected ThreadsMonitoring getThreadMonitorObj() { |
| DistributionManager distributionManager = this.sender.getDistributionManager(); |
| if (distributionManager != null) { |
| return distributionManager.getThreadMonitoring(); |
| } else { |
| return null; |
| } |
| } |
| |
| @Override |
| public String printUnprocessedEvents() { |
| return this.processors.stream().map(processor -> processor.printUnprocessedEvents()) |
| .collect(Collectors.joining(", ")); |
| } |
| |
| @Override |
| public String printUnprocessedTokens() { |
| return this.processors.stream().map(processor -> processor.printUnprocessedTokens()) |
| .collect(Collectors.joining(", ")); |
| } |
| } |