| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.brooklyn.entity.nosql.couchbase; |
| |
| import static java.lang.String.format; |
| |
| import java.net.URI; |
| import java.util.Collection; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.brooklyn.api.entity.Entity; |
| import org.apache.brooklyn.api.location.MachineProvisioningLocation; |
| import org.apache.brooklyn.api.sensor.AttributeSensor; |
| import org.apache.brooklyn.api.sensor.SensorEvent; |
| import org.apache.brooklyn.api.sensor.SensorEventListener; |
| import org.apache.brooklyn.core.effector.EffectorBody; |
| import org.apache.brooklyn.core.entity.Attributes; |
| import org.apache.brooklyn.core.location.access.BrooklynAccessUtils; |
| import org.apache.brooklyn.core.location.cloud.CloudLocationConfig; |
| import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl; |
| import org.apache.brooklyn.feed.http.HttpFeed; |
| import org.apache.brooklyn.feed.http.HttpPollConfig; |
| import org.apache.brooklyn.feed.http.HttpValueFunctions; |
| import org.apache.brooklyn.feed.http.JsonFunctions; |
| import org.apache.http.auth.UsernamePasswordCredentials; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.brooklyn.util.collections.MutableMap; |
| import org.apache.brooklyn.util.collections.MutableSet; |
| import org.apache.brooklyn.util.core.config.ConfigBag; |
| import org.apache.brooklyn.util.http.HttpTool; |
| import org.apache.brooklyn.util.http.HttpToolResponse; |
| import org.apache.brooklyn.util.core.task.Tasks; |
| import org.apache.brooklyn.util.exceptions.Exceptions; |
| import org.apache.brooklyn.util.guava.Functionals; |
| import org.apache.brooklyn.util.guava.MaybeFunctions; |
| import org.apache.brooklyn.util.guava.TypeTokens; |
| import org.apache.brooklyn.util.net.Urls; |
| import org.apache.brooklyn.util.text.Strings; |
| import org.apache.brooklyn.util.time.Duration; |
| |
| import com.google.common.base.Charsets; |
| import com.google.common.base.Function; |
| import com.google.common.base.Functions; |
| import com.google.common.base.Preconditions; |
| import com.google.common.net.HostAndPort; |
| import com.google.common.net.HttpHeaders; |
| import com.google.common.net.MediaType; |
| import com.google.gson.JsonArray; |
| import com.google.gson.JsonElement; |
| |
| public class CouchbaseNodeImpl extends SoftwareProcessImpl implements CouchbaseNode { |
| |
| private static final Logger log = LoggerFactory.getLogger(CouchbaseNodeImpl.class); |
| |
| private volatile HttpFeed httpFeed; |
| |
| @Override |
| public Class<CouchbaseNodeDriver> getDriverInterface() { |
| return CouchbaseNodeDriver.class; |
| } |
| |
| @Override |
| public CouchbaseNodeDriver getDriver() { |
| return (CouchbaseNodeDriver) super.getDriver(); |
| } |
| |
| @Override |
| public void init() { |
| super.init(); |
| |
| subscriptions().subscribe(this, Attributes.SERVICE_UP, new SensorEventListener<Boolean>() { |
| @Override |
| public void onEvent(SensorEvent<Boolean> booleanSensorEvent) { |
| if (Boolean.TRUE.equals(booleanSensorEvent.getValue())) { |
| Integer webPort = getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT); |
| Preconditions.checkNotNull(webPort, CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT+" not set for %s; is an acceptable port available?", this); |
| String hostAndPort = BrooklynAccessUtils.getBrooklynAccessibleAddress(CouchbaseNodeImpl.this, webPort).toString(); |
| sensors().set(CouchbaseNode.COUCHBASE_WEB_ADMIN_URL, URI.create(format("http://%s", hostAndPort))); |
| } |
| } |
| }); |
| |
| getMutableEntityType().addEffector(ADD_REPLICATION_RULE, new EffectorBody<Void>() { |
| @Override |
| public Void call(ConfigBag parameters) { |
| addReplicationRule(parameters); |
| return null; |
| } |
| }); |
| } |
| |
| @Override |
| protected Map<String, Object> obtainProvisioningFlags(@SuppressWarnings("rawtypes") MachineProvisioningLocation location) { |
| ConfigBag result = ConfigBag.newInstance(super.obtainProvisioningFlags(location)); |
| result.configure(CloudLocationConfig.OS_64_BIT, true); |
| return result.getAllConfig(); |
| } |
| |
| @Override |
| protected Collection<Integer> getRequiredOpenPorts() { |
| // TODO this creates a huge list of inbound ports; much better to define on a security group using range syntax! |
| int erlangRangeStart = getConfig(NODE_DATA_EXCHANGE_PORT_RANGE_START).iterator().next(); |
| int erlangRangeEnd = getConfig(NODE_DATA_EXCHANGE_PORT_RANGE_END).iterator().next(); |
| |
| Set<Integer> newPorts = MutableSet.<Integer>copyOf(super.getRequiredOpenPorts()); |
| newPorts.remove(erlangRangeStart); |
| newPorts.remove(erlangRangeEnd); |
| for (int i = erlangRangeStart; i <= erlangRangeEnd; i++) |
| newPorts.add(i); |
| return newPorts; |
| } |
| |
| @Override |
| public void serverAdd(String serverToAdd, String username, String password) { |
| getDriver().serverAdd(serverToAdd, username, password); |
| } |
| |
| @Override |
| public void serverAddAndRebalance(String serverToAdd, String username, String password) { |
| getDriver().serverAddAndRebalance(serverToAdd, username, password); |
| } |
| |
| @Override |
| public void rebalance() { |
| getDriver().rebalance(); |
| } |
| |
| protected final static Function<HttpToolResponse, JsonElement> GET_THIS_NODE_STATS = Functionals.chain( |
| HttpValueFunctions.jsonContents(), |
| JsonFunctions.walk("nodes"), |
| new Function<JsonElement, JsonElement>() { |
| @Override public JsonElement apply(JsonElement input) { |
| JsonArray nodes = input.getAsJsonArray(); |
| for (JsonElement element : nodes) { |
| JsonElement thisNode = element.getAsJsonObject().get("thisNode"); |
| if (thisNode!=null && Boolean.TRUE.equals(thisNode.getAsBoolean())) { |
| return element.getAsJsonObject().get("interestingStats"); |
| } |
| } |
| return null; |
| }} |
| ); |
| |
| protected final static <T> HttpPollConfig<T> getSensorFromNodeStat(AttributeSensor<T> sensor, String ...jsonPath) { |
| return new HttpPollConfig<T>(sensor) |
| .onSuccess(Functionals.chain(GET_THIS_NODE_STATS, |
| MaybeFunctions.<JsonElement>wrap(), |
| JsonFunctions.walkM(jsonPath), |
| JsonFunctions.castM(TypeTokens.getRawRawType(sensor.getTypeToken()), null))) |
| .onFailureOrException(Functions.<T>constant(null)); |
| } |
| |
| @Override |
| protected void postStart() { |
| super.postStart(); |
| renameServerToPublicHostname(); |
| } |
| |
| protected void renameServerToPublicHostname() { |
| // http://docs.couchbase.com/couchbase-manual-2.5/cb-install/#couchbase-getting-started-hostnames |
| URI apiUri = null; |
| try { |
| HostAndPort accessible = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, getAttribute(COUCHBASE_WEB_ADMIN_PORT)); |
| apiUri = URI.create(String.format("http://%s:%d/node/controller/rename", accessible.getHostText(), accessible.getPort())); |
| UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(getConfig(COUCHBASE_ADMIN_USERNAME), getConfig(COUCHBASE_ADMIN_PASSWORD)); |
| HttpToolResponse response = HttpTool.httpPost( |
| // the uri is required by the HttpClientBuilder in order to set the AuthScope of the credentials |
| HttpTool.httpClientBuilder().uri(apiUri).credentials(credentials).build(), |
| apiUri, |
| MutableMap.of( |
| HttpHeaders.CONTENT_TYPE, MediaType.FORM_DATA.toString(), |
| HttpHeaders.ACCEPT, "*/*", |
| // this appears needed; without it we get org.apache.http.NoHttpResponseException !? |
| HttpHeaders.AUTHORIZATION, HttpTool.toBasicAuthorizationValue(credentials)), |
| Charsets.UTF_8.encode("hostname="+Urls.encode(accessible.getHostText())).array()); |
| log.debug("Renamed Couchbase server "+this+" via "+apiUri+": "+response); |
| if (!HttpTool.isStatusCodeHealthy(response.getResponseCode())) { |
| log.warn("Invalid response code, renaming {} ({}): {}", |
| new Object[]{apiUri, response.getResponseCode(), response.getContentAsString()}); |
| } |
| } catch (Exception e) { |
| Exceptions.propagateIfFatal(e); |
| log.warn("Error renaming server, using "+apiUri+": "+e, e); |
| } |
| } |
| |
| @Override |
| public void connectSensors() { |
| super.connectSensors(); |
| connectServiceUpIsRunning(); |
| |
| HostAndPort hostAndPort = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, this.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT)); |
| httpFeed = HttpFeed.builder() |
| .entity(this) |
| .period(Duration.seconds(3)) |
| .baseUri("http://" + hostAndPort + "/pools/nodes/") |
| .credentialsIfNotNull(getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME), getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD)) |
| .poll(getSensorFromNodeStat(CouchbaseNode.OPS, "ops")) |
| .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_DOCS_DATA_SIZE, "couch_docs_data_size")) |
| .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_DOCS_ACTUAL_DISK_SIZE, "couch_docs_actual_disk_size")) |
| .poll(getSensorFromNodeStat(CouchbaseNode.EP_BG_FETCHED, "ep_bg_fetched")) |
| .poll(getSensorFromNodeStat(CouchbaseNode.MEM_USED, "mem_used")) |
| .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_VIEWS_ACTUAL_DISK_SIZE, "couch_views_actual_disk_size")) |
| .poll(getSensorFromNodeStat(CouchbaseNode.CURR_ITEMS, "curr_items")) |
| .poll(getSensorFromNodeStat(CouchbaseNode.VB_REPLICA_CURR_ITEMS, "vb_replica_curr_items")) |
| .poll(getSensorFromNodeStat(CouchbaseNode.COUCH_VIEWS_DATA_SIZE, "couch_views_data_size")) |
| .poll(getSensorFromNodeStat(CouchbaseNode.GET_HITS, "get_hits")) |
| .poll(getSensorFromNodeStat(CouchbaseNode.CMD_GET, "cmd_get")) |
| .poll(getSensorFromNodeStat(CouchbaseNode.CURR_ITEMS_TOT, "curr_items_tot")) |
| .poll(new HttpPollConfig<String>(CouchbaseNode.REBALANCE_STATUS) |
| .onSuccess(HttpValueFunctions.jsonContents("rebalanceStatus", String.class)) |
| .onFailureOrException(Functions.constant("Could not retrieve"))) |
| .build(); |
| } |
| |
| @Override |
| public void disconnectSensors() { |
| super.disconnectSensors(); |
| disconnectServiceUpIsRunning(); |
| if (httpFeed != null) { |
| httpFeed.stop(); |
| } |
| } |
| |
| @Override |
| public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica) { |
| if (Strings.isBlank(bucketType)) bucketType = "couchbase"; |
| if (bucketRamSize==null || bucketRamSize<=0) bucketRamSize = 200; |
| if (bucketReplica==null || bucketReplica<0) bucketReplica = 1; |
| |
| getDriver().bucketCreate(bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica); |
| } |
| |
| /** exposed through {@link CouchbaseNode#ADD_REPLICATION_RULE} */ |
| protected void addReplicationRule(ConfigBag ruleArgs) { |
| Object toClusterO = Preconditions.checkNotNull(ruleArgs.getStringKey("toCluster"), "toCluster must not be null"); |
| if (toClusterO instanceof String) { |
| toClusterO = getManagementContext().lookup((String)toClusterO); |
| } |
| Entity toCluster = Tasks.resolving(toClusterO, Entity.class).context(this).get(); |
| |
| String fromBucket = Preconditions.checkNotNull( (String)ruleArgs.getStringKey("fromBucket"), "fromBucket must be specified" ); |
| |
| String toBucket = (String)ruleArgs.getStringKey("toBucket"); |
| if (toBucket==null) toBucket = fromBucket; |
| |
| if (!ruleArgs.getUnusedConfig().isEmpty()) { |
| throw new IllegalArgumentException("Unsupported replication rule data: "+ruleArgs.getUnusedConfig()); |
| } |
| |
| getDriver().addReplicationRule(toCluster, fromBucket, toBucket); |
| } |
| |
| } |