blob: 41e358e8317f4424975a47a845c1b66051602ef0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.log.LogFileInformation;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.ProtocolSignature;
import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode.Controller;
import com.datatorrent.api.Operator;
import com.datatorrent.bufferserver.server.Server;
import com.datatorrent.bufferserver.storage.DiskStorage;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest;
import com.datatorrent.stram.StreamingContainerManager.ContainerResource;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StreamingContainerContext;
import com.datatorrent.stram.engine.Node;
import com.datatorrent.stram.engine.OperatorContext;
import com.datatorrent.stram.engine.StreamingContainer;
import com.datatorrent.stram.engine.WindowGenerator;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
import com.datatorrent.stram.plan.physical.PTOperator;
/**
* Launcher for topologies in embedded mode within a single process.
* Child containers are mapped to threads.
*
* @since 0.3.2
*/
public class StramLocalCluster implements Runnable, Controller
{
private static final Logger LOG = LoggerFactory.getLogger(StramLocalCluster.class);
// assumes execution as unit test
private static final File DEFAULT_APP_DIR = new File("target", StramLocalCluster.class.getName());
private static final String LOCALHOST_PROPERTY_KEY = "org.apache.apex.stram.StramLocalCluster.hostname";
private static final String LOCALHOST = System.getProperty(LOCALHOST_PROPERTY_KEY, "localhost");
protected final StreamingContainerManager dnmgr;
private final UmbilicalProtocolLocalImpl umbilical;
private InetSocketAddress bufferServerAddress;
private boolean perContainerBufferServer;
private Server bufferServer = null;
private final Map<String, LocalStreamingContainer> childContainers = new ConcurrentHashMap<>();
private int containerSeq = 0;
private boolean appDone = false;
private final Map<String, StreamingContainer> injectShutdown = new ConcurrentHashMap<>();
private boolean heartbeatMonitoringEnabled = true;
private Callable<Boolean> exitCondition;
private Thread master;
public interface MockComponentFactory
{
WindowGenerator setupWindowGenerator();
}
private MockComponentFactory mockComponentFactory;
private class UmbilicalProtocolLocalImpl implements StreamingContainerUmbilicalProtocol
{
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException
{
throw new UnsupportedOperationException("not implemented in local mode");
}
@Override
public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException
{
throw new UnsupportedOperationException("not implemented in local mode");
}
@Override
public void reportError(String containerId, int[] operators, String msg, LogFileInformation logFileInfo) throws IOException
{
log(containerId, msg);
}
@Override
public void log(String containerId, String msg) throws IOException
{
LOG.info("{} msg: {}", containerId, msg);
}
@Override
public StreamingContainerContext getInitContext(String containerId)
throws IOException
{
StreamingContainerAgent sca = dnmgr.getContainerAgent(containerId);
StreamingContainerContext scc = sca.getInitContext();
scc.deployBufferServer = perContainerBufferServer;
return scc;
}
@Override
public ContainerHeartbeatResponse processHeartbeat(ContainerHeartbeat msg) throws IOException
{
if (injectShutdown.containsKey(msg.getContainerId())) {
ContainerHeartbeatResponse r = new ContainerHeartbeatResponse();
r.shutdown = ShutdownType.ABORT;
return r;
}
try {
ContainerHeartbeatResponse rsp = dnmgr.processHeartbeat(msg);
if (rsp != null) {
// clone to not share attributes (stream codec etc.) between threads.
rsp = SerializationUtils.clone(rsp);
}
return rsp;
} finally {
LocalStreamingContainer c = childContainers.get(msg.getContainerId());
synchronized (c.heartbeatCount) {
c.heartbeatCount.incrementAndGet();
c.heartbeatCount.notifyAll();
}
}
}
}
public static class LocalStreamingContainer extends StreamingContainer
{
/**
* Count heartbeat from container and allow other threads to wait for it.
*/
private final AtomicInteger heartbeatCount = new AtomicInteger();
private final WindowGenerator windowGenerator;
public LocalStreamingContainer(String containerId, StreamingContainerUmbilicalProtocol umbilical, WindowGenerator winGen)
{
super(containerId, umbilical);
this.windowGenerator = winGen;
}
public void run(StreamingContainerContext ctx) throws Exception
{
LOG.debug("container {} context {}", getContainerId(), ctx);
setup(ctx);
if (bufferServerAddress != null && !bufferServerAddress.getAddress().isLoopbackAddress()) {
bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, bufferServerAddress.getPort());
}
boolean hasError = true;
try {
// main thread enters heartbeat loop
heartbeatLoop();
hasError = false;
} finally {
// teardown
try {
teardown();
} catch (Exception e) {
if (!hasError) {
throw e;
}
}
}
}
public void waitForHeartbeat(int waitMillis) throws InterruptedException
{
synchronized (heartbeatCount) {
heartbeatCount.wait(waitMillis);
}
}
@Override
public void teardown()
{
super.teardown();
}
@Override
protected WindowGenerator setupWindowGenerator(long smallestWindowId)
{
if (windowGenerator != null) {
return windowGenerator;
}
return super.setupWindowGenerator(smallestWindowId);
}
OperatorContext getNodeContext(int id)
{
return nodes.get(id).context;
}
Operator getOperator(int id)
{
return nodes.get(id).getOperator();
}
Map<Integer, Node<?>> getNodes()
{
return Collections.unmodifiableMap(nodes);
}
}
/**
* Starts the child "container" as thread.
*/
private class LocalStreamingContainerLauncher implements Runnable
{
final String containerId;
final LocalStreamingContainer child;
@SuppressWarnings("CallToThreadStartDuringObjectConstruction")
private LocalStreamingContainerLauncher(ContainerStartRequest cdr, List<Thread> containerThreads)
{
this.containerId = "container-" + containerSeq++;
WindowGenerator wingen = null;
if (mockComponentFactory != null) {
wingen = mockComponentFactory.setupWindowGenerator();
}
this.child = new LocalStreamingContainer(containerId, umbilical, wingen);
ContainerResource cr = new ContainerResource(cdr.container.getResourceRequestPriority(), containerId, "localhost", cdr.container.getRequiredMemoryMB(), cdr.container.getRequiredVCores(), null);
StreamingContainerAgent sca = dnmgr.assignContainer(cr, perContainerBufferServer ? null : bufferServerAddress);
if (sca != null) {
childContainers.put(containerId, child);
Thread launchThread = new Thread(this, containerId);
containerThreads.add(launchThread);
launchThread.start();
}
}
@Override
public void run()
{
try {
StreamingContainerContext ctx = umbilical.getInitContext(containerId);
LOG.info("Started container {}", containerId);
child.run(ctx);
} catch (Exception | Error e) {
LOG.error("Fatal {} in container {}", e instanceof Error ? "error" : "exception", containerId, e);
} finally {
childContainers.remove(containerId);
LOG.info("Container {} terminating.", containerId);
}
}
}
public StramLocalCluster(LogicalPlan dag) throws IOException, ClassNotFoundException
{
dag.validate();
// ensure plan can be serialized
cloneLogicalPlan(dag);
final Path pathUri;
String appPath = dag.getAttributes().get(LogicalPlan.APPLICATION_PATH);
if (appPath == null) {
// convert to URI so we always write to local file system,
// even when the environment has a default HDFS location.
pathUri = new Path(DEFAULT_APP_DIR.toURI());
dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, pathUri.toString());
} else {
// should accept any valid path URI (or relative path) provided by user
Path tmp = new Path(appPath);
if (!tmp.isAbsolute()) {
pathUri = new Path(new File(appPath).toURI());
} else {
pathUri = tmp;
}
}
try {
FileContext.getLocalFSFileContext().delete(pathUri, true);
} catch (IllegalArgumentException e) {
throw e;
} catch (IOException e) {
throw new RuntimeException("could not cleanup test dir", e);
}
dag.getAttributes().put(LogicalPlan.APPLICATION_ID, "app_local_" + System.currentTimeMillis());
if (dag.getAttributes().get(OperatorContext.STORAGE_AGENT) == null) {
dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(new Path(pathUri, LogicalPlan.SUBDIR_CHECKPOINTS).toString(), null));
}
this.dnmgr = new StreamingContainerManager(dag);
this.umbilical = new UmbilicalProtocolLocalImpl();
}
public static LogicalPlan cloneLogicalPlan(LogicalPlan lp) throws IOException, ClassNotFoundException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
LogicalPlan.write(lp, bos);
LOG.debug("serialized size: {}", bos.toByteArray().length);
bos.flush();
ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
return LogicalPlan.read(bis);
}
LocalStreamingContainer getContainer(String id)
{
return this.childContainers.get(id);
}
public StreamingContainerManager getStreamingContainerManager()
{
return dnmgr;
}
public DAG getDAG()
{
return dnmgr.getPhysicalPlan().getLogicalPlan();
}
public StramLocalCluster(LogicalPlan dag, MockComponentFactory mcf) throws Exception
{
this(dag);
this.mockComponentFactory = mcf;
}
/**
* Simulate container failure for testing purposes.
*
* @param c
*/
void failContainer(StreamingContainer c)
{
injectShutdown.put(c.getContainerId(), c);
c.triggerHeartbeat();
LOG.info("Container {} failed, launching new container.", c.getContainerId());
dnmgr.scheduleContainerRestart(c.getContainerId());
// simplify testing: remove immediately rather than waiting for thread to exit
this.childContainers.remove(c.getContainerId());
}
public PTOperator findByLogicalNode(OperatorMeta logicalNode)
{
List<PTOperator> nodes = dnmgr.getPhysicalPlan().getOperators(logicalNode);
if (nodes.isEmpty()) {
return null;
}
return nodes.get(0);
}
List<PTOperator> getPlanOperators(OperatorMeta logicalNode)
{
return dnmgr.getPhysicalPlan().getOperators(logicalNode);
}
/**
* Return the container that has the given operator deployed.
* Returns null if the specified operator is not deployed.
*
* @param planOperator
* @return
*/
public LocalStreamingContainer getContainer(PTOperator planOperator)
{
LocalStreamingContainer container;
String cid = planOperator.getContainer().getExternalId();
if (cid != null) {
if ((container = getContainer(cid)) != null) {
if (container.getNodeContext(planOperator.getId()) != null) {
return container;
}
}
}
return null;
}
StreamingContainerAgent getContainerAgent(StreamingContainer c)
{
return this.dnmgr.getContainerAgent(c.getContainerId());
}
@Override
public void runAsync()
{
master = new Thread(this, "master");
master.start();
}
@Override
public void shutdown()
{
appDone = true;
awaitTermination(0);
}
private void awaitTermination(long millis)
{
if (master != null) {
try {
master.interrupt();
master.join(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (master.isAlive()) {
LOG.warn("{} {} did not terminate.", this.getClass().getSimpleName(), master.getName());
}
master = null;
}
}
}
public boolean isFinished()
{
return appDone;
}
@Override
public void setHeartbeatMonitoringEnabled(boolean enabled)
{
this.heartbeatMonitoringEnabled = enabled;
}
public void setPerContainerBufferServer(boolean perContainerBufferServer)
{
this.perContainerBufferServer = perContainerBufferServer;
}
public void setExitCondition(Callable<Boolean> exitCondition)
{
this.exitCondition = exitCondition;
}
@Override
public void run()
{
run(0);
}
@Override
@SuppressWarnings({"SleepWhileInLoop", "ResultOfObjectAllocationIgnored"})
public void run(long runMillis)
{
Thread eventLoopThread = null;
List<Thread> containerThreads = new LinkedList<>();
try {
if (!perContainerBufferServer) {
eventLoopThread = StreamingContainer.eventloop.start();
bufferServer = new Server(StreamingContainer.eventloop, 0, 1024 * 1024, 8);
try {
bufferServer.setSpoolStorage(new DiskStorage());
} catch (IOException e) {
throw new RuntimeException(e);
}
bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, bufferServer.run().getPort());
LOG.info("Buffer server started: {}", bufferServerAddress);
}
long endMillis = System.currentTimeMillis() + runMillis;
while (!appDone) {
for (String containerIdStr : dnmgr.containerStopRequests.values()) {
// teardown child thread
StreamingContainer c = childContainers.get(containerIdStr);
if (c != null) {
ContainerHeartbeatResponse r = new ContainerHeartbeatResponse();
r.shutdown = StreamingContainerUmbilicalProtocol.ShutdownType.ABORT;
c.processHeartbeatResponse(r);
}
dnmgr.containerStopRequests.remove(containerIdStr);
LOG.info("Container {} restart.", containerIdStr);
dnmgr.scheduleContainerRestart(containerIdStr);
//dnmgr.removeContainerAgent(containerIdStr);
}
// start containers
while (!dnmgr.containerStartRequests.isEmpty()) {
ContainerStartRequest cdr = dnmgr.containerStartRequests.poll();
if (cdr != null) {
new LocalStreamingContainerLauncher(cdr, containerThreads);
}
}
if (heartbeatMonitoringEnabled) {
// monitor child containers
dnmgr.monitorHeartbeat(false);
}
if (childContainers.isEmpty() && dnmgr.containerStartRequests.isEmpty()) {
appDone = true;
}
if (runMillis > 0 && System.currentTimeMillis() > endMillis) {
appDone = true;
}
try {
if (exitCondition != null && exitCondition.call()) {
appDone = true;
}
} catch (Exception ex) {
break;
}
if (Thread.interrupted()) {
break;
}
if (!appDone) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.debug("Sleep interrupted", e);
break;
}
}
}
} finally {
for (LocalStreamingContainer lsc : childContainers.values()) {
injectShutdown.put(lsc.getContainerId(), lsc);
lsc.triggerHeartbeat();
}
for (Thread thread : containerThreads) {
try {
thread.join(1000);
} catch (InterruptedException e) {
LOG.debug("Wait for {} to terminate interrupted", thread, e);
}
if (thread.isAlive()) {
LOG.warn("Container thread {} is still alive", thread.getName());
}
}
try {
dnmgr.teardown();
} catch (RuntimeException e) {
LOG.warn("Exception during StreamingContainerManager teardown", e);
}
if (bufferServerAddress != null) {
try {
bufferServer.stop();
} catch (RuntimeException e) {
LOG.warn("Exception during BufferServer stop", e);
}
}
if (eventLoopThread != null) {
try {
StreamingContainer.eventloop.stop();
eventLoopThread.join(1000);
} catch (InterruptedException ie) {
LOG.debug("Wait for {} to terminate interrupted", eventLoopThread.getName(), ie);
} catch (RuntimeException e) {
LOG.warn("Exception during {} stop", StreamingContainer.eventloop, e);
}
if (StreamingContainer.eventloop.isActive()) {
LOG.warn("Event loop {} is still active", StreamingContainer.eventloop);
}
}
}
LOG.info("Application finished.");
}
}