| <!DOCTYPE HTML> |
| <html lang="en"> |
| <head> |
| <!-- Generated by javadoc (17) --> |
| <title>Source code</title> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| <meta name="description" content="source: package: org.apache.hadoop.hbase.backup, class: HFileArchiver, class: FileableStoreFile"> |
| <meta name="generator" content="javadoc/SourceToHTMLConverter"> |
| <link rel="stylesheet" type="text/css" href="../../../../../../stylesheet.css" title="Style"> |
| </head> |
| <body class="source-page"> |
| <main role="main"> |
| <div class="source-container"> |
| <pre><span class="source-line-no">001</span><span id="line-1">/*</span> |
| <span class="source-line-no">002</span><span id="line-2"> * Licensed to the Apache Software Foundation (ASF) under one</span> |
| <span class="source-line-no">003</span><span id="line-3"> * or more contributor license agreements. See the NOTICE file</span> |
| <span class="source-line-no">004</span><span id="line-4"> * distributed with this work for additional information</span> |
| <span class="source-line-no">005</span><span id="line-5"> * regarding copyright ownership. The ASF licenses this file</span> |
| <span class="source-line-no">006</span><span id="line-6"> * to you under the Apache License, Version 2.0 (the</span> |
| <span class="source-line-no">007</span><span id="line-7"> * "License"); you may not use this file except in compliance</span> |
| <span class="source-line-no">008</span><span id="line-8"> * with the License. You may obtain a copy of the License at</span> |
| <span class="source-line-no">009</span><span id="line-9"> *</span> |
| <span class="source-line-no">010</span><span id="line-10"> * http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="source-line-no">011</span><span id="line-11"> *</span> |
| <span class="source-line-no">012</span><span id="line-12"> * Unless required by applicable law or agreed to in writing, software</span> |
| <span class="source-line-no">013</span><span id="line-13"> * distributed under the License is distributed on an "AS IS" BASIS,</span> |
| <span class="source-line-no">014</span><span id="line-14"> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span> |
| <span class="source-line-no">015</span><span id="line-15"> * See the License for the specific language governing permissions and</span> |
| <span class="source-line-no">016</span><span id="line-16"> * limitations under the License.</span> |
| <span class="source-line-no">017</span><span id="line-17"> */</span> |
| <span class="source-line-no">018</span><span id="line-18">package org.apache.hadoop.hbase.backup;</span> |
| <span class="source-line-no">019</span><span id="line-19"></span> |
| <span class="source-line-no">020</span><span id="line-20">import java.io.FileNotFoundException;</span> |
| <span class="source-line-no">021</span><span id="line-21">import java.io.IOException;</span> |
| <span class="source-line-no">022</span><span id="line-22">import java.io.InterruptedIOException;</span> |
| <span class="source-line-no">023</span><span id="line-23">import java.util.ArrayList;</span> |
| <span class="source-line-no">024</span><span id="line-24">import java.util.Collection;</span> |
| <span class="source-line-no">025</span><span id="line-25">import java.util.Collections;</span> |
| <span class="source-line-no">026</span><span id="line-26">import java.util.HashMap;</span> |
| <span class="source-line-no">027</span><span id="line-27">import java.util.List;</span> |
| <span class="source-line-no">028</span><span id="line-28">import java.util.Map;</span> |
| <span class="source-line-no">029</span><span id="line-29">import java.util.Queue;</span> |
| <span class="source-line-no">030</span><span id="line-30">import java.util.concurrent.ConcurrentLinkedQueue;</span> |
| <span class="source-line-no">031</span><span id="line-31">import java.util.concurrent.ExecutionException;</span> |
| <span class="source-line-no">032</span><span id="line-32">import java.util.concurrent.Future;</span> |
| <span class="source-line-no">033</span><span id="line-33">import java.util.concurrent.ThreadFactory;</span> |
| <span class="source-line-no">034</span><span id="line-34">import java.util.concurrent.ThreadPoolExecutor;</span> |
| <span class="source-line-no">035</span><span id="line-35">import java.util.concurrent.TimeUnit;</span> |
| <span class="source-line-no">036</span><span id="line-36">import java.util.concurrent.atomic.AtomicInteger;</span> |
| <span class="source-line-no">037</span><span id="line-37">import java.util.function.Function;</span> |
| <span class="source-line-no">038</span><span id="line-38">import java.util.stream.Collectors;</span> |
| <span class="source-line-no">039</span><span id="line-39">import java.util.stream.Stream;</span> |
| <span class="source-line-no">040</span><span id="line-40">import org.apache.hadoop.conf.Configuration;</span> |
| <span class="source-line-no">041</span><span id="line-41">import org.apache.hadoop.fs.FileStatus;</span> |
| <span class="source-line-no">042</span><span id="line-42">import org.apache.hadoop.fs.FileSystem;</span> |
| <span class="source-line-no">043</span><span id="line-43">import org.apache.hadoop.fs.Path;</span> |
| <span class="source-line-no">044</span><span id="line-44">import org.apache.hadoop.fs.PathFilter;</span> |
| <span class="source-line-no">045</span><span id="line-45">import org.apache.hadoop.hbase.HConstants;</span> |
| <span class="source-line-no">046</span><span id="line-46">import org.apache.hadoop.hbase.client.RegionInfo;</span> |
| <span class="source-line-no">047</span><span id="line-47">import org.apache.hadoop.hbase.regionserver.HStoreFile;</span> |
| <span class="source-line-no">048</span><span id="line-48">import org.apache.hadoop.hbase.util.Bytes;</span> |
| <span class="source-line-no">049</span><span id="line-49">import org.apache.hadoop.hbase.util.CommonFSUtils;</span> |
| <span class="source-line-no">050</span><span id="line-50">import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;</span> |
| <span class="source-line-no">051</span><span id="line-51">import org.apache.hadoop.hbase.util.FSUtils;</span> |
| <span class="source-line-no">052</span><span id="line-52">import org.apache.hadoop.hbase.util.HFileArchiveUtil;</span> |
| <span class="source-line-no">053</span><span id="line-53">import org.apache.hadoop.hbase.util.Threads;</span> |
| <span class="source-line-no">054</span><span id="line-54">import org.apache.hadoop.io.MultipleIOException;</span> |
| <span class="source-line-no">055</span><span id="line-55">import org.apache.yetus.audience.InterfaceAudience;</span> |
| <span class="source-line-no">056</span><span id="line-56">import org.slf4j.Logger;</span> |
| <span class="source-line-no">057</span><span id="line-57">import org.slf4j.LoggerFactory;</span> |
| <span class="source-line-no">058</span><span id="line-58"></span> |
| <span class="source-line-no">059</span><span id="line-59">import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;</span> |
| <span class="source-line-no">060</span><span id="line-60"></span> |
| <span class="source-line-no">061</span><span id="line-61">/**</span> |
| <span class="source-line-no">062</span><span id="line-62"> * Utility class to handle the removal of HFiles (or the respective {@link HStoreFile StoreFiles})</span> |
| <span class="source-line-no">063</span><span id="line-63"> * for a HRegion from the {@link FileSystem}. The hfiles will be archived or deleted, depending on</span> |
| <span class="source-line-no">064</span><span id="line-64"> * the state of the system.</span> |
| <span class="source-line-no">065</span><span id="line-65"> */</span> |
| <span class="source-line-no">066</span><span id="line-66">@InterfaceAudience.Private</span> |
| <span class="source-line-no">067</span><span id="line-67">public class HFileArchiver {</span> |
| <span class="source-line-no">068</span><span id="line-68"> private static final Logger LOG = LoggerFactory.getLogger(HFileArchiver.class);</span> |
| <span class="source-line-no">069</span><span id="line-69"> private static final String SEPARATOR = ".";</span> |
| <span class="source-line-no">070</span><span id="line-70"></span> |
| <span class="source-line-no">071</span><span id="line-71"> /** Number of retries in case of fs operation failure */</span> |
| <span class="source-line-no">072</span><span id="line-72"> private static final int DEFAULT_RETRIES_NUMBER = 3;</span> |
| <span class="source-line-no">073</span><span id="line-73"></span> |
| <span class="source-line-no">074</span><span id="line-74"> private static final Function<File, Path> FUNC_FILE_TO_PATH = new Function<File, Path>() {</span> |
| <span class="source-line-no">075</span><span id="line-75"> @Override</span> |
| <span class="source-line-no">076</span><span id="line-76"> public Path apply(File file) {</span> |
| <span class="source-line-no">077</span><span id="line-77"> return file == null ? null : file.getPath();</span> |
| <span class="source-line-no">078</span><span id="line-78"> }</span> |
| <span class="source-line-no">079</span><span id="line-79"> };</span> |
| <span class="source-line-no">080</span><span id="line-80"></span> |
| <span class="source-line-no">081</span><span id="line-81"> private static ThreadPoolExecutor archiveExecutor;</span> |
| <span class="source-line-no">082</span><span id="line-82"></span> |
| <span class="source-line-no">083</span><span id="line-83"> private HFileArchiver() {</span> |
| <span class="source-line-no">084</span><span id="line-84"> // hidden ctor since this is just a util</span> |
| <span class="source-line-no">085</span><span id="line-85"> }</span> |
| <span class="source-line-no">086</span><span id="line-86"></span> |
| <span class="source-line-no">087</span><span id="line-87"> /** Returns True if the Region exits in the filesystem. */</span> |
| <span class="source-line-no">088</span><span id="line-88"> public static boolean exists(Configuration conf, FileSystem fs, RegionInfo info)</span> |
| <span class="source-line-no">089</span><span id="line-89"> throws IOException {</span> |
| <span class="source-line-no">090</span><span id="line-90"> Path rootDir = CommonFSUtils.getRootDir(conf);</span> |
| <span class="source-line-no">091</span><span id="line-91"> Path regionDir = FSUtils.getRegionDirFromRootDir(rootDir, info);</span> |
| <span class="source-line-no">092</span><span id="line-92"> return fs.exists(regionDir);</span> |
| <span class="source-line-no">093</span><span id="line-93"> }</span> |
| <span class="source-line-no">094</span><span id="line-94"></span> |
| <span class="source-line-no">095</span><span id="line-95"> /**</span> |
| <span class="source-line-no">096</span><span id="line-96"> * Cleans up all the files for a HRegion by archiving the HFiles to the archive directory</span> |
| <span class="source-line-no">097</span><span id="line-97"> * @param conf the configuration to use</span> |
| <span class="source-line-no">098</span><span id="line-98"> * @param fs the file system object</span> |
| <span class="source-line-no">099</span><span id="line-99"> * @param info RegionInfo for region to be deleted</span> |
| <span class="source-line-no">100</span><span id="line-100"> */</span> |
| <span class="source-line-no">101</span><span id="line-101"> public static void archiveRegion(Configuration conf, FileSystem fs, RegionInfo info)</span> |
| <span class="source-line-no">102</span><span id="line-102"> throws IOException {</span> |
| <span class="source-line-no">103</span><span id="line-103"> Path rootDir = CommonFSUtils.getRootDir(conf);</span> |
| <span class="source-line-no">104</span><span id="line-104"> archiveRegion(conf, fs, rootDir, CommonFSUtils.getTableDir(rootDir, info.getTable()),</span> |
| <span class="source-line-no">105</span><span id="line-105"> FSUtils.getRegionDirFromRootDir(rootDir, info));</span> |
| <span class="source-line-no">106</span><span id="line-106"> }</span> |
| <span class="source-line-no">107</span><span id="line-107"></span> |
| <span class="source-line-no">108</span><span id="line-108"> /**</span> |
| <span class="source-line-no">109</span><span id="line-109"> * Cleans up all the files for a HRegion by archiving the HFiles to the archive directory</span> |
| <span class="source-line-no">110</span><span id="line-110"> * @param conf the configuration to use</span> |
| <span class="source-line-no">111</span><span id="line-111"> * @param fs the file system object</span> |
| <span class="source-line-no">112</span><span id="line-112"> * @param info RegionInfo for region to be deleted</span> |
| <span class="source-line-no">113</span><span id="line-113"> * @param rootDir {@link Path} to the root directory where hbase files are stored (for building</span> |
| <span class="source-line-no">114</span><span id="line-114"> * the archive path)</span> |
| <span class="source-line-no">115</span><span id="line-115"> * @param tableDir {@link Path} to where the table is being stored (for building the archive path)</span> |
| <span class="source-line-no">116</span><span id="line-116"> */</span> |
| <span class="source-line-no">117</span><span id="line-117"> public static void archiveRegion(Configuration conf, FileSystem fs, RegionInfo info, Path rootDir,</span> |
| <span class="source-line-no">118</span><span id="line-118"> Path tableDir) throws IOException {</span> |
| <span class="source-line-no">119</span><span id="line-119"> archiveRegion(conf, fs, rootDir, tableDir, FSUtils.getRegionDirFromRootDir(rootDir, info));</span> |
| <span class="source-line-no">120</span><span id="line-120"> }</span> |
| <span class="source-line-no">121</span><span id="line-121"></span> |
| <span class="source-line-no">122</span><span id="line-122"> /**</span> |
| <span class="source-line-no">123</span><span id="line-123"> * Remove an entire region from the table directory via archiving the region's hfiles.</span> |
| <span class="source-line-no">124</span><span id="line-124"> * @param fs {@link FileSystem} from which to remove the region</span> |
| <span class="source-line-no">125</span><span id="line-125"> * @param rootdir {@link Path} to the root directory where hbase files are stored (for building</span> |
| <span class="source-line-no">126</span><span id="line-126"> * the archive path)</span> |
| <span class="source-line-no">127</span><span id="line-127"> * @param tableDir {@link Path} to where the table is being stored (for building the archive</span> |
| <span class="source-line-no">128</span><span id="line-128"> * path)</span> |
| <span class="source-line-no">129</span><span id="line-129"> * @param regionDir {@link Path} to where a region is being stored (for building the archive path)</span> |
| <span class="source-line-no">130</span><span id="line-130"> * @return <tt>true</tt> if the region was successfully deleted. <tt>false</tt> if the filesystem</span> |
| <span class="source-line-no">131</span><span id="line-131"> * operations could not complete.</span> |
| <span class="source-line-no">132</span><span id="line-132"> * @throws IOException if the request cannot be completed</span> |
| <span class="source-line-no">133</span><span id="line-133"> */</span> |
| <span class="source-line-no">134</span><span id="line-134"> public static boolean archiveRegion(Configuration conf, FileSystem fs, Path rootdir,</span> |
| <span class="source-line-no">135</span><span id="line-135"> Path tableDir, Path regionDir) throws IOException {</span> |
| <span class="source-line-no">136</span><span id="line-136"> // otherwise, we archive the files</span> |
| <span class="source-line-no">137</span><span id="line-137"> // make sure we can archive</span> |
| <span class="source-line-no">138</span><span id="line-138"> if (tableDir == null || regionDir == null) {</span> |
| <span class="source-line-no">139</span><span id="line-139"> LOG.error("No archive directory could be found because tabledir (" + tableDir</span> |
| <span class="source-line-no">140</span><span id="line-140"> + ") or regiondir (" + regionDir + "was null. Deleting files instead.");</span> |
| <span class="source-line-no">141</span><span id="line-141"> if (regionDir != null) {</span> |
| <span class="source-line-no">142</span><span id="line-142"> deleteRegionWithoutArchiving(fs, regionDir);</span> |
| <span class="source-line-no">143</span><span id="line-143"> }</span> |
| <span class="source-line-no">144</span><span id="line-144"> // we should have archived, but failed to. Doesn't matter if we deleted</span> |
| <span class="source-line-no">145</span><span id="line-145"> // the archived files correctly or not.</span> |
| <span class="source-line-no">146</span><span id="line-146"> return false;</span> |
| <span class="source-line-no">147</span><span id="line-147"> }</span> |
| <span class="source-line-no">148</span><span id="line-148"></span> |
| <span class="source-line-no">149</span><span id="line-149"> LOG.debug("ARCHIVING {}", regionDir);</span> |
| <span class="source-line-no">150</span><span id="line-150"></span> |
| <span class="source-line-no">151</span><span id="line-151"> // make sure the regiondir lives under the tabledir</span> |
| <span class="source-line-no">152</span><span id="line-152"> Preconditions.checkArgument(regionDir.toString().startsWith(tableDir.toString()));</span> |
| <span class="source-line-no">153</span><span id="line-153"> Path regionArchiveDir = HFileArchiveUtil.getRegionArchiveDir(rootdir,</span> |
| <span class="source-line-no">154</span><span id="line-154"> CommonFSUtils.getTableName(tableDir), regionDir.getName());</span> |
| <span class="source-line-no">155</span><span id="line-155"></span> |
| <span class="source-line-no">156</span><span id="line-156"> FileStatusConverter getAsFile = new FileStatusConverter(fs);</span> |
| <span class="source-line-no">157</span><span id="line-157"> // otherwise, we attempt to archive the store files</span> |
| <span class="source-line-no">158</span><span id="line-158"></span> |
| <span class="source-line-no">159</span><span id="line-159"> // build collection of just the store directories to archive</span> |
| <span class="source-line-no">160</span><span id="line-160"> Collection<File> toArchive = new ArrayList<>();</span> |
| <span class="source-line-no">161</span><span id="line-161"> final PathFilter dirFilter = new FSUtils.DirFilter(fs);</span> |
| <span class="source-line-no">162</span><span id="line-162"> PathFilter nonHidden = new PathFilter() {</span> |
| <span class="source-line-no">163</span><span id="line-163"> @Override</span> |
| <span class="source-line-no">164</span><span id="line-164"> public boolean accept(Path file) {</span> |
| <span class="source-line-no">165</span><span id="line-165"> return dirFilter.accept(file) && !file.getName().startsWith(".");</span> |
| <span class="source-line-no">166</span><span id="line-166"> }</span> |
| <span class="source-line-no">167</span><span id="line-167"> };</span> |
| <span class="source-line-no">168</span><span id="line-168"> FileStatus[] storeDirs = CommonFSUtils.listStatus(fs, regionDir, nonHidden);</span> |
| <span class="source-line-no">169</span><span id="line-169"> // if there no files, we can just delete the directory and return;</span> |
| <span class="source-line-no">170</span><span id="line-170"> if (storeDirs == null) {</span> |
| <span class="source-line-no">171</span><span id="line-171"> LOG.debug("Directory {} empty.", regionDir);</span> |
| <span class="source-line-no">172</span><span id="line-172"> return deleteRegionWithoutArchiving(fs, regionDir);</span> |
| <span class="source-line-no">173</span><span id="line-173"> }</span> |
| <span class="source-line-no">174</span><span id="line-174"></span> |
| <span class="source-line-no">175</span><span id="line-175"> // convert the files in the region to a File</span> |
| <span class="source-line-no">176</span><span id="line-176"> Stream.of(storeDirs).map(getAsFile).forEachOrdered(toArchive::add);</span> |
| <span class="source-line-no">177</span><span id="line-177"> LOG.debug("Archiving " + toArchive);</span> |
| <span class="source-line-no">178</span><span id="line-178"> List<File> failedArchive = resolveAndArchive(conf, fs, regionArchiveDir, toArchive,</span> |
| <span class="source-line-no">179</span><span id="line-179"> EnvironmentEdgeManager.currentTime());</span> |
| <span class="source-line-no">180</span><span id="line-180"> if (!failedArchive.isEmpty()) {</span> |
| <span class="source-line-no">181</span><span id="line-181"> throw new FailedArchiveException(</span> |
| <span class="source-line-no">182</span><span id="line-182"> "Failed to archive/delete all the files for region:" + regionDir.getName() + " into "</span> |
| <span class="source-line-no">183</span><span id="line-183"> + regionArchiveDir + ". Something is probably awry on the filesystem.",</span> |
| <span class="source-line-no">184</span><span id="line-184"> failedArchive.stream().map(FUNC_FILE_TO_PATH).collect(Collectors.toList()));</span> |
| <span class="source-line-no">185</span><span id="line-185"> }</span> |
| <span class="source-line-no">186</span><span id="line-186"> // if that was successful, then we delete the region</span> |
| <span class="source-line-no">187</span><span id="line-187"> return deleteRegionWithoutArchiving(fs, regionDir);</span> |
| <span class="source-line-no">188</span><span id="line-188"> }</span> |
| <span class="source-line-no">189</span><span id="line-189"></span> |
| <span class="source-line-no">190</span><span id="line-190"> /**</span> |
| <span class="source-line-no">191</span><span id="line-191"> * Archive the specified regions in parallel.</span> |
| <span class="source-line-no">192</span><span id="line-192"> * @param conf the configuration to use</span> |
| <span class="source-line-no">193</span><span id="line-193"> * @param fs {@link FileSystem} from which to remove the region</span> |
| <span class="source-line-no">194</span><span id="line-194"> * @param rootDir {@link Path} to the root directory where hbase files are stored (for</span> |
| <span class="source-line-no">195</span><span id="line-195"> * building the archive path)</span> |
| <span class="source-line-no">196</span><span id="line-196"> * @param tableDir {@link Path} to where the table is being stored (for building the archive</span> |
| <span class="source-line-no">197</span><span id="line-197"> * path)</span> |
| <span class="source-line-no">198</span><span id="line-198"> * @param regionDirList {@link Path} to where regions are being stored (for building the archive</span> |
| <span class="source-line-no">199</span><span id="line-199"> * path)</span> |
| <span class="source-line-no">200</span><span id="line-200"> * @throws IOException if the request cannot be completed</span> |
| <span class="source-line-no">201</span><span id="line-201"> */</span> |
| <span class="source-line-no">202</span><span id="line-202"> public static void archiveRegions(Configuration conf, FileSystem fs, Path rootDir, Path tableDir,</span> |
| <span class="source-line-no">203</span><span id="line-203"> List<Path> regionDirList) throws IOException {</span> |
| <span class="source-line-no">204</span><span id="line-204"> List<Future<Void>> futures = new ArrayList<>(regionDirList.size());</span> |
| <span class="source-line-no">205</span><span id="line-205"> for (Path regionDir : regionDirList) {</span> |
| <span class="source-line-no">206</span><span id="line-206"> Future<Void> future = getArchiveExecutor(conf).submit(() -> {</span> |
| <span class="source-line-no">207</span><span id="line-207"> archiveRegion(conf, fs, rootDir, tableDir, regionDir);</span> |
| <span class="source-line-no">208</span><span id="line-208"> return null;</span> |
| <span class="source-line-no">209</span><span id="line-209"> });</span> |
| <span class="source-line-no">210</span><span id="line-210"> futures.add(future);</span> |
| <span class="source-line-no">211</span><span id="line-211"> }</span> |
| <span class="source-line-no">212</span><span id="line-212"> try {</span> |
| <span class="source-line-no">213</span><span id="line-213"> for (Future<Void> future : futures) {</span> |
| <span class="source-line-no">214</span><span id="line-214"> future.get();</span> |
| <span class="source-line-no">215</span><span id="line-215"> }</span> |
| <span class="source-line-no">216</span><span id="line-216"> } catch (InterruptedException e) {</span> |
| <span class="source-line-no">217</span><span id="line-217"> throw new InterruptedIOException(e.getMessage());</span> |
| <span class="source-line-no">218</span><span id="line-218"> } catch (ExecutionException e) {</span> |
| <span class="source-line-no">219</span><span id="line-219"> throw new IOException(e.getCause());</span> |
| <span class="source-line-no">220</span><span id="line-220"> }</span> |
| <span class="source-line-no">221</span><span id="line-221"> }</span> |
| <span class="source-line-no">222</span><span id="line-222"></span> |
| <span class="source-line-no">223</span><span id="line-223"> private static synchronized ThreadPoolExecutor getArchiveExecutor(final Configuration conf) {</span> |
| <span class="source-line-no">224</span><span id="line-224"> if (archiveExecutor == null) {</span> |
| <span class="source-line-no">225</span><span id="line-225"> int maxThreads = conf.getInt("hbase.hfilearchiver.thread.pool.max", 8);</span> |
| <span class="source-line-no">226</span><span id="line-226"> archiveExecutor = Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,</span> |
| <span class="source-line-no">227</span><span id="line-227"> getThreadFactory("HFileArchiver"));</span> |
| <span class="source-line-no">228</span><span id="line-228"></span> |
| <span class="source-line-no">229</span><span id="line-229"> // Shutdown this ThreadPool in a shutdown hook</span> |
| <span class="source-line-no">230</span><span id="line-230"> Runtime.getRuntime().addShutdownHook(new Thread(() -> archiveExecutor.shutdown()));</span> |
| <span class="source-line-no">231</span><span id="line-231"> }</span> |
| <span class="source-line-no">232</span><span id="line-232"> return archiveExecutor;</span> |
| <span class="source-line-no">233</span><span id="line-233"> }</span> |
| <span class="source-line-no">234</span><span id="line-234"></span> |
| <span class="source-line-no">235</span><span id="line-235"> // We need this method instead of Threads.getNamedThreadFactory() to pass some tests.</span> |
| <span class="source-line-no">236</span><span id="line-236"> // The difference from Threads.getNamedThreadFactory() is that it doesn't fix ThreadGroup for</span> |
| <span class="source-line-no">237</span><span id="line-237"> // new threads. If we use Threads.getNamedThreadFactory(), we will face ThreadGroup related</span> |
| <span class="source-line-no">238</span><span id="line-238"> // issues in some tests.</span> |
| <span class="source-line-no">239</span><span id="line-239"> private static ThreadFactory getThreadFactory(String archiverName) {</span> |
| <span class="source-line-no">240</span><span id="line-240"> return new ThreadFactory() {</span> |
| <span class="source-line-no">241</span><span id="line-241"> final AtomicInteger threadNumber = new AtomicInteger(1);</span> |
| <span class="source-line-no">242</span><span id="line-242"></span> |
| <span class="source-line-no">243</span><span id="line-243"> @Override</span> |
| <span class="source-line-no">244</span><span id="line-244"> public Thread newThread(Runnable r) {</span> |
| <span class="source-line-no">245</span><span id="line-245"> final String name = archiverName + "-" + threadNumber.getAndIncrement();</span> |
| <span class="source-line-no">246</span><span id="line-246"> Thread t = new Thread(r, name);</span> |
| <span class="source-line-no">247</span><span id="line-247"> t.setDaemon(true);</span> |
| <span class="source-line-no">248</span><span id="line-248"> return t;</span> |
| <span class="source-line-no">249</span><span id="line-249"> }</span> |
| <span class="source-line-no">250</span><span id="line-250"> };</span> |
| <span class="source-line-no">251</span><span id="line-251"> }</span> |
| <span class="source-line-no">252</span><span id="line-252"></span> |
| <span class="source-line-no">253</span><span id="line-253"> /**</span> |
| <span class="source-line-no">254</span><span id="line-254"> * Remove from the specified region the store files of the specified column family, either by</span> |
| <span class="source-line-no">255</span><span id="line-255"> * archiving them or outright deletion</span> |
| <span class="source-line-no">256</span><span id="line-256"> * @param fs the filesystem where the store files live</span> |
| <span class="source-line-no">257</span><span id="line-257"> * @param conf {@link Configuration} to examine to determine the archive directory</span> |
| <span class="source-line-no">258</span><span id="line-258"> * @param parent Parent region hosting the store files</span> |
| <span class="source-line-no">259</span><span id="line-259"> * @param tableDir {@link Path} to where the table is being stored (for building the archive path)</span> |
| <span class="source-line-no">260</span><span id="line-260"> * @param family the family hosting the store files</span> |
| <span class="source-line-no">261</span><span id="line-261"> * @throws IOException if the files could not be correctly disposed.</span> |
| <span class="source-line-no">262</span><span id="line-262"> */</span> |
| <span class="source-line-no">263</span><span id="line-263"> public static void archiveFamily(FileSystem fs, Configuration conf, RegionInfo parent,</span> |
| <span class="source-line-no">264</span><span id="line-264"> Path tableDir, byte[] family) throws IOException {</span> |
| <span class="source-line-no">265</span><span id="line-265"> Path familyDir = new Path(tableDir, new Path(parent.getEncodedName(), Bytes.toString(family)));</span> |
| <span class="source-line-no">266</span><span id="line-266"> archiveFamilyByFamilyDir(fs, conf, parent, familyDir, family);</span> |
| <span class="source-line-no">267</span><span id="line-267"> }</span> |
| <span class="source-line-no">268</span><span id="line-268"></span> |
| <span class="source-line-no">269</span><span id="line-269"> /**</span> |
| <span class="source-line-no">270</span><span id="line-270"> * Removes from the specified region the store files of the specified column family, either by</span> |
| <span class="source-line-no">271</span><span id="line-271"> * archiving them or outright deletion</span> |
| <span class="source-line-no">272</span><span id="line-272"> * @param fs the filesystem where the store files live</span> |
| <span class="source-line-no">273</span><span id="line-273"> * @param conf {@link Configuration} to examine to determine the archive directory</span> |
| <span class="source-line-no">274</span><span id="line-274"> * @param parent Parent region hosting the store files</span> |
| <span class="source-line-no">275</span><span id="line-275"> * @param familyDir {@link Path} to where the family is being stored</span> |
| <span class="source-line-no">276</span><span id="line-276"> * @param family the family hosting the store files</span> |
| <span class="source-line-no">277</span><span id="line-277"> * @throws IOException if the files could not be correctly disposed.</span> |
| <span class="source-line-no">278</span><span id="line-278"> */</span> |
| <span class="source-line-no">279</span><span id="line-279"> public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf, RegionInfo parent,</span> |
| <span class="source-line-no">280</span><span id="line-280"> Path familyDir, byte[] family) throws IOException {</span> |
| <span class="source-line-no">281</span><span id="line-281"> FileStatus[] storeFiles = CommonFSUtils.listStatus(fs, familyDir);</span> |
| <span class="source-line-no">282</span><span id="line-282"> if (storeFiles == null) {</span> |
| <span class="source-line-no">283</span><span id="line-283"> LOG.debug("No files to dispose of in {}, family={}", parent.getRegionNameAsString(),</span> |
| <span class="source-line-no">284</span><span id="line-284"> Bytes.toString(family));</span> |
| <span class="source-line-no">285</span><span id="line-285"> return;</span> |
| <span class="source-line-no">286</span><span id="line-286"> }</span> |
| <span class="source-line-no">287</span><span id="line-287"></span> |
| <span class="source-line-no">288</span><span id="line-288"> FileStatusConverter getAsFile = new FileStatusConverter(fs);</span> |
| <span class="source-line-no">289</span><span id="line-289"> Collection<File> toArchive = Stream.of(storeFiles).map(getAsFile).collect(Collectors.toList());</span> |
| <span class="source-line-no">290</span><span id="line-290"> Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, parent, family);</span> |
| <span class="source-line-no">291</span><span id="line-291"></span> |
| <span class="source-line-no">292</span><span id="line-292"> // do the actual archive</span> |
| <span class="source-line-no">293</span><span id="line-293"> List<File> failedArchive =</span> |
| <span class="source-line-no">294</span><span id="line-294"> resolveAndArchive(conf, fs, storeArchiveDir, toArchive, EnvironmentEdgeManager.currentTime());</span> |
| <span class="source-line-no">295</span><span id="line-295"> if (!failedArchive.isEmpty()) {</span> |
| <span class="source-line-no">296</span><span id="line-296"> throw new FailedArchiveException(</span> |
| <span class="source-line-no">297</span><span id="line-297"> "Failed to archive/delete all the files for region:"</span> |
| <span class="source-line-no">298</span><span id="line-298"> + Bytes.toString(parent.getRegionName()) + ", family:" + Bytes.toString(family) + " into "</span> |
| <span class="source-line-no">299</span><span id="line-299"> + storeArchiveDir + ". Something is probably awry on the filesystem.",</span> |
| <span class="source-line-no">300</span><span id="line-300"> failedArchive.stream().map(FUNC_FILE_TO_PATH).collect(Collectors.toList()));</span> |
| <span class="source-line-no">301</span><span id="line-301"> }</span> |
| <span class="source-line-no">302</span><span id="line-302"> }</span> |
| <span class="source-line-no">303</span><span id="line-303"></span> |
| <span class="source-line-no">304</span><span id="line-304"> /**</span> |
| <span class="source-line-no">305</span><span id="line-305"> * Remove the store files, either by archiving them or outright deletion</span> |
| <span class="source-line-no">306</span><span id="line-306"> * @param conf {@link Configuration} to examine to determine the archive directory</span> |
| <span class="source-line-no">307</span><span id="line-307"> * @param fs the filesystem where the store files live</span> |
| <span class="source-line-no">308</span><span id="line-308"> * @param regionInfo {@link RegionInfo} of the region hosting the store files</span> |
| <span class="source-line-no">309</span><span id="line-309"> * @param family the family hosting the store files</span> |
| <span class="source-line-no">310</span><span id="line-310"> * @param compactedFiles files to be disposed of. No further reading of these files should be</span> |
| <span class="source-line-no">311</span><span id="line-311"> * attempted; otherwise likely to cause an {@link IOException}</span> |
| <span class="source-line-no">312</span><span id="line-312"> * @throws IOException if the files could not be correctly disposed.</span> |
| <span class="source-line-no">313</span><span id="line-313"> */</span> |
| <span class="source-line-no">314</span><span id="line-314"> public static void archiveStoreFiles(Configuration conf, FileSystem fs, RegionInfo regionInfo,</span> |
| <span class="source-line-no">315</span><span id="line-315"> Path tableDir, byte[] family, Collection<HStoreFile> compactedFiles) throws IOException {</span> |
| <span class="source-line-no">316</span><span id="line-316"> Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family);</span> |
| <span class="source-line-no">317</span><span id="line-317"> archive(conf, fs, regionInfo, family, compactedFiles, storeArchiveDir);</span> |
| <span class="source-line-no">318</span><span id="line-318"> }</span> |
| <span class="source-line-no">319</span><span id="line-319"></span> |
| <span class="source-line-no">320</span><span id="line-320"> /**</span> |
| <span class="source-line-no">321</span><span id="line-321"> * Archive recovered edits using existing logic for archiving store files. This is currently only</span> |
| <span class="source-line-no">322</span><span id="line-322"> * relevant when <b>hbase.region.archive.recovered.edits</b> is true, as recovered edits shouldn't</span> |
| <span class="source-line-no">323</span><span id="line-323"> * be kept after replay. In theory, we could use very same method available for archiving store</span> |
| <span class="source-line-no">324</span><span id="line-324"> * files, but supporting WAL dir and store files on different FileSystems added the need for extra</span> |
| <span class="source-line-no">325</span><span id="line-325"> * validation of the passed FileSystem instance and the path where the archiving edits should be</span> |
| <span class="source-line-no">326</span><span id="line-326"> * placed.</span> |
| <span class="source-line-no">327</span><span id="line-327"> * @param conf {@link Configuration} to determine the archive directory.</span> |
| <span class="source-line-no">328</span><span id="line-328"> * @param fs the filesystem used for storing WAL files.</span> |
| <span class="source-line-no">329</span><span id="line-329"> * @param regionInfo {@link RegionInfo} a pseudo region representation for the archiving logic.</span> |
| <span class="source-line-no">330</span><span id="line-330"> * @param family a pseudo familiy representation for the archiving logic.</span> |
| <span class="source-line-no">331</span><span id="line-331"> * @param replayedEdits the recovered edits to be archived.</span> |
| <span class="source-line-no">332</span><span id="line-332"> * @throws IOException if files can't be achived due to some internal error.</span> |
| <span class="source-line-no">333</span><span id="line-333"> */</span> |
| <span class="source-line-no">334</span><span id="line-334"> public static void archiveRecoveredEdits(Configuration conf, FileSystem fs, RegionInfo regionInfo,</span> |
| <span class="source-line-no">335</span><span id="line-335"> byte[] family, Collection<HStoreFile> replayedEdits) throws IOException {</span> |
| <span class="source-line-no">336</span><span id="line-336"> String workingDir = conf.get(CommonFSUtils.HBASE_WAL_DIR, conf.get(HConstants.HBASE_DIR));</span> |
| <span class="source-line-no">337</span><span id="line-337"> // extra sanity checks for the right FS</span> |
| <span class="source-line-no">338</span><span id="line-338"> Path path = new Path(workingDir);</span> |
| <span class="source-line-no">339</span><span id="line-339"> if (path.isAbsoluteAndSchemeAuthorityNull()) {</span> |
| <span class="source-line-no">340</span><span id="line-340"> // no schema specified on wal dir value, so it's on same FS as StoreFiles</span> |
| <span class="source-line-no">341</span><span id="line-341"> path = new Path(conf.get(HConstants.HBASE_DIR));</span> |
| <span class="source-line-no">342</span><span id="line-342"> }</span> |
| <span class="source-line-no">343</span><span id="line-343"> if (path.toUri().getScheme() != null && !path.toUri().getScheme().equals(fs.getScheme())) {</span> |
| <span class="source-line-no">344</span><span id="line-344"> throw new IOException(</span> |
| <span class="source-line-no">345</span><span id="line-345"> "Wrong file system! Should be " + path.toUri().getScheme() + ", but got " + fs.getScheme());</span> |
| <span class="source-line-no">346</span><span id="line-346"> }</span> |
| <span class="source-line-no">347</span><span id="line-347"> path = HFileArchiveUtil.getStoreArchivePathForRootDir(path, regionInfo, family);</span> |
| <span class="source-line-no">348</span><span id="line-348"> archive(conf, fs, regionInfo, family, replayedEdits, path);</span> |
| <span class="source-line-no">349</span><span id="line-349"> }</span> |
| <span class="source-line-no">350</span><span id="line-350"></span> |
| <span class="source-line-no">351</span><span id="line-351"> private static void archive(Configuration conf, FileSystem fs, RegionInfo regionInfo,</span> |
| <span class="source-line-no">352</span><span id="line-352"> byte[] family, Collection<HStoreFile> compactedFiles, Path storeArchiveDir) throws IOException {</span> |
| <span class="source-line-no">353</span><span id="line-353"> // sometimes in testing, we don't have rss, so we need to check for that</span> |
| <span class="source-line-no">354</span><span id="line-354"> if (fs == null) {</span> |
| <span class="source-line-no">355</span><span id="line-355"> LOG.warn(</span> |
| <span class="source-line-no">356</span><span id="line-356"> "Passed filesystem is null, so just deleting files without archiving for {}," + "family={}",</span> |
| <span class="source-line-no">357</span><span id="line-357"> Bytes.toString(regionInfo.getRegionName()), Bytes.toString(family));</span> |
| <span class="source-line-no">358</span><span id="line-358"> deleteStoreFilesWithoutArchiving(compactedFiles);</span> |
| <span class="source-line-no">359</span><span id="line-359"> return;</span> |
| <span class="source-line-no">360</span><span id="line-360"> }</span> |
| <span class="source-line-no">361</span><span id="line-361"></span> |
| <span class="source-line-no">362</span><span id="line-362"> // short circuit if we don't have any files to delete</span> |
| <span class="source-line-no">363</span><span id="line-363"> if (compactedFiles.isEmpty()) {</span> |
| <span class="source-line-no">364</span><span id="line-364"> LOG.debug("No files to dispose of, done!");</span> |
| <span class="source-line-no">365</span><span id="line-365"> return;</span> |
| <span class="source-line-no">366</span><span id="line-366"> }</span> |
| <span class="source-line-no">367</span><span id="line-367"></span> |
| <span class="source-line-no">368</span><span id="line-368"> // build the archive path</span> |
| <span class="source-line-no">369</span><span id="line-369"> if (regionInfo == null || family == null)</span> |
| <span class="source-line-no">370</span><span id="line-370"> throw new IOException("Need to have a region and a family to archive from.");</span> |
| <span class="source-line-no">371</span><span id="line-371"> // make sure we don't archive if we can't and that the archive dir exists</span> |
| <span class="source-line-no">372</span><span id="line-372"> if (!fs.mkdirs(storeArchiveDir)) {</span> |
| <span class="source-line-no">373</span><span id="line-373"> throw new IOException("Could not make archive directory (" + storeArchiveDir + ") for store:"</span> |
| <span class="source-line-no">374</span><span id="line-374"> + Bytes.toString(family) + ", deleting compacted files instead.");</span> |
| <span class="source-line-no">375</span><span id="line-375"> }</span> |
| <span class="source-line-no">376</span><span id="line-376"></span> |
| <span class="source-line-no">377</span><span id="line-377"> // otherwise we attempt to archive the store files</span> |
| <span class="source-line-no">378</span><span id="line-378"> LOG.debug("Archiving compacted files.");</span> |
| <span class="source-line-no">379</span><span id="line-379"></span> |
| <span class="source-line-no">380</span><span id="line-380"> // Wrap the storefile into a File</span> |
| <span class="source-line-no">381</span><span id="line-381"> StoreToFile getStorePath = new StoreToFile(fs);</span> |
| <span class="source-line-no">382</span><span id="line-382"> Collection<File> storeFiles =</span> |
| <span class="source-line-no">383</span><span id="line-383"> compactedFiles.stream().map(getStorePath).collect(Collectors.toList());</span> |
| <span class="source-line-no">384</span><span id="line-384"></span> |
| <span class="source-line-no">385</span><span id="line-385"> // do the actual archive</span> |
| <span class="source-line-no">386</span><span id="line-386"> List<File> failedArchive = resolveAndArchive(conf, fs, storeArchiveDir, storeFiles,</span> |
| <span class="source-line-no">387</span><span id="line-387"> EnvironmentEdgeManager.currentTime());</span> |
| <span class="source-line-no">388</span><span id="line-388"></span> |
| <span class="source-line-no">389</span><span id="line-389"> if (!failedArchive.isEmpty()) {</span> |
| <span class="source-line-no">390</span><span id="line-390"> throw new FailedArchiveException(</span> |
| <span class="source-line-no">391</span><span id="line-391"> "Failed to archive/delete all the files for region:"</span> |
| <span class="source-line-no">392</span><span id="line-392"> + Bytes.toString(regionInfo.getRegionName()) + ", family:" + Bytes.toString(family)</span> |
| <span class="source-line-no">393</span><span id="line-393"> + " into " + storeArchiveDir + ". Something is probably awry on the filesystem.",</span> |
| <span class="source-line-no">394</span><span id="line-394"> failedArchive.stream().map(FUNC_FILE_TO_PATH).collect(Collectors.toList()));</span> |
| <span class="source-line-no">395</span><span id="line-395"> }</span> |
| <span class="source-line-no">396</span><span id="line-396"> }</span> |
| <span class="source-line-no">397</span><span id="line-397"></span> |
| <span class="source-line-no">398</span><span id="line-398"> /**</span> |
| <span class="source-line-no">399</span><span id="line-399"> * Archive the store file</span> |
| <span class="source-line-no">400</span><span id="line-400"> * @param fs the filesystem where the store files live</span> |
| <span class="source-line-no">401</span><span id="line-401"> * @param regionInfo region hosting the store files</span> |
| <span class="source-line-no">402</span><span id="line-402"> * @param conf {@link Configuration} to examine to determine the archive directory</span> |
| <span class="source-line-no">403</span><span id="line-403"> * @param tableDir {@link Path} to where the table is being stored (for building the archive</span> |
| <span class="source-line-no">404</span><span id="line-404"> * path)</span> |
| <span class="source-line-no">405</span><span id="line-405"> * @param family the family hosting the store files</span> |
| <span class="source-line-no">406</span><span id="line-406"> * @param storeFile file to be archived</span> |
| <span class="source-line-no">407</span><span id="line-407"> * @throws IOException if the files could not be correctly disposed.</span> |
| <span class="source-line-no">408</span><span id="line-408"> */</span> |
| <span class="source-line-no">409</span><span id="line-409"> public static void archiveStoreFile(Configuration conf, FileSystem fs, RegionInfo regionInfo,</span> |
| <span class="source-line-no">410</span><span id="line-410"> Path tableDir, byte[] family, Path storeFile) throws IOException {</span> |
| <span class="source-line-no">411</span><span id="line-411"> Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family);</span> |
| <span class="source-line-no">412</span><span id="line-412"> // make sure we don't archive if we can't and that the archive dir exists</span> |
| <span class="source-line-no">413</span><span id="line-413"> if (!fs.mkdirs(storeArchiveDir)) {</span> |
| <span class="source-line-no">414</span><span id="line-414"> throw new IOException("Could not make archive directory (" + storeArchiveDir + ") for store:"</span> |
| <span class="source-line-no">415</span><span id="line-415"> + Bytes.toString(family) + ", deleting compacted files instead.");</span> |
| <span class="source-line-no">416</span><span id="line-416"> }</span> |
| <span class="source-line-no">417</span><span id="line-417"></span> |
| <span class="source-line-no">418</span><span id="line-418"> // do the actual archive</span> |
| <span class="source-line-no">419</span><span id="line-419"> long start = EnvironmentEdgeManager.currentTime();</span> |
| <span class="source-line-no">420</span><span id="line-420"> File file = new FileablePath(fs, storeFile);</span> |
| <span class="source-line-no">421</span><span id="line-421"> if (!resolveAndArchiveFile(storeArchiveDir, file, Long.toString(start))) {</span> |
| <span class="source-line-no">422</span><span id="line-422"> throw new IOException("Failed to archive/delete the file for region:"</span> |
| <span class="source-line-no">423</span><span id="line-423"> + regionInfo.getRegionNameAsString() + ", family:" + Bytes.toString(family) + " into "</span> |
| <span class="source-line-no">424</span><span id="line-424"> + storeArchiveDir + ". Something is probably awry on the filesystem.");</span> |
| <span class="source-line-no">425</span><span id="line-425"> }</span> |
| <span class="source-line-no">426</span><span id="line-426"> }</span> |
| <span class="source-line-no">427</span><span id="line-427"></span> |
| <span class="source-line-no">428</span><span id="line-428"> /**</span> |
| <span class="source-line-no">429</span><span id="line-429"> * Resolve any conflict with an existing archive file via timestamp-append renaming of the</span> |
| <span class="source-line-no">430</span><span id="line-430"> * existing file and then archive the passed in files.</span> |
| <span class="source-line-no">431</span><span id="line-431"> * @param fs {@link FileSystem} on which to archive the files</span> |
| <span class="source-line-no">432</span><span id="line-432"> * @param baseArchiveDir base archive directory to store the files. If any of the files to archive</span> |
| <span class="source-line-no">433</span><span id="line-433"> * are directories, will append the name of the directory to the base</span> |
| <span class="source-line-no">434</span><span id="line-434"> * archive directory name, creating a parallel structure.</span> |
| <span class="source-line-no">435</span><span id="line-435"> * @param toArchive files/directories that need to be archvied</span> |
| <span class="source-line-no">436</span><span id="line-436"> * @param start time the archiving started - used for resolving archive conflicts.</span> |
| <span class="source-line-no">437</span><span id="line-437"> * @return the list of failed to archive files.</span> |
| <span class="source-line-no">438</span><span id="line-438"> * @throws IOException if an unexpected file operation exception occurred</span> |
| <span class="source-line-no">439</span><span id="line-439"> */</span> |
| <span class="source-line-no">440</span><span id="line-440"> private static List<File> resolveAndArchive(Configuration conf, FileSystem fs,</span> |
| <span class="source-line-no">441</span><span id="line-441"> Path baseArchiveDir, Collection<File> toArchive, long start) throws IOException {</span> |
| <span class="source-line-no">442</span><span id="line-442"> // Early exit if no files to archive</span> |
| <span class="source-line-no">443</span><span id="line-443"> if (toArchive.isEmpty()) {</span> |
| <span class="source-line-no">444</span><span id="line-444"> LOG.trace("No files to archive, returning an empty list.");</span> |
| <span class="source-line-no">445</span><span id="line-445"> return Collections.emptyList();</span> |
| <span class="source-line-no">446</span><span id="line-446"> }</span> |
| <span class="source-line-no">447</span><span id="line-447"></span> |
| <span class="source-line-no">448</span><span id="line-448"> LOG.trace("Preparing to archive files into directory: {}", baseArchiveDir);</span> |
| <span class="source-line-no">449</span><span id="line-449"></span> |
| <span class="source-line-no">450</span><span id="line-450"> // Ensure the archive directory exists</span> |
| <span class="source-line-no">451</span><span id="line-451"> ensureArchiveDirectoryExists(fs, baseArchiveDir);</span> |
| <span class="source-line-no">452</span><span id="line-452"></span> |
| <span class="source-line-no">453</span><span id="line-453"> // Thread-safe collection for storing failures</span> |
| <span class="source-line-no">454</span><span id="line-454"> Queue<File> failures = new ConcurrentLinkedQueue<>();</span> |
| <span class="source-line-no">455</span><span id="line-455"> String startTime = Long.toString(start);</span> |
| <span class="source-line-no">456</span><span id="line-456"></span> |
| <span class="source-line-no">457</span><span id="line-457"> // Separate files and directories for processing</span> |
| <span class="source-line-no">458</span><span id="line-458"> List<File> filesOnly = new ArrayList<>();</span> |
| <span class="source-line-no">459</span><span id="line-459"> for (File file : toArchive) {</span> |
| <span class="source-line-no">460</span><span id="line-460"> if (file.isFile()) {</span> |
| <span class="source-line-no">461</span><span id="line-461"> filesOnly.add(file);</span> |
| <span class="source-line-no">462</span><span id="line-462"> } else {</span> |
| <span class="source-line-no">463</span><span id="line-463"> handleDirectory(conf, fs, baseArchiveDir, failures, file, start);</span> |
| <span class="source-line-no">464</span><span id="line-464"> }</span> |
| <span class="source-line-no">465</span><span id="line-465"> }</span> |
| <span class="source-line-no">466</span><span id="line-466"></span> |
| <span class="source-line-no">467</span><span id="line-467"> // Archive files concurrently</span> |
| <span class="source-line-no">468</span><span id="line-468"> archiveFilesConcurrently(conf, baseArchiveDir, filesOnly, failures, startTime);</span> |
| <span class="source-line-no">469</span><span id="line-469"></span> |
| <span class="source-line-no">470</span><span id="line-470"> return new ArrayList<>(failures); // Convert to a List for the return value</span> |
| <span class="source-line-no">471</span><span id="line-471"> }</span> |
| <span class="source-line-no">472</span><span id="line-472"></span> |
| <span class="source-line-no">473</span><span id="line-473"> private static void ensureArchiveDirectoryExists(FileSystem fs, Path baseArchiveDir)</span> |
| <span class="source-line-no">474</span><span id="line-474"> throws IOException {</span> |
| <span class="source-line-no">475</span><span id="line-475"> if (!fs.exists(baseArchiveDir) && !fs.mkdirs(baseArchiveDir)) {</span> |
| <span class="source-line-no">476</span><span id="line-476"> throw new IOException("Failed to create the archive directory: " + baseArchiveDir);</span> |
| <span class="source-line-no">477</span><span id="line-477"> }</span> |
| <span class="source-line-no">478</span><span id="line-478"> LOG.trace("Archive directory ready: {}", baseArchiveDir);</span> |
| <span class="source-line-no">479</span><span id="line-479"> }</span> |
| <span class="source-line-no">480</span><span id="line-480"></span> |
| <span class="source-line-no">481</span><span id="line-481"> private static void handleDirectory(Configuration conf, FileSystem fs, Path baseArchiveDir,</span> |
| <span class="source-line-no">482</span><span id="line-482"> Queue<File> failures, File directory, long start) {</span> |
| <span class="source-line-no">483</span><span id="line-483"> LOG.trace("Processing directory: {}, archiving its children.", directory);</span> |
| <span class="source-line-no">484</span><span id="line-484"> Path subArchiveDir = new Path(baseArchiveDir, directory.getName());</span> |
| <span class="source-line-no">485</span><span id="line-485"></span> |
| <span class="source-line-no">486</span><span id="line-486"> try {</span> |
| <span class="source-line-no">487</span><span id="line-487"> Collection<File> children = directory.getChildren();</span> |
| <span class="source-line-no">488</span><span id="line-488"> failures.addAll(resolveAndArchive(conf, fs, subArchiveDir, children, start));</span> |
| <span class="source-line-no">489</span><span id="line-489"> } catch (IOException e) {</span> |
| <span class="source-line-no">490</span><span id="line-490"> LOG.warn("Failed to archive directory: {}", directory, e);</span> |
| <span class="source-line-no">491</span><span id="line-491"> failures.add(directory);</span> |
| <span class="source-line-no">492</span><span id="line-492"> }</span> |
| <span class="source-line-no">493</span><span id="line-493"> }</span> |
| <span class="source-line-no">494</span><span id="line-494"></span> |
| <span class="source-line-no">495</span><span id="line-495"> private static void archiveFilesConcurrently(Configuration conf, Path baseArchiveDir,</span> |
| <span class="source-line-no">496</span><span id="line-496"> List<File> files, Queue<File> failures, String startTime) {</span> |
| <span class="source-line-no">497</span><span id="line-497"> LOG.trace("Archiving {} files concurrently into directory: {}", files.size(), baseArchiveDir);</span> |
| <span class="source-line-no">498</span><span id="line-498"> Map<File, Future<Boolean>> futureMap = new HashMap<>();</span> |
| <span class="source-line-no">499</span><span id="line-499"> // Submit file archiving tasks</span> |
| <span class="source-line-no">500</span><span id="line-500"> // default is 16 which comes equal hbase.hstore.blockingStoreFiles default value</span> |
| <span class="source-line-no">501</span><span id="line-501"> int maxThreads = conf.getInt("hbase.hfilearchiver.per.region.thread.pool.max", 16);</span> |
| <span class="source-line-no">502</span><span id="line-502"> ThreadPoolExecutor hfilesArchiveExecutor = Threads.getBoundedCachedThreadPool(maxThreads, 30L,</span> |
| <span class="source-line-no">503</span><span id="line-503"> TimeUnit.SECONDS, getThreadFactory("HFileArchiverPerRegion-"));</span> |
| <span class="source-line-no">504</span><span id="line-504"> try {</span> |
| <span class="source-line-no">505</span><span id="line-505"> for (File file : files) {</span> |
| <span class="source-line-no">506</span><span id="line-506"> Future<Boolean> future = hfilesArchiveExecutor</span> |
| <span class="source-line-no">507</span><span id="line-507"> .submit(() -> resolveAndArchiveFile(baseArchiveDir, file, startTime));</span> |
| <span class="source-line-no">508</span><span id="line-508"> futureMap.put(file, future);</span> |
| <span class="source-line-no">509</span><span id="line-509"> }</span> |
| <span class="source-line-no">510</span><span id="line-510"></span> |
| <span class="source-line-no">511</span><span id="line-511"> // Process results of each task</span> |
| <span class="source-line-no">512</span><span id="line-512"> for (Map.Entry<File, Future<Boolean>> entry : futureMap.entrySet()) {</span> |
| <span class="source-line-no">513</span><span id="line-513"> File file = entry.getKey();</span> |
| <span class="source-line-no">514</span><span id="line-514"> try {</span> |
| <span class="source-line-no">515</span><span id="line-515"> if (!entry.getValue().get()) {</span> |
| <span class="source-line-no">516</span><span id="line-516"> LOG.warn("Failed to archive file: {} into directory: {}", file, baseArchiveDir);</span> |
| <span class="source-line-no">517</span><span id="line-517"> failures.add(file);</span> |
| <span class="source-line-no">518</span><span id="line-518"> }</span> |
| <span class="source-line-no">519</span><span id="line-519"> } catch (InterruptedException e) {</span> |
| <span class="source-line-no">520</span><span id="line-520"> LOG.error("Archiving interrupted for file: {}", file, e);</span> |
| <span class="source-line-no">521</span><span id="line-521"> Thread.currentThread().interrupt(); // Restore interrupt status</span> |
| <span class="source-line-no">522</span><span id="line-522"> failures.add(file);</span> |
| <span class="source-line-no">523</span><span id="line-523"> } catch (ExecutionException e) {</span> |
| <span class="source-line-no">524</span><span id="line-524"> LOG.error("Archiving failed for file: {}", file, e);</span> |
| <span class="source-line-no">525</span><span id="line-525"> failures.add(file);</span> |
| <span class="source-line-no">526</span><span id="line-526"> }</span> |
| <span class="source-line-no">527</span><span id="line-527"> }</span> |
| <span class="source-line-no">528</span><span id="line-528"> } finally {</span> |
| <span class="source-line-no">529</span><span id="line-529"> hfilesArchiveExecutor.shutdown();</span> |
| <span class="source-line-no">530</span><span id="line-530"> }</span> |
| <span class="source-line-no">531</span><span id="line-531"> }</span> |
| <span class="source-line-no">532</span><span id="line-532"></span> |
| <span class="source-line-no">533</span><span id="line-533"> /**</span> |
| <span class="source-line-no">534</span><span id="line-534"> * Attempt to archive the passed in file to the archive directory.</span> |
| <span class="source-line-no">535</span><span id="line-535"> * <p></span> |
| <span class="source-line-no">536</span><span id="line-536"> * If the same file already exists in the archive, it is moved to a timestamped directory under</span> |
| <span class="source-line-no">537</span><span id="line-537"> * the archive directory and the new file is put in its place.</span> |
| <span class="source-line-no">538</span><span id="line-538"> * @param archiveDir {@link Path} to the directory that stores the archives of the hfiles</span> |
| <span class="source-line-no">539</span><span id="line-539"> * @param currentFile {@link Path} to the original HFile that will be archived</span> |
| <span class="source-line-no">540</span><span id="line-540"> * @param archiveStartTime time the archiving started, to resolve naming conflicts</span> |
| <span class="source-line-no">541</span><span id="line-541"> * @return <tt>true</tt> if the file is successfully archived. <tt>false</tt> if there was a</span> |
| <span class="source-line-no">542</span><span id="line-542"> * problem, but the operation still completed.</span> |
| <span class="source-line-no">543</span><span id="line-543"> * @throws IOException on failure to complete {@link FileSystem} operations.</span> |
| <span class="source-line-no">544</span><span id="line-544"> */</span> |
| <span class="source-line-no">545</span><span id="line-545"> private static boolean resolveAndArchiveFile(Path archiveDir, File currentFile,</span> |
| <span class="source-line-no">546</span><span id="line-546"> String archiveStartTime) throws IOException {</span> |
| <span class="source-line-no">547</span><span id="line-547"> // build path as it should be in the archive</span> |
| <span class="source-line-no">548</span><span id="line-548"> String filename = currentFile.getName();</span> |
| <span class="source-line-no">549</span><span id="line-549"> Path archiveFile = new Path(archiveDir, filename);</span> |
| <span class="source-line-no">550</span><span id="line-550"> FileSystem fs = currentFile.getFileSystem();</span> |
| <span class="source-line-no">551</span><span id="line-551"></span> |
| <span class="source-line-no">552</span><span id="line-552"> // An existing destination file in the archive is unexpected, but we handle it here.</span> |
| <span class="source-line-no">553</span><span id="line-553"> if (fs.exists(archiveFile)) {</span> |
| <span class="source-line-no">554</span><span id="line-554"> if (!fs.exists(currentFile.getPath())) {</span> |
| <span class="source-line-no">555</span><span id="line-555"> // If the file already exists in the archive, and there is no current file to archive, then</span> |
| <span class="source-line-no">556</span><span id="line-556"> // assume that the file in archive is correct. This is an unexpected situation, suggesting a</span> |
| <span class="source-line-no">557</span><span id="line-557"> // race condition or split brain.</span> |
| <span class="source-line-no">558</span><span id="line-558"> // In HBASE-26718 this was found when compaction incorrectly happened during warmupRegion.</span> |
| <span class="source-line-no">559</span><span id="line-559"> LOG.warn("{} exists in archive. Attempted to archive nonexistent file {}.", archiveFile,</span> |
| <span class="source-line-no">560</span><span id="line-560"> currentFile);</span> |
| <span class="source-line-no">561</span><span id="line-561"> // We return success to match existing behavior in this method, where FileNotFoundException</span> |
| <span class="source-line-no">562</span><span id="line-562"> // in moveAndClose is ignored.</span> |
| <span class="source-line-no">563</span><span id="line-563"> return true;</span> |
| <span class="source-line-no">564</span><span id="line-564"> }</span> |
| <span class="source-line-no">565</span><span id="line-565"> // There is a conflict between the current file and the already existing archived file.</span> |
| <span class="source-line-no">566</span><span id="line-566"> // Move the archived file to a timestamped backup. This is a really, really unlikely</span> |
| <span class="source-line-no">567</span><span id="line-567"> // situation, where we get the same name for the existing file, but is included just for that</span> |
| <span class="source-line-no">568</span><span id="line-568"> // 1 in trillion chance. We are potentially incurring data loss in the archive directory if</span> |
| <span class="source-line-no">569</span><span id="line-569"> // the files are not identical. The timestamped backup will be cleaned by HFileCleaner as it</span> |
| <span class="source-line-no">570</span><span id="line-570"> // has no references.</span> |
| <span class="source-line-no">571</span><span id="line-571"> FileStatus curStatus = fs.getFileStatus(currentFile.getPath());</span> |
| <span class="source-line-no">572</span><span id="line-572"> FileStatus archiveStatus = fs.getFileStatus(archiveFile);</span> |
| <span class="source-line-no">573</span><span id="line-573"> long curLen = curStatus.getLen();</span> |
| <span class="source-line-no">574</span><span id="line-574"> long archiveLen = archiveStatus.getLen();</span> |
| <span class="source-line-no">575</span><span id="line-575"> long curMtime = curStatus.getModificationTime();</span> |
| <span class="source-line-no">576</span><span id="line-576"> long archiveMtime = archiveStatus.getModificationTime();</span> |
| <span class="source-line-no">577</span><span id="line-577"> if (curLen != archiveLen) {</span> |
| <span class="source-line-no">578</span><span id="line-578"> LOG.error(</span> |
| <span class="source-line-no">579</span><span id="line-579"> "{} already exists in archive with different size than current {}."</span> |
| <span class="source-line-no">580</span><span id="line-580"> + " archiveLen: {} currentLen: {} archiveMtime: {} currentMtime: {}",</span> |
| <span class="source-line-no">581</span><span id="line-581"> archiveFile, currentFile, archiveLen, curLen, archiveMtime, curMtime);</span> |
| <span class="source-line-no">582</span><span id="line-582"> throw new IOException(</span> |
| <span class="source-line-no">583</span><span id="line-583"> archiveFile + " already exists in archive with different size" + " than " + currentFile);</span> |
| <span class="source-line-no">584</span><span id="line-584"> }</span> |
| <span class="source-line-no">585</span><span id="line-585"></span> |
| <span class="source-line-no">586</span><span id="line-586"> LOG.error(</span> |
| <span class="source-line-no">587</span><span id="line-587"> "{} already exists in archive, moving to timestamped backup and overwriting"</span> |
| <span class="source-line-no">588</span><span id="line-588"> + " current {}. archiveLen: {} currentLen: {} archiveMtime: {} currentMtime: {}",</span> |
| <span class="source-line-no">589</span><span id="line-589"> archiveFile, currentFile, archiveLen, curLen, archiveMtime, curMtime);</span> |
| <span class="source-line-no">590</span><span id="line-590"></span> |
| <span class="source-line-no">591</span><span id="line-591"> // move the archive file to the stamped backup</span> |
| <span class="source-line-no">592</span><span id="line-592"> Path backedupArchiveFile = new Path(archiveDir, filename + SEPARATOR + archiveStartTime);</span> |
| <span class="source-line-no">593</span><span id="line-593"> if (!fs.rename(archiveFile, backedupArchiveFile)) {</span> |
| <span class="source-line-no">594</span><span id="line-594"> LOG.error("Could not rename archive file to backup: " + backedupArchiveFile</span> |
| <span class="source-line-no">595</span><span id="line-595"> + ", deleting existing file in favor of newer.");</span> |
| <span class="source-line-no">596</span><span id="line-596"> // try to delete the existing file, if we can't rename it</span> |
| <span class="source-line-no">597</span><span id="line-597"> if (!fs.delete(archiveFile, false)) {</span> |
| <span class="source-line-no">598</span><span id="line-598"> throw new IOException("Couldn't delete existing archive file (" + archiveFile</span> |
| <span class="source-line-no">599</span><span id="line-599"> + ") or rename it to the backup file (" + backedupArchiveFile</span> |
| <span class="source-line-no">600</span><span id="line-600"> + ") to make room for similarly named file.");</span> |
| <span class="source-line-no">601</span><span id="line-601"> }</span> |
| <span class="source-line-no">602</span><span id="line-602"> } else {</span> |
| <span class="source-line-no">603</span><span id="line-603"> LOG.info("Backed up archive file from {} to {}.", archiveFile, backedupArchiveFile);</span> |
| <span class="source-line-no">604</span><span id="line-604"> }</span> |
| <span class="source-line-no">605</span><span id="line-605"> }</span> |
| <span class="source-line-no">606</span><span id="line-606"></span> |
| <span class="source-line-no">607</span><span id="line-607"> LOG.trace("No existing file in archive for {}, free to archive original file.", archiveFile);</span> |
| <span class="source-line-no">608</span><span id="line-608"></span> |
| <span class="source-line-no">609</span><span id="line-609"> // at this point, we should have a free spot for the archive file</span> |
| <span class="source-line-no">610</span><span id="line-610"> boolean success = false;</span> |
| <span class="source-line-no">611</span><span id="line-611"> for (int i = 0; !success && i < DEFAULT_RETRIES_NUMBER; ++i) {</span> |
| <span class="source-line-no">612</span><span id="line-612"> if (i > 0) {</span> |
| <span class="source-line-no">613</span><span id="line-613"> // Ensure that the archive directory exists.</span> |
| <span class="source-line-no">614</span><span id="line-614"> // The previous "move to archive" operation has failed probably because</span> |
| <span class="source-line-no">615</span><span id="line-615"> // the cleaner has removed our archive directory (HBASE-7643).</span> |
| <span class="source-line-no">616</span><span id="line-616"> // (we're in a retry loop, so don't worry too much about the exception)</span> |
| <span class="source-line-no">617</span><span id="line-617"> try {</span> |
| <span class="source-line-no">618</span><span id="line-618"> if (!fs.exists(archiveDir)) {</span> |
| <span class="source-line-no">619</span><span id="line-619"> if (fs.mkdirs(archiveDir)) {</span> |
| <span class="source-line-no">620</span><span id="line-620"> LOG.debug("Created archive directory {}", archiveDir);</span> |
| <span class="source-line-no">621</span><span id="line-621"> }</span> |
| <span class="source-line-no">622</span><span id="line-622"> }</span> |
| <span class="source-line-no">623</span><span id="line-623"> } catch (IOException e) {</span> |
| <span class="source-line-no">624</span><span id="line-624"> LOG.warn("Failed to create directory {}", archiveDir, e);</span> |
| <span class="source-line-no">625</span><span id="line-625"> }</span> |
| <span class="source-line-no">626</span><span id="line-626"> }</span> |
| <span class="source-line-no">627</span><span id="line-627"></span> |
| <span class="source-line-no">628</span><span id="line-628"> try {</span> |
| <span class="source-line-no">629</span><span id="line-629"> success = currentFile.moveAndClose(archiveFile);</span> |
| <span class="source-line-no">630</span><span id="line-630"> } catch (FileNotFoundException fnfe) {</span> |
| <span class="source-line-no">631</span><span id="line-631"> LOG.warn("Failed to archive " + currentFile</span> |
| <span class="source-line-no">632</span><span id="line-632"> + " because it does not exist! Skipping and continuing on.", fnfe);</span> |
| <span class="source-line-no">633</span><span id="line-633"> success = true;</span> |
| <span class="source-line-no">634</span><span id="line-634"> } catch (IOException e) {</span> |
| <span class="source-line-no">635</span><span id="line-635"> success = false;</span> |
| <span class="source-line-no">636</span><span id="line-636"> // When HFiles are placed on a filesystem other than HDFS a rename operation can be a</span> |
| <span class="source-line-no">637</span><span id="line-637"> // non-atomic file copy operation. It can take a long time to copy a large hfile and if</span> |
| <span class="source-line-no">638</span><span id="line-638"> // interrupted there may be a partially copied file present at the destination. We must</span> |
| <span class="source-line-no">639</span><span id="line-639"> // remove the partially copied file, if any, or otherwise the archive operation will fail</span> |
| <span class="source-line-no">640</span><span id="line-640"> // indefinitely from this point.</span> |
| <span class="source-line-no">641</span><span id="line-641"> LOG.warn("Failed to archive " + currentFile + " on try #" + i, e);</span> |
| <span class="source-line-no">642</span><span id="line-642"> try {</span> |
| <span class="source-line-no">643</span><span id="line-643"> fs.delete(archiveFile, false);</span> |
| <span class="source-line-no">644</span><span id="line-644"> } catch (FileNotFoundException fnfe) {</span> |
| <span class="source-line-no">645</span><span id="line-645"> // This case is fine.</span> |
| <span class="source-line-no">646</span><span id="line-646"> } catch (IOException ee) {</span> |
| <span class="source-line-no">647</span><span id="line-647"> // Complain about other IO exceptions</span> |
| <span class="source-line-no">648</span><span id="line-648"> LOG.warn("Failed to clean up from failure to archive " + currentFile + " on try #" + i,</span> |
| <span class="source-line-no">649</span><span id="line-649"> ee);</span> |
| <span class="source-line-no">650</span><span id="line-650"> }</span> |
| <span class="source-line-no">651</span><span id="line-651"> }</span> |
| <span class="source-line-no">652</span><span id="line-652"> }</span> |
| <span class="source-line-no">653</span><span id="line-653"></span> |
| <span class="source-line-no">654</span><span id="line-654"> if (!success) {</span> |
| <span class="source-line-no">655</span><span id="line-655"> LOG.error("Failed to archive " + currentFile);</span> |
| <span class="source-line-no">656</span><span id="line-656"> return false;</span> |
| <span class="source-line-no">657</span><span id="line-657"> }</span> |
| <span class="source-line-no">658</span><span id="line-658"></span> |
| <span class="source-line-no">659</span><span id="line-659"> LOG.debug("Archived from {} to {}", currentFile, archiveFile);</span> |
| <span class="source-line-no">660</span><span id="line-660"> return true;</span> |
| <span class="source-line-no">661</span><span id="line-661"> }</span> |
| <span class="source-line-no">662</span><span id="line-662"></span> |
| <span class="source-line-no">663</span><span id="line-663"> /**</span> |
| <span class="source-line-no">664</span><span id="line-664"> * Without regard for backup, delete a region. Should be used with caution.</span> |
| <span class="source-line-no">665</span><span id="line-665"> * @param regionDir {@link Path} to the region to be deleted.</span> |
| <span class="source-line-no">666</span><span id="line-666"> * @param fs FileSystem from which to delete the region</span> |
| <span class="source-line-no">667</span><span id="line-667"> * @return <tt>true</tt> on successful deletion, <tt>false</tt> otherwise</span> |
| <span class="source-line-no">668</span><span id="line-668"> * @throws IOException on filesystem operation failure</span> |
| <span class="source-line-no">669</span><span id="line-669"> */</span> |
| <span class="source-line-no">670</span><span id="line-670"> private static boolean deleteRegionWithoutArchiving(FileSystem fs, Path regionDir)</span> |
| <span class="source-line-no">671</span><span id="line-671"> throws IOException {</span> |
| <span class="source-line-no">672</span><span id="line-672"> if (fs.delete(regionDir, true)) {</span> |
| <span class="source-line-no">673</span><span id="line-673"> LOG.debug("Deleted {}", regionDir);</span> |
| <span class="source-line-no">674</span><span id="line-674"> return true;</span> |
| <span class="source-line-no">675</span><span id="line-675"> }</span> |
| <span class="source-line-no">676</span><span id="line-676"> LOG.debug("Failed to delete directory {}", regionDir);</span> |
| <span class="source-line-no">677</span><span id="line-677"> return false;</span> |
| <span class="source-line-no">678</span><span id="line-678"> }</span> |
| <span class="source-line-no">679</span><span id="line-679"></span> |
| <span class="source-line-no">680</span><span id="line-680"> /**</span> |
| <span class="source-line-no">681</span><span id="line-681"> * Just do a simple delete of the given store files</span> |
| <span class="source-line-no">682</span><span id="line-682"> * <p></span> |
| <span class="source-line-no">683</span><span id="line-683"> * A best effort is made to delete each of the files, rather than bailing on the first failure.</span> |
| <span class="source-line-no">684</span><span id="line-684"> * <p></span> |
| <span class="source-line-no">685</span><span id="line-685"> * @param compactedFiles store files to delete from the file system.</span> |
| <span class="source-line-no">686</span><span id="line-686"> * @throws IOException if a file cannot be deleted. All files will be attempted to deleted before</span> |
| <span class="source-line-no">687</span><span id="line-687"> * throwing the exception, rather than failing at the first file.</span> |
| <span class="source-line-no">688</span><span id="line-688"> */</span> |
| <span class="source-line-no">689</span><span id="line-689"> private static void deleteStoreFilesWithoutArchiving(Collection<HStoreFile> compactedFiles)</span> |
| <span class="source-line-no">690</span><span id="line-690"> throws IOException {</span> |
| <span class="source-line-no">691</span><span id="line-691"> LOG.debug("Deleting files without archiving.");</span> |
| <span class="source-line-no">692</span><span id="line-692"> List<IOException> errors = new ArrayList<>(0);</span> |
| <span class="source-line-no">693</span><span id="line-693"> for (HStoreFile hsf : compactedFiles) {</span> |
| <span class="source-line-no">694</span><span id="line-694"> try {</span> |
| <span class="source-line-no">695</span><span id="line-695"> hsf.deleteStoreFile();</span> |
| <span class="source-line-no">696</span><span id="line-696"> } catch (IOException e) {</span> |
| <span class="source-line-no">697</span><span id="line-697"> LOG.error("Failed to delete {}", hsf.getPath());</span> |
| <span class="source-line-no">698</span><span id="line-698"> errors.add(e);</span> |
| <span class="source-line-no">699</span><span id="line-699"> }</span> |
| <span class="source-line-no">700</span><span id="line-700"> }</span> |
| <span class="source-line-no">701</span><span id="line-701"> if (errors.size() > 0) {</span> |
| <span class="source-line-no">702</span><span id="line-702"> throw MultipleIOException.createIOException(errors);</span> |
| <span class="source-line-no">703</span><span id="line-703"> }</span> |
| <span class="source-line-no">704</span><span id="line-704"> }</span> |
| <span class="source-line-no">705</span><span id="line-705"></span> |
| <span class="source-line-no">706</span><span id="line-706"> /**</span> |
| <span class="source-line-no">707</span><span id="line-707"> * Adapt a type to match the {@link File} interface, which is used internally for handling</span> |
| <span class="source-line-no">708</span><span id="line-708"> * archival/removal of files</span> |
| <span class="source-line-no">709</span><span id="line-709"> * @param <T> type to adapt to the {@link File} interface</span> |
| <span class="source-line-no">710</span><span id="line-710"> */</span> |
| <span class="source-line-no">711</span><span id="line-711"> private static abstract class FileConverter<T> implements Function<T, File> {</span> |
| <span class="source-line-no">712</span><span id="line-712"> protected final FileSystem fs;</span> |
| <span class="source-line-no">713</span><span id="line-713"></span> |
| <span class="source-line-no">714</span><span id="line-714"> public FileConverter(FileSystem fs) {</span> |
| <span class="source-line-no">715</span><span id="line-715"> this.fs = fs;</span> |
| <span class="source-line-no">716</span><span id="line-716"> }</span> |
| <span class="source-line-no">717</span><span id="line-717"> }</span> |
| <span class="source-line-no">718</span><span id="line-718"></span> |
| <span class="source-line-no">719</span><span id="line-719"> /**</span> |
| <span class="source-line-no">720</span><span id="line-720"> * Convert a FileStatus to something we can manage in the archiving</span> |
| <span class="source-line-no">721</span><span id="line-721"> */</span> |
| <span class="source-line-no">722</span><span id="line-722"> private static class FileStatusConverter extends FileConverter<FileStatus> {</span> |
| <span class="source-line-no">723</span><span id="line-723"> public FileStatusConverter(FileSystem fs) {</span> |
| <span class="source-line-no">724</span><span id="line-724"> super(fs);</span> |
| <span class="source-line-no">725</span><span id="line-725"> }</span> |
| <span class="source-line-no">726</span><span id="line-726"></span> |
| <span class="source-line-no">727</span><span id="line-727"> @Override</span> |
| <span class="source-line-no">728</span><span id="line-728"> public File apply(FileStatus input) {</span> |
| <span class="source-line-no">729</span><span id="line-729"> return new FileablePath(fs, input.getPath());</span> |
| <span class="source-line-no">730</span><span id="line-730"> }</span> |
| <span class="source-line-no">731</span><span id="line-731"> }</span> |
| <span class="source-line-no">732</span><span id="line-732"></span> |
| <span class="source-line-no">733</span><span id="line-733"> /**</span> |
| <span class="source-line-no">734</span><span id="line-734"> * Convert the {@link HStoreFile} into something we can manage in the archive methods</span> |
| <span class="source-line-no">735</span><span id="line-735"> */</span> |
| <span class="source-line-no">736</span><span id="line-736"> private static class StoreToFile extends FileConverter<HStoreFile> {</span> |
| <span class="source-line-no">737</span><span id="line-737"> public StoreToFile(FileSystem fs) {</span> |
| <span class="source-line-no">738</span><span id="line-738"> super(fs);</span> |
| <span class="source-line-no">739</span><span id="line-739"> }</span> |
| <span class="source-line-no">740</span><span id="line-740"></span> |
| <span class="source-line-no">741</span><span id="line-741"> @Override</span> |
| <span class="source-line-no">742</span><span id="line-742"> public File apply(HStoreFile input) {</span> |
| <span class="source-line-no">743</span><span id="line-743"> return new FileableStoreFile(fs, input);</span> |
| <span class="source-line-no">744</span><span id="line-744"> }</span> |
| <span class="source-line-no">745</span><span id="line-745"> }</span> |
| <span class="source-line-no">746</span><span id="line-746"></span> |
| <span class="source-line-no">747</span><span id="line-747"> /**</span> |
| <span class="source-line-no">748</span><span id="line-748"> * Wrapper to handle file operations uniformly</span> |
| <span class="source-line-no">749</span><span id="line-749"> */</span> |
| <span class="source-line-no">750</span><span id="line-750"> private static abstract class File {</span> |
| <span class="source-line-no">751</span><span id="line-751"> protected final FileSystem fs;</span> |
| <span class="source-line-no">752</span><span id="line-752"></span> |
| <span class="source-line-no">753</span><span id="line-753"> public File(FileSystem fs) {</span> |
| <span class="source-line-no">754</span><span id="line-754"> this.fs = fs;</span> |
| <span class="source-line-no">755</span><span id="line-755"> }</span> |
| <span class="source-line-no">756</span><span id="line-756"></span> |
| <span class="source-line-no">757</span><span id="line-757"> /**</span> |
| <span class="source-line-no">758</span><span id="line-758"> * Delete the file</span> |
| <span class="source-line-no">759</span><span id="line-759"> * @throws IOException on failure</span> |
| <span class="source-line-no">760</span><span id="line-760"> */</span> |
| <span class="source-line-no">761</span><span id="line-761"> abstract void delete() throws IOException;</span> |
| <span class="source-line-no">762</span><span id="line-762"></span> |
| <span class="source-line-no">763</span><span id="line-763"> /**</span> |
| <span class="source-line-no">764</span><span id="line-764"> * Check to see if this is a file or a directory</span> |
| <span class="source-line-no">765</span><span id="line-765"> * @return <tt>true</tt> if it is a file, <tt>false</tt> otherwise</span> |
| <span class="source-line-no">766</span><span id="line-766"> * @throws IOException on {@link FileSystem} connection error</span> |
| <span class="source-line-no">767</span><span id="line-767"> */</span> |
| <span class="source-line-no">768</span><span id="line-768"> abstract boolean isFile() throws IOException;</span> |
| <span class="source-line-no">769</span><span id="line-769"></span> |
| <span class="source-line-no">770</span><span id="line-770"> /**</span> |
| <span class="source-line-no">771</span><span id="line-771"> * @return if this is a directory, returns all the children in the directory, otherwise returns</span> |
| <span class="source-line-no">772</span><span id="line-772"> * an empty list</span> |
| <span class="source-line-no">773</span><span id="line-773"> */</span> |
| <span class="source-line-no">774</span><span id="line-774"> abstract Collection<File> getChildren() throws IOException;</span> |
| <span class="source-line-no">775</span><span id="line-775"></span> |
| <span class="source-line-no">776</span><span id="line-776"> /**</span> |
| <span class="source-line-no">777</span><span id="line-777"> * close any outside readers of the file</span> |
| <span class="source-line-no">778</span><span id="line-778"> */</span> |
| <span class="source-line-no">779</span><span id="line-779"> abstract void close() throws IOException;</span> |
| <span class="source-line-no">780</span><span id="line-780"></span> |
| <span class="source-line-no">781</span><span id="line-781"> /** Returns the name of the file (not the full fs path, just the individual file name) */</span> |
| <span class="source-line-no">782</span><span id="line-782"> abstract String getName();</span> |
| <span class="source-line-no">783</span><span id="line-783"></span> |
| <span class="source-line-no">784</span><span id="line-784"> /** Returns the path to this file */</span> |
| <span class="source-line-no">785</span><span id="line-785"> abstract Path getPath();</span> |
| <span class="source-line-no">786</span><span id="line-786"></span> |
| <span class="source-line-no">787</span><span id="line-787"> /**</span> |
| <span class="source-line-no">788</span><span id="line-788"> * Move the file to the given destination</span> |
| <span class="source-line-no">789</span><span id="line-789"> * @return <tt>true</tt> on success</span> |
| <span class="source-line-no">790</span><span id="line-790"> */</span> |
| <span class="source-line-no">791</span><span id="line-791"> public boolean moveAndClose(Path dest) throws IOException {</span> |
| <span class="source-line-no">792</span><span id="line-792"> this.close();</span> |
| <span class="source-line-no">793</span><span id="line-793"> Path p = this.getPath();</span> |
| <span class="source-line-no">794</span><span id="line-794"> return CommonFSUtils.renameAndSetModifyTime(fs, p, dest);</span> |
| <span class="source-line-no">795</span><span id="line-795"> }</span> |
| <span class="source-line-no">796</span><span id="line-796"></span> |
| <span class="source-line-no">797</span><span id="line-797"> /** Returns the {@link FileSystem} on which this file resides */</span> |
| <span class="source-line-no">798</span><span id="line-798"> public FileSystem getFileSystem() {</span> |
| <span class="source-line-no">799</span><span id="line-799"> return this.fs;</span> |
| <span class="source-line-no">800</span><span id="line-800"> }</span> |
| <span class="source-line-no">801</span><span id="line-801"></span> |
| <span class="source-line-no">802</span><span id="line-802"> @Override</span> |
| <span class="source-line-no">803</span><span id="line-803"> public String toString() {</span> |
| <span class="source-line-no">804</span><span id="line-804"> return this.getClass().getSimpleName() + ", " + getPath().toString();</span> |
| <span class="source-line-no">805</span><span id="line-805"> }</span> |
| <span class="source-line-no">806</span><span id="line-806"> }</span> |
| <span class="source-line-no">807</span><span id="line-807"></span> |
| <span class="source-line-no">808</span><span id="line-808"> /**</span> |
| <span class="source-line-no">809</span><span id="line-809"> * A {@link File} that wraps a simple {@link Path} on a {@link FileSystem}.</span> |
| <span class="source-line-no">810</span><span id="line-810"> */</span> |
| <span class="source-line-no">811</span><span id="line-811"> private static class FileablePath extends File {</span> |
| <span class="source-line-no">812</span><span id="line-812"> private final Path file;</span> |
| <span class="source-line-no">813</span><span id="line-813"> private final FileStatusConverter getAsFile;</span> |
| <span class="source-line-no">814</span><span id="line-814"></span> |
| <span class="source-line-no">815</span><span id="line-815"> public FileablePath(FileSystem fs, Path file) {</span> |
| <span class="source-line-no">816</span><span id="line-816"> super(fs);</span> |
| <span class="source-line-no">817</span><span id="line-817"> this.file = file;</span> |
| <span class="source-line-no">818</span><span id="line-818"> this.getAsFile = new FileStatusConverter(fs);</span> |
| <span class="source-line-no">819</span><span id="line-819"> }</span> |
| <span class="source-line-no">820</span><span id="line-820"></span> |
| <span class="source-line-no">821</span><span id="line-821"> @Override</span> |
| <span class="source-line-no">822</span><span id="line-822"> public void delete() throws IOException {</span> |
| <span class="source-line-no">823</span><span id="line-823"> if (!fs.delete(file, true)) throw new IOException("Failed to delete:" + this.file);</span> |
| <span class="source-line-no">824</span><span id="line-824"> }</span> |
| <span class="source-line-no">825</span><span id="line-825"></span> |
| <span class="source-line-no">826</span><span id="line-826"> @Override</span> |
| <span class="source-line-no">827</span><span id="line-827"> public String getName() {</span> |
| <span class="source-line-no">828</span><span id="line-828"> return file.getName();</span> |
| <span class="source-line-no">829</span><span id="line-829"> }</span> |
| <span class="source-line-no">830</span><span id="line-830"></span> |
| <span class="source-line-no">831</span><span id="line-831"> @Override</span> |
| <span class="source-line-no">832</span><span id="line-832"> public Collection<File> getChildren() throws IOException {</span> |
| <span class="source-line-no">833</span><span id="line-833"> if (fs.isFile(file)) {</span> |
| <span class="source-line-no">834</span><span id="line-834"> return Collections.emptyList();</span> |
| <span class="source-line-no">835</span><span id="line-835"> }</span> |
| <span class="source-line-no">836</span><span id="line-836"> return Stream.of(fs.listStatus(file)).map(getAsFile).collect(Collectors.toList());</span> |
| <span class="source-line-no">837</span><span id="line-837"> }</span> |
| <span class="source-line-no">838</span><span id="line-838"></span> |
| <span class="source-line-no">839</span><span id="line-839"> @Override</span> |
| <span class="source-line-no">840</span><span id="line-840"> public boolean isFile() throws IOException {</span> |
| <span class="source-line-no">841</span><span id="line-841"> return fs.isFile(file);</span> |
| <span class="source-line-no">842</span><span id="line-842"> }</span> |
| <span class="source-line-no">843</span><span id="line-843"></span> |
| <span class="source-line-no">844</span><span id="line-844"> @Override</span> |
| <span class="source-line-no">845</span><span id="line-845"> public void close() throws IOException {</span> |
| <span class="source-line-no">846</span><span id="line-846"> // NOOP - files are implicitly closed on removal</span> |
| <span class="source-line-no">847</span><span id="line-847"> }</span> |
| <span class="source-line-no">848</span><span id="line-848"></span> |
| <span class="source-line-no">849</span><span id="line-849"> @Override</span> |
| <span class="source-line-no">850</span><span id="line-850"> Path getPath() {</span> |
| <span class="source-line-no">851</span><span id="line-851"> return file;</span> |
| <span class="source-line-no">852</span><span id="line-852"> }</span> |
| <span class="source-line-no">853</span><span id="line-853"> }</span> |
| <span class="source-line-no">854</span><span id="line-854"></span> |
| <span class="source-line-no">855</span><span id="line-855"> /**</span> |
| <span class="source-line-no">856</span><span id="line-856"> * {@link File} adapter for a {@link HStoreFile} living on a {@link FileSystem} .</span> |
| <span class="source-line-no">857</span><span id="line-857"> */</span> |
| <span class="source-line-no">858</span><span id="line-858"> private static class FileableStoreFile extends File {</span> |
| <span class="source-line-no">859</span><span id="line-859"> HStoreFile file;</span> |
| <span class="source-line-no">860</span><span id="line-860"></span> |
| <span class="source-line-no">861</span><span id="line-861"> public FileableStoreFile(FileSystem fs, HStoreFile store) {</span> |
| <span class="source-line-no">862</span><span id="line-862"> super(fs);</span> |
| <span class="source-line-no">863</span><span id="line-863"> this.file = store;</span> |
| <span class="source-line-no">864</span><span id="line-864"> }</span> |
| <span class="source-line-no">865</span><span id="line-865"></span> |
| <span class="source-line-no">866</span><span id="line-866"> @Override</span> |
| <span class="source-line-no">867</span><span id="line-867"> public void delete() throws IOException {</span> |
| <span class="source-line-no">868</span><span id="line-868"> file.deleteStoreFile();</span> |
| <span class="source-line-no">869</span><span id="line-869"> }</span> |
| <span class="source-line-no">870</span><span id="line-870"></span> |
| <span class="source-line-no">871</span><span id="line-871"> @Override</span> |
| <span class="source-line-no">872</span><span id="line-872"> public String getName() {</span> |
| <span class="source-line-no">873</span><span id="line-873"> return file.getPath().getName();</span> |
| <span class="source-line-no">874</span><span id="line-874"> }</span> |
| <span class="source-line-no">875</span><span id="line-875"></span> |
| <span class="source-line-no">876</span><span id="line-876"> @Override</span> |
| <span class="source-line-no">877</span><span id="line-877"> public boolean isFile() {</span> |
| <span class="source-line-no">878</span><span id="line-878"> return true;</span> |
| <span class="source-line-no">879</span><span id="line-879"> }</span> |
| <span class="source-line-no">880</span><span id="line-880"></span> |
| <span class="source-line-no">881</span><span id="line-881"> @Override</span> |
| <span class="source-line-no">882</span><span id="line-882"> public Collection<File> getChildren() throws IOException {</span> |
| <span class="source-line-no">883</span><span id="line-883"> // storefiles don't have children</span> |
| <span class="source-line-no">884</span><span id="line-884"> return Collections.emptyList();</span> |
| <span class="source-line-no">885</span><span id="line-885"> }</span> |
| <span class="source-line-no">886</span><span id="line-886"></span> |
| <span class="source-line-no">887</span><span id="line-887"> @Override</span> |
| <span class="source-line-no">888</span><span id="line-888"> public void close() throws IOException {</span> |
| <span class="source-line-no">889</span><span id="line-889"> file.closeStoreFile(true);</span> |
| <span class="source-line-no">890</span><span id="line-890"> }</span> |
| <span class="source-line-no">891</span><span id="line-891"></span> |
| <span class="source-line-no">892</span><span id="line-892"> @Override</span> |
| <span class="source-line-no">893</span><span id="line-893"> Path getPath() {</span> |
| <span class="source-line-no">894</span><span id="line-894"> return file.getPath();</span> |
| <span class="source-line-no">895</span><span id="line-895"> }</span> |
| <span class="source-line-no">896</span><span id="line-896"> }</span> |
| <span class="source-line-no">897</span><span id="line-897">}</span> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| </pre> |
| </div> |
| </main> |
| </body> |
| </html> |