blob: 80c4b2e2fef173bb507b934e149e55b47dd22a77 [file] [log] [blame]
/*
* FILE: BoundaryRecordReader
* Copyright (c) 2015 - 2019 GeoSpark Development Team
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.datasyslab.geospark.formatMapper.shapefileParser.boundary;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
public class BoundaryRecordReader
extends RecordReader<Long, BoundBox>
{
/**
* paths of files to be read
*/
Path[] paths = null;
/**
* fixed key value for reduce all results together
*/
long KEY_VALUE = 0;
/**
* input stream
*/
FSDataInputStream inputStream = null;
/**
* task context
*/
Configuration configuration = null;
/**
* index of current file to be read
*/
int id = -1;
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException
{
CombineFileSplit split = (CombineFileSplit) inputSplit;
paths = split.getPaths();
configuration = taskAttemptContext.getConfiguration();
id = -1;
}
@Override
public boolean nextKeyValue()
throws IOException, InterruptedException
{
id++;
return id < paths.length;
}
@Override
public Long getCurrentKey()
throws IOException, InterruptedException
{
return KEY_VALUE;
}
@Override
public BoundBox getCurrentValue()
throws IOException, InterruptedException
{
// open id file
FileSystem fs = paths[id].getFileSystem(configuration);
inputStream = fs.open(paths[id]);
// read header into memory
byte[] bytes = new byte[100];
inputStream.readFully(bytes);
inputStream.close();
// use byte buffer to abstract 8 parameters of bound box.
ByteBuffer buffer = ByteBuffer.wrap(bytes);
//skip first 36 bytes
buffer.position(buffer.position() + 36);
buffer.order(ByteOrder.LITTLE_ENDIAN);
double[] bounds = new double[8];
buffer.asDoubleBuffer().get(bounds);
return new BoundBox(bounds);
}
@Override
public float getProgress()
throws IOException, InterruptedException
{
return (float) id / (float) paths.length;
}
@Override
public void close()
throws IOException
{
// input stream already closed every time getCurrentKey()
}
}