blob: c53bcc458401ad3e53cf852bd8af1458dd1d3a90 [file] [log] [blame]
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.elasticjob.cloud.scheduler.mesos;
import io.elasticjob.cloud.scheduler.ha.FrameworkIDService;
import io.elasticjob.cloud.reg.base.CoordinatorRegistryCenter;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.sun.jersey.api.client.Client;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import java.util.Collection;
import java.util.List;
/**
* Mesos状态服务.
*
* @author gaohongtao
*/
@Slf4j
public class MesosStateService {
private static String stateUrl;
private final FrameworkIDService frameworkIDService;
public MesosStateService(final CoordinatorRegistryCenter regCenter) {
frameworkIDService = new FrameworkIDService(regCenter);
}
/**
* 注册Mesos的Master信息.
*
* @param hostName Master的主机名
* @param port Master端口
*/
public static synchronized void register(final String hostName, final int port) {
stateUrl = String.format("http://%s:%d/state", hostName, port);
}
/**
* 注销Mesos的Master信息.
*/
public static synchronized void deregister() {
stateUrl = null;
}
/**
* 获取沙箱信息.
*
* @param appName 作业云配置App的名字
* @return 沙箱信息
* @throws JSONException 解析JSON格式异常
*/
public JsonArray sandbox(final String appName) throws JSONException {
JSONObject state = fetch(stateUrl);
JsonArray result = new JsonArray();
for (JSONObject each : findExecutors(state.getJSONArray("frameworks"), appName)) {
JSONArray slaves = state.getJSONArray("slaves");
String slaveHost = null;
for (int i = 0; i < slaves.length(); i++) {
JSONObject slave = slaves.getJSONObject(i);
if (each.getString("slave_id").equals(slave.getString("id"))) {
slaveHost = slave.getString("pid").split("@")[1];
}
}
Preconditions.checkNotNull(slaveHost);
JSONObject slaveState = fetch(String.format("http://%s/state", slaveHost));
String workDir = slaveState.getJSONObject("flags").getString("work_dir");
Collection<JSONObject> executorsOnSlave = findExecutors(slaveState.getJSONArray("frameworks"), appName);
for (JSONObject executorOnSlave : executorsOnSlave) {
JsonObject r = new JsonObject();
r.addProperty("hostname", slaveState.getString("hostname"));
r.addProperty("path", executorOnSlave.getString("directory").replace(workDir, ""));
result.add(r);
}
}
return result;
}
/**
* 查找执行器信息.
*
* @param appName 作业云配置App的名字
* @return 执行器信息
* @throws JSONException 解析JSON格式异常
*/
public Collection<ExecutorStateInfo> executors(final String appName) throws JSONException {
return Collections2.transform(findExecutors(fetch(stateUrl).getJSONArray("frameworks"), appName), new Function<JSONObject, ExecutorStateInfo>() {
@Override
public ExecutorStateInfo apply(final JSONObject input) {
try {
return ExecutorStateInfo.builder().id(getExecutorId(input)).slaveId(input.getString("slave_id")).build();
} catch (final JSONException ex) {
throw new RuntimeException(ex);
}
}
});
}
/**
* 获取所有执行器.
*
* @return 执行器信息
* @throws JSONException 解析JSON格式异常
*/
public Collection<ExecutorStateInfo> executors() throws JSONException {
return executors(null);
}
private JSONObject fetch(final String url) {
Preconditions.checkState(!Strings.isNullOrEmpty(url));
return Client.create().resource(url).get(JSONObject.class);
}
private Collection<JSONObject> findExecutors(final JSONArray frameworks, final String appName) throws JSONException {
List<JSONObject> result = Lists.newArrayList();
Optional<String> frameworkIDOptional = frameworkIDService.fetch();
String frameworkID;
if (frameworkIDOptional.isPresent()) {
frameworkID = frameworkIDOptional.get();
} else {
return result;
}
for (int i = 0; i < frameworks.length(); i++) {
JSONObject framework = frameworks.getJSONObject(i);
if (!framework.getString("id").equals(frameworkID)) {
continue;
}
JSONArray executors = framework.getJSONArray("executors");
for (int j = 0; j < executors.length(); j++) {
JSONObject executor = executors.getJSONObject(j);
if (null == appName || appName.equals(getExecutorId(executor).split("@-@")[0])) {
result.add(executor);
}
}
}
return result;
}
private String getExecutorId(final JSONObject executor) throws JSONException {
return executor.has("id") ? executor.getString("id") : executor.getString("executor_id");
}
@Builder
@Getter
public static final class ExecutorStateInfo {
private final String id;
private final String slaveId;
}
}