blob: 479c3a96382bf47b1cfdb4f907717c0fff3f3dae [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.livy.rsc.driver;
import java.util.List;
import org.apache.livy.Job;
import org.apache.livy.JobHandle;
import org.apache.livy.rsc.BypassJobStatus;
import org.apache.livy.rsc.Utils;
public class BypassJobWrapper extends JobWrapper<byte[]> {
private volatile byte[] result;
private volatile Throwable error;
private volatile JobHandle.State state;
private volatile List<Integer> newSparkJobs;
public BypassJobWrapper(RSCDriver driver, String jobId, Job<byte[]> serializedJob) {
super(driver, jobId, serializedJob);
state = JobHandle.State.QUEUED;
}
@Override
public Void call() throws Exception {
// we ignore the return value here, because super.call() will detect cancellation
// and still needs to perform some cleanup
tryTransitionToState(JobHandle.State.STARTED);
return super.call();
}
@Override
protected synchronized void finished(byte[] result, Throwable error) {
if (error == null && tryTransitionToState(JobHandle.State.SUCCEEDED)) {
this.result = result;
} else if (error != null && tryTransitionToState(JobHandle.State.FAILED)) {
this.error = error;
}
}
@Override
synchronized boolean cancel() {
return tryTransitionToState(JobHandle.State.CANCELLED) && super.cancel();
}
private synchronized boolean tryTransitionToState(JobHandle.State newState) {
boolean success = false;
switch (this.state) {
case QUEUED:
if (newState == JobHandle.State.STARTED || newState == JobHandle.State.CANCELLED) {
this.state = newState;
success = true;
}
break;
case STARTED:
if (newState == JobHandle.State.CANCELLED || newState == JobHandle.State.SUCCEEDED ||
newState == JobHandle.State.FAILED) {
this.state = newState;
success = true;
}
break;
default:
break;
}
return success;
}
@Override
protected void jobStarted() {
// Do nothing; just avoid sending data back to the driver.
}
synchronized BypassJobStatus getStatus() {
String stackTrace = error != null ? Utils.stackTraceAsString(error) : null;
return new BypassJobStatus(state, result, stackTrace);
}
}