blob: 5a6c39cba3665344907ac69ffc1d7a8d83f3b569 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lens.lib.query;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.text.ParseException;
import java.util.Map;
import java.util.TreeMap;
import org.apache.lens.server.api.driver.LensResultSetMetadata;
import org.apache.lens.server.api.query.PersistedOutputFormatter;
import org.apache.lens.server.api.query.QueryContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import lombok.extern.slf4j.Slf4j;
/**
* File formatter for {@link PersistedOutputFormatter}
* <p></p>
* This is a {@link WrappedFileFormatter} which can wrap any {@link FileFormatter}.
*/
@Slf4j
public class FilePersistentFormatter extends WrappedFileFormatter implements PersistedOutputFormatter {
/*
* (non-Javadoc)
*
* @see org.apache.lens.lib.query.WrappedFileFormatter#init(org.apache.lens.server.api.query.QueryContext,
* org.apache.lens.server.api.driver.LensResultSetMetadata)
*/
public void init(QueryContext ctx, LensResultSetMetadata metadata) throws IOException {
super.init(ctx, metadata);
}
// File names are of the form 000000_0
/**
* The Class PartFile.
*/
class PartFile implements Comparable<PartFile> {
/**
* The id.
*/
int id;
/**
* Instantiates a new part file.
*
* @param fileName the file name
* @throws ParseException the parse exception
*/
PartFile(String fileName) throws ParseException {
String idStr = fileName.substring(0, fileName.lastIndexOf('_'));
id = Integer.parseInt(idStr);
}
/*
* (non-Javadoc)
*
* @see java.lang.Comparable#compareTo(java.lang.Object)
*/
@Override
public int compareTo(PartFile other) {
if (this.id < other.id) {
return -1;
} else if (this.id > other.id) {
return 1;
}
return 0;
}
}
/*
* (non-Javadoc)
*
* @see org.apache.lens.server.api.query.PersistedOutputFormatter#addRowsFromPersistedPath(org.apache.hadoop.fs.Path)
*/
@Override
public void addRowsFromPersistedPath(final Path persistedDir) throws IOException {
final FileSystem persistFs = persistedDir.getFileSystem(ctx.getConf());
FileStatus[] partFiles = persistFs.listStatus(persistedDir, new PathFilter() {
@Override
public boolean accept(Path path) {
return !path.getName().startsWith("_") && !path.getName().startsWith(".");
}
});
TreeMap<PartFile, FileStatus> partFileMap = new TreeMap<PartFile, FileStatus>();
try {
for (FileStatus file : partFiles) {
partFileMap.put(new PartFile(file.getPath().getName()), file);
}
for (Map.Entry<PartFile, FileStatus> entry : partFileMap.entrySet()) {
log.info("Processing file:{}", entry.getValue().getPath());
BufferedReader in = null;
try {
// default encoding in hadoop filesystem is utf-8
in = new BufferedReader(new InputStreamReader(persistFs.open(entry.getValue().getPath()), "UTF-8"));
String row = in.readLine();
while (row != null) {
writeRow(row);
row = in.readLine();
}
} finally {
if (in != null) {
in.close();
}
}
}
} catch (ParseException e) {
throw new IOException(e);
}
}
}