blob: 15d9272f624862ca9a70279232895c795c4e7e31 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.blur.manager.writer;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.blur.log.Log;
import org.apache.blur.log.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDeletionPolicy;
public class SnapshotIndexDeletionPolicy extends IndexDeletionPolicy {
private static final Log LOG = LogFactory.getLog(SnapshotIndexDeletionPolicy.class);
private final Configuration _configuration;
private final Path _path;
private final Map<String, Long> _namesToGenerations = new ConcurrentHashMap<String, Long>();
private final Map<Long, Set<String>> _generationsToNames = new ConcurrentHashMap<Long, Set<String>>();
private final WriteLock _writeLock = new ReentrantReadWriteLock().writeLock();
public SnapshotIndexDeletionPolicy(Configuration configuration, Path path) throws IOException {
_configuration = configuration;
_path = path;
FileSystem fileSystem = _path.getFileSystem(configuration);
fileSystem.mkdirs(path);
loadGenerations();
}
@Override
public void onInit(List<? extends IndexCommit> commits) throws IOException {
onCommit(commits);
}
@Override
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
_writeLock.lock();
try {
int size = commits.size();
for (int i = 0; i < size - 1; i++) {
IndexCommit indexCommit = commits.get(i);
long generation = indexCommit.getGeneration();
if (!_generationsToNames.containsKey(generation)) {
indexCommit.delete();
}
}
} finally {
_writeLock.unlock();
}
}
private synchronized void storeGenerations() throws IOException {
FileSystem fileSystem = _path.getFileSystem(_configuration);
FileStatus[] listStatus = fileSystem.listStatus(_path);
SortedSet<FileStatus> existing = new TreeSet<FileStatus>(Arrays.asList(listStatus));
long currentFile;
if (!existing.isEmpty()) {
FileStatus last = existing.last();
currentFile = Long.parseLong(last.getPath().getName());
} else {
currentFile = 0;
}
Path path = new Path(_path, buffer(currentFile + 1));
LOG.info("Creating new snapshot file [{0}]", path);
FSDataOutputStream outputStream = fileSystem.create(path, false);
Writer writer = SequenceFile.createWriter(_configuration, outputStream, Text.class, LongWritable.class,
CompressionType.NONE, null);
for (Entry<String, Long> e : _namesToGenerations.entrySet()) {
writer.append(new Text(e.getKey()), new LongWritable(e.getValue()));
}
writer.close();
outputStream.close();
cleanupOldFiles(fileSystem, existing);
}
private void cleanupOldFiles(FileSystem fileSystem, SortedSet<FileStatus> existing) throws IOException {
for (FileStatus fileStatus : existing) {
fileSystem.delete(fileStatus.getPath(), false);
}
}
private String buffer(long number) {
String s = Long.toString(number);
StringBuilder builder = new StringBuilder();
for (int i = s.length(); i < 12; i++) {
builder.append('0');
}
return builder.append(s).toString();
}
private void loadGenerations() throws IOException {
FileSystem fileSystem = _path.getFileSystem(_configuration);
FileStatus[] listStatus = fileSystem.listStatus(_path);
SortedSet<FileStatus> existing = new TreeSet<FileStatus>(Arrays.asList(listStatus));
if (existing.isEmpty()) {
return;
}
FileStatus last = existing.last();
Reader reader = new SequenceFile.Reader(fileSystem, last.getPath(), _configuration);
Text key = new Text();
LongWritable value = new LongWritable();
while (reader.next(key, value)) {
String name = key.toString();
long gen = value.get();
_namesToGenerations.put(name, gen);
Set<String> names = _generationsToNames.get(gen);
if (names == null) {
names = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
_generationsToNames.put(gen, names);
}
names.add(name);
}
reader.close();
existing.remove(last);
cleanupOldFiles(fileSystem, existing);
}
public void createSnapshot(String name, DirectoryReader reader, String context) throws IOException {
_writeLock.lock();
try {
if (_namesToGenerations.containsKey(name)) {
throw new IOException("Snapshot [" + name + "] already exists.");
}
LOG.info("Creating snapshot [{0}] in [{1}].", name, context);
IndexCommit indexCommit = reader.getIndexCommit();
long generation = indexCommit.getGeneration();
_namesToGenerations.put(name, generation);
Set<String> names = _generationsToNames.get(generation);
if (names == null) {
names = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
_generationsToNames.put(generation, names);
}
names.add(name);
storeGenerations();
} finally {
_writeLock.unlock();
}
}
public void removeSnapshot(String name, String context) throws IOException {
_writeLock.lock();
try {
Long gen = _namesToGenerations.get(name);
if (gen == null) {
LOG.info("Snapshot [{0}] does not exist in [{1}].", name, context);
return;
}
LOG.info("Removing snapshot [{0}] from [{1}].", name, context);
_namesToGenerations.remove(name);
Set<String> names = _generationsToNames.get(gen);
names.remove(name);
if (names.isEmpty()) {
_generationsToNames.remove(gen);
}
storeGenerations();
} finally {
_writeLock.unlock();
}
}
public Collection<String> getSnapshots() {
return new HashSet<String>(_namesToGenerations.keySet());
}
public Path getSnapshotsDirectoryPath() {
return _path;
}
public Long getGeneration(String name) {
return _namesToGenerations.get(name);
}
public static Path getGenerationsPath(Path shardDir) {
return new Path(shardDir, "generations");
}
}