| package edu.uci.ics.hyracks.examples.text; |
| |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.io.Reader; |
| import java.nio.ByteBuffer; |
| import java.util.Arrays; |
| |
| import edu.uci.ics.hyracks.api.comm.IFrameWriter; |
| import edu.uci.ics.hyracks.api.context.IHyracksStageletContext; |
| import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; |
| import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; |
| import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender; |
| import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils; |
| import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParser; |
| import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory; |
| import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser; |
| import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory; |
| |
| public class WordTupleParserFactory implements ITupleParserFactory { |
| private static final long serialVersionUID = 1L; |
| |
| @Override |
| public ITupleParser createTupleParser(final IHyracksStageletContext ctx) { |
| return new ITupleParser() { |
| @Override |
| public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException { |
| try { |
| ByteBuffer frame = ctx.allocateFrame(); |
| FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize()); |
| appender.reset(frame, true); |
| ArrayTupleBuilder tb = new ArrayTupleBuilder(1); |
| DataOutput dos = tb.getDataOutput(); |
| |
| IValueParser utf8StringParser = UTF8StringParserFactory.INSTANCE.createValueParser(); |
| WordCursor cursor = new WordCursor(new InputStreamReader(in)); |
| while (cursor.nextWord()) { |
| tb.reset(); |
| utf8StringParser.parse(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart, dos); |
| tb.addFieldEndOffset(); |
| if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) { |
| FrameUtils.flushFrame(frame, writer); |
| appender.reset(frame, true); |
| if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) { |
| throw new IllegalStateException(); |
| } |
| } |
| } |
| if (appender.getTupleCount() > 0) { |
| FrameUtils.flushFrame(frame, writer); |
| } |
| } catch (IOException e) { |
| throw new HyracksDataException(e); |
| } |
| } |
| }; |
| } |
| |
| private static class WordCursor { |
| private static final int INITIAL_BUFFER_SIZE = 4096; |
| private static final int INCREMENT = 4096; |
| |
| private char[] buffer; |
| |
| private int start; |
| private int end; |
| private boolean eof; |
| |
| private int fStart; |
| private int fEnd; |
| private Reader in; |
| |
| public WordCursor(Reader in) { |
| this.in = in; |
| buffer = new char[INITIAL_BUFFER_SIZE]; |
| start = 0; |
| end = 0; |
| eof = false; |
| } |
| |
| public boolean nextWord() throws IOException { |
| if (eof) { |
| return false; |
| } |
| |
| int p = start; |
| while (true) { |
| if (p >= end) { |
| int s = start; |
| eof = !readMore(); |
| if (eof) { |
| return true; |
| } |
| p -= (s - start); |
| } |
| char ch = buffer[p]; |
| if (Character.isWhitespace(ch)) { |
| fStart = start; |
| fEnd = p; |
| start = p + 1; |
| return true; |
| } |
| ++p; |
| } |
| } |
| |
| private boolean readMore() throws IOException { |
| if (start > 0) { |
| System.arraycopy(buffer, start, buffer, 0, end - start); |
| } |
| end -= start; |
| start = 0; |
| |
| if (end == buffer.length) { |
| buffer = Arrays.copyOf(buffer, buffer.length + INCREMENT); |
| } |
| |
| int n = in.read(buffer, end, buffer.length - end); |
| if (n < 0) { |
| return false; |
| } |
| end += n; |
| return true; |
| } |
| } |
| } |