blob: 8a6d601062ee69a57d0ccae3723b9fefda4508bf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.scheduler.client;
import java.net.HttpURLConnection;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.heron.proto.scheduler.Scheduler;
import org.apache.heron.proto.system.Common;
import org.apache.heron.scheduler.Command;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.utils.NetworkUtils;
/**
* This class manages topology by sending request
* to appropriate HTTP endpoint of the topology scheduler service.
*/
public class HttpServiceSchedulerClient implements ISchedulerClient {
private static final Logger LOG = Logger.getLogger(HttpServiceSchedulerClient.class.getName());
private final Config config;
private final Config runtime;
private final String schedulerHttpEndpoint;
public HttpServiceSchedulerClient(Config config, Config runtime,
String schedulerHttpEndpoint) {
this.config = config;
this.runtime = runtime;
this.schedulerHttpEndpoint = schedulerHttpEndpoint;
}
@Override
public boolean restartTopology(Scheduler.RestartTopologyRequest restartTopologyRequest) {
return requestSchedulerService(Command.RESTART, restartTopologyRequest.toByteArray());
}
@Override
public boolean killTopology(Scheduler.KillTopologyRequest killTopologyRequest) {
return requestSchedulerService(Command.KILL, killTopologyRequest.toByteArray());
}
@Override
public boolean updateTopology(Scheduler.UpdateTopologyRequest updateTopologyRequest) {
return requestSchedulerService(Command.UPDATE, updateTopologyRequest.toByteArray());
}
/**
* Send payload to target HTTP connection to request a service
*
* @param data the byte[] to send
* @return true if got OK response successfully
*/
protected boolean requestSchedulerService(Command command, byte[] data) {
String endpoint = getCommandEndpoint(schedulerHttpEndpoint, command);
final HttpURLConnection connection = NetworkUtils.getHttpConnection(endpoint);
if (connection == null) {
LOG.severe("Scheduler not found.");
return false;
}
// now, we have a valid connection
try {
// send the actual http request
if (!NetworkUtils.sendHttpPostRequest(connection, NetworkUtils.URL_ENCODE_TYPE, data)) {
LOG.log(Level.SEVERE, "Failed to send http request to scheduler");
return false;
}
// receive the response for manage topology
Common.StatusCode statusCode;
LOG.fine("Receiving response from scheduler...");
try {
statusCode = Scheduler.SchedulerResponse.newBuilder()
.mergeFrom(NetworkUtils.readHttpResponse(connection))
.build().getStatus().getStatus();
} catch (InvalidProtocolBufferException e) {
LOG.log(Level.SEVERE, "Failed to parse response", e);
return false;
}
if (!statusCode.equals(Common.StatusCode.OK)) {
LOG.severe("Received not OK response from scheduler");
return false;
}
} finally {
connection.disconnect();
}
return true;
}
/**
* Construct the endpoint to send http request for a particular command
* Make sure the construction matches server sides.
*
* @param schedulerEndpoint The scheduler http endpoint
* @param command The command to request
* @return The http endpoint for particular command
*/
protected String getCommandEndpoint(String schedulerEndpoint, Command command) {
// Currently the server side receives command request in lower case
return String.format("http://%s/%s", schedulerEndpoint, command.name().toLowerCase());
}
}