blob: 227d4d5ce55e8de2bbbf4012e231cbd02c91f834 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.compute.ComputeJobContext;
import org.apache.ignite.internal.processors.job.GridJobWorker;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
/**
* Remote job context implementation.
*/
public class GridJobContextImpl implements ComputeJobContext, Externalizable {
/** */
private static final long serialVersionUID = 0L;
/** */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
/** Logger. */
private static IgniteLogger log;
/** Kernal context ({@code null} for job result context). */
private GridKernalContext ctx;
/** */
private IgniteUuid jobId;
/** Job worker. */
private GridJobWorker job;
/** Current timeout object. */
private volatile GridTimeoutObject timeoutObj;
/** Attributes mux. Do not use this as object is exposed to user. */
private final Object mux = new Object();
/** */
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
@GridToStringInclude
private Map<Object, Object> attrs;
/**
* Empty constructor required by {@link Externalizable}.
*/
public GridJobContextImpl() {
// No-op.
}
/**
* @param ctx Kernal context.
* @param jobId Job ID.
*/
public GridJobContextImpl(@Nullable GridKernalContext ctx, IgniteUuid jobId) {
assert jobId != null;
this.ctx = ctx;
this.jobId = jobId;
attrs = U.newHashMap(1);
}
/**
* @param ctx Kernal context.
* @param jobId Job ID.
* @param attrs Job attributes.
*/
public GridJobContextImpl(GridKernalContext ctx, IgniteUuid jobId,
Map<? extends Serializable, ? extends Serializable> attrs) {
this(ctx, jobId);
synchronized (mux) {
this.attrs.putAll(attrs);
}
}
/**
* @param job Job worker.
*/
public void job(GridJobWorker job) {
assert job != null;
this.job = job;
}
/** {@inheritDoc} */
@Override public IgniteUuid getJobId() {
return jobId;
}
/** {@inheritDoc} */
@Override public void setAttribute(Object key, @Nullable Object val) {
A.notNull(key, "key");
synchronized (mux) {
attrs.put(key, val);
}
}
/** {@inheritDoc} */
@Override public void setAttributes(Map<?, ?> attrs) {
A.notNull(attrs, "attrs");
synchronized (mux) {
this.attrs.putAll(attrs);
}
}
/** {@inheritDoc} */
@Override public <K, V> V getAttribute(K key) {
A.notNull(key, "key");
synchronized (mux) {
return (V)attrs.get(key);
}
}
/** {@inheritDoc} */
@Override public Map<Object, Object> getAttributes() {
synchronized (mux) {
return attrs.isEmpty() ? Collections.emptyMap() : U.sealMap(attrs);
}
}
/** {@inheritDoc} */
@Override public boolean heldcc() {
if (ctx == null)
return false;
if (job == null)
job = ctx.job().activeJob(jobId);
return job != null && job.held();
}
/** {@inheritDoc} */
@SuppressWarnings( {"RedundantTypeArguments"})
@Override public <T> T holdcc() {
return this.<T>holdcc(0);
}
/** {@inheritDoc} */
@Override public <T> T holdcc(long timeout) {
if (ctx != null) {
if (job == null)
job = ctx.job().activeJob(jobId);
if (job != null) {
if (!job.hold())
throw new IllegalStateException("Job has already been hold [ctx=" + this + ']');
assert timeoutObj == null;
if (timeout <= 0)
return null;
final long endTime = U.currentTimeMillis() + timeout;
// Overflow.
if (endTime > 0) {
timeoutObj = new GridTimeoutObject() {
private final IgniteUuid id = IgniteUuid.randomUuid();
@Override public IgniteUuid timeoutId() {
return id;
}
@Override public long endTime() {
return endTime;
}
@Override public void onTimeout() {
try {
synchronized (mux) {
GridTimeoutObject timeoutObj0 = timeoutObj;
if (timeoutObj0 == null || timeoutObj0.timeoutId() != id)
// The timer was canceled by explicit callcc() call.
return;
timeoutObj = null;
}
ExecutorService execSvc = job.isInternal() ?
ctx.pools().getManagementExecutorService() : ctx.pools().getExecutorService();
assert execSvc != null;
execSvc.execute(new Runnable() {
@Override public void run() {
callcc0();
}
});
}
catch (RejectedExecutionException e) {
U.error(log(), "Failed to execute job (will execute synchronously).", e);
callcc0();
}
}
};
ctx.timeout().addTimeoutObject(timeoutObj);
}
}
}
return null;
}
/** {@inheritDoc} */
@Override public void callcc() {
synchronized (mux) {
GridTimeoutObject timeoutObj0 = timeoutObj;
if (timeoutObj0 != null) {
if (ctx != null)
ctx.timeout().removeTimeoutObject(timeoutObj0);
timeoutObj = null;
}
}
callcc0();
}
/**
* Unholds job.
*/
private void callcc0() {
if (ctx != null) {
if (job == null)
job = ctx.job().activeJob(jobId);
if (job != null) {
// Execute in the same thread.
job.execute();
}
}
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(ctx);
U.writeIgniteUuid(out, jobId);
U.writeMap(out, getAttributes());
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
ctx = (GridKernalContext)in.readObject();
jobId = U.readIgniteUuid(in);
attrs = U.readMap(in);
}
/**
* @return Logger.
*/
private IgniteLogger log() {
assert ctx != null;
if (log == null)
log = U.logger(ctx, logRef, GridJobContextImpl.class);
return log;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridJobContextImpl.class, this);
}
}