| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package com.datatorrent.lib.io; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.codehaus.jettison.json.JSONException; |
| import org.codehaus.jettison.json.JSONObject; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.commons.io.IOUtils; |
| |
| import com.sun.jersey.api.client.ClientResponse; |
| |
| /** |
| * This operator reads in JSON data and outputs it as a map. |
| * <p> |
| * (entities on stream delimited by leading length) |
| * Incoming data is interpreted as JSONObject and converted to {@link java.util.Map}.<br> |
| * If second rawOutput is connected then content is streamed to this port as it is. |
| * <br> |
| * </p> |
| * @displayName HTTP JSON Chunks Input |
| * @category Input |
| * @tags http |
| * |
| * @since 0.9.4 |
| */ |
| @org.apache.hadoop.classification.InterfaceStability.Evolving |
| public class HttpJsonChunksInputOperator extends AbstractHttpInputOperator<Map<String, String>> |
| { |
| private static final Logger LOG = LoggerFactory.getLogger(HttpJsonChunksInputOperator.class); |
| |
| @Override |
| public void processResponse(ClientResponse response) throws IOException |
| { |
| InputStream is = response.getEntity(java.io.InputStream.class); |
| |
| while (true) { |
| ByteArrayOutputStream bos = new ByteArrayOutputStream(); |
| byte[] bytes = new byte[255]; |
| int bytesRead; |
| while ((bytesRead = is.read(bytes)) != -1) { |
| LOG.debug("read {} bytes", bytesRead); |
| bos.write(bytes, 0, bytesRead); |
| if (is.available() == 0 && bos.size() > 0) { |
| // give chance to process what we have before blocking on read |
| break; |
| } |
| } |
| try { |
| if (processBytes(bos.toByteArray())) { |
| LOG.debug("End of chunked input stream."); |
| response.close(); |
| break; |
| } |
| } catch (JSONException ex) { |
| LOG.error("Caught JSON error:", ex); |
| } |
| if (bytesRead == -1) { |
| LOG.error("Unexpected end of chunked input stream"); |
| response.close(); |
| break; |
| } |
| |
| bos.reset(); |
| } |
| } |
| |
| private boolean processBytes(byte[] bytes) throws IOException, JSONException |
| { |
| StringBuilder chunkStr = new StringBuilder(); |
| // hack: when line is a number we skip to next object instead of using it to read length chunk bytes |
| List<String> lines = IOUtils.readLines(new ByteArrayInputStream(bytes)); |
| boolean endStream = false; |
| int currentChunkLength = 0; |
| for (String line : lines) { |
| try { |
| int nextLength = Integer.parseInt(line); |
| if (nextLength == 0) { |
| endStream = true; |
| break; |
| } |
| // switch to next chunk |
| processChunk(chunkStr, currentChunkLength); |
| currentChunkLength = nextLength; |
| |
| //LOG.debug("chunk length: " + line); |
| } catch (NumberFormatException e) { |
| // add to chunk |
| chunkStr.append(line); |
| chunkStr.append("\n"); |
| } |
| } |
| // process current chunk, if any |
| processChunk(chunkStr, currentChunkLength); |
| return endStream; |
| } |
| |
| /** |
| * We are expecting a JSON object and converting one level of keys to a map, no further mapping is performed |
| * |
| * @param chunk |
| * @param expectedLength |
| * @throws JSONException |
| */ |
| private void processChunk(StringBuilder chunk, int expectedLength) throws JSONException |
| { |
| if (expectedLength > 0 && chunk.length() > 0) { |
| //LOG.debug("completed chunk: " + chunkStr); |
| JSONObject json = new JSONObject(chunk.toString()); |
| chunk = new StringBuilder(); |
| Map<String, String> tuple = new HashMap<String, String>(); |
| Iterator<?> it = json.keys(); |
| while (it.hasNext()) { |
| String key = (String)it.next(); |
| Object val = json.get(key); |
| if (val != null) { |
| String vstr = val.toString(); |
| tuple.put(key, vstr); |
| } |
| } |
| if (!tuple.isEmpty()) { |
| LOG.debug("Got: " + tuple); |
| outputPort.emit(tuple); |
| chunk.setLength(0); |
| } |
| } |
| if (rawOutput.isConnected()) { |
| rawOutput.emit(chunk.toString()); |
| } |
| } |
| |
| } |