| /*========================================================================= |
| * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *======================================================================== |
| */ |
| |
| package com.gemstone.gemfire.cache.hdfs.internal; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| |
| import com.gemstone.gemfire.cache.CacheClosedException; |
| import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent; |
| import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener; |
| import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog; |
| import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer; |
| import com.gemstone.gemfire.i18n.LogWriterI18n; |
| import com.gemstone.gemfire.internal.cache.BucketRegion; |
| import com.gemstone.gemfire.internal.cache.ForceReattemptException; |
| import com.gemstone.gemfire.internal.cache.LocalRegion; |
| import com.gemstone.gemfire.internal.cache.PartitionedRegion; |
| import com.gemstone.gemfire.internal.cache.PrimaryBucketException; |
| import com.gemstone.gemfire.internal.cache.execute.BucketMovedException; |
| import com.gemstone.gemfire.internal.i18n.LocalizedStrings; |
| |
| /** |
| * Listener that persists events to a write only HDFS store |
| * |
| * @author Hemant Bhanawat |
| */ |
| public class HDFSWriteOnlyStoreEventListener implements |
| AsyncEventListener { |
| |
| private final LogWriterI18n logger; |
| private volatile boolean senderStopped = false; |
| private final FailureTracker failureTracker = new FailureTracker(10L, 60 * 1000L, 1.5f); |
| |
| |
| public HDFSWriteOnlyStoreEventListener(LogWriterI18n logger) { |
| this.logger = logger; |
| } |
| |
| @Override |
| public void close() { |
| senderStopped = true; |
| } |
| |
| @Override |
| public boolean processEvents(List<AsyncEvent> events) { |
| if (Hoplog.NOP_WRITE) { |
| return true; |
| } |
| |
| if (logger.fineEnabled()) |
| logger.fine("HDFSWriteOnlyStoreEventListener: A total of " + events.size() + " events are sent from GemFire to persist on HDFS"); |
| boolean success = false; |
| try { |
| failureTracker.sleepIfRetry(); |
| HDFSGatewayEventImpl hdfsEvent = null; |
| int previousBucketId = -1; |
| BatchManager bm = null; |
| for (AsyncEvent asyncEvent : events) { |
| if (senderStopped){ |
| if (logger.fineEnabled()) { |
| logger.fine("HDFSWriteOnlyStoreEventListener.processEvents: Cache is closing down. Ignoring the batch of data."); |
| } |
| return false; |
| } |
| hdfsEvent = (HDFSGatewayEventImpl)asyncEvent; |
| if (previousBucketId != hdfsEvent.getBucketId()){ |
| if (previousBucketId != -1) |
| persistBatch(bm, previousBucketId); |
| |
| previousBucketId = hdfsEvent.getBucketId(); |
| bm = new BatchManager(); |
| } |
| bm.addEvent(hdfsEvent); |
| } |
| try { |
| persistBatch(bm, hdfsEvent.getBucketId()); |
| } catch (BucketMovedException e) { |
| logger.fine("Batch could not be written to HDFS as the bucket moved. bucket id: " + |
| hdfsEvent.getBucketId() + " Exception: " + e); |
| return false; |
| } |
| success = true; |
| } catch (IOException e) { |
| logger.warning(LocalizedStrings.HOPLOG_FLUSH_FOR_BATCH_FAILED, e); |
| return false; |
| } |
| catch (ClassNotFoundException e) { |
| logger.warning(LocalizedStrings.HOPLOG_FLUSH_FOR_BATCH_FAILED, e); |
| return false; |
| } |
| catch (CacheClosedException e) { |
| // exit silently |
| if (logger.fineEnabled()) |
| logger.fine(e); |
| return false; |
| } catch (ForceReattemptException e) { |
| if (logger.fineEnabled()) |
| logger.fine(e); |
| return false; |
| } catch (InterruptedException e1) { |
| // TODO Auto-generated catch block |
| e1.printStackTrace(); |
| } finally { |
| failureTracker.record(success); |
| } |
| return true; |
| } |
| |
| /** |
| * Persists batches of multiple regions specified by the batch manager |
| * |
| */ |
| private void persistBatch(BatchManager bm, int bucketId) throws IOException, ForceReattemptException { |
| Iterator<Map.Entry<LocalRegion,ArrayList<QueuedPersistentEvent>>> eventsPerRegion = |
| bm.iterator(); |
| HoplogOrganizer bucketOrganizer = null; |
| while (eventsPerRegion.hasNext()) { |
| Map.Entry<LocalRegion, ArrayList<QueuedPersistentEvent>> eventsForARegion = eventsPerRegion.next(); |
| bucketOrganizer = getOrganizer((PartitionedRegion) eventsForARegion.getKey(), bucketId); |
| // bucket organizer cannot be null. |
| if (bucketOrganizer == null) |
| throw new BucketMovedException("Bucket moved. BucketID: " + bucketId + " HdfsRegion: " + eventsForARegion.getKey().getName()); |
| bucketOrganizer.flush(eventsForARegion.getValue().iterator(), eventsForARegion.getValue().size()); |
| if (logger.fineEnabled()) { |
| logger.fine("Batch written to HDFS of size " + eventsForARegion.getValue().size() + |
| " for region " + eventsForARegion.getKey()); |
| } |
| } |
| } |
| |
| private HoplogOrganizer getOrganizer(PartitionedRegion region, int bucketId) { |
| BucketRegion br = region.getDataStore().getLocalBucketById(bucketId); |
| if (br == null) { |
| // got rebalanced or something |
| throw new BucketMovedException("Bucket region is no longer available. BucketId: "+ |
| bucketId + " HdfsRegion: " + region.getName()); |
| } |
| |
| return br.getHoplogOrganizer(); |
| } |
| |
| /** |
| * Sorts out events of the multiple regions into lists per region |
| * |
| */ |
| private class BatchManager implements Iterable<Map.Entry<LocalRegion,ArrayList<QueuedPersistentEvent>>> { |
| private HashMap<LocalRegion, ArrayList<QueuedPersistentEvent>> regionBatches = |
| new HashMap<LocalRegion, ArrayList<QueuedPersistentEvent>>(); |
| |
| public void addEvent (HDFSGatewayEventImpl hdfsEvent) throws IOException, ClassNotFoundException { |
| LocalRegion region = (LocalRegion) hdfsEvent.getRegion(); |
| ArrayList<QueuedPersistentEvent> regionList = regionBatches.get(region); |
| if (regionList == null) { |
| regionList = new ArrayList<QueuedPersistentEvent>(); |
| regionBatches.put(region, regionList); |
| } |
| regionList.add(new UnsortedHDFSQueuePersistedEvent(hdfsEvent)); |
| } |
| |
| @Override |
| public Iterator<Map.Entry<LocalRegion,ArrayList<QueuedPersistentEvent>>> iterator() { |
| return regionBatches.entrySet().iterator(); |
| } |
| |
| } |
| } |