blob: 325eff401830a9f4bf8ff724f01094f3e35d8f1f [file] [log] [blame]
/*
* Copyright 2009-2011 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.asterix.external.data.parser;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import edu.uci.ics.asterix.builders.IARecordBuilder;
import edu.uci.ics.asterix.builders.RecordBuilder;
import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter.OperationState;
import edu.uci.ics.asterix.om.base.AMutableString;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.runtime.operators.file.DelimitedDataTupleParser;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParser;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
public class ManagedDelimitedDataTupleParser extends DelimitedDataTupleParser implements IManagedTupleParser {
private List<OperationState> nextState;
private IManagedFeedAdapter adapter;
private long tupleInterval;
public static final String TUPLE_INTERVAL_KEY = "tuple-interval";
public ManagedDelimitedDataTupleParser(IHyracksTaskContext ctx, ARecordType recType, IManagedFeedAdapter adapter,
IValueParserFactory[] valueParserFactories, char fieldDelimter) {
super(ctx, recType, valueParserFactories, fieldDelimter);
this.adapter = adapter;
nextState = new ArrayList<OperationState>();
tupleInterval = adapter.getAdapterProperty(TUPLE_INTERVAL_KEY) == null ? 0 : Long.parseLong(adapter
.getAdapterProperty(TUPLE_INTERVAL_KEY));
}
@Override
public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
try {
IValueParser[] valueParsers = new IValueParser[valueParserFactories.length];
for (int i = 0; i < valueParserFactories.length; ++i) {
valueParsers[i] = valueParserFactories[i].createValueParser();
}
appender.reset(frame, true);
tb = new ArrayTupleBuilder(1);
recDos = tb.getDataOutput();
ArrayBackedValueStorage fieldValueBuffer = new ArrayBackedValueStorage();
DataOutput fieldValueBufferOutput = fieldValueBuffer.getDataOutput();
IARecordBuilder recBuilder = new RecordBuilder();
recBuilder.reset(recType);
recBuilder.init();
int n = recType.getFieldNames().length;
byte[] fieldTypeTags = new byte[n];
for (int i = 0; i < n; i++) {
ATypeTag tag = recType.getFieldTypes()[i].getTypeTag();
fieldTypeTags[i] = tag.serialize();
}
int[] fldIds = new int[n];
ArrayBackedValueStorage[] nameBuffers = new ArrayBackedValueStorage[n];
AMutableString str = new AMutableString(null);
for (int i = 0; i < n; i++) {
String name = recType.getFieldNames()[i];
fldIds[i] = recBuilder.getFieldId(name);
if (fldIds[i] < 0) {
if (!recType.isOpen()) {
throw new HyracksDataException("Illegal field " + name + " in closed type " + recType);
} else {
nameBuffers[i] = new ArrayBackedValueStorage();
fieldNameToBytes(name, str, nameBuffers[i]);
}
}
}
FieldCursor cursor = new FieldCursor(new InputStreamReader(in));
while (cursor.nextRecord()) {
tb.reset();
recBuilder.reset(recType);
recBuilder.init();
for (int i = 0; i < valueParsers.length; ++i) {
if (!cursor.nextField()) {
break;
}
fieldValueBuffer.reset();
fieldValueBufferOutput.writeByte(fieldTypeTags[i]);
valueParsers[i].parse(cursor.getBuffer(), cursor.getfStart(),
cursor.getfEnd() - cursor.getfStart(), fieldValueBufferOutput);
if (fldIds[i] < 0) {
recBuilder.addField(nameBuffers[i], fieldValueBuffer);
} else {
recBuilder.addField(fldIds[i], fieldValueBuffer);
}
}
recBuilder.write(recDos, true);
processNextTuple(nextState.isEmpty() ? null : nextState.get(0), writer);
Thread.currentThread().sleep(tupleInterval);
}
if (appender.getTupleCount() > 0) {
FrameUtils.flushFrame(frame, writer);
}
} catch (IOException e) {
throw new HyracksDataException(e);
} catch (InterruptedException ie) {
throw new HyracksDataException(ie);
}
}
private void addTupleToFrame(IFrameWriter writer, boolean forceFlush) throws HyracksDataException {
tb.addFieldEndOffset();
boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
if (!success) {
FrameUtils.flushFrame(frame, writer);
appender.reset(frame, true);
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
throw new IllegalStateException();
}
}
if (forceFlush) {
FrameUtils.flushFrame(frame, writer);
}
}
private void processNextTuple(OperationState feedState, IFrameWriter writer) throws HyracksDataException {
try {
if (feedState != null) {
switch (feedState) {
case SUSPENDED:
suspendOperation(writer);
break;
case STOPPED:
stopOperation(writer);
break;
}
} else {
addTupleToFrame(writer, false);
}
} catch (HyracksDataException hde) {
throw hde;
} catch (Exception e) {
throw new HyracksDataException(e);
}
}
private void suspendOperation(IFrameWriter writer) throws HyracksDataException, Exception {
nextState.remove(0);
addTupleToFrame(writer, false);
adapter.beforeSuspend();
synchronized (this) {
this.wait();
adapter.beforeResume();
}
}
private void stopOperation(IFrameWriter writer) throws HyracksDataException, Exception {
nextState.remove(0);
addTupleToFrame(writer, false);
adapter.beforeStop();
adapter.stop();
}
@Override
public void suspend() throws Exception {
nextState.add(OperationState.SUSPENDED);
}
@Override
public void resume() throws Exception {
synchronized (this) {
this.notifyAll();
}
}
@Override
public void stop() throws Exception {
nextState.add(OperationState.STOPPED);
}
@Override
public void alter(Map<String, String> alterParams) throws Exception {
if (alterParams.get(TUPLE_INTERVAL_KEY) != null) {
tupleInterval = Long.parseLong(alterParams.get(TUPLE_INTERVAL_KEY));
}
}
}