| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.accumulo.tserver.log; |
| |
| import java.io.EOFException; |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.UUID; |
| |
| import org.apache.accumulo.core.conf.AccumuloConfiguration; |
| import org.apache.accumulo.core.conf.Property; |
| import org.apache.accumulo.core.conf.SiteConfiguration; |
| import org.apache.accumulo.server.ServerConstants; |
| import org.apache.accumulo.server.fs.VolumeManagerImpl; |
| import org.apache.accumulo.server.security.SecurityUtil; |
| import org.apache.accumulo.tserver.logger.LogFileKey; |
| import org.apache.accumulo.tserver.logger.LogFileValue; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.SequenceFile; |
| import org.apache.hadoop.io.SequenceFile.Reader; |
| import org.apache.hadoop.io.WritableName; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.beust.jcommander.JCommander; |
| import com.beust.jcommander.Parameter; |
| import com.beust.jcommander.ParameterException; |
| import com.google.common.annotations.VisibleForTesting; |
| |
| /** |
| * This class will attempt to rewrite any local WALs to HDFS. |
| */ |
| public class LocalWALRecovery implements Runnable { |
| private static final Logger log = LoggerFactory.getLogger(LocalWALRecovery.class); |
| |
| static { |
| WritableName.addName(LogFileKey.class, "org.apache.accumulo.server.logger.LogFileKey"); |
| WritableName.addName(LogFileValue.class, "org.apache.accumulo.server.logger.LogFileValue"); |
| } |
| |
| public static void main(String[] args) throws IOException { |
| AccumuloConfiguration configuration = SiteConfiguration.getInstance(SiteConfiguration.getDefaultConfiguration()); |
| |
| LocalWALRecovery main = new LocalWALRecovery(configuration); |
| main.parseArgs(args); |
| main.run(); |
| } |
| |
| public final class Options { |
| @Parameter(names = "--delete-local", description = "Specify whether to delete the local WAL files after they have been re-written in HDFS.") |
| public boolean deleteLocal = false; |
| |
| @Parameter(names = "--local-wal-directories", |
| description = "Comma separated list of local directories containing WALs, default is set according to the logger.dir.walog property.") |
| public List<String> directories = getDefaultDirectories(); |
| |
| @Parameter(names = "--dfs-wal-directory", |
| description = "The directory that WALs will be copied into. Will default to the first configured base dir + '/wal'") |
| public String destination = null; |
| |
| private List<String> getDefaultDirectories() { |
| String property = configuration.get(Property.LOGGER_DIR); |
| return Arrays.asList(property.split(",")); |
| } |
| } |
| |
| private final AccumuloConfiguration configuration; |
| private final Options options; |
| |
| /** |
| * Create a WAL recovery tool for the given instance. |
| */ |
| public LocalWALRecovery(AccumuloConfiguration configuration) { |
| this.configuration = configuration; |
| this.options = new Options(); |
| } |
| |
| @VisibleForTesting |
| public void parseArgs(String... args) { |
| JCommander jcommander = new JCommander(); |
| jcommander.addObject(options); |
| |
| try { |
| jcommander.parse(args); |
| } catch (ParameterException e) { |
| jcommander.usage(); |
| } |
| } |
| |
| @Override |
| public void run() { |
| SecurityUtil.serverLogin(SiteConfiguration.getInstance()); |
| |
| try { |
| recoverLocalWriteAheadLogs(VolumeManagerImpl.get().getDefaultVolume().getFileSystem()); |
| } catch (IOException e) { |
| log.error("Error while recovering WAL files.", e); |
| } |
| } |
| |
| public void recoverLocalWriteAheadLogs(FileSystem fs) throws IOException { |
| for (String directory : options.directories) { |
| File localDirectory = new File(directory); |
| if (!localDirectory.isAbsolute()) { |
| localDirectory = new File(System.getenv("ACCUMULO_HOME"), directory); |
| } |
| |
| if (!localDirectory.isDirectory()) { |
| log.warn("Local walog dir " + localDirectory.getAbsolutePath() + " does not exist or is not a directory."); |
| continue; |
| } |
| |
| if (options.destination == null) { |
| // Defer loading the default value until now because it might require talking to zookeeper. |
| options.destination = ServerConstants.getWalDirs()[0]; |
| } |
| log.info("Copying WALs to " + options.destination); |
| |
| File[] files = localDirectory.listFiles(); |
| if (files != null) { |
| for (File file : files) { |
| String name = file.getName(); |
| try { |
| UUID.fromString(name); |
| } catch (IllegalArgumentException ex) { |
| log.info("Ignoring non-log file " + file.getAbsolutePath()); |
| continue; |
| } |
| |
| LogFileKey key = new LogFileKey(); |
| LogFileValue value = new LogFileValue(); |
| |
| log.info("Openning local log " + file.getAbsolutePath()); |
| |
| Path localWal = new Path(file.toURI()); |
| FileSystem localFs = FileSystem.getLocal(fs.getConf()); |
| |
| Reader reader = new SequenceFile.Reader(localFs.getConf(), SequenceFile.Reader.file(localWal.makeQualified(localWal.toUri(), |
| localFs.getWorkingDirectory()))); |
| Path tmp = new Path(options.destination + "/" + name + ".copy"); |
| FSDataOutputStream writer = fs.create(tmp); |
| while (reader.next(key, value)) { |
| try { |
| key.write(writer); |
| value.write(writer); |
| } catch (EOFException ex) { |
| break; |
| } |
| } |
| writer.close(); |
| reader.close(); |
| fs.rename(tmp, new Path(tmp.getParent(), name)); |
| |
| if (options.deleteLocal) { |
| if (file.delete()) { |
| log.info("Copied and deleted: " + name); |
| } else { |
| log.info("Failed to delete: " + name + " (but it is safe for you to delete it manually)."); |
| } |
| } else { |
| log.info("Safe to delete: " + name); |
| } |
| } |
| } |
| } |
| } |
| |
| } |