blob: 1993b2149a0dc21de54c83538bb642c9bc0fe36d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.io;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.io.IOUtils;
import com.sun.jersey.api.client.ClientResponse;
/**
* This operator reads in JSON data and outputs it as a map.
* <p>
* (entities on stream delimited by leading length)
* Incoming data is interpreted as JSONObject and converted to {@link java.util.Map}.<br>
* If second rawOutput is connected then content is streamed to this port as it is.
* <br>
* </p>
* @displayName HTTP JSON Chunks Input
* @category Input
* @tags http
*
* @since 0.9.4
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
public class HttpJsonChunksInputOperator extends AbstractHttpInputOperator<Map<String, String>>
{
private static final Logger LOG = LoggerFactory.getLogger(HttpJsonChunksInputOperator.class);
@Override
public void processResponse(ClientResponse response) throws IOException
{
InputStream is = response.getEntity(java.io.InputStream.class);
while (true) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
byte[] bytes = new byte[255];
int bytesRead;
while ((bytesRead = is.read(bytes)) != -1) {
LOG.debug("read {} bytes", bytesRead);
bos.write(bytes, 0, bytesRead);
if (is.available() == 0 && bos.size() > 0) {
// give chance to process what we have before blocking on read
break;
}
}
try {
if (processBytes(bos.toByteArray())) {
LOG.debug("End of chunked input stream.");
response.close();
break;
}
} catch (JSONException ex) {
LOG.error("Caught JSON error:", ex);
}
if (bytesRead == -1) {
LOG.error("Unexpected end of chunked input stream");
response.close();
break;
}
bos.reset();
}
}
private boolean processBytes(byte[] bytes) throws IOException, JSONException
{
StringBuilder chunkStr = new StringBuilder();
// hack: when line is a number we skip to next object instead of using it to read length chunk bytes
List<String> lines = IOUtils.readLines(new ByteArrayInputStream(bytes));
boolean endStream = false;
int currentChunkLength = 0;
for (String line : lines) {
try {
int nextLength = Integer.parseInt(line);
if (nextLength == 0) {
endStream = true;
break;
}
// switch to next chunk
processChunk(chunkStr, currentChunkLength);
currentChunkLength = nextLength;
//LOG.debug("chunk length: " + line);
} catch (NumberFormatException e) {
// add to chunk
chunkStr.append(line);
chunkStr.append("\n");
}
}
// process current chunk, if any
processChunk(chunkStr, currentChunkLength);
return endStream;
}
/**
* We are expecting a JSON object and converting one level of keys to a map, no further mapping is performed
*
* @param chunk
* @param expectedLength
* @throws JSONException
*/
private void processChunk(StringBuilder chunk, int expectedLength) throws JSONException
{
if (expectedLength > 0 && chunk.length() > 0) {
//LOG.debug("completed chunk: " + chunkStr);
JSONObject json = new JSONObject(chunk.toString());
chunk = new StringBuilder();
Map<String, String> tuple = new HashMap<String, String>();
Iterator<?> it = json.keys();
while (it.hasNext()) {
String key = (String)it.next();
Object val = json.get(key);
if (val != null) {
String vstr = val.toString();
tuple.put(key, vstr);
}
}
if (!tuple.isEmpty()) {
LOG.debug("Got: " + tuple);
outputPort.emit(tuple);
chunk.setLength(0);
}
}
if (rawOutput.isConnected()) {
rawOutput.emit(chunk.toString());
}
}
}