| /*========================================================================= |
| * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.cache.hdfs.internal.hoplog; |
| |
| import java.util.Collection; |
| |
| import org.apache.hadoop.fs.FileSystem; |
| |
| import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager; |
| import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask; |
| import com.gemstone.gemfire.internal.i18n.LocalizedStrings; |
| import com.gemstone.gemfire.internal.logging.LogService; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| /** |
| * For streaming case, if the bucket traffic goes down after writing few batches of data, |
| * the flush doesn't get called. In that case, the file is left in tmp state |
| * until the flush restarts. To avoid this issue, added this timer task |
| * that periodically iterates over the buckets and closes their writer |
| * if the time for rollover has passed. |
| * |
| * It also has got an extra responsibility of fixing the file sizes of the files |
| * that weren't closed properly last time. |
| * |
| * @author hemantb |
| * |
| */ |
| class CloseTmpHoplogsTimerTask extends SystemTimerTask { |
| |
| private HdfsRegionManager hdfsRegionManager; |
| private static final Logger logger = LogService.getLogger(); |
| private FileSystem filesystem; |
| |
| public CloseTmpHoplogsTimerTask(HdfsRegionManager hdfsRegionManager) { |
| this.hdfsRegionManager = hdfsRegionManager; |
| |
| // Create a new filesystem |
| // This is added for the following reason: |
| // For HDFS, if a file wasn't closed properly last time, |
| // while calling FileSystem.append for this file, FSNamesystem.startFileInternal-> |
| // FSNamesystem.recoverLeaseInternal function gets called. |
| // This function throws AlreadyBeingCreatedException if there is an open handle, to any other file, |
| // created using the same FileSystem object. This is a bug and is being tracked at: |
| // https://issues.apache.org/jira/browse/HDFS-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel |
| // |
| // The fix for this bug is not yet part of Pivotal HD. So to overcome the bug, |
| // we create a new file system for the timer task so that it does not encounter the bug. |
| this.filesystem = this.hdfsRegionManager.getStore().createFileSystem(); |
| if (logger.isDebugEnabled()) |
| logger.debug("created a new file system specifically for timer task"); |
| } |
| |
| |
| /** |
| * Iterates over all the bucket organizers and closes their writer if the time for |
| * rollover has passed. It also has the additional responsibility of fixing the tmp |
| * files that were left over in the last unsuccessful run. |
| */ |
| @Override |
| public void run2() { |
| Collection<HoplogOrganizer> organizers = hdfsRegionManager.getBucketOrganizers(); |
| if (logger.isDebugEnabled()) |
| logger.debug("Starting the close temp logs run."); |
| |
| for (HoplogOrganizer organizer: organizers) { |
| |
| HDFSUnsortedHoplogOrganizer unsortedOrganizer = (HDFSUnsortedHoplogOrganizer)organizer; |
| long timeSinceLastFlush = (System.currentTimeMillis() - unsortedOrganizer.getLastFlushTime())/1000 ; |
| try { |
| this.hdfsRegionManager.getRegion().checkReadiness(); |
| } catch (Exception e) { |
| break; |
| } |
| |
| try { |
| // the time since last flush has exceeded file rollover interval, roll over the |
| // file. |
| if (timeSinceLastFlush >= unsortedOrganizer.getfileRolloverInterval()) { |
| if (logger.isDebugEnabled()) |
| logger.debug("Closing writer for bucket: " + unsortedOrganizer.bucketId); |
| unsortedOrganizer.synchronizedCloseWriter(false, timeSinceLastFlush, 0); |
| } |
| |
| // fix the tmp hoplogs, if any. Pass the new file system here. |
| unsortedOrganizer.identifyAndFixTmpHoplogs(this.filesystem); |
| |
| } catch (Exception e) { |
| logger.warn(LocalizedStrings.HOPLOG_CLOSE_FAILED, e); |
| } |
| } |
| |
| } |
| } |
| |