blob: a711a9eb4472ce985c0ed22c29a000f1e1fefe4c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.bad;
import java.io.StringReader;
import java.time.Instant;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.api.http.server.ResultUtil;
import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.lang.BADParserFactory;
import org.apache.asterix.bad.lang.BADStatementExecutor;
import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.statement.Query;
import org.apache.asterix.lang.common.statement.SetStatement;
import org.apache.asterix.lang.sqlpp.visitor.SqlppDeleteRewriteVisitor;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.result.IResultSet;
import org.apache.hyracks.api.result.ResultSetId;
/**
* Provides functionality for channel jobs
*/
public class BADJobService {
private static final Logger LOGGER = Logger.getLogger(BADJobService.class.getName());
//pool size one (only running one thread at a time)
private static final int POOL_SIZE = 1;
private static final long millisecondTimeout = BADConstants.EXECUTOR_TIMEOUT * 1000;
public static void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec,
IHyracksClientConnection hcc, DeployedJobSpecEventListener listener, ITxnIdFactory txnIdFactory,
String duration) throws Exception {
if (channeljobSpec != null) {
channeljobSpec.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
DeployedJobSpecId deployedId = hcc.deployJobSpec(channeljobSpec);
ScheduledExecutorService ses = startRepetitiveDeployedJobSpec(deployedId, hcc, findPeriod(duration),
new HashMap<>(), entityId, txnIdFactory, listener);
listener.setDeployedJobSpecId(deployedId);
listener.setExecutorService(ses);
}
}
//Starts running a deployed job specification periodically with an interval of "period" seconds
public static ScheduledExecutorService startRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId,
IHyracksClientConnection hcc, long period, Map<byte[], byte[]> jobParameters, EntityId entityId,
ITxnIdFactory txnIdFactory, DeployedJobSpecEventListener listener) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(POOL_SIZE);
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (!runDeployedJobSpecCheckPeriod(distributedId, hcc, jobParameters, period, entityId,
txnIdFactory, listener)) {
scheduledExecutorService.shutdown();
}
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Job Failed to run for " + entityId.getExtensionName() + " "
+ entityId.getDataverse() + "." + entityId.getEntityName() + ".", e);
}
}
}, period, period, TimeUnit.MILLISECONDS);
return scheduledExecutorService;
}
public static boolean runDeployedJobSpecCheckPeriod(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
Map<byte[], byte[]> jobParameters, long period, EntityId entityId, ITxnIdFactory txnIdFactory,
DeployedJobSpecEventListener listener) throws Exception {
long executionMilliseconds = runDeployedJobSpec(distributedId, hcc, null, jobParameters, entityId, txnIdFactory,
null, listener, null);
if (executionMilliseconds > period) {
LOGGER.log(Level.SEVERE,
"Periodic job for " + entityId.getExtensionName() + " " + entityId.getDataverse() + "."
+ entityId.getEntityName() + " was unable to meet the required period of " + period
+ " milliseconds. Actually took " + executionMilliseconds + " execution will shutdown"
+ new Date());
return false;
}
return true;
}
public static long runDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
IResultSet resultSet, Map<byte[], byte[]> jobParameters, EntityId entityId, ITxnIdFactory txnIdFactory,
ICcApplicationContext appCtx, DeployedJobSpecEventListener listener, QueryTranslator statementExecutor)
throws Exception {
listener.waitWhileAtState(ActivityState.SUSPENDED);
//Add the Asterix Transaction Id to the map
jobParameters.put(BADConstants.TRANSACTION_ID_PARAMETER_NAME,
String.valueOf(txnIdFactory.create().getId()).getBytes());
long startTime = Instant.now().toEpochMilli();
JobId jobId = hcc.startJob(distributedId, jobParameters);
hcc.waitForCompletion(jobId);
long executionMilliseconds = Instant.now().toEpochMilli() - startTime;
if (listener.getType() == DeployedJobSpecEventListener.PrecompiledType.QUERY) {
ResultReader resultReader = new ResultReader(resultSet, jobId, new ResultSetId(0));
ResultUtil.printResults(appCtx, resultReader, statementExecutor.getSessionOutput(),
new IStatementExecutor.Stats(), null);
}
LOGGER.log(Level.SEVERE,
"Deployed Job execution completed for " + entityId.getExtensionName() + " " + entityId.getDataverse()
+ "." + entityId.getEntityName() + ". Took " + executionMilliseconds + " milliseconds ");
return executionMilliseconds;
}
public static long findPeriod(String duration) {
//TODO: Allow Repetitive Channels to use YMD durations
String hoursMinutesSeconds = "";
if (duration.indexOf('T') != -1) {
hoursMinutesSeconds = duration.substring(duration.indexOf('T') + 1);
}
double seconds = 0;
if (hoursMinutesSeconds != "") {
int pos = 0;
if (hoursMinutesSeconds.indexOf('H') != -1) {
Double hours = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('H')));
seconds += (hours * 60 * 60);
pos = hoursMinutesSeconds.indexOf('H') + 1;
}
if (hoursMinutesSeconds.indexOf('M') != -1) {
Double minutes =
Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('M')));
seconds += (minutes * 60);
pos = hoursMinutesSeconds.indexOf('M') + 1;
}
if (hoursMinutesSeconds.indexOf('S') != -1) {
Double s = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('S')));
seconds += (s);
}
}
return (long) (seconds * 1000);
}
public static JobSpecification compilePushChannel(IStatementExecutor statementExecutor,
MetadataProvider metadataProvider, IHyracksClientConnection hcc, Query q) throws Exception {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
JobSpecification jobSpec = null;
try {
jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null, null,
null);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
} catch (Exception e) {
LOGGER.log(Level.INFO, e.getMessage(), e);
if (bActiveTxn) {
((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
}
throw e;
} finally {
metadataProvider.getLocks().unlock();
}
return jobSpec;
}
public static void redeployJobSpec(EntityId entityId, String queryBodyString, MetadataProvider metadataProvider,
BADStatementExecutor badStatementExecutor, IHyracksClientConnection hcc,
IRequestParameters requestParameters, boolean useNewId) throws Exception {
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
if (listener == null) {
LOGGER.severe("Tried to redeploy the job for " + entityId + " but no listener exists.");
return;
}
BADParserFactory factory = new BADParserFactory();
List<Statement> fStatements = factory.createParser(new StringReader(queryBodyString)).parse();
JobSpecification jobSpec = null;
if (listener.getType().equals(DeployedJobSpecEventListener.PrecompiledType.PUSH_CHANNEL)
|| listener.getType().equals(DeployedJobSpecEventListener.PrecompiledType.CHANNEL)) {
//Channels
SetStatement ss = (SetStatement) fStatements.get(0);
metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());
if (listener.getType().equals(DeployedJobSpecEventListener.PrecompiledType.PUSH_CHANNEL)) {
jobSpec = compilePushChannel(badStatementExecutor, metadataProvider, hcc, (Query) fStatements.get(1));
} else {
jobSpec = badStatementExecutor.handleInsertUpsertStatement(metadataProvider, fStatements.get(1), hcc,
null, null, null, null, true, null, null, null);
}
} else {
//Procedures
metadataProvider.setResultSetId(new ResultSetId(0));
IStatementExecutor.ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery();
IResultSet hdc = requestParameters.getResultSet();
IStatementExecutor.Stats stats = requestParameters.getStats();
boolean resultsAsync = resultDelivery == IStatementExecutor.ResultDelivery.ASYNC
|| resultDelivery == IStatementExecutor.ResultDelivery.DEFERRED;
metadataProvider.setResultAsyncMode(resultsAsync);
metadataProvider.setMaxResultReads(1);
jobSpec = compileProcedureJob(badStatementExecutor, metadataProvider, hcc, hdc, stats, fStatements.get(1));
}
if (useNewId) {
DeployedJobSpecId id = hcc.deployJobSpec(jobSpec);
listener.setDeployedJobSpecId(id);
} else {
hcc.redeployJobSpec(listener.getDeployedJobSpecId(), jobSpec);
}
listener.resume();
}
public static JobSpecification compileQueryJob(IStatementExecutor statementExecutor,
MetadataProvider metadataProvider, IHyracksClientConnection hcc, Query q) throws Exception {
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
JobSpecification jobSpec;
try {
jobSpec = statementExecutor.rewriteCompileQuery(hcc, metadataProvider, q, null, null, null);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
throw e;
}
return jobSpec;
}
private static JobSpecification compileProcedureJob(IStatementExecutor statementExecutor,
MetadataProvider metadataProvider, IHyracksClientConnection hcc, IResultSet resultSet,
IStatementExecutor.Stats stats, Statement procedureStatement) throws Exception {
if (procedureStatement.getKind() == Statement.Kind.INSERT) {
return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
procedureStatement, hcc, resultSet, IStatementExecutor.ResultDelivery.ASYNC, null, stats, true,
null, null, null);
} else if (procedureStatement.getKind() == Statement.Kind.QUERY) {
return compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) procedureStatement);
} else {
SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
procedureStatement.accept(visitor, null);
return ((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, procedureStatement,
hcc, true, null, null);
}
}
@Override
public String toString() {
return "BADJobService";
}
}