blob: d55080c9a02a37eaa8d3c25efbe5fc285dd9b066 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.bad.runtime;
import java.io.DataInputStream;
import java.nio.ByteBuffer;
import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.bad.ChannelJobService;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
import org.apache.asterix.om.base.ADateTime;
import org.apache.asterix.om.base.AOrderedList;
import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.types.AOrderedListType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
private final ByteBufferInputStream bbis = new ByteBufferInputStream();
private final DataInputStream di = new DataInputStream(bbis);
private final AOrderedListSerializerDeserializer subSerDes = new AOrderedListSerializerDeserializer(
new AOrderedListType(BuiltinType.AUUID, null));
private IPointable inputArg0 = new VoidPointable();
private IPointable inputArg1 = new VoidPointable();
private IPointable inputArg2 = new VoidPointable();
private IScalarEvaluator eval0;
private IScalarEvaluator eval1;
private IScalarEvaluator eval2;
private final ActiveManager activeManager;
private final EntityId entityId;
private ChannelJobService channelJobService;
public NotifyBrokerRuntime(IHyracksTaskContext ctx, IScalarEvaluatorFactory brokerEvalFactory,
IScalarEvaluatorFactory subEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory,
EntityId activeJobId) throws AlgebricksException {
this.tRef = new FrameTupleReference();
eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
eval1 = subEvalFactory.createScalarEvaluator(ctx);
eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx);
this.activeManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
.getApplicationObject()).getActiveManager();
this.entityId = activeJobId;
channelJobService = new ChannelJobService();
}
@Override
public void open() throws HyracksDataException {
return;
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
tAccess.reset(buffer);
int nTuple = tAccess.getTupleCount();
for (int t = 0; t < nTuple; t++) {
tRef.reset(tAccess, t);
try {
eval0.evaluate(tRef, inputArg0);
eval1.evaluate(tRef, inputArg1);
eval2.evaluate(tRef, inputArg2);
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
int serBrokerOffset = inputArg0.getStartOffset();
bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serBrokerOffset + 1);
AString endpoint = AStringSerializerDeserializer.INSTANCE.deserialize(di);
int serSubOffset = inputArg1.getStartOffset();
bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serSubOffset + 1);
AOrderedList subs = subSerDes.deserialize(di);
int resultSetOffset = inputArg2.getStartOffset();
bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), resultSetOffset + 1);
ADateTime executionTime = ADateTimeSerializerDeserializer.INSTANCE.deserialize(di);
String executionTimeString = executionTime.toSimpleString();
channelJobService.sendBrokerNotificationsForChannel(entityId, endpoint.getStringValue(), subs,
executionTimeString);
}
}
@Override
public void close() throws HyracksDataException {
return;
}
@Override
public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
this.inputRecordDesc = recordDescriptor;
this.tAccess = new FrameTupleAccessor(inputRecordDesc);
}
@Override
public void flush() throws HyracksDataException {
return;
}
@Override
public void fail() throws HyracksDataException {
failed = true;
}
}