blob: 6ffb244b0f76e9f9437deef04719b8810bf16e2a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.bad.runtime;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
import org.apache.asterix.om.base.ADateTime;
import org.apache.asterix.om.base.AOrderedList;
import org.apache.asterix.om.base.ARecord;
import org.apache.asterix.om.base.AUUID;
import org.apache.asterix.om.types.AOrderedListType;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
private static final Logger LOGGER = Logger.getLogger(NotifyBrokerRuntime.class.getName());
private final ByteBufferInputStream bbis = new ByteBufferInputStream();
private final DataInputStream di = new DataInputStream(bbis);
private final AOrderedListSerializerDeserializer subSerDes =
new AOrderedListSerializerDeserializer(new AOrderedListType(BuiltinType.AUUID, null));
private final ARecordSerializerDeserializer recordSerDes;
private IPointable inputArg0 = new VoidPointable();
private IPointable inputArg1 = new VoidPointable();
private IPointable inputArg2 = new VoidPointable();
private IScalarEvaluator eval0;
private IScalarEvaluator eval1;
private IScalarEvaluator eval2;
private final ActiveManager activeManager;
private final EntityId entityId;
private final boolean push;
private AOrderedList pushList;
private ARecord pushRecord;
private final IAType recordType;
private final Map<String, HashSet<String>> sendData = new HashMap<>();
private String executionTimeString;
public NotifyBrokerRuntime(IHyracksTaskContext ctx, IScalarEvaluatorFactory brokerEvalFactory,
IScalarEvaluatorFactory pushListEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory,
EntityId activeJobId, boolean push, IAType recordType) throws HyracksDataException {
this.tRef = new FrameTupleReference();
eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
eval1 = pushListEvalFactory.createScalarEvaluator(ctx);
eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx);
this.activeManager = (ActiveManager) ((INcApplicationContext) ctx.getJobletContext().getServiceContext()
.getApplicationContext()).getActiveManager();
this.entityId = activeJobId;
this.push = push;
this.pushList = null;
this.pushRecord = null;
this.recordType = recordType;
recordSerDes = new ARecordSerializerDeserializer((ARecordType) recordType);
executionTimeString = null;
}
@Override
public void open() throws HyracksDataException {
return;
}
private void addSubscriptions(String endpoint, AOrderedList subscriptionIds) {
for (int i = 0; i < subscriptionIds.size(); i++) {
AUUID subId = (AUUID) subscriptionIds.getItem(i);
String subscriptionString = subId.toString();
//Broker code currently cannot handle the "uuid {}" part of the string, so we parse just the value
subscriptionString = subscriptionString.substring(8, subscriptionString.length() - 2);
subscriptionString = "\"" + subscriptionString + "\"";
sendData.get(endpoint).add(subscriptionString);
}
}
public String createData(String endpoint) {
String JSON = "{ \"dataverseName\":\"" + entityId.getDataverse() + "\", \"channelName\":\""
+ entityId.getEntityName() + "\", \"" + BADConstants.ChannelExecutionTime + "\":\""
+ executionTimeString + "\", \"subscriptionIds\":[";
for (String value : sendData.get(endpoint)) {
JSON += value;
JSON += ",";
}
JSON = JSON.substring(0, JSON.length() - 1);
JSON += "]}";
return JSON;
}
private void sendGroupOfResults(String endpoint) {
String urlParameters = createData(endpoint);
try {
//Create connection
URL url = new URL(endpoint);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
connection.setRequestProperty("Content-Length", Integer.toString(urlParameters.getBytes().length));
connection.setRequestProperty("Content-Language", "en-US");
connection.setUseCaches(false);
connection.setDoOutput(true);
connection.setConnectTimeout(500);
DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
wr.writeBytes(urlParameters);
if (LOGGER.isLoggable(Level.INFO)) {
int responseCode = connection.getResponseCode();
LOGGER.info("\nSending 'POST' request to URL : " + url);
LOGGER.info("Post parameters : " + urlParameters);
LOGGER.info("Response Code : " + responseCode);
}
wr.close();
connection.disconnect();
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Channel Failed to connect to Broker.");
}
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
tAccess.reset(buffer);
int nTuple = tAccess.getTupleCount();
for (int t = 0; t < nTuple; t++) {
tRef.reset(tAccess, t);
eval0.evaluate(tRef, inputArg0);
eval1.evaluate(tRef, inputArg1);
eval2.evaluate(tRef, inputArg2);
if (executionTimeString == null) {
int resultSetOffset = inputArg2.getStartOffset();
bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), resultSetOffset + 1);
ADateTime executionTime = ADateTimeSerializerDeserializer.INSTANCE.deserialize(di);
try {
executionTimeString = executionTime.toSimpleString();
} catch (IOException e) {
throw HyracksDataException.create(e);
}
}
int serBrokerOffset = inputArg0.getStartOffset();
bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serBrokerOffset + 1);
String endpoint = AStringSerializerDeserializer.INSTANCE.deserialize(di).getStringValue();
sendData.putIfAbsent(endpoint, new HashSet<>());
if (push) {
int pushOffset = inputArg1.getStartOffset();
bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), pushOffset + 1);
//TODO: Right now this creates an object per channel result. Need to find a better way to deserialize
pushRecord = recordSerDes.deserialize(di);
sendData.get(endpoint).add(pushRecord.toString());
} else {
int serSubOffset = inputArg1.getStartOffset();
bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serSubOffset + 1);
pushList = subSerDes.deserialize(di);
addSubscriptions(endpoint, pushList);
}
}
}
@Override
public void close() throws HyracksDataException {
for (String endpoint : sendData.keySet()) {
if (sendData.get(endpoint).size() > 0) {
sendGroupOfResults(endpoint);
sendData.get(endpoint).clear();
}
}
return;
}
@Override
public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
this.inputRecordDesc = recordDescriptor;
this.tAccess = new FrameTupleAccessor(inputRecordDesc);
}
@Override
public void flush() throws HyracksDataException {
return;
}
@Override
public void fail() throws HyracksDataException {
failed = true;
}
}