blob: 284aefb3f8282e25c8cae5e2440983b73b1d20e2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram.engine;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.datatorrent.api.Attribute.AttributeMap;
import com.datatorrent.api.Context;
import com.datatorrent.api.StatsListener.OperatorRequest;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.netlet.util.CircularBuffer;
import com.datatorrent.stram.api.BaseContext;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats;
/**
* The for context for all of the operators<p>
* <br>
*
* @since 0.3.2
*/
public class OperatorContext extends BaseContext implements Context.OperatorContext
{
private Thread thread;
private long lastProcessedWindowId;
private final int id;
private final String name;
// the size of the circular queue should be configurable. hardcoded to 1024 for now.
private final CircularBuffer<ContainerStats.OperatorStats> statsBuffer = new CircularBuffer<>(1024);
private final CircularBuffer<OperatorRequest> requests = new CircularBuffer<>(1024);
public final boolean stateless;
private int windowsFromCheckpoint;
/**
* The operator to which this context is passed, will timeout after the following milliseconds if no new tuple has been received by it.
*/
// we should make it configurable somehow.
private long idleTimeout = 1000L;
@SuppressWarnings("ReturnOfCollectionOrArrayField")
public BlockingQueue<OperatorRequest> getRequests()
{
return requests;
}
/**
* @return the idleTimeout
*/
public long getIdleTimeout()
{
return idleTimeout;
}
/**
* @param idleTimeout the idleTimeout to set
*/
public void setIdleTimeout(long idleTimeout)
{
this.idleTimeout = idleTimeout;
}
/**
*
* @param id the value of id
* @param name name of the operator
* @param attributes the value of attributes
* @param parentContext
*/
public OperatorContext(int id, @NotNull String name, AttributeMap attributes, Context parentContext)
{
super(attributes, parentContext);
this.lastProcessedWindowId = Stateless.WINDOW_ID;
this.id = id;
this.name = Preconditions.checkNotNull(name, "operator name");
this.stateless = super.getValue(OperatorContext.STATELESS);
}
@Override
public int getId()
{
return id;
}
@Override
public String getName()
{
return name;
}
@Override
public int getWindowsFromCheckpoint()
{
return windowsFromCheckpoint;
}
public void setWindowsFromCheckpoint(int windowsFromCheckpoint)
{
this.windowsFromCheckpoint = windowsFromCheckpoint;
}
/**
* Reset counts for next heartbeat interval and return current counts. This is called as part of the heartbeat processing.
*
* @param stats
* @return int
*/
public final synchronized int drainStats(Collection<? super ContainerStats.OperatorStats> stats)
{
//logger.debug("{} draining {}", counters);
return statsBuffer.drainTo(stats);
}
public final synchronized long getLastProcessedWindowId()
{
return lastProcessedWindowId;
}
public void report(ContainerStats.OperatorStats stats, long windowId)
{
lastProcessedWindowId = windowId;
stats.windowId = windowId;
stats.counters = this.counters;
this.counters = null;
if (!statsBuffer.offer(stats)) {
statsBuffer.poll();
statsBuffer.offer(stats);
}
}
public void request(OperatorRequest request)
{
//logger.debug("Received request {} for (node={})", request, id);
requests.add(request);
}
public Thread getThread()
{
return thread;
}
public void setThread(Thread thread)
{
this.thread = thread;
}
@SuppressWarnings("FieldNameHidesFieldInSuperclass")
private static final long serialVersionUID = 2013060671427L;
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(OperatorContext.class);
}