blob: a6a0db752608cba8caee2fcfac4136ee4ffdcc73 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.relp.frame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.nio.charset.Charset;
/**
* Decodes a RELP frame by maintaining a state based on each byte that has been processed. This class
* should not be shared by multiple threads.
*/
public class RELPDecoder {
static final Logger logger = LoggerFactory.getLogger(RELPDecoder.class);
private RELPFrame.Builder frameBuilder;
private RELPState currState = RELPState.TXNR;
private final Charset charset;
private final ByteArrayOutputStream currBytes;
/**
* @param charset the charset to decode bytes from the RELP frame
*/
public RELPDecoder(final Charset charset) {
this(charset, new ByteArrayOutputStream(4096));
}
/**
* @param charset the charset to decode bytes from the RELP frame
*/
public RELPDecoder(final Charset charset, final int bufferSize) {
this(charset, new ByteArrayOutputStream(bufferSize));
}
/**
*
* @param charset the charset to decode bytes from the RELP frame
* @param buffer a buffer to use while processing the bytes
*/
public RELPDecoder(final Charset charset, final ByteArrayOutputStream buffer) {
this.charset = charset;
this.currBytes = buffer;
this.frameBuilder = new RELPFrame.Builder();
}
/**
* Resets this decoder back to it's initial state.
*/
public void reset() {
frameBuilder = new RELPFrame.Builder();
currState = RELPState.TXNR;
currBytes.reset();
}
/**
* Process the next byte from the channel, updating the builder and state accordingly.
*
* @param currByte the next byte to process
* @preturn true if a frame is ready to be retrieved, false otherwise
*/
public boolean process(final byte currByte) throws RELPFrameException {
try {
switch (currState) {
case TXNR:
processTXNR(currByte);
break;
case COMMAND:
processCOMMAND(currByte);
break;
case LENGTH:
processLENGTH(currByte);
// if jumped from length to trailer we need to return true here
// because there might not be another byte to process
if (currState == RELPState.TRAILER) {
return true;
}
break;
case DATA:
processDATA(currByte);
break;
case TRAILER:
return true;
default:
break;
}
return false;
} catch (Exception e) {
throw new RELPFrameException("Error decoding RELP frame: " + e.getMessage(), e);
}
}
/**
* Returns the decoded frame and resets the decoder for the next frame.
* This method should be called after checking isComplete().
*
* @return the RELPFrame that was decoded
*/
public RELPFrame getFrame() throws RELPFrameException {
if (currState != RELPState.TRAILER) {
throw new RELPFrameException("Must be at the trailer of a frame");
}
try {
final RELPFrame frame = frameBuilder.build();
processTRAILER(RELPFrame.DELIMITER);
return frame;
} catch (Exception e) {
throw new RELPFrameException("Error decoding RELP frame: " + e.getMessage(), e);
}
}
private void processTXNR(final byte b) {
if (b == RELPFrame.SEPARATOR) {
if (currBytes.size() > 0) {
final long txnr = Long.parseLong(new String(currBytes.toByteArray(), charset));
frameBuilder.txnr(txnr);
logger.debug("Transaction number is {}", new Object[]{txnr});
currBytes.reset();
currState = RELPState.COMMAND;
}
} else {
currBytes.write(b);
}
}
private void processCOMMAND(final byte b) {
if (b == RELPFrame.SEPARATOR) {
final String command = new String(currBytes.toByteArray(), charset);
frameBuilder.command(command);
logger.debug("Command is {}", command);
currBytes.reset();
currState = RELPState.LENGTH;
} else {
currBytes.write(b);
}
}
private void processLENGTH(final byte b) {
if (b == RELPFrame.SEPARATOR || (currBytes.size() > 0 && b == RELPFrame.DELIMITER)) {
final int dataLength = Integer.parseInt(new String(currBytes.toByteArray(), charset));
frameBuilder.dataLength(dataLength);
logger.debug("Length is {}", dataLength);
currBytes.reset();
// if at a separator then data is going to follow, but if at a separator there is no data
if (b == RELPFrame.SEPARATOR) {
currState = RELPState.DATA;
} else {
frameBuilder.data(new byte[0]);
currState = RELPState.TRAILER;
}
} else {
currBytes.write(b);
}
}
private void processDATA(final byte b) {
currBytes.write(b);
logger.trace("Data size is {}", new Object[] {currBytes.size()});
if (currBytes.size() >= frameBuilder.dataLength) {
final byte[] data = currBytes.toByteArray();
frameBuilder.data(data);
logger.debug("Reached expected data size of {}", new Object[] {frameBuilder.dataLength});
currBytes.reset();
currState = RELPState.TRAILER;
}
}
private void processTRAILER(final byte b) {
if (b != RELPFrame.DELIMITER) {
logger.warn("Expected RELP trailing LF, but found another byte");
}
currBytes.reset();
frameBuilder.reset();
currState = RELPState.TXNR;
}
}