blob: 386c8b2380b9f86bc4520f00ee2164aa67faa282 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.util;
import java.io.BufferedInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import org.apache.hadoop.io.Writable;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.conf.DMLConfig;
import org.apache.sysds.lops.Lop;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
import org.apache.sysds.runtime.io.IOUtilFunctions;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.matrix.data.MatrixValue;
import org.apache.sysds.runtime.matrix.data.Pair;
public class LocalFileUtils
{
public static final int BUFFER_SIZE = 8192;
//unique IDs per JVM for tmp files
private static IDSequence _seq = null;
private static String _workingDir = null;
//categories of temp files under process-specific working dir
public static final String CATEGORY_CACHE = "cache";
public static final String CATEGORY_PARTITIONING = "partitioning";
public static final String CATEGORY_RESULTMERGE = "resultmerge";
public static final String CATEGORY_WORK = "work";
public static final String CATEGORY_CODEGEN = "codegen";
public static final String CATEGORY_LINEAGE = "lineage";
static {
_seq = new IDSequence();
}
/** Reads a matrix block from local file system.
*
* @param fname file name to read
* @return matrix block
* @throws IOException if IOException occurs
*/
public static MatrixBlock readMatrixBlockFromLocal(String fname) throws IOException {
return (MatrixBlock) readWritableFromLocal(fname, new MatrixBlock());
}
/** Reads a matrix block from local file system.
*
* @param fname file name to read
* @param reuse matrix block to reuse
* @return matrix block
* @throws IOException if IOException occurs
*/
public static MatrixBlock readMatrixBlockFromLocal(String fname, MatrixBlock reuse) throws IOException {
return (MatrixBlock) readWritableFromLocal(fname, reuse);
}
/** Reads a frame block from local file system.
*
* @param fname file name to read
* @return frame block
* @throws IOException if IOException occurs
*/
public static FrameBlock readFrameBlockFromLocal(String fname) throws IOException {
return (FrameBlock) readWritableFromLocal(fname, new FrameBlock());
}
/** Reads a matrix/frame block from local file system.
*
* @param fname file name to read
* @param matrix if true, read matrix. if false, read frame.
* @return cache block (common interface to MatrixBlock and FrameBlock)
* @throws IOException if IOException occurs
*/
public static CacheBlock readCacheBlockFromLocal(String fname, boolean matrix) throws IOException {
return (CacheBlock) readWritableFromLocal(fname, matrix?new MatrixBlock():new FrameBlock());
}
/**
* Reads an arbitrary writable from local file system, using a fused buffered reader
* with special support for matrix blocks.
*
* @param fname file name to read
* @param ret hadoop writable
* @return hadoop writable
* @throws IOException if IOException occurs
*/
public static Writable readWritableFromLocal(String fname, Writable ret)
throws IOException
{
FileInputStream fis = new FileInputStream(fname);
DataInput in = !(ret instanceof MatrixBlock) ?
new DataInputStream(new BufferedInputStream(fis, BUFFER_SIZE)) :
new FastBufferedDataInputStream(fis, BUFFER_SIZE);
try {
ret.readFields(in);
}
finally {
IOUtilFunctions.closeSilently((InputStream)in);
IOUtilFunctions.closeSilently(fis);
}
return ret;
}
/**
* Reads a matrix block from an input stream, using a fused buffered reader
* with special support for matrix blocks.
*
* @param is input stream to read
* @return matrix block
* @throws IOException if IOException occurs
*/
public static MatrixBlock readMatrixBlockFromStream(InputStream is) throws IOException {
return (MatrixBlock) readWritableFromStream(is, new MatrixBlock());
}
/**
* Reads a frame block from an input stream, using a fused buffered reader
* with special support for matrix blocks.
*
* @param is input stream to read
* @return frame block
* @throws IOException if IOException occurs
*/
public static FrameBlock readFrameBlockFromStream(InputStream is) throws IOException {
return (FrameBlock) readWritableFromStream(is, new FrameBlock());
}
/**
* Reads an arbitrary writable from an input stream, using a fused buffered reader
* with special support for matrix blocks.
*
* @param is input stream to read
* @param ret hadoop writable
* @return hadoop writable
* @throws IOException if IOException occurs
*/
public static Writable readWritableFromStream(InputStream is, Writable ret)
throws IOException
{
DataInput in = !(ret instanceof MatrixBlock) ?
new DataInputStream(new BufferedInputStream(is, BUFFER_SIZE)) :
new FastBufferedDataInputStream(is, BUFFER_SIZE);
try {
ret.readFields(in);
}
finally {
IOUtilFunctions.closeSilently((InputStream)in);
IOUtilFunctions.closeSilently(is);
}
return ret;
}
/** Writes a matrix block to local file system.
*
* @param fname file name to write
* @param mb matrix block
* @throws IOException if IOException occurs
*/
public static void writeMatrixBlockToLocal(String fname, MatrixBlock mb) throws IOException {
writeWritableToLocal(fname, mb);
}
/** Writes a frame block to local file system.
*
* @param fname file name to write
* @param fb fame block
* @throws IOException if IOException occurs
*/
public static void writeFrameBlockToLocal(String fname, FrameBlock fb) throws IOException {
writeWritableToLocal(fname, fb);
}
/** Writes a matrix/frame block to local file system.
*
* @param fname file name to write
* @param cb cache block (common interface to matrix block and frame block)
* @throws IOException if IOException occurs
*/
public static void writeCacheBlockToLocal(String fname, CacheBlock cb) throws IOException {
writeWritableToLocal(fname, cb);
}
/**
* Writes an arbitrary writable to local file system, using a fused buffered writer
* with special support for matrix blocks.
*
* @param fname file name to write
* @param mb Hadoop writable
* @throws IOException if IOException occurs
*/
public static void writeWritableToLocal(String fname, Writable mb)
throws IOException
{
FileOutputStream fos = new FileOutputStream( fname );
FastBufferedDataOutputStream out = new FastBufferedDataOutputStream(fos, BUFFER_SIZE);
try {
mb.write(out);
}
finally {
IOUtilFunctions.closeSilently(out);
IOUtilFunctions.closeSilently(fos);
}
}
public static void writeByteArrayToLocal( String fname, byte[] data )
throws IOException
{
//byte array write via java.nio file channel ~10-15% faster than java.io
FileChannel channel = null;
try {
Path path = Paths.get(fname);
channel = FileChannel.open(path, StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);
channel.write(ByteBuffer.wrap(data));
}
finally {
IOUtilFunctions.closeSilently(channel);
}
}
public static int readBlockSequenceFromLocal(String fname,
Pair<MatrixIndexes,MatrixValue>[] outValues, HashMap<MatrixIndexes, Integer> outMap)
throws IOException
{
FileInputStream fis = new FileInputStream(fname);
FastBufferedDataInputStream in = new FastBufferedDataInputStream(fis, BUFFER_SIZE);
int bufferSize = 0;
try {
int len = in.readInt();
for( int i=0; i<len; i++ ) {
outValues[i].getKey().readFields(in);
outValues[i].getValue().readFields(in);
if( outMap!=null )
outMap.put( outValues[i].getKey(), i );
}
bufferSize = len;
}
finally {
IOUtilFunctions.closeSilently(in);
IOUtilFunctions.closeSilently(fis);
}
return bufferSize;
}
public static void writeBlockSequenceToLocal(String fname, Pair<MatrixIndexes,MatrixValue>[] inValues, int len)
throws IOException
{
if( len > inValues.length )
throw new IOException("Invalid length of block sequence: len="+len+" vs data="+inValues.length);
FileOutputStream fos = new FileOutputStream(fname);
FastBufferedDataOutputStream out = new FastBufferedDataOutputStream(fos, BUFFER_SIZE);
try
{
out.writeInt(len);
for( int i=0; i<len; i++ ) {
inValues[i].getKey().write(out);
inValues[i].getValue().write(out);
}
}
finally{
IOUtilFunctions.closeSilently(out);
IOUtilFunctions.closeSilently(fos);
}
}
public static boolean createLocalFileIfNotExist(String dir) {
boolean ret = true;
File fdir = new File(dir);
if( !fdir.exists() )
ret = fdir.mkdirs();
return ret;
}
public static void deleteFileIfExists(String dir) {
deleteFileIfExists(dir, false);
}
public static void deleteFileIfExists(String dir, boolean fileOnly)
{
File fdir = new File(dir);
if( fdir.exists() )
{
if( fileOnly ) //delete single file
fdir.delete();
else //recursively delete entire directory
rDelete(fdir);
}
}
public static boolean isExisting(String dir) {
File fdir = new File(dir);
return fdir.exists();
}
public static boolean createLocalFileIfNotExist( String dir, String permission )
{
boolean ret = true;
File fdir = new File(dir);
if( !fdir.exists() ) {
ret = fdir.mkdirs();
setLocalFilePermissions(fdir, DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
}
return ret;
}
public static void setLocalFilePermissions(File file, String permissions)
{
//note: user and group treated the same way
char[] c = permissions.toCharArray();
short sU = (short)(c[0]-48);
short sO = (short)(c[2]-48);
file.setExecutable( (sU&1)==1, (sO&1)==0 );
file.setWritable( (sU&2)==2, (sO&2)==0 );
file.setReadable( (sU&4)==4, (sO&4)==0 );
}
///////////
// working dir handling
///
public static String checkAndCreateStagingDir(String dir) {
File f = new File(dir);
if( !f.exists() )
f.mkdirs();
return dir;
}
public static String createWorkingDirectory() {
return createWorkingDirectoryWithUUID( DMLScript.getUUID() );
}
public static String createWorkingDirectoryWithUUID(String uuid)
{
//create local tmp dir if not existing
String dirRoot = null;
DMLConfig conf = ConfigurationManager.getDMLConfig();
if( conf != null )
dirRoot = conf.getTextValue(DMLConfig.LOCAL_TMP_DIR);
else
dirRoot = DMLConfig.getDefaultTextValue(DMLConfig.LOCAL_TMP_DIR);
//create shared staging dir if not existing
if( !LocalFileUtils.createLocalFileIfNotExist(dirRoot, DMLConfig.DEFAULT_SHARED_DIR_PERMISSION) ){
throw new DMLRuntimeException("Failed to create non-existing local working directory: "+dirRoot);
}
//create process specific sub tmp dir
StringBuilder sb = new StringBuilder();
sb.append( dirRoot );
sb.append(Lop.FILE_SEPARATOR);
sb.append(Lop.PROCESS_PREFIX);
sb.append( uuid );
sb.append(Lop.FILE_SEPARATOR);
_workingDir = sb.toString();
//create process-specific staging dir if not existing
if( !LocalFileUtils.createLocalFileIfNotExist(_workingDir) ){
throw new DMLRuntimeException("Failed to create local working directory: "+_workingDir);
}
return _workingDir;
}
public static void cleanupWorkingDirectory() {
if( _workingDir != null )
cleanupWorkingDirectory( _workingDir );
}
public static void cleanupWorkingDirectory(String dir) {
File f = new File(dir);
if( f.exists() )
rDelete(f);
}
public static int cleanupRcWorkingDirectory(String dir) {
int ret = 0;
File f = new File(dir);
if( f.exists() )
ret += rcDelete(f);
return ret;
}
/**
* Recursively deletes an entire local file system directory.
*
* @param dir directory to delete recursively
*/
public static void rDelete(File dir)
{
//recursively delete files if required
if( dir.isDirectory() )
{
File[] files = dir.listFiles();
for( File f : files )
rDelete( f );
}
//delete file/dir itself
dir.delete();
}
/**
* Recursively deletes an entire local file system directory
* and returns the number of files deleted.
*
* @param dir directory to delete recursively
* @return number of files deleted
*/
public static int rcDelete(File dir)
{
int count = 0;
//recursively delete files if required
if( dir.isDirectory() )
{
File[] files = dir.listFiles();
for( File f : files )
count += rcDelete( f );
}
//delete file/dir itself
count += dir.delete() ? 1 : 0;
return count;
}
public static String getWorkingDir(String category) {
if( _workingDir == null )
createWorkingDirectory();
StringBuilder sb = new StringBuilder();
sb.append( _workingDir );
sb.append( Lop.FILE_SEPARATOR );
sb.append( category );
sb.append( Lop.FILE_SEPARATOR );
return sb.toString();
}
public static String getUniqueWorkingDir(String category) {
if( _workingDir == null )
createWorkingDirectory();
StringBuilder sb = new StringBuilder();
sb.append( _workingDir );
sb.append( Lop.FILE_SEPARATOR );
sb.append( category );
sb.append( Lop.FILE_SEPARATOR );
sb.append( "tmp" );
sb.append( _seq.getNextID() );
return sb.toString();
}
/**
* Writes a simple text file to local file system.
*
* @param file output file
* @param text content of text file
* @throws IOException errors in file open/write/close
*/
public static void writeTextFile(File file, String text)
throws IOException
{
Writer writer = null;
try {
writer = new FileWriter(file);
writer.write(text);
writer.flush();
}
finally {
IOUtilFunctions.closeSilently(writer);
}
}
}