| package io.pivotal.gemfire.spark.connector.internal.gemfirefunctions; |
| |
| import com.gemstone.gemfire.DataSerializer; |
| import com.gemstone.gemfire.cache.execute.ResultSender; |
| import com.gemstone.gemfire.cache.query.internal.types.ObjectTypeImpl; |
| import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl; |
| import com.gemstone.gemfire.cache.query.types.ObjectType; |
| import com.gemstone.gemfire.cache.query.types.StructType; |
| import com.gemstone.gemfire.internal.HeapDataOutputStream; |
| import com.gemstone.gemfire.internal.cache.CachedDeserializable; |
| import com.gemstone.gemfire.internal.logging.LogService; |
| import org.apache.logging.log4j.Logger; |
| |
| import java.io.IOException; |
| import java.util.Iterator; |
| |
| /** |
| * StructStreamingResultSender and StructStreamingResultCollector are paired |
| * to transfer result of list of `com.gemstone.gemfire.cache.query.Struct` |
| * from GemFire server to Spark Connector (the client of GemFire server) |
| * in streaming, i.e., while sender sending the result, the collector can |
| * start processing the arrived result without waiting for full result to |
| * become available. |
| */ |
| public class StructStreamingResultSender { |
| |
| public static final byte TYPE_CHUNK = 0x30; |
| public static final byte DATA_CHUNK = 0x31; |
| public static final byte ERROR_CHUNK = 0x32; |
| public static final byte SER_DATA = 0x41; |
| public static final byte UNSER_DATA = 0x42; |
| public static final byte BYTEARR_DATA = 0x43; |
| |
| private static ObjectTypeImpl ObjField = new ObjectTypeImpl(java.lang.Object.class); |
| public static StructTypeImpl KeyValueType = new StructTypeImpl(new String[]{"key", "value"}, new ObjectType[]{ObjField, ObjField}); |
| |
| private static final Logger logger = LogService.getLogger(); |
| private static final int CHUNK_SIZE = 4096; |
| |
| // Note: The type of ResultSender returned from GemFire FunctionContext is |
| // always ResultSender<Object>, so can't use ResultSender<byte[]> here |
| private final ResultSender<Object> sender; |
| private final StructType structType; |
| private final Iterator<Object[]> rows; |
| private String desc; |
| private boolean closed = false; |
| |
| /** |
| * the Constructor |
| * @param sender the base ResultSender that send data in byte array |
| * @param type the StructType of result record |
| * @param rows the iterator of the collection of results |
| * @param desc description of this result (used for logging) |
| */ |
| public StructStreamingResultSender( |
| ResultSender<Object> sender, StructType type, Iterator<Object[]> rows, String desc) { |
| if (sender == null || rows == null) |
| throw new NullPointerException("sender=" + sender + ", rows=" + rows); |
| this.sender = sender; |
| this.structType = type; |
| this.rows = rows; |
| this.desc = desc; |
| } |
| |
| /** the Constructor with default `desc` */ |
| public StructStreamingResultSender( |
| ResultSender<Object> sender, StructType type, Iterator<Object[]> rows) { |
| this(sender, type, rows, "StructStreamingResultSender"); |
| } |
| |
| /** |
| * Send the result in chunks. There are 3 types of chunk: TYPE, DATA, and ERROR. |
| * TYPE chunk for sending struct type info, DATA chunk for sending data, and |
| * ERROR chunk for sending exception. There are at most 1 TYPE chunk (omitted |
| * for `KeyValueType`) and 1 ERROR chunk (if there's error), but usually |
| * there are multiple DATA chunks. Each DATA chunk contains multiple rows |
| * of data. The chunk size is determined by the const `CHUNK_SIZE`. If an |
| * exception is thrown, it is serialized and sent as the last chunk of the |
| * result (in the form of ERROR chunk). |
| */ |
| public void send() { |
| if (closed) throw new RuntimeException("sender is closed."); |
| |
| HeapDataOutputStream buf = new HeapDataOutputStream(CHUNK_SIZE + 2048, null); |
| String dataType = null; |
| int typeSize = 0; |
| int rowCount = 0; |
| int dataSize = 0; |
| try { |
| if (rows.hasNext()) { |
| // Note: only send type info if there's data with it |
| typeSize = sendType(buf); |
| buf.writeByte(DATA_CHUNK); |
| int rowSize = structType == null ? 2 : structType.getFieldNames().length; |
| while (rows.hasNext()) { |
| rowCount ++; |
| Object[] row = rows.next(); |
| if (rowCount < 2) dataType = entryDataType(row); |
| if (rowSize != row.length) |
| throw new IOException(rowToString("Expect " + rowSize + " columns, but got ", row)); |
| serializeRowToBuffer(row, buf); |
| if (buf.size() > CHUNK_SIZE) { |
| dataSize += sendBufferredData(buf, false); |
| buf.writeByte(DATA_CHUNK); |
| } |
| } |
| } |
| // send last piece of data or empty byte array |
| dataSize += sendBufferredData(buf, true); |
| logger.info(desc + ": " + rowCount + " rows, type=" + dataType + ", type.size=" + |
| typeSize + ", data.size=" + dataSize + ", row.avg.size=" + |
| (rowCount == 0 ? "NaN" : String.format("%.1f", ((float) dataSize)/rowCount))); |
| } catch (IOException | RuntimeException e) { |
| sendException(buf, e); |
| } finally { |
| closed = true; |
| } |
| } |
| |
| private String rowToString(String rowDesc, Object[] row) { |
| StringBuilder buf = new StringBuilder(); |
| buf.append(rowDesc).append("("); |
| for (int i = 0; i < row.length; i++) buf.append(i ==0 ? "" : " ,").append(row[i]); |
| return buf.append(")") .toString(); |
| } |
| |
| private String entryDataType(Object[] row) { |
| StringBuilder buf = new StringBuilder(); |
| buf.append("("); |
| for (int i = 0; i < row.length; i++) { |
| if (i != 0) buf.append(", "); |
| buf.append(row[i].getClass().getCanonicalName()); |
| } |
| return buf.append(")").toString(); |
| } |
| |
| private void serializeRowToBuffer(Object[] row, HeapDataOutputStream buf) throws IOException { |
| for (Object data : row) { |
| if (data instanceof CachedDeserializable) { |
| buf.writeByte(SER_DATA); |
| DataSerializer.writeByteArray(((CachedDeserializable) data).getSerializedValue(), buf); |
| } else if (data instanceof byte[]) { |
| buf.writeByte(BYTEARR_DATA); |
| DataSerializer.writeByteArray((byte[]) data, buf); |
| } else { |
| buf.writeByte(UNSER_DATA); |
| DataSerializer.writeObject(data, buf); |
| } |
| } |
| } |
| |
| /** return the size of type data */ |
| private int sendType(HeapDataOutputStream buf) throws IOException { |
| // logger.info(desc + " struct type: " + structType); |
| if (structType != null) { |
| buf.writeByte(TYPE_CHUNK); |
| DataSerializer.writeObject(structType, buf); |
| return sendBufferredData(buf, false); |
| } else { |
| return 0; // default KeyValue type, no type info send |
| } |
| } |
| |
| private int sendBufferredData(HeapDataOutputStream buf, boolean isLast) throws IOException { |
| if (isLast) sender.lastResult(buf.toByteArray()); |
| else sender.sendResult(buf.toByteArray()); |
| // logData(buf.toByteArray(), desc); |
| int s = buf.size(); |
| buf.reset(); |
| return s; |
| } |
| |
| /** Send the exception as the last chunk of the result. */ |
| private void sendException(HeapDataOutputStream buf, Exception e) { |
| // Note: if exception happens during the serialization, the `buf` may contain |
| // partial serialized data, which may cause de-serialization hang or error. |
| // Therefore, always empty the buffer before sending the exception |
| if (buf.size() > 0) buf.reset(); |
| |
| try { |
| buf.writeByte(ERROR_CHUNK); |
| DataSerializer.writeObject(e, buf); |
| } catch (IOException ioe) { |
| logger.error("StructStreamingResultSender failed to send the result:", e); |
| logger.error("StructStreamingResultSender failed to serialize the exception:", ioe); |
| buf.reset(); |
| } |
| // Note: send empty chunk as the last result if serialization of exception |
| // failed, and the error is logged on the GemFire server side. |
| sender.lastResult(buf.toByteArray()); |
| // logData(buf.toByteArray(), desc); |
| } |
| |
| // private void logData(byte[] data, String desc) { |
| // StringBuilder buf = new StringBuilder(); |
| // buf.append(desc); |
| // for (byte b : data) { |
| // buf.append(" ").append(b); |
| // } |
| // logger.info(buf.toString()); |
| // } |
| |
| } |