blob: 752cbce10bb378836de740b40d9cb0a31d06dfe4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.recon;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Timestamp;
import java.util.zip.GZIPOutputStream;
import com.google.inject.Singleton;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import static org.apache.hadoop.hdds.server.ServerUtils.getDirectoryFromConfig;
import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DB_DIR;
import static org.jooq.impl.DSL.currentTimestamp;
import static org.jooq.impl.DSL.select;
import static org.jooq.impl.DSL.using;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
import org.hadoop.ozone.recon.schema.tables.pojos.GlobalStats;
import org.jooq.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Recon Utility class.
*/
@Singleton
public class ReconUtils {
private final static int WRITE_BUFFER = 1048576; //1MB
public ReconUtils() {
}
private static final Logger LOG = LoggerFactory.getLogger(
ReconUtils.class);
public static File getReconScmDbDir(ConfigurationSource conf) {
return new ReconUtils().getReconDbDir(conf, OZONE_RECON_SCM_DB_DIR);
}
/**
* Get configured Recon DB directory value based on config. If not present,
* fallback to ozone.metadata.dirs
*
* @param conf configuration bag
* @param dirConfigKey key to check
* @return Return File based on configured or fallback value.
*/
public File getReconDbDir(ConfigurationSource conf, String dirConfigKey) {
File metadataDir = getDirectoryFromConfig(conf, dirConfigKey,
"Recon");
if (metadataDir != null) {
return metadataDir;
}
LOG.warn("{} is not configured. We recommend adding this setting. " +
"Falling back to {} instead.",
dirConfigKey, HddsConfigKeys.OZONE_METADATA_DIRS);
return getOzoneMetaDirPath(conf);
}
/**
* Given a source directory, create a tar.gz file from it.
*
* @param sourcePath the path to the directory to be archived.
* @return tar.gz file
* @throws IOException
*/
public static File createTarFile(Path sourcePath) throws IOException {
TarArchiveOutputStream tarOs = null;
FileOutputStream fileOutputStream = null;
GZIPOutputStream gzipOutputStream = null;
try {
String sourceDir = sourcePath.toString();
String fileName = sourceDir.concat(".tar.gz");
fileOutputStream = new FileOutputStream(fileName);
gzipOutputStream =
new GZIPOutputStream(new BufferedOutputStream(fileOutputStream));
tarOs = new TarArchiveOutputStream(gzipOutputStream);
File folder = new File(sourceDir);
File[] filesInDir = folder.listFiles();
if (filesInDir != null) {
for (File file : filesInDir) {
addFilesToArchive(file.getName(), file, tarOs);
}
}
return new File(fileName);
} finally {
try {
org.apache.hadoop.io.IOUtils.closeStream(tarOs);
org.apache.hadoop.io.IOUtils.closeStream(fileOutputStream);
org.apache.hadoop.io.IOUtils.closeStream(gzipOutputStream);
} catch (Exception e) {
LOG.error("Exception encountered when closing " +
"TAR file output stream: " + e);
}
}
}
private static void addFilesToArchive(String source, File file,
TarArchiveOutputStream
tarFileOutputStream)
throws IOException {
tarFileOutputStream.putArchiveEntry(new TarArchiveEntry(file, source));
if (file.isFile()) {
try (FileInputStream fileInputStream = new FileInputStream(file)) {
BufferedInputStream bufferedInputStream =
new BufferedInputStream(fileInputStream);
org.apache.commons.compress.utils.IOUtils.copy(bufferedInputStream,
tarFileOutputStream);
tarFileOutputStream.closeArchiveEntry();
}
} else if (file.isDirectory()) {
tarFileOutputStream.closeArchiveEntry();
File[] filesInDir = file.listFiles();
if (filesInDir != null) {
for (File cFile : filesInDir) {
addFilesToArchive(cFile.getAbsolutePath(), cFile,
tarFileOutputStream);
}
}
}
}
/**
* Untar DB snapshot tar file to recon OM snapshot directory.
*
* @param tarFile source tar file
* @param destPath destination path to untar to.
* @throws IOException ioException
*/
public void untarCheckpointFile(File tarFile, Path destPath)
throws IOException {
FileInputStream fileInputStream = null;
BufferedInputStream buffIn = null;
GzipCompressorInputStream gzIn = null;
try {
fileInputStream = new FileInputStream(tarFile);
buffIn = new BufferedInputStream(fileInputStream);
gzIn = new GzipCompressorInputStream(buffIn);
//Create Destination directory if it does not exist.
if (!destPath.toFile().exists()) {
boolean success = destPath.toFile().mkdirs();
if (!success) {
throw new IOException("Unable to create Destination directory.");
}
}
try (TarArchiveInputStream tarInStream =
new TarArchiveInputStream(gzIn)) {
TarArchiveEntry entry;
while ((entry = (TarArchiveEntry) tarInStream.getNextEntry()) != null) {
Path path = Paths.get(destPath.toString(), entry.getName());
HddsUtils.validatePath(path, destPath);
File f = path.toFile();
//If directory, create a directory.
if (entry.isDirectory()) {
boolean success = f.mkdirs();
if (!success) {
LOG.error("Unable to create directory found in tar.");
}
} else {
//Write contents of file in archive to a new file.
int count;
byte[] data = new byte[WRITE_BUFFER];
FileOutputStream fos = new FileOutputStream(f);
try (BufferedOutputStream dest =
new BufferedOutputStream(fos, WRITE_BUFFER)) {
while ((count =
tarInStream.read(data, 0, WRITE_BUFFER)) != -1) {
dest.write(data, 0, count);
}
}
}
}
}
} finally {
IOUtils.closeStream(gzIn);
IOUtils.closeStream(buffIn);
IOUtils.closeStream(fileInputStream);
}
}
/**
* Make HTTP GET call on the URL and return HttpURLConnection instance.
* @param connectionFactory URLConnectionFactory to use.
* @param url url to call
* @param isSpnego is SPNEGO enabled
* @return HttpURLConnection instance of the HTTP call.
* @throws IOException, AuthenticationException While reading the response.
*/
public HttpURLConnection makeHttpCall(URLConnectionFactory connectionFactory,
String url, boolean isSpnego)
throws IOException, AuthenticationException {
HttpURLConnection urlConnection = (HttpURLConnection)
connectionFactory.openConnection(new URL(url), isSpnego);
urlConnection.connect();
return urlConnection;
}
/**
* Load last known DB in Recon.
* @param reconDbDir
* @param fileNamePrefix
* @return
*/
public File getLastKnownDB(File reconDbDir, String fileNamePrefix) {
String lastKnownSnapshotFileName = null;
long lastKnonwnSnapshotTs = Long.MIN_VALUE;
if (reconDbDir != null) {
File[] snapshotFiles = reconDbDir.listFiles((dir, name) ->
name.startsWith(fileNamePrefix));
if (snapshotFiles != null) {
for (File snapshotFile : snapshotFiles) {
String fileName = snapshotFile.getName();
try {
String[] fileNameSplits = fileName.split("_");
if (fileNameSplits.length <= 1) {
continue;
}
long snapshotTimestamp = Long.parseLong(fileNameSplits[1]);
if (lastKnonwnSnapshotTs < snapshotTimestamp) {
lastKnonwnSnapshotTs = snapshotTimestamp;
lastKnownSnapshotFileName = fileName;
}
} catch (NumberFormatException nfEx) {
LOG.warn("Unknown file found in Recon DB dir : {}", fileName);
}
}
}
}
return lastKnownSnapshotFileName == null ? null :
new File(reconDbDir.getPath(), lastKnownSnapshotFileName);
}
/**
* Upsert row in GlobalStats table.
*
* @param sqlConfiguration
* @param globalStatsDao
* @param key
* @param count
*/
public static void upsertGlobalStatsTable(Configuration sqlConfiguration,
GlobalStatsDao globalStatsDao,
String key,
Long count) {
// Get the current timestamp
Timestamp now =
using(sqlConfiguration).fetchValue(select(currentTimestamp()));
GlobalStats record = globalStatsDao.fetchOneByKey(key);
GlobalStats newRecord = new GlobalStats(key, count, now);
// Insert a new record for key if it does not exist
if (record == null) {
globalStatsDao.insert(newRecord);
} else {
globalStatsDao.update(newRecord);
}
}
}