blob: c7b3b160025074964c2e62019c60695ed1962a05 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop;
import java.io.BufferedInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processors.hadoop.util.InputStreamWritable;
import org.apache.nifi.util.FlowFilePackagerV3;
import org.slf4j.LoggerFactory;
public class FlowFileStreamUnpackerSequenceFileWriter extends SequenceFileWriterImpl {
static {
logger = LoggerFactory.getLogger(FlowFileStreamUnpackerSequenceFileWriter.class);
}
@Override
protected void processInputStream(final InputStream stream, final FlowFile flowFileStreamPackedFlowFile, final Writer writer) throws IOException {
final FlowFileUnpackager unpackager = new FlowFileUnpackager();
try (final InputStream in = new BufferedInputStream(stream)) {
while (unpackager.hasMoreData()) {
unpackager.unpackageFlowFile(stream, writer);
}
}
}
private static class FlowFileUnpackager {
private byte[] nextHeader = null;
private boolean haveReadSomething = false;
private final byte readBuffer[] = new byte[8192];
public boolean hasMoreData() {
return nextHeader != null || !haveReadSomething;
}
private byte[] readHeader(final InputStream in) throws IOException {
final byte[] header = new byte[FlowFilePackagerV3.MAGIC_HEADER.length];
for (int i = 0; i < header.length; i++) {
final int next = in.read();
if (next < 0) {
if (i == 0) {
return null;
}
throw new IOException("Not in FlowFile-v3 format");
}
header[i] = (byte) (next & 0xFF);
}
return header;
}
public void unpackageFlowFile(final InputStream in, final Writer writer) throws IOException {
final byte[] header = (nextHeader == null) ? readHeader(in) : nextHeader;
if (!Arrays.equals(header, FlowFilePackagerV3.MAGIC_HEADER)) {
throw new IOException("Not in FlowFile-v3 format");
}
final Map<String, String> attributes = readAttributes(in);
final long expectedNumBytes = readLong(in); // read length of payload
final InputStreamWritable inStreamWritable = new InputStreamWritable(in, (int) expectedNumBytes);
String fileName = attributes.get(CoreAttributes.FILENAME.key());
Text key = new Text(fileName);
writer.append(key, inStreamWritable);
nextHeader = readHeader(in);
haveReadSomething = true;
}
protected Map<String, String> readAttributes(final InputStream in) throws IOException {
final Map<String, String> attributes = new HashMap<>();
final Integer numAttributes = readFieldLength(in); // read number of attributes
if (numAttributes == null) {
return null;
}
if (numAttributes == 0) {
throw new IOException("flow files cannot have zero attributes");
}
for (int i = 0; i < numAttributes; i++) { // read each attribute key/value pair
final String key = readString(in);
final String value = readString(in);
attributes.put(key, value);
}
return attributes;
}
protected String readString(final InputStream in) throws IOException {
final Integer numBytes = readFieldLength(in);
if (numBytes == null) {
throw new EOFException();
}
final byte[] bytes = new byte[numBytes];
fillBuffer(in, bytes, numBytes);
return new String(bytes, "UTF-8");
}
private void fillBuffer(final InputStream in, final byte[] buffer, final int length) throws IOException {
int bytesRead;
int totalBytesRead = 0;
while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) {
totalBytesRead += bytesRead;
}
if (totalBytesRead != length) {
throw new EOFException();
}
}
protected long readLong(final InputStream in) throws IOException {
fillBuffer(in, readBuffer, 8);
return (((long) readBuffer[0] << 56)
+ ((long) (readBuffer[1] & 255) << 48)
+ ((long) (readBuffer[2] & 255) << 40)
+ ((long) (readBuffer[3] & 255) << 32)
+ ((long) (readBuffer[4] & 255) << 24)
+ ((readBuffer[5] & 255) << 16)
+ ((readBuffer[6] & 255) << 8)
+ ((readBuffer[7] & 255)));
}
private Integer readFieldLength(final InputStream in) throws IOException {
final int firstValue = in.read();
final int secondValue = in.read();
if (firstValue < 0) {
return null;
}
if (secondValue < 0) {
throw new EOFException();
}
if (firstValue == 0xff && secondValue == 0xff) {
int ch1 = in.read();
int ch2 = in.read();
int ch3 = in.read();
int ch4 = in.read();
if ((ch1 | ch2 | ch3 | ch4) < 0) {
throw new EOFException();
}
return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4));
}
return ((firstValue << 8) + (secondValue));
}
}
}