blob: 46e8940e004dcd865ac1a6ac231dfe42de4526f5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog.service.placement;
import com.twitter.distributedlog.client.routing.RoutingService;
import com.twitter.distributedlog.namespace.DistributedLogNamespace;
import com.twitter.distributedlog.service.DLSocketAddress;
import com.twitter.util.Duration;
import com.twitter.util.Function0;
import com.twitter.util.Future;
import com.twitter.util.ScheduledThreadPoolTimer;
import com.twitter.util.Time;
import com.twitter.util.Timer;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.TreeSet;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.BoxedUnit;
/**
* A PlacementPolicy assigns streams to servers given an appraisal of the load that the stream contains.
*
* <p>The load of a stream is determined by the LoadAppraiser used. The PlacementPolicy will
* then distributed these StreamLoads to the available servers in a manner defined by the
* implementation creating ServerLoad objects. It then saves this assignment via the
* PlacementStateManager.
*/
public abstract class PlacementPolicy {
private static final Logger logger = LoggerFactory.getLogger(PlacementPolicy.class);
protected final LoadAppraiser loadAppraiser;
protected final RoutingService routingService;
protected final DistributedLogNamespace namespace;
protected final PlacementStateManager placementStateManager;
private final Duration refreshInterval;
protected final OpStatsLogger placementCalcStats;
private Timer placementRefreshTimer;
public PlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
Duration refreshInterval, StatsLogger statsLogger) {
this.loadAppraiser = loadAppraiser;
this.routingService = routingService;
this.namespace = namespace;
this.placementStateManager = placementStateManager;
this.refreshInterval = refreshInterval;
placementCalcStats = statsLogger.getOpStatsLogger("placement");
}
public Set<String> getServers() {
Set<SocketAddress> hosts = routingService.getHosts();
Set<String> servers = new HashSet<String>(hosts.size());
for (SocketAddress address : hosts) {
servers.add(DLSocketAddress.toString((InetSocketAddress) address));
}
return servers;
}
public Set<String> getStreams() {
Set<String> streams = new HashSet<String>();
try {
Iterator<String> logs = namespace.getLogs();
while (logs.hasNext()) {
streams.add(logs.next());
}
} catch (IOException e) {
logger.error("Could not get streams for placement policy.", e);
}
return streams;
}
public void start(boolean leader) {
logger.info("Starting placement policy");
TreeSet<ServerLoad> emptyServerLoads = new TreeSet<ServerLoad>();
for (String server : getServers()) {
emptyServerLoads.add(new ServerLoad(server));
}
load(emptyServerLoads); //Pre-Load so streams don't NPE
if (leader) { //this is the leader shard
logger.info("Shard is leader. Scheduling timed refresh.");
placementRefreshTimer = new ScheduledThreadPoolTimer(1, "timer", true);
placementRefreshTimer.schedule(Time.now(), refreshInterval, new Function0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
refresh();
return BoxedUnit.UNIT;
}
});
} else {
logger.info("Shard is not leader. Watching for server load changes.");
placementStateManager.watch(new PlacementStateManager.PlacementCallback() {
@Override
public void callback(TreeSet<ServerLoad> serverLoads) {
if (!serverLoads.isEmpty()) {
load(serverLoads);
}
}
});
}
}
public void close() {
if (placementRefreshTimer != null) {
placementRefreshTimer.stop();
}
}
/**
* Places the stream on a server according to the policy.
*
* <p>It returns a future containing the host that owns the stream upon completion
*/
public abstract Future<String> placeStream(String stream);
/**
* Recalculates the entire placement mapping and updates stores it using the PlacementStateManager.
*/
public abstract void refresh();
/**
* Loads the placement mapping into the node from a TreeSet of ServerLoads.
*/
public abstract void load(TreeSet<ServerLoad> serverLoads);
}