| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.io; |
| |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.SequenceFile.CompressionType; |
| import org.apache.hadoop.io.compress.CompressionCodec; |
| import org.apache.hadoop.io.serial.Serialization; |
| import org.apache.hadoop.util.Progressable; |
| import org.apache.hadoop.util.bloom.DynamicBloomFilter; |
| import org.apache.hadoop.util.bloom.Filter; |
| import org.apache.hadoop.util.bloom.Key; |
| import org.apache.hadoop.util.hash.Hash; |
| |
| /** |
| * This class extends {@link MapFile} and provides very much the same |
| * functionality. However, it uses dynamic Bloom filters to provide |
| * quick membership test for keys, and it offers a fast version of |
| * {@link Reader#get(Object, Object)} operation, especially in |
| * case of sparsely populated MapFile-s. |
| */ |
| @InterfaceAudience.Public |
| @InterfaceStability.Stable |
| public class BloomMapFile { |
| private static final Log LOG = LogFactory.getLog(BloomMapFile.class); |
| public static final String BLOOM_FILE_NAME = "bloom"; |
| public static final int HASH_COUNT = 5; |
| |
| public static void delete(FileSystem fs, String name) throws IOException { |
| Path dir = new Path(name); |
| Path data = new Path(dir, MapFile.DATA_FILE_NAME); |
| Path index = new Path(dir, MapFile.INDEX_FILE_NAME); |
| Path bloom = new Path(dir, BLOOM_FILE_NAME); |
| |
| fs.delete(data, true); |
| fs.delete(index, true); |
| fs.delete(bloom, true); |
| fs.delete(dir, true); |
| } |
| |
| private static byte[] byteArrayForBloomKey(DataOutputBuffer buf) { |
| int cleanLength = buf.getLength(); |
| byte [] ba = buf.getData(); |
| if (cleanLength != ba.length) { |
| ba = new byte[cleanLength]; |
| System.arraycopy(buf.getData(), 0, ba, 0, cleanLength); |
| } |
| return ba; |
| } |
| |
| public static class Writer extends MapFile.Writer { |
| private DynamicBloomFilter bloomFilter; |
| private int numKeys; |
| private int vectorSize; |
| private Key bloomKey = new Key(); |
| private DataOutputBuffer buf = new DataOutputBuffer(); |
| private FileSystem fs; |
| private Path dir; |
| private final Serialization<Object> keySerialization; |
| |
| @SuppressWarnings("unchecked") |
| @Deprecated |
| public Writer(Configuration conf, FileSystem fs, String dirName, |
| Class<? extends WritableComparable> keyClass, |
| Class<? extends Writable> valClass, CompressionType compress, |
| CompressionCodec codec, Progressable progress) throws IOException { |
| this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), |
| compression(compress, codec), progressable(progress)); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Deprecated |
| public Writer(Configuration conf, FileSystem fs, String dirName, |
| Class<? extends WritableComparable> keyClass, |
| Class valClass, CompressionType compress, |
| Progressable progress) throws IOException { |
| this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), |
| compression(compress), progressable(progress)); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Deprecated |
| public Writer(Configuration conf, FileSystem fs, String dirName, |
| Class<? extends WritableComparable> keyClass, |
| Class valClass, CompressionType compress) |
| throws IOException { |
| this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), |
| compression(compress)); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Deprecated |
| public Writer(Configuration conf, FileSystem fs, String dirName, |
| WritableComparator comparator, Class valClass, |
| CompressionType compress, CompressionCodec codec, Progressable progress) |
| throws IOException { |
| this(conf, new Path(dirName), comparator(comparator), |
| valueClass(valClass), compression(compress, codec), |
| progressable(progress)); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Deprecated |
| public Writer(Configuration conf, FileSystem fs, String dirName, |
| WritableComparator comparator, Class valClass, |
| CompressionType compress, Progressable progress) throws IOException { |
| this(conf, new Path(dirName), comparator(comparator), |
| valueClass(valClass), compression(compress), |
| progressable(progress)); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Deprecated |
| public Writer(Configuration conf, FileSystem fs, String dirName, |
| WritableComparator comparator, Class valClass, CompressionType compress) |
| throws IOException { |
| this(conf, new Path(dirName), comparator(comparator), |
| valueClass(valClass), compression(compress)); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Deprecated |
| public Writer(Configuration conf, FileSystem fs, String dirName, |
| WritableComparator comparator, Class valClass) throws IOException { |
| this(conf, new Path(dirName), comparator(comparator), |
| valueClass(valClass)); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Deprecated |
| public Writer(Configuration conf, FileSystem fs, String dirName, |
| Class<? extends WritableComparable> keyClass, |
| Class valClass) throws IOException { |
| this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass)); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public Writer(Configuration conf, Path dir, |
| SequenceFile.Writer.Option... options) throws IOException { |
| super(conf, dir, options); |
| this.fs = dir.getFileSystem(conf); |
| this.dir = dir; |
| initBloomFilter(conf); |
| keySerialization = (Serialization<Object>) getKeySerialization(); |
| } |
| |
| private synchronized void initBloomFilter(Configuration conf) { |
| numKeys = conf.getInt("io.mapfile.bloom.size", 1024 * 1024); |
| // vector size should be <code>-kn / (ln(1 - c^(1/k)))</code> bits for |
| // single key, where <code> is the number of hash functions, |
| // <code>n</code> is the number of keys and <code>c</code> is the desired |
| // max. error rate. |
| // Our desired error rate is by default 0.005, i.e. 0.5% |
| float errorRate = conf.getFloat("io.mapfile.bloom.error.rate", 0.005f); |
| vectorSize = (int)Math.ceil((double)(-HASH_COUNT * numKeys) / |
| Math.log(1.0 - Math.pow(errorRate, 1.0/HASH_COUNT))); |
| bloomFilter = new DynamicBloomFilter(vectorSize, HASH_COUNT, |
| Hash.getHashType(conf), numKeys); |
| } |
| |
| @Override |
| public synchronized void append(Object key, Object val) throws IOException { |
| super.append(key, val); |
| buf.reset(); |
| keySerialization.serialize(buf, key); |
| bloomKey.set(byteArrayForBloomKey(buf), 1.0); |
| bloomFilter.add(bloomKey); |
| } |
| |
| @Override |
| public synchronized void close() throws IOException { |
| super.close(); |
| DataOutputStream out = fs.create(new Path(dir, BLOOM_FILE_NAME), true); |
| bloomFilter.write(out); |
| out.flush(); |
| out.close(); |
| } |
| |
| } |
| |
| public static class Reader extends MapFile.Reader { |
| private DynamicBloomFilter bloomFilter; |
| private DataOutputBuffer buf = new DataOutputBuffer(); |
| private Key bloomKey = new Key(); |
| private final Serialization<Object> keySerialization; |
| |
| @SuppressWarnings("unchecked") |
| public Reader(Path dir, Configuration conf, |
| SequenceFile.Reader.Option... options) throws IOException { |
| super(dir, conf, options); |
| initBloomFilter(dir, conf); |
| keySerialization = (Serialization<Object>) getKeySerialization(); |
| } |
| |
| @Deprecated |
| public Reader(FileSystem fs, String dirName, Configuration conf) |
| throws IOException { |
| this(new Path(dirName), conf); |
| } |
| |
| @Deprecated |
| public Reader(FileSystem fs, String dirName, WritableComparator comparator, |
| Configuration conf, boolean open) throws IOException { |
| this(new Path(dirName), conf, comparator(comparator)); |
| } |
| |
| @Deprecated |
| public Reader(FileSystem fs, String dirName, WritableComparator comparator, |
| Configuration conf) throws IOException { |
| this(new Path(dirName), conf, comparator(comparator)); |
| } |
| |
| private void initBloomFilter(Path dirName, |
| Configuration conf) { |
| try { |
| FileSystem fs = dirName.getFileSystem(conf); |
| DataInputStream in = fs.open(new Path(dirName, BLOOM_FILE_NAME)); |
| bloomFilter = new DynamicBloomFilter(); |
| bloomFilter.readFields(in); |
| in.close(); |
| } catch (IOException ioe) { |
| LOG.warn("Can't open BloomFilter: " + ioe + " - fallback to MapFile."); |
| bloomFilter = null; |
| } |
| } |
| |
| /** |
| * Checks if this MapFile has the indicated key. The membership test is |
| * performed using a Bloom filter, so the result has always non-zero |
| * probability of false positives. |
| * @param key key to check |
| * @return false iff key doesn't exist, true if key probably exists. |
| * @throws IOException |
| */ |
| public boolean probablyHasKey(Object key) throws IOException { |
| if (bloomFilter == null) { |
| return true; |
| } |
| buf.reset(); |
| keySerialization.serialize(buf, key); |
| bloomKey.set(byteArrayForBloomKey(buf), 1.0); |
| return bloomFilter.membershipTest(bloomKey); |
| } |
| |
| /** |
| * Fast version of the |
| * {@link MapFile.Reader#get(Object, Object)} method. First |
| * it checks the Bloom filter for the existence of the key, and only if |
| * present it performs the real get operation. This yields significant |
| * performance improvements for get operations on sparsely populated files. |
| */ |
| @SuppressWarnings("unchecked") |
| @Deprecated |
| @Override |
| public synchronized Writable get(WritableComparable key, |
| Writable value) throws IOException { |
| return (Writable) get((Object) key, (Object) value); |
| } |
| |
| /** |
| * Fast version of the |
| * {@link MapFile.Reader#get(Object, Object)} method. First |
| * it checks the Bloom filter for the existence of the key, and only if |
| * present it performs the real get operation. This yields significant |
| * performance improvements for get operations on sparsely populated files. |
| */ |
| @Override |
| public synchronized Object get(Object key, Object val) throws IOException { |
| if (!probablyHasKey(key)) { |
| return null; |
| } |
| return super.get(key, val); |
| } |
| |
| /** |
| * Retrieve the Bloom filter used by this instance of the Reader. |
| * @return a Bloom filter (see {@link Filter}) |
| */ |
| public Filter getBloomFilter() { |
| return bloomFilter; |
| } |
| } |
| } |