blob: 7c1bb9e03564f07a109916ace1b94d26ba7502e8 [file] [log] [blame]
<!DOCTYPE HTML>
<html lang="en">
<head>
<!-- Generated by javadoc (17) -->
<title>Source code</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="description" content="source: package: org.apache.hadoop.hbase.mapreduce, class: TableSnapshotInputFormatImpl">
<meta name="generator" content="javadoc/SourceToHTMLConverter">
<link rel="stylesheet" type="text/css" href="../../../../../../stylesheet.css" title="Style">
</head>
<body class="source-page">
<main role="main">
<div class="source-container">
<pre><span class="source-line-no">001</span><span id="line-1">/*</span>
<span class="source-line-no">002</span><span id="line-2"> * Licensed to the Apache Software Foundation (ASF) under one</span>
<span class="source-line-no">003</span><span id="line-3"> * or more contributor license agreements. See the NOTICE file</span>
<span class="source-line-no">004</span><span id="line-4"> * distributed with this work for additional information</span>
<span class="source-line-no">005</span><span id="line-5"> * regarding copyright ownership. The ASF licenses this file</span>
<span class="source-line-no">006</span><span id="line-6"> * to you under the Apache License, Version 2.0 (the</span>
<span class="source-line-no">007</span><span id="line-7"> * "License"); you may not use this file except in compliance</span>
<span class="source-line-no">008</span><span id="line-8"> * with the License. You may obtain a copy of the License at</span>
<span class="source-line-no">009</span><span id="line-9"> *</span>
<span class="source-line-no">010</span><span id="line-10"> * http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="source-line-no">011</span><span id="line-11"> *</span>
<span class="source-line-no">012</span><span id="line-12"> * Unless required by applicable law or agreed to in writing, software</span>
<span class="source-line-no">013</span><span id="line-13"> * distributed under the License is distributed on an "AS IS" BASIS,</span>
<span class="source-line-no">014</span><span id="line-14"> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="source-line-no">015</span><span id="line-15"> * See the License for the specific language governing permissions and</span>
<span class="source-line-no">016</span><span id="line-16"> * limitations under the License.</span>
<span class="source-line-no">017</span><span id="line-17"> */</span>
<span class="source-line-no">018</span><span id="line-18">package org.apache.hadoop.hbase.mapreduce;</span>
<span class="source-line-no">019</span><span id="line-19"></span>
<span class="source-line-no">020</span><span id="line-20">import java.io.ByteArrayOutputStream;</span>
<span class="source-line-no">021</span><span id="line-21">import java.io.DataInput;</span>
<span class="source-line-no">022</span><span id="line-22">import java.io.DataOutput;</span>
<span class="source-line-no">023</span><span id="line-23">import java.io.IOException;</span>
<span class="source-line-no">024</span><span id="line-24">import java.lang.reflect.InvocationTargetException;</span>
<span class="source-line-no">025</span><span id="line-25">import java.util.ArrayList;</span>
<span class="source-line-no">026</span><span id="line-26">import java.util.List;</span>
<span class="source-line-no">027</span><span id="line-27">import java.util.UUID;</span>
<span class="source-line-no">028</span><span id="line-28">import org.apache.hadoop.conf.Configuration;</span>
<span class="source-line-no">029</span><span id="line-29">import org.apache.hadoop.fs.FileSystem;</span>
<span class="source-line-no">030</span><span id="line-30">import org.apache.hadoop.fs.Path;</span>
<span class="source-line-no">031</span><span id="line-31">import org.apache.hadoop.hbase.HDFSBlocksDistribution;</span>
<span class="source-line-no">032</span><span id="line-32">import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;</span>
<span class="source-line-no">033</span><span id="line-33">import org.apache.hadoop.hbase.HRegionLocation;</span>
<span class="source-line-no">034</span><span id="line-34">import org.apache.hadoop.hbase.PrivateCellUtil;</span>
<span class="source-line-no">035</span><span id="line-35">import org.apache.hadoop.hbase.client.ClientSideRegionScanner;</span>
<span class="source-line-no">036</span><span id="line-36">import org.apache.hadoop.hbase.client.Connection;</span>
<span class="source-line-no">037</span><span id="line-37">import org.apache.hadoop.hbase.client.ConnectionFactory;</span>
<span class="source-line-no">038</span><span id="line-38">import org.apache.hadoop.hbase.client.IsolationLevel;</span>
<span class="source-line-no">039</span><span id="line-39">import org.apache.hadoop.hbase.client.RegionInfo;</span>
<span class="source-line-no">040</span><span id="line-40">import org.apache.hadoop.hbase.client.RegionLocator;</span>
<span class="source-line-no">041</span><span id="line-41">import org.apache.hadoop.hbase.client.Result;</span>
<span class="source-line-no">042</span><span id="line-42">import org.apache.hadoop.hbase.client.Scan;</span>
<span class="source-line-no">043</span><span id="line-43">import org.apache.hadoop.hbase.client.Scan.ReadType;</span>
<span class="source-line-no">044</span><span id="line-44">import org.apache.hadoop.hbase.client.TableDescriptor;</span>
<span class="source-line-no">045</span><span id="line-45">import org.apache.hadoop.hbase.io.ImmutableBytesWritable;</span>
<span class="source-line-no">046</span><span id="line-46">import org.apache.hadoop.hbase.regionserver.HRegion;</span>
<span class="source-line-no">047</span><span id="line-47">import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;</span>
<span class="source-line-no">048</span><span id="line-48">import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;</span>
<span class="source-line-no">049</span><span id="line-49">import org.apache.hadoop.hbase.snapshot.SnapshotManifest;</span>
<span class="source-line-no">050</span><span id="line-50">import org.apache.hadoop.hbase.util.Bytes;</span>
<span class="source-line-no">051</span><span id="line-51">import org.apache.hadoop.hbase.util.CommonFSUtils;</span>
<span class="source-line-no">052</span><span id="line-52">import org.apache.hadoop.hbase.util.RegionSplitter;</span>
<span class="source-line-no">053</span><span id="line-53">import org.apache.hadoop.io.Writable;</span>
<span class="source-line-no">054</span><span id="line-54">import org.apache.hadoop.mapreduce.Job;</span>
<span class="source-line-no">055</span><span id="line-55">import org.apache.yetus.audience.InterfaceAudience;</span>
<span class="source-line-no">056</span><span id="line-56">import org.slf4j.Logger;</span>
<span class="source-line-no">057</span><span id="line-57">import org.slf4j.LoggerFactory;</span>
<span class="source-line-no">058</span><span id="line-58"></span>
<span class="source-line-no">059</span><span id="line-59">import org.apache.hbase.thirdparty.com.google.common.collect.Lists;</span>
<span class="source-line-no">060</span><span id="line-60"></span>
<span class="source-line-no">061</span><span id="line-61">import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;</span>
<span class="source-line-no">062</span><span id="line-62">import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit;</span>
<span class="source-line-no">063</span><span id="line-63">import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;</span>
<span class="source-line-no">064</span><span id="line-64">import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;</span>
<span class="source-line-no">065</span><span id="line-65"></span>
<span class="source-line-no">066</span><span id="line-66">/**</span>
<span class="source-line-no">067</span><span id="line-67"> * Hadoop MR API-agnostic implementation for mapreduce over table snapshots.</span>
<span class="source-line-no">068</span><span id="line-68"> */</span>
<span class="source-line-no">069</span><span id="line-69">@InterfaceAudience.Private</span>
<span class="source-line-no">070</span><span id="line-70">public class TableSnapshotInputFormatImpl {</span>
<span class="source-line-no">071</span><span id="line-71"> // TODO: Snapshots files are owned in fs by the hbase user. There is no</span>
<span class="source-line-no">072</span><span id="line-72"> // easy way to delegate access.</span>
<span class="source-line-no">073</span><span id="line-73"></span>
<span class="source-line-no">074</span><span id="line-74"> public static final Logger LOG = LoggerFactory.getLogger(TableSnapshotInputFormatImpl.class);</span>
<span class="source-line-no">075</span><span id="line-75"></span>
<span class="source-line-no">076</span><span id="line-76"> private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";</span>
<span class="source-line-no">077</span><span id="line-77"> // key for specifying the root dir of the restored snapshot</span>
<span class="source-line-no">078</span><span id="line-78"> protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";</span>
<span class="source-line-no">079</span><span id="line-79"></span>
<span class="source-line-no">080</span><span id="line-80"> /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution, int)} */</span>
<span class="source-line-no">081</span><span id="line-81"> private static final String LOCALITY_CUTOFF_MULTIPLIER =</span>
<span class="source-line-no">082</span><span id="line-82"> "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";</span>
<span class="source-line-no">083</span><span id="line-83"> private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;</span>
<span class="source-line-no">084</span><span id="line-84"></span>
<span class="source-line-no">085</span><span id="line-85"> /**</span>
<span class="source-line-no">086</span><span id="line-86"> * For MapReduce jobs running multiple mappers per region, determines what split algorithm we</span>
<span class="source-line-no">087</span><span id="line-87"> * should be using to find split points for scanners.</span>
<span class="source-line-no">088</span><span id="line-88"> */</span>
<span class="source-line-no">089</span><span id="line-89"> public static final String SPLIT_ALGO = "hbase.mapreduce.split.algorithm";</span>
<span class="source-line-no">090</span><span id="line-90"> /**</span>
<span class="source-line-no">091</span><span id="line-91"> * For MapReduce jobs running multiple mappers per region, determines number of splits to generate</span>
<span class="source-line-no">092</span><span id="line-92"> * per region.</span>
<span class="source-line-no">093</span><span id="line-93"> */</span>
<span class="source-line-no">094</span><span id="line-94"> public static final String NUM_SPLITS_PER_REGION = "hbase.mapreduce.splits.per.region";</span>
<span class="source-line-no">095</span><span id="line-95"></span>
<span class="source-line-no">096</span><span id="line-96"> /**</span>
<span class="source-line-no">097</span><span id="line-97"> * Whether to calculate the block location for splits. Default to true. If the computing layer</span>
<span class="source-line-no">098</span><span id="line-98"> * runs outside of HBase cluster, the block locality does not master. Setting this value to false</span>
<span class="source-line-no">099</span><span id="line-99"> * could skip the calculation and save some time. Set access modifier to "public" so that these</span>
<span class="source-line-no">100</span><span id="line-100"> * could be accessed by test classes of both org.apache.hadoop.hbase.mapred and</span>
<span class="source-line-no">101</span><span id="line-101"> * org.apache.hadoop.hbase.mapreduce.</span>
<span class="source-line-no">102</span><span id="line-102"> */</span>
<span class="source-line-no">103</span><span id="line-103"> public static final String SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY =</span>
<span class="source-line-no">104</span><span id="line-104"> "hbase.TableSnapshotInputFormat.locality.enabled";</span>
<span class="source-line-no">105</span><span id="line-105"> public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT = true;</span>
<span class="source-line-no">106</span><span id="line-106"></span>
<span class="source-line-no">107</span><span id="line-107"> /**</span>
<span class="source-line-no">108</span><span id="line-108"> * Whether to calculate the Snapshot region location by region location from meta. It is much</span>
<span class="source-line-no">109</span><span id="line-109"> * faster than computing block locations for splits.</span>
<span class="source-line-no">110</span><span id="line-110"> */</span>
<span class="source-line-no">111</span><span id="line-111"> public static final String SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION =</span>
<span class="source-line-no">112</span><span id="line-112"> "hbase.TableSnapshotInputFormat.locality.by.region.location";</span>
<span class="source-line-no">113</span><span id="line-113"></span>
<span class="source-line-no">114</span><span id="line-114"> public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT = false;</span>
<span class="source-line-no">115</span><span id="line-115"></span>
<span class="source-line-no">116</span><span id="line-116"> /**</span>
<span class="source-line-no">117</span><span id="line-117"> * In some scenario, scan limited rows on each InputSplit for sampling data extraction</span>
<span class="source-line-no">118</span><span id="line-118"> */</span>
<span class="source-line-no">119</span><span id="line-119"> public static final String SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT =</span>
<span class="source-line-no">120</span><span id="line-120"> "hbase.TableSnapshotInputFormat.row.limit.per.inputsplit";</span>
<span class="source-line-no">121</span><span id="line-121"></span>
<span class="source-line-no">122</span><span id="line-122"> /**</span>
<span class="source-line-no">123</span><span id="line-123"> * Whether to enable scan metrics on Scan, default to true</span>
<span class="source-line-no">124</span><span id="line-124"> */</span>
<span class="source-line-no">125</span><span id="line-125"> public static final String SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED =</span>
<span class="source-line-no">126</span><span id="line-126"> "hbase.TableSnapshotInputFormat.scan_metrics.enabled";</span>
<span class="source-line-no">127</span><span id="line-127"></span>
<span class="source-line-no">128</span><span id="line-128"> public static final boolean SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED_DEFAULT = true;</span>
<span class="source-line-no">129</span><span id="line-129"></span>
<span class="source-line-no">130</span><span id="line-130"> /**</span>
<span class="source-line-no">131</span><span id="line-131"> * The {@link ReadType} which should be set on the {@link Scan} to read the HBase Snapshot,</span>
<span class="source-line-no">132</span><span id="line-132"> * default STREAM.</span>
<span class="source-line-no">133</span><span id="line-133"> */</span>
<span class="source-line-no">134</span><span id="line-134"> public static final String SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE =</span>
<span class="source-line-no">135</span><span id="line-135"> "hbase.TableSnapshotInputFormat.scanner.readtype";</span>
<span class="source-line-no">136</span><span id="line-136"> public static final ReadType SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT = ReadType.STREAM;</span>
<span class="source-line-no">137</span><span id="line-137"></span>
<span class="source-line-no">138</span><span id="line-138"> /**</span>
<span class="source-line-no">139</span><span id="line-139"> * Implementation class for InputSplit logic common between mapred and mapreduce.</span>
<span class="source-line-no">140</span><span id="line-140"> */</span>
<span class="source-line-no">141</span><span id="line-141"> public static class InputSplit implements Writable {</span>
<span class="source-line-no">142</span><span id="line-142"></span>
<span class="source-line-no">143</span><span id="line-143"> private TableDescriptor htd;</span>
<span class="source-line-no">144</span><span id="line-144"> private RegionInfo regionInfo;</span>
<span class="source-line-no">145</span><span id="line-145"> private String[] locations;</span>
<span class="source-line-no">146</span><span id="line-146"> private String scan;</span>
<span class="source-line-no">147</span><span id="line-147"> private String restoreDir;</span>
<span class="source-line-no">148</span><span id="line-148"></span>
<span class="source-line-no">149</span><span id="line-149"> // constructor for mapreduce framework / Writable</span>
<span class="source-line-no">150</span><span id="line-150"> public InputSplit() {</span>
<span class="source-line-no">151</span><span id="line-151"> }</span>
<span class="source-line-no">152</span><span id="line-152"></span>
<span class="source-line-no">153</span><span id="line-153"> public InputSplit(TableDescriptor htd, RegionInfo regionInfo, List&lt;String&gt; locations, Scan scan,</span>
<span class="source-line-no">154</span><span id="line-154"> Path restoreDir) {</span>
<span class="source-line-no">155</span><span id="line-155"> this.htd = htd;</span>
<span class="source-line-no">156</span><span id="line-156"> this.regionInfo = regionInfo;</span>
<span class="source-line-no">157</span><span id="line-157"> if (locations == null || locations.isEmpty()) {</span>
<span class="source-line-no">158</span><span id="line-158"> this.locations = new String[0];</span>
<span class="source-line-no">159</span><span id="line-159"> } else {</span>
<span class="source-line-no">160</span><span id="line-160"> this.locations = locations.toArray(new String[locations.size()]);</span>
<span class="source-line-no">161</span><span id="line-161"> }</span>
<span class="source-line-no">162</span><span id="line-162"> try {</span>
<span class="source-line-no">163</span><span id="line-163"> this.scan = scan != null ? TableMapReduceUtil.convertScanToString(scan) : "";</span>
<span class="source-line-no">164</span><span id="line-164"> } catch (IOException e) {</span>
<span class="source-line-no">165</span><span id="line-165"> LOG.warn("Failed to convert Scan to String", e);</span>
<span class="source-line-no">166</span><span id="line-166"> }</span>
<span class="source-line-no">167</span><span id="line-167"></span>
<span class="source-line-no">168</span><span id="line-168"> this.restoreDir = restoreDir.toString();</span>
<span class="source-line-no">169</span><span id="line-169"> }</span>
<span class="source-line-no">170</span><span id="line-170"></span>
<span class="source-line-no">171</span><span id="line-171"> public TableDescriptor getHtd() {</span>
<span class="source-line-no">172</span><span id="line-172"> return htd;</span>
<span class="source-line-no">173</span><span id="line-173"> }</span>
<span class="source-line-no">174</span><span id="line-174"></span>
<span class="source-line-no">175</span><span id="line-175"> public String getScan() {</span>
<span class="source-line-no">176</span><span id="line-176"> return scan;</span>
<span class="source-line-no">177</span><span id="line-177"> }</span>
<span class="source-line-no">178</span><span id="line-178"></span>
<span class="source-line-no">179</span><span id="line-179"> public String getRestoreDir() {</span>
<span class="source-line-no">180</span><span id="line-180"> return restoreDir;</span>
<span class="source-line-no">181</span><span id="line-181"> }</span>
<span class="source-line-no">182</span><span id="line-182"></span>
<span class="source-line-no">183</span><span id="line-183"> public long getLength() {</span>
<span class="source-line-no">184</span><span id="line-184"> // TODO: We can obtain the file sizes of the snapshot here.</span>
<span class="source-line-no">185</span><span id="line-185"> return 0;</span>
<span class="source-line-no">186</span><span id="line-186"> }</span>
<span class="source-line-no">187</span><span id="line-187"></span>
<span class="source-line-no">188</span><span id="line-188"> public String[] getLocations() {</span>
<span class="source-line-no">189</span><span id="line-189"> return locations;</span>
<span class="source-line-no">190</span><span id="line-190"> }</span>
<span class="source-line-no">191</span><span id="line-191"></span>
<span class="source-line-no">192</span><span id="line-192"> public TableDescriptor getTableDescriptor() {</span>
<span class="source-line-no">193</span><span id="line-193"> return htd;</span>
<span class="source-line-no">194</span><span id="line-194"> }</span>
<span class="source-line-no">195</span><span id="line-195"></span>
<span class="source-line-no">196</span><span id="line-196"> public RegionInfo getRegionInfo() {</span>
<span class="source-line-no">197</span><span id="line-197"> return regionInfo;</span>
<span class="source-line-no">198</span><span id="line-198"> }</span>
<span class="source-line-no">199</span><span id="line-199"></span>
<span class="source-line-no">200</span><span id="line-200"> // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of</span>
<span class="source-line-no">201</span><span id="line-201"> // doing this wrapping with Writables.</span>
<span class="source-line-no">202</span><span id="line-202"> @Override</span>
<span class="source-line-no">203</span><span id="line-203"> public void write(DataOutput out) throws IOException {</span>
<span class="source-line-no">204</span><span id="line-204"> TableSnapshotRegionSplit.Builder builder = TableSnapshotRegionSplit.newBuilder()</span>
<span class="source-line-no">205</span><span id="line-205"> .setTable(ProtobufUtil.toTableSchema(htd)).setRegion(ProtobufUtil.toRegionInfo(regionInfo));</span>
<span class="source-line-no">206</span><span id="line-206"></span>
<span class="source-line-no">207</span><span id="line-207"> for (String location : locations) {</span>
<span class="source-line-no">208</span><span id="line-208"> builder.addLocations(location);</span>
<span class="source-line-no">209</span><span id="line-209"> }</span>
<span class="source-line-no">210</span><span id="line-210"></span>
<span class="source-line-no">211</span><span id="line-211"> TableSnapshotRegionSplit split = builder.build();</span>
<span class="source-line-no">212</span><span id="line-212"></span>
<span class="source-line-no">213</span><span id="line-213"> ByteArrayOutputStream baos = new ByteArrayOutputStream();</span>
<span class="source-line-no">214</span><span id="line-214"> split.writeTo(baos);</span>
<span class="source-line-no">215</span><span id="line-215"> baos.close();</span>
<span class="source-line-no">216</span><span id="line-216"> byte[] buf = baos.toByteArray();</span>
<span class="source-line-no">217</span><span id="line-217"> out.writeInt(buf.length);</span>
<span class="source-line-no">218</span><span id="line-218"> out.write(buf);</span>
<span class="source-line-no">219</span><span id="line-219"></span>
<span class="source-line-no">220</span><span id="line-220"> Bytes.writeByteArray(out, Bytes.toBytes(scan));</span>
<span class="source-line-no">221</span><span id="line-221"> Bytes.writeByteArray(out, Bytes.toBytes(restoreDir));</span>
<span class="source-line-no">222</span><span id="line-222"></span>
<span class="source-line-no">223</span><span id="line-223"> }</span>
<span class="source-line-no">224</span><span id="line-224"></span>
<span class="source-line-no">225</span><span id="line-225"> @Override</span>
<span class="source-line-no">226</span><span id="line-226"> public void readFields(DataInput in) throws IOException {</span>
<span class="source-line-no">227</span><span id="line-227"> int len = in.readInt();</span>
<span class="source-line-no">228</span><span id="line-228"> byte[] buf = new byte[len];</span>
<span class="source-line-no">229</span><span id="line-229"> in.readFully(buf);</span>
<span class="source-line-no">230</span><span id="line-230"> TableSnapshotRegionSplit split = TableSnapshotRegionSplit.parser().parseFrom(buf);</span>
<span class="source-line-no">231</span><span id="line-231"> this.htd = ProtobufUtil.toTableDescriptor(split.getTable());</span>
<span class="source-line-no">232</span><span id="line-232"> this.regionInfo = ProtobufUtil.toRegionInfo(split.getRegion());</span>
<span class="source-line-no">233</span><span id="line-233"> List&lt;String&gt; locationsList = split.getLocationsList();</span>
<span class="source-line-no">234</span><span id="line-234"> this.locations = locationsList.toArray(new String[locationsList.size()]);</span>
<span class="source-line-no">235</span><span id="line-235"></span>
<span class="source-line-no">236</span><span id="line-236"> this.scan = Bytes.toString(Bytes.readByteArray(in));</span>
<span class="source-line-no">237</span><span id="line-237"> this.restoreDir = Bytes.toString(Bytes.readByteArray(in));</span>
<span class="source-line-no">238</span><span id="line-238"> }</span>
<span class="source-line-no">239</span><span id="line-239"> }</span>
<span class="source-line-no">240</span><span id="line-240"></span>
<span class="source-line-no">241</span><span id="line-241"> /**</span>
<span class="source-line-no">242</span><span id="line-242"> * Implementation class for RecordReader logic common between mapred and mapreduce.</span>
<span class="source-line-no">243</span><span id="line-243"> */</span>
<span class="source-line-no">244</span><span id="line-244"> public static class RecordReader {</span>
<span class="source-line-no">245</span><span id="line-245"> private InputSplit split;</span>
<span class="source-line-no">246</span><span id="line-246"> private Scan scan;</span>
<span class="source-line-no">247</span><span id="line-247"> private Result result = null;</span>
<span class="source-line-no">248</span><span id="line-248"> private ImmutableBytesWritable row = null;</span>
<span class="source-line-no">249</span><span id="line-249"> private ClientSideRegionScanner scanner;</span>
<span class="source-line-no">250</span><span id="line-250"> private int numOfCompleteRows = 0;</span>
<span class="source-line-no">251</span><span id="line-251"> private int rowLimitPerSplit;</span>
<span class="source-line-no">252</span><span id="line-252"></span>
<span class="source-line-no">253</span><span id="line-253"> public ClientSideRegionScanner getScanner() {</span>
<span class="source-line-no">254</span><span id="line-254"> return scanner;</span>
<span class="source-line-no">255</span><span id="line-255"> }</span>
<span class="source-line-no">256</span><span id="line-256"></span>
<span class="source-line-no">257</span><span id="line-257"> public void initialize(InputSplit split, Configuration conf) throws IOException {</span>
<span class="source-line-no">258</span><span id="line-258"> this.scan = TableMapReduceUtil.convertStringToScan(split.getScan());</span>
<span class="source-line-no">259</span><span id="line-259"> this.split = split;</span>
<span class="source-line-no">260</span><span id="line-260"> this.rowLimitPerSplit = conf.getInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 0);</span>
<span class="source-line-no">261</span><span id="line-261"> TableDescriptor htd = split.htd;</span>
<span class="source-line-no">262</span><span id="line-262"> RegionInfo hri = this.split.getRegionInfo();</span>
<span class="source-line-no">263</span><span id="line-263"> FileSystem fs = CommonFSUtils.getCurrentFileSystem(conf);</span>
<span class="source-line-no">264</span><span id="line-264"></span>
<span class="source-line-no">265</span><span id="line-265"> // region is immutable, this should be fine,</span>
<span class="source-line-no">266</span><span id="line-266"> // otherwise we have to set the thread read point</span>
<span class="source-line-no">267</span><span id="line-267"> scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);</span>
<span class="source-line-no">268</span><span id="line-268"> // disable caching of data blocks</span>
<span class="source-line-no">269</span><span id="line-269"> scan.setCacheBlocks(false);</span>
<span class="source-line-no">270</span><span id="line-270"></span>
<span class="source-line-no">271</span><span id="line-271"> scanner =</span>
<span class="source-line-no">272</span><span id="line-272"> new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null);</span>
<span class="source-line-no">273</span><span id="line-273"> }</span>
<span class="source-line-no">274</span><span id="line-274"></span>
<span class="source-line-no">275</span><span id="line-275"> public boolean nextKeyValue() throws IOException {</span>
<span class="source-line-no">276</span><span id="line-276"> result = scanner.next();</span>
<span class="source-line-no">277</span><span id="line-277"> if (result == null) {</span>
<span class="source-line-no">278</span><span id="line-278"> // we are done</span>
<span class="source-line-no">279</span><span id="line-279"> return false;</span>
<span class="source-line-no">280</span><span id="line-280"> }</span>
<span class="source-line-no">281</span><span id="line-281"></span>
<span class="source-line-no">282</span><span id="line-282"> if (rowLimitPerSplit &gt; 0 &amp;&amp; ++this.numOfCompleteRows &gt; rowLimitPerSplit) {</span>
<span class="source-line-no">283</span><span id="line-283"> return false;</span>
<span class="source-line-no">284</span><span id="line-284"> }</span>
<span class="source-line-no">285</span><span id="line-285"> if (this.row == null) {</span>
<span class="source-line-no">286</span><span id="line-286"> this.row = new ImmutableBytesWritable();</span>
<span class="source-line-no">287</span><span id="line-287"> }</span>
<span class="source-line-no">288</span><span id="line-288"> this.row.set(result.getRow());</span>
<span class="source-line-no">289</span><span id="line-289"> return true;</span>
<span class="source-line-no">290</span><span id="line-290"> }</span>
<span class="source-line-no">291</span><span id="line-291"></span>
<span class="source-line-no">292</span><span id="line-292"> public ImmutableBytesWritable getCurrentKey() {</span>
<span class="source-line-no">293</span><span id="line-293"> return row;</span>
<span class="source-line-no">294</span><span id="line-294"> }</span>
<span class="source-line-no">295</span><span id="line-295"></span>
<span class="source-line-no">296</span><span id="line-296"> public Result getCurrentValue() {</span>
<span class="source-line-no">297</span><span id="line-297"> return result;</span>
<span class="source-line-no">298</span><span id="line-298"> }</span>
<span class="source-line-no">299</span><span id="line-299"></span>
<span class="source-line-no">300</span><span id="line-300"> public long getPos() {</span>
<span class="source-line-no">301</span><span id="line-301"> return 0;</span>
<span class="source-line-no">302</span><span id="line-302"> }</span>
<span class="source-line-no">303</span><span id="line-303"></span>
<span class="source-line-no">304</span><span id="line-304"> public float getProgress() {</span>
<span class="source-line-no">305</span><span id="line-305"> return 0; // TODO: use total bytes to estimate</span>
<span class="source-line-no">306</span><span id="line-306"> }</span>
<span class="source-line-no">307</span><span id="line-307"></span>
<span class="source-line-no">308</span><span id="line-308"> public void close() {</span>
<span class="source-line-no">309</span><span id="line-309"> if (this.scanner != null) {</span>
<span class="source-line-no">310</span><span id="line-310"> this.scanner.close();</span>
<span class="source-line-no">311</span><span id="line-311"> }</span>
<span class="source-line-no">312</span><span id="line-312"> }</span>
<span class="source-line-no">313</span><span id="line-313"> }</span>
<span class="source-line-no">314</span><span id="line-314"></span>
<span class="source-line-no">315</span><span id="line-315"> public static List&lt;InputSplit&gt; getSplits(Configuration conf) throws IOException {</span>
<span class="source-line-no">316</span><span id="line-316"> String snapshotName = getSnapshotName(conf);</span>
<span class="source-line-no">317</span><span id="line-317"></span>
<span class="source-line-no">318</span><span id="line-318"> Path rootDir = CommonFSUtils.getRootDir(conf);</span>
<span class="source-line-no">319</span><span id="line-319"> FileSystem fs = rootDir.getFileSystem(conf);</span>
<span class="source-line-no">320</span><span id="line-320"></span>
<span class="source-line-no">321</span><span id="line-321"> SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs);</span>
<span class="source-line-no">322</span><span id="line-322"></span>
<span class="source-line-no">323</span><span id="line-323"> List&lt;RegionInfo&gt; regionInfos = getRegionInfosFromManifest(manifest);</span>
<span class="source-line-no">324</span><span id="line-324"></span>
<span class="source-line-no">325</span><span id="line-325"> // TODO: mapred does not support scan as input API. Work around for now.</span>
<span class="source-line-no">326</span><span id="line-326"> Scan scan = extractScanFromConf(conf);</span>
<span class="source-line-no">327</span><span id="line-327"> // the temp dir where the snapshot is restored</span>
<span class="source-line-no">328</span><span id="line-328"> Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));</span>
<span class="source-line-no">329</span><span id="line-329"></span>
<span class="source-line-no">330</span><span id="line-330"> RegionSplitter.SplitAlgorithm splitAlgo = getSplitAlgo(conf);</span>
<span class="source-line-no">331</span><span id="line-331"></span>
<span class="source-line-no">332</span><span id="line-332"> int numSplits = conf.getInt(NUM_SPLITS_PER_REGION, 1);</span>
<span class="source-line-no">333</span><span id="line-333"></span>
<span class="source-line-no">334</span><span id="line-334"> return getSplits(scan, manifest, regionInfos, restoreDir, conf, splitAlgo, numSplits);</span>
<span class="source-line-no">335</span><span id="line-335"> }</span>
<span class="source-line-no">336</span><span id="line-336"></span>
<span class="source-line-no">337</span><span id="line-337"> public static RegionSplitter.SplitAlgorithm getSplitAlgo(Configuration conf) throws IOException {</span>
<span class="source-line-no">338</span><span id="line-338"> String splitAlgoClassName = conf.get(SPLIT_ALGO);</span>
<span class="source-line-no">339</span><span id="line-339"> if (splitAlgoClassName == null) {</span>
<span class="source-line-no">340</span><span id="line-340"> return null;</span>
<span class="source-line-no">341</span><span id="line-341"> }</span>
<span class="source-line-no">342</span><span id="line-342"> try {</span>
<span class="source-line-no">343</span><span id="line-343"> return Class.forName(splitAlgoClassName).asSubclass(RegionSplitter.SplitAlgorithm.class)</span>
<span class="source-line-no">344</span><span id="line-344"> .getDeclaredConstructor().newInstance();</span>
<span class="source-line-no">345</span><span id="line-345"> } catch (ClassNotFoundException | InstantiationException | IllegalAccessException</span>
<span class="source-line-no">346</span><span id="line-346"> | NoSuchMethodException | InvocationTargetException e) {</span>
<span class="source-line-no">347</span><span id="line-347"> throw new IOException("SplitAlgo class " + splitAlgoClassName + " is not found", e);</span>
<span class="source-line-no">348</span><span id="line-348"> }</span>
<span class="source-line-no">349</span><span id="line-349"> }</span>
<span class="source-line-no">350</span><span id="line-350"></span>
<span class="source-line-no">351</span><span id="line-351"> public static List&lt;RegionInfo&gt; getRegionInfosFromManifest(SnapshotManifest manifest) {</span>
<span class="source-line-no">352</span><span id="line-352"> List&lt;SnapshotRegionManifest&gt; regionManifests = manifest.getRegionManifests();</span>
<span class="source-line-no">353</span><span id="line-353"> if (regionManifests == null) {</span>
<span class="source-line-no">354</span><span id="line-354"> throw new IllegalArgumentException("Snapshot seems empty");</span>
<span class="source-line-no">355</span><span id="line-355"> }</span>
<span class="source-line-no">356</span><span id="line-356"></span>
<span class="source-line-no">357</span><span id="line-357"> List&lt;RegionInfo&gt; regionInfos = Lists.newArrayListWithCapacity(regionManifests.size());</span>
<span class="source-line-no">358</span><span id="line-358"></span>
<span class="source-line-no">359</span><span id="line-359"> for (SnapshotRegionManifest regionManifest : regionManifests) {</span>
<span class="source-line-no">360</span><span id="line-360"> RegionInfo hri = ProtobufUtil.toRegionInfo(regionManifest.getRegionInfo());</span>
<span class="source-line-no">361</span><span id="line-361"> if (hri.isOffline() &amp;&amp; (hri.isSplit() || hri.isSplitParent())) {</span>
<span class="source-line-no">362</span><span id="line-362"> continue;</span>
<span class="source-line-no">363</span><span id="line-363"> }</span>
<span class="source-line-no">364</span><span id="line-364"> regionInfos.add(hri);</span>
<span class="source-line-no">365</span><span id="line-365"> }</span>
<span class="source-line-no">366</span><span id="line-366"> return regionInfos;</span>
<span class="source-line-no">367</span><span id="line-367"> }</span>
<span class="source-line-no">368</span><span id="line-368"></span>
<span class="source-line-no">369</span><span id="line-369"> public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName,</span>
<span class="source-line-no">370</span><span id="line-370"> Path rootDir, FileSystem fs) throws IOException {</span>
<span class="source-line-no">371</span><span id="line-371"> Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);</span>
<span class="source-line-no">372</span><span id="line-372"> SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);</span>
<span class="source-line-no">373</span><span id="line-373"> return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);</span>
<span class="source-line-no">374</span><span id="line-374"> }</span>
<span class="source-line-no">375</span><span id="line-375"></span>
<span class="source-line-no">376</span><span id="line-376"> public static Scan extractScanFromConf(Configuration conf) throws IOException {</span>
<span class="source-line-no">377</span><span id="line-377"> Scan scan = null;</span>
<span class="source-line-no">378</span><span id="line-378"> if (conf.get(TableInputFormat.SCAN) != null) {</span>
<span class="source-line-no">379</span><span id="line-379"> scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));</span>
<span class="source-line-no">380</span><span id="line-380"> } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {</span>
<span class="source-line-no">381</span><span id="line-381"> String[] columns =</span>
<span class="source-line-no">382</span><span id="line-382"> conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");</span>
<span class="source-line-no">383</span><span id="line-383"> scan = new Scan();</span>
<span class="source-line-no">384</span><span id="line-384"> for (String col : columns) {</span>
<span class="source-line-no">385</span><span id="line-385"> scan.addFamily(Bytes.toBytes(col));</span>
<span class="source-line-no">386</span><span id="line-386"> }</span>
<span class="source-line-no">387</span><span id="line-387"> } else {</span>
<span class="source-line-no">388</span><span id="line-388"> throw new IllegalArgumentException("Unable to create scan");</span>
<span class="source-line-no">389</span><span id="line-389"> }</span>
<span class="source-line-no">390</span><span id="line-390"></span>
<span class="source-line-no">391</span><span id="line-391"> if (scan.getReadType() == ReadType.DEFAULT) {</span>
<span class="source-line-no">392</span><span id="line-392"> LOG.info(</span>
<span class="source-line-no">393</span><span id="line-393"> "Provided Scan has DEFAULT ReadType," + " updating STREAM for Snapshot-based InputFormat");</span>
<span class="source-line-no">394</span><span id="line-394"> // Update the "DEFAULT" ReadType to be "STREAM" to try to improve the default case.</span>
<span class="source-line-no">395</span><span id="line-395"> scan.setReadType(conf.getEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE,</span>
<span class="source-line-no">396</span><span id="line-396"> SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT));</span>
<span class="source-line-no">397</span><span id="line-397"> }</span>
<span class="source-line-no">398</span><span id="line-398"></span>
<span class="source-line-no">399</span><span id="line-399"> return scan;</span>
<span class="source-line-no">400</span><span id="line-400"> }</span>
<span class="source-line-no">401</span><span id="line-401"></span>
<span class="source-line-no">402</span><span id="line-402"> public static List&lt;InputSplit&gt; getSplits(Scan scan, SnapshotManifest manifest,</span>
<span class="source-line-no">403</span><span id="line-403"> List&lt;RegionInfo&gt; regionManifests, Path restoreDir, Configuration conf) throws IOException {</span>
<span class="source-line-no">404</span><span id="line-404"> return getSplits(scan, manifest, regionManifests, restoreDir, conf, null, 1);</span>
<span class="source-line-no">405</span><span id="line-405"> }</span>
<span class="source-line-no">406</span><span id="line-406"></span>
<span class="source-line-no">407</span><span id="line-407"> public static List&lt;InputSplit&gt; getSplits(Scan scan, SnapshotManifest manifest,</span>
<span class="source-line-no">408</span><span id="line-408"> List&lt;RegionInfo&gt; regionManifests, Path restoreDir, Configuration conf,</span>
<span class="source-line-no">409</span><span id="line-409"> RegionSplitter.SplitAlgorithm sa, int numSplits) throws IOException {</span>
<span class="source-line-no">410</span><span id="line-410"> // load table descriptor</span>
<span class="source-line-no">411</span><span id="line-411"> TableDescriptor htd = manifest.getTableDescriptor();</span>
<span class="source-line-no">412</span><span id="line-412"></span>
<span class="source-line-no">413</span><span id="line-413"> Path tableDir = CommonFSUtils.getTableDir(restoreDir, htd.getTableName());</span>
<span class="source-line-no">414</span><span id="line-414"></span>
<span class="source-line-no">415</span><span id="line-415"> boolean localityEnabled = conf.getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY,</span>
<span class="source-line-no">416</span><span id="line-416"> SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT);</span>
<span class="source-line-no">417</span><span id="line-417"></span>
<span class="source-line-no">418</span><span id="line-418"> boolean scanMetricsEnabled = conf.getBoolean(SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED,</span>
<span class="source-line-no">419</span><span id="line-419"> SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED_DEFAULT);</span>
<span class="source-line-no">420</span><span id="line-420"> scan.setScanMetricsEnabled(scanMetricsEnabled);</span>
<span class="source-line-no">421</span><span id="line-421"></span>
<span class="source-line-no">422</span><span id="line-422"> boolean useRegionLoc = conf.getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION,</span>
<span class="source-line-no">423</span><span id="line-423"> SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT);</span>
<span class="source-line-no">424</span><span id="line-424"></span>
<span class="source-line-no">425</span><span id="line-425"> Connection connection = null;</span>
<span class="source-line-no">426</span><span id="line-426"> RegionLocator regionLocator = null;</span>
<span class="source-line-no">427</span><span id="line-427"> if (localityEnabled &amp;&amp; useRegionLoc) {</span>
<span class="source-line-no">428</span><span id="line-428"> Configuration newConf = new Configuration(conf);</span>
<span class="source-line-no">429</span><span id="line-429"> newConf.setInt("hbase.hconnection.threads.max", 1);</span>
<span class="source-line-no">430</span><span id="line-430"> try {</span>
<span class="source-line-no">431</span><span id="line-431"> connection = ConnectionFactory.createConnection(newConf);</span>
<span class="source-line-no">432</span><span id="line-432"> regionLocator = connection.getRegionLocator(htd.getTableName());</span>
<span class="source-line-no">433</span><span id="line-433"></span>
<span class="source-line-no">434</span><span id="line-434"> /* Get all locations for the table and cache it */</span>
<span class="source-line-no">435</span><span id="line-435"> regionLocator.getAllRegionLocations();</span>
<span class="source-line-no">436</span><span id="line-436"> } finally {</span>
<span class="source-line-no">437</span><span id="line-437"> if (connection != null) {</span>
<span class="source-line-no">438</span><span id="line-438"> connection.close();</span>
<span class="source-line-no">439</span><span id="line-439"> }</span>
<span class="source-line-no">440</span><span id="line-440"> }</span>
<span class="source-line-no">441</span><span id="line-441"> }</span>
<span class="source-line-no">442</span><span id="line-442"></span>
<span class="source-line-no">443</span><span id="line-443"> List&lt;InputSplit&gt; splits = new ArrayList&lt;&gt;();</span>
<span class="source-line-no">444</span><span id="line-444"> for (RegionInfo hri : regionManifests) {</span>
<span class="source-line-no">445</span><span id="line-445"> // load region descriptor</span>
<span class="source-line-no">446</span><span id="line-446"> List&lt;String&gt; hosts = null;</span>
<span class="source-line-no">447</span><span id="line-447"> if (localityEnabled) {</span>
<span class="source-line-no">448</span><span id="line-448"> if (regionLocator != null) {</span>
<span class="source-line-no">449</span><span id="line-449"> /* Get Location from the local cache */</span>
<span class="source-line-no">450</span><span id="line-450"> HRegionLocation location = regionLocator.getRegionLocation(hri.getStartKey(), false);</span>
<span class="source-line-no">451</span><span id="line-451"></span>
<span class="source-line-no">452</span><span id="line-452"> hosts = new ArrayList&lt;&gt;(1);</span>
<span class="source-line-no">453</span><span id="line-453"> hosts.add(location.getHostname());</span>
<span class="source-line-no">454</span><span id="line-454"> } else {</span>
<span class="source-line-no">455</span><span id="line-455"> hosts = calculateLocationsForInputSplit(conf, htd, hri, tableDir);</span>
<span class="source-line-no">456</span><span id="line-456"> }</span>
<span class="source-line-no">457</span><span id="line-457"> }</span>
<span class="source-line-no">458</span><span id="line-458"></span>
<span class="source-line-no">459</span><span id="line-459"> if (numSplits &gt; 1) {</span>
<span class="source-line-no">460</span><span id="line-460"> byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true);</span>
<span class="source-line-no">461</span><span id="line-461"> for (int i = 0; i &lt; sp.length - 1; i++) {</span>
<span class="source-line-no">462</span><span id="line-462"> if (</span>
<span class="source-line-no">463</span><span id="line-463"> PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i], sp[i + 1])</span>
<span class="source-line-no">464</span><span id="line-464"> ) {</span>
<span class="source-line-no">465</span><span id="line-465"></span>
<span class="source-line-no">466</span><span id="line-466"> Scan boundedScan = new Scan(scan);</span>
<span class="source-line-no">467</span><span id="line-467"> if (scan.getStartRow().length == 0) {</span>
<span class="source-line-no">468</span><span id="line-468"> boundedScan.withStartRow(sp[i]);</span>
<span class="source-line-no">469</span><span id="line-469"> } else {</span>
<span class="source-line-no">470</span><span id="line-470"> boundedScan.withStartRow(</span>
<span class="source-line-no">471</span><span id="line-471"> Bytes.compareTo(scan.getStartRow(), sp[i]) &gt; 0 ? scan.getStartRow() : sp[i]);</span>
<span class="source-line-no">472</span><span id="line-472"> }</span>
<span class="source-line-no">473</span><span id="line-473"></span>
<span class="source-line-no">474</span><span id="line-474"> if (scan.getStopRow().length == 0) {</span>
<span class="source-line-no">475</span><span id="line-475"> boundedScan.withStopRow(sp[i + 1]);</span>
<span class="source-line-no">476</span><span id="line-476"> } else {</span>
<span class="source-line-no">477</span><span id="line-477"> boundedScan.withStopRow(</span>
<span class="source-line-no">478</span><span id="line-478"> Bytes.compareTo(scan.getStopRow(), sp[i + 1]) &lt; 0 ? scan.getStopRow() : sp[i + 1]);</span>
<span class="source-line-no">479</span><span id="line-479"> }</span>
<span class="source-line-no">480</span><span id="line-480"></span>
<span class="source-line-no">481</span><span id="line-481"> splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir));</span>
<span class="source-line-no">482</span><span id="line-482"> }</span>
<span class="source-line-no">483</span><span id="line-483"> }</span>
<span class="source-line-no">484</span><span id="line-484"> } else {</span>
<span class="source-line-no">485</span><span id="line-485"> if (</span>
<span class="source-line-no">486</span><span id="line-486"> PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),</span>
<span class="source-line-no">487</span><span id="line-487"> hri.getEndKey())</span>
<span class="source-line-no">488</span><span id="line-488"> ) {</span>
<span class="source-line-no">489</span><span id="line-489"></span>
<span class="source-line-no">490</span><span id="line-490"> splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));</span>
<span class="source-line-no">491</span><span id="line-491"> }</span>
<span class="source-line-no">492</span><span id="line-492"> }</span>
<span class="source-line-no">493</span><span id="line-493"> }</span>
<span class="source-line-no">494</span><span id="line-494"></span>
<span class="source-line-no">495</span><span id="line-495"> return splits;</span>
<span class="source-line-no">496</span><span id="line-496"> }</span>
<span class="source-line-no">497</span><span id="line-497"></span>
<span class="source-line-no">498</span><span id="line-498"> /**</span>
<span class="source-line-no">499</span><span id="line-499"> * Compute block locations for snapshot files (which will get the locations for referred hfiles)</span>
<span class="source-line-no">500</span><span id="line-500"> * only when localityEnabled is true.</span>
<span class="source-line-no">501</span><span id="line-501"> */</span>
<span class="source-line-no">502</span><span id="line-502"> private static List&lt;String&gt; calculateLocationsForInputSplit(Configuration conf,</span>
<span class="source-line-no">503</span><span id="line-503"> TableDescriptor htd, RegionInfo hri, Path tableDir) throws IOException {</span>
<span class="source-line-no">504</span><span id="line-504"> return getBestLocations(conf, HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));</span>
<span class="source-line-no">505</span><span id="line-505"> }</span>
<span class="source-line-no">506</span><span id="line-506"></span>
<span class="source-line-no">507</span><span id="line-507"> /**</span>
<span class="source-line-no">508</span><span id="line-508"> * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take</span>
<span class="source-line-no">509</span><span id="line-509"> * weights into account, thus will treat every location passed from the input split as equal. We</span>
<span class="source-line-no">510</span><span id="line-510"> * do not want to blindly pass all the locations, since we are creating one split per region, and</span>
<span class="source-line-no">511</span><span id="line-511"> * the region's blocks are all distributed throughout the cluster unless favorite node assignment</span>
<span class="source-line-no">512</span><span id="line-512"> * is used. On the expected stable case, only one location will contain most of the blocks as</span>
<span class="source-line-no">513</span><span id="line-513"> * local. On the other hand, in favored node assignment, 3 nodes will contain highly local blocks.</span>
<span class="source-line-no">514</span><span id="line-514"> * Here we are doing a simple heuristic, where we will pass all hosts which have at least 80%</span>
<span class="source-line-no">515</span><span id="line-515"> * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top</span>
<span class="source-line-no">516</span><span id="line-516"> * host with the best locality. Return at most numTopsAtMost locations if there are more than</span>
<span class="source-line-no">517</span><span id="line-517"> * that.</span>
<span class="source-line-no">518</span><span id="line-518"> */</span>
<span class="source-line-no">519</span><span id="line-519"> private static List&lt;String&gt; getBestLocations(Configuration conf,</span>
<span class="source-line-no">520</span><span id="line-520"> HDFSBlocksDistribution blockDistribution, int numTopsAtMost) {</span>
<span class="source-line-no">521</span><span id="line-521"> HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();</span>
<span class="source-line-no">522</span><span id="line-522"></span>
<span class="source-line-no">523</span><span id="line-523"> if (hostAndWeights.length == 0) { // no matter what numTopsAtMost is</span>
<span class="source-line-no">524</span><span id="line-524"> return null;</span>
<span class="source-line-no">525</span><span id="line-525"> }</span>
<span class="source-line-no">526</span><span id="line-526"></span>
<span class="source-line-no">527</span><span id="line-527"> if (numTopsAtMost &lt; 1) { // invalid if numTopsAtMost &lt; 1, correct it to be 1</span>
<span class="source-line-no">528</span><span id="line-528"> numTopsAtMost = 1;</span>
<span class="source-line-no">529</span><span id="line-529"> }</span>
<span class="source-line-no">530</span><span id="line-530"> int top = Math.min(numTopsAtMost, hostAndWeights.length);</span>
<span class="source-line-no">531</span><span id="line-531"> List&lt;String&gt; locations = new ArrayList&lt;&gt;(top);</span>
<span class="source-line-no">532</span><span id="line-532"> HostAndWeight topHost = hostAndWeights[0];</span>
<span class="source-line-no">533</span><span id="line-533"> locations.add(topHost.getHost());</span>
<span class="source-line-no">534</span><span id="line-534"></span>
<span class="source-line-no">535</span><span id="line-535"> if (top == 1) { // only care about the top host</span>
<span class="source-line-no">536</span><span id="line-536"> return locations;</span>
<span class="source-line-no">537</span><span id="line-537"> }</span>
<span class="source-line-no">538</span><span id="line-538"></span>
<span class="source-line-no">539</span><span id="line-539"> // When top &gt;= 2,</span>
<span class="source-line-no">540</span><span id="line-540"> // do the heuristic: filter all hosts which have at least cutoffMultiplier % of block locality</span>
<span class="source-line-no">541</span><span id="line-541"> double cutoffMultiplier =</span>
<span class="source-line-no">542</span><span id="line-542"> conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);</span>
<span class="source-line-no">543</span><span id="line-543"></span>
<span class="source-line-no">544</span><span id="line-544"> double filterWeight = topHost.getWeight() * cutoffMultiplier;</span>
<span class="source-line-no">545</span><span id="line-545"></span>
<span class="source-line-no">546</span><span id="line-546"> for (int i = 1; i &lt;= top - 1; i++) {</span>
<span class="source-line-no">547</span><span id="line-547"> if (hostAndWeights[i].getWeight() &gt;= filterWeight) {</span>
<span class="source-line-no">548</span><span id="line-548"> locations.add(hostAndWeights[i].getHost());</span>
<span class="source-line-no">549</span><span id="line-549"> } else {</span>
<span class="source-line-no">550</span><span id="line-550"> // As hostAndWeights is in descending order,</span>
<span class="source-line-no">551</span><span id="line-551"> // we could break the loop as long as we meet a weight which is less than filterWeight.</span>
<span class="source-line-no">552</span><span id="line-552"> break;</span>
<span class="source-line-no">553</span><span id="line-553"> }</span>
<span class="source-line-no">554</span><span id="line-554"> }</span>
<span class="source-line-no">555</span><span id="line-555"></span>
<span class="source-line-no">556</span><span id="line-556"> return locations;</span>
<span class="source-line-no">557</span><span id="line-557"> }</span>
<span class="source-line-no">558</span><span id="line-558"></span>
<span class="source-line-no">559</span><span id="line-559"> public static List&lt;String&gt; getBestLocations(Configuration conf,</span>
<span class="source-line-no">560</span><span id="line-560"> HDFSBlocksDistribution blockDistribution) {</span>
<span class="source-line-no">561</span><span id="line-561"> // 3 nodes will contain highly local blocks. So default to 3.</span>
<span class="source-line-no">562</span><span id="line-562"> return getBestLocations(conf, blockDistribution, 3);</span>
<span class="source-line-no">563</span><span id="line-563"> }</span>
<span class="source-line-no">564</span><span id="line-564"></span>
<span class="source-line-no">565</span><span id="line-565"> private static String getSnapshotName(Configuration conf) {</span>
<span class="source-line-no">566</span><span id="line-566"> String snapshotName = conf.get(SNAPSHOT_NAME_KEY);</span>
<span class="source-line-no">567</span><span id="line-567"> if (snapshotName == null) {</span>
<span class="source-line-no">568</span><span id="line-568"> throw new IllegalArgumentException("Snapshot name must be provided");</span>
<span class="source-line-no">569</span><span id="line-569"> }</span>
<span class="source-line-no">570</span><span id="line-570"> return snapshotName;</span>
<span class="source-line-no">571</span><span id="line-571"> }</span>
<span class="source-line-no">572</span><span id="line-572"></span>
<span class="source-line-no">573</span><span id="line-573"> /**</span>
<span class="source-line-no">574</span><span id="line-574"> * Configures the job to use TableSnapshotInputFormat to read from a snapshot.</span>
<span class="source-line-no">575</span><span id="line-575"> * @param conf the job to configuration</span>
<span class="source-line-no">576</span><span id="line-576"> * @param snapshotName the name of the snapshot to read from</span>
<span class="source-line-no">577</span><span id="line-577"> * @param restoreDir a temporary directory to restore the snapshot into. Current user should</span>
<span class="source-line-no">578</span><span id="line-578"> * have write permissions to this directory, and this should not be a</span>
<span class="source-line-no">579</span><span id="line-579"> * subdirectory of rootdir. After the job is finished, restoreDir can be</span>
<span class="source-line-no">580</span><span id="line-580"> * deleted.</span>
<span class="source-line-no">581</span><span id="line-581"> * @throws IOException if an error occurs</span>
<span class="source-line-no">582</span><span id="line-582"> */</span>
<span class="source-line-no">583</span><span id="line-583"> public static void setInput(Configuration conf, String snapshotName, Path restoreDir)</span>
<span class="source-line-no">584</span><span id="line-584"> throws IOException {</span>
<span class="source-line-no">585</span><span id="line-585"> setInput(conf, snapshotName, restoreDir, null, 1);</span>
<span class="source-line-no">586</span><span id="line-586"> }</span>
<span class="source-line-no">587</span><span id="line-587"></span>
<span class="source-line-no">588</span><span id="line-588"> /**</span>
<span class="source-line-no">589</span><span id="line-589"> * Configures the job to use TableSnapshotInputFormat to read from a snapshot.</span>
<span class="source-line-no">590</span><span id="line-590"> * @param conf the job to configure</span>
<span class="source-line-no">591</span><span id="line-591"> * @param snapshotName the name of the snapshot to read from</span>
<span class="source-line-no">592</span><span id="line-592"> * @param restoreDir a temporary directory to restore the snapshot into. Current user</span>
<span class="source-line-no">593</span><span id="line-593"> * should have write permissions to this directory, and this should not</span>
<span class="source-line-no">594</span><span id="line-594"> * be a subdirectory of rootdir. After the job is finished, restoreDir</span>
<span class="source-line-no">595</span><span id="line-595"> * can be deleted.</span>
<span class="source-line-no">596</span><span id="line-596"> * @param numSplitsPerRegion how many input splits to generate per one region</span>
<span class="source-line-no">597</span><span id="line-597"> * @param splitAlgo SplitAlgorithm to be used when generating InputSplits</span>
<span class="source-line-no">598</span><span id="line-598"> * @throws IOException if an error occurs</span>
<span class="source-line-no">599</span><span id="line-599"> */</span>
<span class="source-line-no">600</span><span id="line-600"> public static void setInput(Configuration conf, String snapshotName, Path restoreDir,</span>
<span class="source-line-no">601</span><span id="line-601"> RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) throws IOException {</span>
<span class="source-line-no">602</span><span id="line-602"> conf.set(SNAPSHOT_NAME_KEY, snapshotName);</span>
<span class="source-line-no">603</span><span id="line-603"> if (numSplitsPerRegion &lt; 1) {</span>
<span class="source-line-no">604</span><span id="line-604"> throw new IllegalArgumentException(</span>
<span class="source-line-no">605</span><span id="line-605"> "numSplits must be &gt;= 1, " + "illegal numSplits : " + numSplitsPerRegion);</span>
<span class="source-line-no">606</span><span id="line-606"> }</span>
<span class="source-line-no">607</span><span id="line-607"> if (splitAlgo == null &amp;&amp; numSplitsPerRegion &gt; 1) {</span>
<span class="source-line-no">608</span><span id="line-608"> throw new IllegalArgumentException("Split algo can't be null when numSplits &gt; 1");</span>
<span class="source-line-no">609</span><span id="line-609"> }</span>
<span class="source-line-no">610</span><span id="line-610"> if (splitAlgo != null) {</span>
<span class="source-line-no">611</span><span id="line-611"> conf.set(SPLIT_ALGO, splitAlgo.getClass().getName());</span>
<span class="source-line-no">612</span><span id="line-612"> }</span>
<span class="source-line-no">613</span><span id="line-613"> conf.setInt(NUM_SPLITS_PER_REGION, numSplitsPerRegion);</span>
<span class="source-line-no">614</span><span id="line-614"> Path rootDir = CommonFSUtils.getRootDir(conf);</span>
<span class="source-line-no">615</span><span id="line-615"> FileSystem fs = rootDir.getFileSystem(conf);</span>
<span class="source-line-no">616</span><span id="line-616"></span>
<span class="source-line-no">617</span><span id="line-617"> restoreDir = new Path(restoreDir, UUID.randomUUID().toString());</span>
<span class="source-line-no">618</span><span id="line-618"></span>
<span class="source-line-no">619</span><span id="line-619"> RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);</span>
<span class="source-line-no">620</span><span id="line-620"> conf.set(RESTORE_DIR_KEY, restoreDir.toString());</span>
<span class="source-line-no">621</span><span id="line-621"> }</span>
<span class="source-line-no">622</span><span id="line-622"></span>
<span class="source-line-no">623</span><span id="line-623"> /**</span>
<span class="source-line-no">624</span><span id="line-624"> * clean restore directory after snapshot scan job</span>
<span class="source-line-no">625</span><span id="line-625"> * @param job the snapshot scan job</span>
<span class="source-line-no">626</span><span id="line-626"> * @param snapshotName the name of the snapshot to read from</span>
<span class="source-line-no">627</span><span id="line-627"> * @throws IOException if an error occurs</span>
<span class="source-line-no">628</span><span id="line-628"> */</span>
<span class="source-line-no">629</span><span id="line-629"> public static void cleanRestoreDir(Job job, String snapshotName) throws IOException {</span>
<span class="source-line-no">630</span><span id="line-630"> Configuration conf = job.getConfiguration();</span>
<span class="source-line-no">631</span><span id="line-631"> Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));</span>
<span class="source-line-no">632</span><span id="line-632"> FileSystem fs = restoreDir.getFileSystem(conf);</span>
<span class="source-line-no">633</span><span id="line-633"> if (!fs.exists(restoreDir)) {</span>
<span class="source-line-no">634</span><span id="line-634"> LOG.warn("{} doesn't exist on file system, maybe it's already been cleaned", restoreDir);</span>
<span class="source-line-no">635</span><span id="line-635"> return;</span>
<span class="source-line-no">636</span><span id="line-636"> }</span>
<span class="source-line-no">637</span><span id="line-637"> if (!fs.delete(restoreDir, true)) {</span>
<span class="source-line-no">638</span><span id="line-638"> LOG.warn("Failed clean restore dir {} for snapshot {}", restoreDir, snapshotName);</span>
<span class="source-line-no">639</span><span id="line-639"> }</span>
<span class="source-line-no">640</span><span id="line-640"> LOG.debug("Clean restore directory {} for {}", restoreDir, snapshotName);</span>
<span class="source-line-no">641</span><span id="line-641"> }</span>
<span class="source-line-no">642</span><span id="line-642">}</span>
</pre>
</div>
</main>
</body>
</html>