Initial import
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..9f9a480
--- /dev/null
+++ b/README.md
@@ -0,0 +1,140 @@
+Smoosh
+======
+
+Smoosh is CouchDB's auto-compaction daemon. It is notified when
+databases and views are updated and may then elect to enqueue them for
+compaction.
+
+API
+---
+
+All API functions are in smoosh.erl and only the exported functions in
+this module should be called from outside of the smoosh application.
+
+Additionally, smoosh responds to config changes dynamically and these
+changes are the principal means of interacting with smoosh.
+
+Top-Level Settings
+------------------
+
+The main settings one interacts with are:
+
+<dl>
+<dt>db_channels<dd>A comma-separated list of channel names for
+databases.
+<dt>view_channels<dd>A comma-separated list of channel names for
+views.
+<dt>staleness<dd>The number of minutes that the (expensive) priority
+calculation can be stale for before it is recalculated. Defaults to 5.
+</dl>
+
+Sometimes it's necessary to use the following:
+
+<dl>
+<dt>cleanup_index_files</dt><dd>Whether smoosh cleans up the files
+for indexes that have been deleted. Defaults to false and probably
+shouldn't be changed unless the cluster is running low on disk space,
+and only after considering the ramifications.</dd>
+<dt>wait_secs</dt><dd>The time a channel waits before starting compactions 
+to allow time to observe the system and make a smarter decision about what 
+to compact first. Hardly ever changed from the default. Default 30 (seconds).
+</dd>
+</dl>
+
+Channel Settings
+----------------
+
+A channel has several important settings that control runtime
+behavior.
+
+<dl>
+<dt>capacity<dd>The maximum number of items the channel can hold (lowest priority item is removed to make room for new items). Defaults to 9999.
+<dt>concurrency<dd>The maximum number of jobs that can run concurrently. Defaults to 1.
+<dt>max_priority<dd>The item must have a priority lower than this to be enqueued. Defaults to infinity.
+<dt>max_size<dd>The item must be no larger than this many bytes in length to be enqueued. Defaults to infinity.
+<dt>min_priority<dd>The item must have a priority at least this high to be enqueued. Defaults to 5.0 for ratio and 16 mb for slack.
+<dt>min_changes<dd>The minimum number of changes since last compaction before the item will be enqueued. Defaults to 0. Currently only works for databases.
+<dt>min_size<dd>The item must be at least this many bytes in length to be enqueued. Defaults to 1mb (1048576 bytes).
+<dt>priority<dd>The method used to calculate priority. Can be ratio (calculated as disk_size/data_size) or slack (calculated as disk_size-data_size). Defaults to ratio.
+</dl>
+
+Structure
+---------
+
+Smoosh consists of a central gen_server (smoosh_server) which manages
+a number of subordinate smoosh_channel gen_servers. This is not
+properly managed by OTP yet.
+
+Compaction Scheduling Algorithm
+-------------------------------
+
+Smoosh decides whether to compact a database or view by evaluating the
+item against the selection criteria of each _channel_ in the order
+they are configured. By default there are two channels for databases
+("ratio_dbs" and "slack_dbs"), and two channels for views ("ratio_views"
+and "slack_views")
+
+Smoosh will enqueue the new item to the first channel that accepts
+it. If none accept it, the item is not enqueued for compaction.
+
+Notes on the data_size value
+----------------------------
+
+Every database and view shard has a data_size value. In CouchDB this
+accurately reflects the post-compaction file size. In DbCore, it is
+the size of the file that we bill for. It excludes the b+tree and
+database footer overhead. We also bill customers for the uncompressed
+size of their documents, though we store them compressed on disk.
+These two systems were developed independently (ours predates
+CouchDB's) and DbCore only calculates the billing size value.
+
+Because of the way our data_size is currently calculated, it can
+sometimes be necessary to enqueue databases and views with very low
+ratios. Due to this, it is also currently impossible to tell how
+optimally compacted a cluster is.
+
+Example config commands
+-----------------------
+
+Change the set of database channels;
+
+    config:set("smoosh", "db_channels", "small_dbs,medium_dbs,large_dbs").
+
+Change the set of database channels on all live nodes in the cluster;
+
+    rpc:multicall(config, set, ["smoosh", "db_channels", "small_dbs,medium_dbs,large_dbs"]).
+
+Change the concurrency of the ratio_dbs database channel to 2
+
+    config:set("smoosh.ratio_dbs", "concurrency", "2").
+
+Change it on all live nodes in the cluster;
+
+    rpc:multicall(config, set, ["smoosh.ratio_dbs", "concurrency", "2"]).
+
+Example API commands
+--------------------
+
+smoosh:status()
+
+This prints the state of each channel; how many jobs they are
+currently running and how many jobs are enqueued (as well as the
+lowest and highest priority of those enqueued items). The idea is to
+provide, at a glance, sufficient insight into smoosh that an operator
+can assess whether smoosh is adequately targeting the reclaimable
+space in the cluster. In general, a healthy status output will have
+items in the ratio_dbs and ratio_views channels. Owing to the default
+settings, the slack_dbs and slack_views will almost certainly have
+items in them. Historically, we've not found that the slack channels,
+on their own, are particularly adept at keeping things well compacted.
+
+smoosh:enqueue_all_dbs(), smoosh:enqueue_all_views()
+
+These functions do just what they say but should not generally need to
+be called, smoosh is supposed to be autonomous. Call them if you get
+alerted to a disk space issue, they might well help. If they do, that
+indicates a bug in smoosh as it should already have enqueued eligible
+shards once they met the configured settings.
+
+
+
diff --git a/operator_guide.md b/operator_guide.md
new file mode 100644
index 0000000..a0c9810
--- /dev/null
+++ b/operator_guide.md
@@ -0,0 +1,396 @@
+# An operator's guide to smoosh
+
+Smoosh is the auto-compactor for the databases. It automatically selects and
+processes the compacting of database shards on each node.
+
+## Smoosh Channels
+
+Smoosh works using the concept of channels. A channel is essentially a queue of pending
+compactions. There are separate sets of channels for database and view compactions. Each
+channel is assigned a configuration which defines whether a compaction ends up in
+the channel's queue and how compactions are prioritised within that queue.
+
+Smoosh takes each channel and works through the compactions queued in each in priority
+order. Each channel is processed concurrently, so the priority levels only matter within
+a given channel.
+
+Finally, each channel has an assigned number of active compactions, which defines how
+many compactions happen for that channel in parallel. For example, a cluster with
+a lot of database churn but few views might require more active compactions to the
+database channel(s).
+
+It's important to remember that a channel is local to a dbcore node, that is
+each node maintains and processes an independent set of compactions.
+
+### Channel configuration options
+
+#### Channel types
+
+Each channel has a basic type for the algorithm it uses to select pending
+compactions for its queue and how it prioritises them.
+
+The two queue types are:
+
+* **ratio**: this uses the ratio `total_bytes / user_bytes` as its driving
+calculation. The result _X_ must be greater than some configurable value _Y_ for a
+compaction to be added to the queue. Compactions are then prioritised for
+higher values of _X_.
+
+* **slack**: this uses `total_bytes - user_bytes` as its driving calculation.
+The result _X_ must be greater than some configurable value _Y_ for a compaction
+to be added to the queue. Compactions are prioritised for higher values of _X_.
+
+In both cases, _Y_ is set using the `min_priority` configuration variable. The
+calculation of _X_ is described in [Priority calculation](#priority-calculation), below.
+
+Both algorithms operate on two main measures:
+
+* **user_bytes**: this is the amount of data the user has in the file. It
+doesn't include storage overhead: old revisions, on-disk btree structure and
+so on.
+
+* **total_bytes**: the size of the file on disk.
+
+Channel type is set using the `priority` configuration setting.
+
+#### Further configuration options
+
+Beyond its basic type, there are several other configuration options which
+can be applied to a queue.
+
+*All options MUST be set as strings.* See the [smoosh readme][srconfig] for
+all settings and their defaults.
+
+#### Priority calculation
+
+The algorithm type and certain configuration options feed into the priority
+calculation.
+
+The priority is calculated when a compaction is enqueued. As each channel
+has a different configuration, each channel will end up with a different
+priority value. The enqueue code checks each channel in turn to see whether the
+compaction passes its configured priority threshold (`min_priority`). Once
+a channel is found that can accept the compaction, the compaction is added
+to that channel's queue and the enqueue process stops. Therefore the
+ordering of channels has a bearing in what channel a compaction ends up in.
+
+If you want to follow this along, the call order is all in `smoosh_server`,
+`enqueue_request -> find_channel -> get_priority`.
+
+The priority calculation is probably the easiest way to understand the effects
+of configuration variables. It's defined in `smoosh_server#get_priority/3`,
+currently [here][ss].
+
+[ss]: https://github.com/apache/couchdb-smoosh/blob/master/src/smoosh_server.erl#L277
+[srconfig]: https://github.com/apache/couchdb-smoosh#channel-settings
+
+#### Background Detail
+
+`user_bytes` is called `data_size` in `db_info` blocks. It is the total of all bytes
+that are used to store docs and their attachments.
+
+Since `.couch` files are append only, every update adds data to the file. When
+you update a btree, a new leaf node is written and all the nodes back up the
+root. In this update, old data is never overwritten and these parts of the
+file are no longer live; this includes old btree nodes and document bodies.
+Compaction takes this file and writes a new file that only contains live data.
+
+`total_data` is the number of bytes in the file as reported by `ls -al filename`.
+
+#### Flaws
+
+An important flaw in this calculation is that `total_data` takes into account
+the compression of data on disk, whereas `user_bytes` does not. This can give
+unexpected results to calculations, as the values are not directly comparable.
+
+However, it's the best measure we currently have.
+
+[Even more info](https://github.com/apache/couchdb-smoosh#notes-on-the-data_size-value).
+
+
+### Defining a channel
+
+Defining a channel is done via normal dbcore configuration, with some
+convention as to the parameter names.
+
+Channel configuration is defined using `smoosh.channel_name` top level config
+options. Defining a channel is just setting the various options you want
+for the channel, then bringing it into smoosh's sets of active channels by
+adding it to either `db_channels` or `view_channels`.
+
+This means that smoosh channels can be defined either for a single node or
+globally across a cluster, by setting the configuration either globally or
+locally. In the example, we set up a new global channel.
+
+It's important to choose good channel names. There are some conventional ones:
+
+* `ratio_dbs`: a ratio channel for dbs, usually using the default settings.
+* `slack_dbs`: a slack channel for dbs, usually using the default settings.
+* `ratio_views`: a ratio channel for views, usually using the default settings.
+* `slack_views`: a slack channel for views, usually using the default settings.
+
+These four are defined by default if there are no others set ([source][source1]).
+
+[source1]: https://github.com/apache/couchdb-smoosh/blob/master/src/smoosh_server.erl#L75
+
+And some standard names for ones we often have to add:
+
+* `big_dbs`: a ratio channel for only enqueuing large database shards. What
+  _large_ means is very workload specific.
+
+Channels have certain defaults for their configuration, defined in the
+[smoosh readme][srconfig]. It's only neccessary to set up how this channel
+differs from those defaults. Below, we just need to set the `min_size` and
+`concurrency` settings, and allow the `priority` to default to `ratio`
+along with the other defaults.
+
+```bash
+# Define the new channel
+(couchdb@db1.foo.bar)3> s:set_config("smoosh.big_dbs", "min_size", "20000000000", global).
+{[ok,ok,ok],[]}
+(couchdb@db1.foo.bar)3> s:set_config("smoosh.big_dbs", "concurrency", "2", global).
+{[ok,ok,ok],[]}
+
+# Add the channel to the db_channels set -- note we need to get the original
+# value first so we can add the new one to the existing list!
+(couchdb@db1.foo.bar)5> s:get_config("smoosh", "db_channels", global).
+{[{'couchdb@db1.foo.bar',"ratio_dbs"},
+{'couchdb@db3.foo.bar',"ratio_dbs"},
+{'couchdb@db2.foo.bar',"ratio_dbs"}],
+[]}
+(couchdb@db1.foo.bar)6> s:set_config("smoosh", "db_channels", "ratio_dbs,big_dbs", global).
+{[ok,ok,ok],[]}
+```
+
+### Viewing active channels
+
+```bash
+(couchdb@db3.foo.bar)3> s:get_config("smoosh", "db_channels", global).
+{[{'couchdb@db3.foo.bar',"ratio_dbs,big_dbs"},
+  {'couchdb@db1.foo.bar',"ratio_dbs,big_dbs"},
+  {'couchdb@db2.foo.bar',"ratio_dbs,big_dbs"}],
+ []}
+(couchdb@db3.foo.bar)4> s:get_config("smoosh", "view_channels", global).
+{[{'couchdb@db3.foo.bar',"ratio_views"},
+  {'couchdb@db1.foo.bar',"ratio_views"},
+  {'couchdb@db2.foo.bar',"ratio_views"}],
+ []}
+```
+
+### Removing a channel
+
+```bash
+# Remove it from the active set
+(couchdb@db1.foo.bar)5> s:get_config("smoosh", "db_channels", global).
+{[{'couchdb@db1.foo.bar',"ratio_dbs,big_dbs"},
+{'couchdb@db3.foo.bar',"ratio_dbs,big_dbs"},
+{'couchdb@db2.foo.bar',"ratio_dbs,big_dbs"}],
+[]}
+(couchdb@db1.foo.bar)6> s:set_config("smoosh", "db_channels", "ratio_dbs", global).
+{[ok,ok,ok],[]}
+
+# Delete the config -- you need to do each value
+(couchdb@db1.foo.bar)3> rpc:multicall(config, delete, ["smoosh.big_dbs", "concurrency"]).
+{[ok,ok,ok],[]}
+(couchdb@db1.foo.bar)3> rpc:multicall(config, delete, ["smoosh.big_dbs", "min_size"]).
+{[ok,ok,ok],[]}
+```
+
+### Getting channel configuration
+
+As far as I know, you have to get each setting separately:
+
+```
+(couchdb@db1.foo.bar)1> s:get_config("smoosh.big_dbs", "concurrency", global).
+{[{'couchdb@db3.foo.bar',"2"},
+  {'couchdb@db1.foo.bar',"2"},
+  {'couchdb@db2.foo.bar',"2"}],
+ []}
+
+```
+
+### Setting channel configuration
+
+The same as defining a channel, you just need to set the new value:
+
+```
+(couchdb@db1.foo.bar)2> s:set_config("smoosh.ratio_dbs", "concurrency", "1", global).
+{[ok,ok,ok],[]}
+```
+
+It sometimes takes a little while to take affect.
+
+
+
+## Standard operating procedures
+
+There are a few standard things that operators often have to do when responding
+to pages.
+
+In addition to the below, in some circumstances it's useful to define new
+channels with certain properties (`big_dbs` is a common one) if smoosh isn't
+selecting and prioritising compactions that well.
+
+### Checking smoosh's status
+
+You can see the queued items for each channel by going into `remsh` on a node
+and using:
+
+```
+> smoosh:status().
+{ok,[{"ratio_dbs",
+      [{active,1},
+       {starting,0},
+       {waiting,[{size,522},
+                 {min,{5.001569007970237,{1378,394651,323864}}},
+                 {max,{981756.5441159063,{1380,370286,655752}}}]}]},
+     {"slack_views",
+      [{active,1},
+       {starting,0},
+       {waiting,[{size,819},
+                 {min,{16839814,{1375,978920,326458}}},
+                 {max,{1541336279,{1380,370205,709896}}}]}]},
+     {"slack_dbs",
+      [{active,1},
+       {starting,0},
+       {waiting,[{size,286},
+                 {min,{19004944,{1380,295245,887295}}},
+                 {max,{48770817098,{1380,370185,876596}}}]}]},
+     {"ratio_views",
+      [{active,1},
+       {starting,0},
+       {waiting,[{size,639},
+                 {min,{5.0126340031149335,{1380,186581,445489}}},
+                 {max,{10275.555632057285,{1380,370411,421477}}}]}]}]}
+```
+
+This gives you the node-local status for each queue.
+
+Under each channel there is some information about the channel:
+
+* `active`: number of current compactions in the channel.
+* `starting`: number of compactions starting-up.
+* `waiting`: number of queued compactions.
+  * `min` and `max` give an idea of the queued jobs' effectiveness. The values
+    for these are obviously dependent on whether the queue is ratio or slack.
+
+For ratio queues, the default minimum for smoosh to enqueue a compaction is 5. In
+the example above, we can guess that 981,756 is quite high. This could be a
+small database, however, so it doesn't necessarily mean useful compactions
+from the point of view of reclaiming disk space.
+
+For this example, we can see that there are quite a lot of queued compactions,
+but we don't know which would be most effective to run to reclaim disk space.
+It's also worth noting that the waiting queue sizes are only meaningful
+related to other factors on the cluster (e.g., db number and size).
+
+
+### Smoosh IOQ priority
+
+This is a global setting which affects all channels. Increasing it allows each
+active compaction to (hopefully) proceed faster as the compaction work is of
+a higher priority relative to other jobs. Decreasing it (hopefully) has the
+converse effect.
+
+By this point you'll [know whether smoosh is backing up](#checking-smooshs-status).
+If it's falling behind (big queues), try increasing compaction priority.
+
+Smoosh's IOQ priority is controlled via the `ioq` -> `compaction` queue.
+
+```
+> s:get_config("ioq", "compaction", global).
+{[{'couchdb@db1.foo.bar',undefined},
+  {'couchdb@db2.foo.bar',undefined},
+  {'couchdb@db3.foo.bar',undefined}],
+ []}
+
+```
+
+Priority by convention runs 0 to 1, though the priority can be any positive
+number. The default for compaction is 0.01; pretty low.
+
+If it looks like smoosh has a bunch of work that it's not getting
+through, priority can be increased. However, be careful that this
+doesn't adversely impact the customer experience. If it will, and
+it's urgent, at least drop them a warning.
+
+```
+> s:set_config("ioq", "compaction", "0.5", global).
+{[ok,ok,ok],[]}
+```
+
+In general, this should be a temporary measure. For some clusters,
+a change from the default may be required to help smoosh keep up
+with particular workloads.
+
+### Granting specific channels more workers
+
+Giving smoosh a higher concurrency for a given channel can allow a backlog
+in that channel to catch up.
+
+Again, some clusters run best with specific channels having more workers.
+
+From [assessing disk space](#assess-the-space-on-the-disk), you should
+know whether the biggest offenders are db or view files. From this,
+you can infer whether it's worth giving a specific smoosh channel a
+higher concurrency.
+
+The current setting can be seen for a channel like so:
+
+```
+> s:get_config("smoosh.ratio_dbs", "concurrency", global).
+{[{'couchdb@db1.foo.bar',undefined},
+  {'couchdb@db2.foo.bar',undefined},
+  {'couchdb@db3.foo.bar',undefined}],
+ []}
+```
+
+`undefined` means the default is used.
+
+If we knew that disk space for DBs was the major user of disk space, we might
+want to increase a `_dbs` channel. Experience shows `ratio_dbs` is often best
+but evaluate this based on the current status.
+
+If we want to increase the ratio_dbs setting:
+
+```
+> s:set_config("smoosh.ratio_dbs", "concurrency", "2", global).
+{[ok,ok,ok],[]}
+```
+
+### Suspending smoosh
+
+If smoosh itself is causing issues, it's possible to suspend its operation.
+This differs from either `application:stop(smoosh).` or setting all channel's
+concurrency to zero because it both pauses on going compactions and maintains
+the channel queues intact.
+
+If, for example, a node's compactions are causing disk space issues, smoosh
+could be suspended while working out which channel is causing the problem. For
+example, a big_dbs channel might be creating huge compaction-in-progress
+files if there's not much in the shard to compact away.
+
+It's therefore useful to use when testing to see if smoosh is causing a
+problem.
+
+```
+# suspend
+smoosh:suspend().
+
+# resume a suspended smoosh
+smoosh:resume().
+```
+
+Suspend is currently pretty literal: `erlang:suspend_process(Pid, [unless_suspending])`
+is called for each compaction process in each channel. `resume_process` is called
+for resume.
+
+### Restarting Smoosh
+
+Restarting Smoosh is a long shot and is a brute force approach in the hope that
+when Smoosh rescans the DBs that it makes the right decisions. If required to take
+this step contact rnewson or davisp so that they can inspect Smoosh and see the bug.
+
+```
+> exit(whereis(smoosh_server), kill), smoosh:enqueue_all_dbs(), smoosh:enqueue_all_views().
+```
diff --git a/src/smoosh.app.src b/src/smoosh.app.src
new file mode 100644
index 0000000..a6cdb7f
--- /dev/null
+++ b/src/smoosh.app.src
@@ -0,0 +1,29 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, smoosh,
+ [
+  {description, "Auto-compaction daemon"},
+  {vsn, git},
+  {registered, [smoosh_server]},
+  {applications, [
+                  kernel,
+                  stdlib,
+                  couch_log,
+                  config,
+                  couch_event,
+                  couch,
+                  mem3
+                 ]},
+  {mod, { smoosh_app, []}},
+  {env, []}
+ ]}.
diff --git a/src/smoosh.erl b/src/smoosh.erl
new file mode 100644
index 0000000..676e7fa
--- /dev/null
+++ b/src/smoosh.erl
@@ -0,0 +1,69 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(smoosh).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+-export([suspend/0, resume/0, enqueue/1, status/0]).
+-export([enqueue_all_dbs/0, enqueue_all_dbs/1, enqueue_all_views/0]).
+
+suspend() ->
+    smoosh_server:suspend().
+
+resume() ->
+    smoosh_server:resume().
+
+enqueue(Object) ->
+    smoosh_server:enqueue(Object).
+
+sync_enqueue(Object) ->
+    smoosh_server:sync_enqueue(Object).
+
+sync_enqueue(Object, Timeout) ->
+    smoosh_server:sync_enqueue(Object, Timeout).
+
+status() ->
+    smoosh_server:status().
+
+enqueue_all_dbs() ->
+    fold_local_shards(fun(#shard{name=Name}, _Acc) ->
+        sync_enqueue(Name) end, ok).
+
+enqueue_all_dbs(Timeout) ->
+    fold_local_shards(fun(#shard{name=Name}, _Acc) ->
+        sync_enqueue(Name, Timeout) end, ok).
+
+enqueue_all_views() ->
+    fold_local_shards(fun(#shard{name=Name}, _Acc) ->
+        catch enqueue_views(Name) end, ok).
+
+fold_local_shards(Fun, Acc0) ->
+    mem3:fold_shards(fun(Shard, Acc1) ->
+        case node() == Shard#shard.node of
+            true ->
+                Fun(Shard, Acc1);
+            false ->
+                Acc1
+        end
+    end, Acc0).
+
+enqueue_views(ShardName) ->
+    DbName = mem3:dbname(ShardName),
+    {ok, DDocs} = fabric:design_docs(DbName),
+    [sync_enqueue({ShardName, id(DDoc)}) || DDoc <- DDocs].
+
+id(#doc{id=Id}) ->
+    Id;
+id({Props}) ->
+    couch_util:get_value(<<"_id">>, Props).
diff --git a/src/smoosh_app.erl b/src/smoosh_app.erl
new file mode 100644
index 0000000..eba3579
--- /dev/null
+++ b/src/smoosh_app.erl
@@ -0,0 +1,28 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(smoosh_app).
+
+-behaviour(application).
+
+%% Application callbacks
+-export([start/2, stop/1]).
+
+%% ===================================================================
+%% Application callbacks
+%% ===================================================================
+
+start(_StartType, _StartArgs) ->
+    smoosh_sup:start_link().
+
+stop(_State) ->
+    ok.
diff --git a/src/smoosh_channel.erl b/src/smoosh_channel.erl
new file mode 100644
index 0000000..58d3ce7
--- /dev/null
+++ b/src/smoosh_channel.erl
@@ -0,0 +1,279 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(smoosh_channel).
+-behaviour(gen_server).
+-vsn(1).
+-include_lib("couch/include/couch_db.hrl").
+
+% public api.
+-export([start_link/1, close/1, suspend/1, resume/1, get_status/1]).
+-export([enqueue/3, last_updated/2, flush/1]).
+
+% gen_server api.
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+    code_change/3, terminate/2]).
+
+% records.
+
+-record(state, {
+    active=[],
+    name,
+    waiting=smoosh_priority_queue:new(),
+    paused=true,
+    starting=[]
+}).
+
+% public functions.
+
+start_link(Name) ->
+    gen_server:start_link(?MODULE, Name, []).
+
+suspend(ServerRef) ->
+    gen_server:call(ServerRef, suspend).
+
+resume(ServerRef) ->
+    gen_server:call(ServerRef, resume).
+
+enqueue(ServerRef, Object, Priority) ->
+    gen_server:cast(ServerRef, {enqueue, Object, Priority}).
+
+last_updated(ServerRef, Object) ->
+    gen_server:call(ServerRef, {last_updated, Object}).
+
+get_status(ServerRef) ->
+    gen_server:call(ServerRef, status).
+
+close(ServerRef) ->
+    gen_server:call(ServerRef, close).
+
+flush(ServerRef) ->
+    gen_server:call(ServerRef, flush).
+
+% gen_server functions.
+
+init(Name) ->
+    schedule_unpause(),
+    {ok, #state{name=Name}}.
+
+handle_call({last_updated, Object}, _From, State0) ->
+    {ok, State} = code_change(nil, State0, nil),
+    LastUpdated = smoosh_priority_queue:last_updated(Object, State#state.waiting),
+    {reply, LastUpdated, State};
+
+handle_call(suspend, _From, State0) ->
+    {ok, State} = code_change(nil, State0, nil),
+    #state{active = Active} = State,
+    [catch erlang:suspend_process(Pid, [unless_suspending])
+        || {_,Pid} <- Active],
+    {reply, ok, State#state{paused=true}};
+
+handle_call(resume, _From, State0) ->
+    {ok, State} = code_change(nil, State0, nil),
+    #state{active = Active} = State,
+    [catch erlang:resume_process(Pid) || {_,Pid} <- Active],
+    {reply, ok, State#state{paused=false}};
+
+handle_call(status, _From, State0) ->
+    {ok, State} = code_change(nil, State0, nil),
+    {reply, {ok, [
+        {active, length(State#state.active)},
+        {starting, length(State#state.starting)},
+        {waiting, smoosh_priority_queue:info(State#state.waiting)}
+    ]}, State};
+
+handle_call(close, _From, State0) ->
+    {ok, State} = code_change(nil, State0, nil),
+    {stop, normal, ok, State};
+
+handle_call(flush, _From, State0) ->
+    {ok, State} = code_change(nil, State0, nil),
+    {reply, ok, State#state{waiting=smoosh_priority_queue:new()}}.
+
+handle_cast({enqueue, _Object, 0}, State0) ->
+    {ok, State} = code_change(nil, State0, nil),
+    {noreply, State};
+handle_cast({enqueue, Object, Priority}, State0) ->
+    {ok, State} = code_change(nil, State0, nil),
+    {noreply, maybe_start_compaction(add_to_queue(Object, Priority, State))}.
+
+% We accept noproc here due to possibly having monitored a restarted compaction
+% pid after it finished.
+handle_info({'DOWN', Ref, _, Job, Reason}, State0)  when Reason == normal;
+        Reason == noproc ->
+    {ok, State} = code_change(nil, State0, nil),
+    #state{active=Active, starting=Starting} = State,
+    {noreply, maybe_start_compaction(
+                State#state{active=lists:keydelete(Job, 2, Active),
+                            starting=lists:keydelete(Ref, 1, Starting)})};
+
+handle_info({'DOWN', Ref, _, Job, Reason}, State0) ->
+    {ok, State} = code_change(nil, State0, nil),
+    #state{active=Active0, starting=Starting0} = State,
+    case lists:keytake(Job, 2, Active0) of
+        {value, {Key, _Pid}, Active1} ->
+            couch_log:warning("exit for compaction of ~p: ~p", [
+                smoosh_utils:stringify(Key), Reason]),
+            {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]),
+            {noreply, maybe_start_compaction(State#state{active=Active1})};
+        false ->
+            case lists:keytake(Ref, 1, Starting0) of
+                {value, {_, Key}, Starting1} ->
+                    couch_log:warning("failed to start compaction of ~p: ~p", [
+                        smoosh_utils:stringify(Key), Reason]),
+                    {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]),
+                    {noreply, maybe_start_compaction(State#state{starting=Starting1})};
+                false ->
+                    {noreply, State}
+            end
+    end;
+
+handle_info({Ref, {ok, Pid}}, State0) when is_reference(Ref) ->
+    {ok, State} = code_change(nil, State0, nil),
+    case lists:keytake(Ref, 1, State#state.starting) of
+        {value, {_, Key}, Starting1} ->
+            couch_log:notice("~s: Started compaction for ~s",
+                     [State#state.name, smoosh_utils:stringify(Key)]),
+            erlang:monitor(process, Pid),
+            erlang:demonitor(Ref, [flush]),
+            {noreply, State#state{active=[{Key, Pid}|State#state.active],
+                                  starting=Starting1}};
+        false ->
+            {noreply, State}
+    end;
+
+handle_info(pause, State0) ->
+    {ok, State} = code_change(nil, State0, nil),
+    {noreply, State#state{paused=true}};
+handle_info(unpause, State0) ->
+    {ok, State} = code_change(nil, State0, nil),
+    {noreply, maybe_start_compaction(State#state{paused=false})}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, #state{}=State, _Extra) ->
+    {ok, State}.
+
+% private functions.
+
+add_to_queue(Key, Priority, State) ->
+    #state{active=Active,waiting=Q} = State,
+    case lists:keymember(Key, 1, Active) of
+    true ->
+        State;
+    false ->
+        Capacity = list_to_integer(smoosh_utils:get(State#state.name, "capacity", "9999")),
+        couch_log:notice(
+            "~s: adding ~p to internal compactor queue with priority ~p",
+                 [State#state.name, Key, Priority]),
+        State#state{
+            waiting=smoosh_priority_queue:in(Key, Priority, Priority, Capacity, Q)
+        }
+    end.
+
+maybe_start_compaction(#state{paused=true}=State) ->
+    State;
+maybe_start_compaction(State) ->
+    Concurrency = list_to_integer(smoosh_utils:get(State#state.name,
+        "concurrency", "1")),
+    if length(State#state.active) + length(State#state.starting) < Concurrency ->
+        case smoosh_priority_queue:out(State#state.waiting) of
+        false ->
+            State;
+        {Key, Priority, Q} ->
+            try
+                State2 = case start_compact(State, Key) of
+                false ->
+                    State;
+                State1 ->
+                    couch_log:notice(
+                        "~s: Starting compaction for ~s (priority ~p)",
+                        [State#state.name, smoosh_utils:stringify(Key), Priority]),
+                    State1
+                end,
+                maybe_start_compaction(State2#state{waiting=Q})
+            catch Class:Exception ->
+                couch_log:notice("~s: ~p ~p for ~s",
+                    [State#state.name, Class, Exception,
+                        smoosh_utils:stringify(Key)]),
+                maybe_start_compaction(State#state{waiting=Q})
+            end
+        end;
+    true ->
+        State
+    end.
+
+start_compact(State, {schema, DbName, GroupId}) ->
+    case smoosh_utils:ignore_db({DbName, GroupId}) of
+        false ->
+            {ok, Pid} = couch_md_index_manager:get_group_pid(DbName,
+                GroupId),
+            Ref = erlang:monitor(process, Pid),
+            Pid ! {'$gen_call', {self(), Ref}, compact},
+            State#state{starting=[{Ref, {schema, DbName,
+                GroupId}} | State#state.starting]};
+        _ ->
+            false
+    end;
+
+start_compact(State, DbName) when is_list(DbName) ->
+    start_compact(State, ?l2b(DbName));
+start_compact(State, DbName) when is_binary(DbName) ->
+    {ok, Db} = couch_db:open_int(DbName, []),
+    try start_compact(State, Db) after couch_db:close(Db) end;
+start_compact(State, {Shard,GroupId}) ->
+    case smoosh_utils:ignore_db({Shard, GroupId}) of
+    false ->
+        DbName = mem3:dbname(Shard),
+        {ok, Pid} = couch_index_server:get_index(
+                couch_mrview_index, Shard, GroupId),
+        spawn(fun() -> cleanup_index_files(DbName, Shard) end),
+        Ref = erlang:monitor(process, Pid),
+        Pid ! {'$gen_call', {self(), Ref}, compact},
+        State#state{starting=[{Ref, {Shard, GroupId}}|State#state.starting]};
+    _ ->
+        false
+    end;
+start_compact(State, Db) ->
+    case smoosh_utils:ignore_db(Db) of
+    false ->
+        DbPid = couch_db:get_pid(Db),
+        Key = couch_db:name(Db),
+        case couch_db:get_compactor_pid(Db) of
+            nil ->
+                Ref = erlang:monitor(process, DbPid),
+                DbPid ! {'$gen_call', {self(), Ref}, start_compact},
+                State#state{starting=[{Ref, Key}|State#state.starting]};
+            % database is still compacting so we can just monitor the existing
+            % compaction pid
+            CPid ->
+                couch_log:notice("Db ~s continuing compaction",
+                    [smoosh_utils:stringify(Key)]),
+                erlang:monitor(process, CPid),
+                State#state{active=[{Key, CPid}|State#state.active]}
+        end;
+    _ ->
+        false
+    end.
+
+schedule_unpause() ->
+    WaitSecs = list_to_integer(config:get("smoosh", "wait_secs", "30")),
+    erlang:send_after(WaitSecs * 1000, self(), unpause).
+
+cleanup_index_files(DbName, _Shard) ->
+    case config:get("smoosh", "cleanup_index_files", "false") of
+    "true" ->
+        fabric:cleanup_index_files(DbName);
+    _ ->
+        ok
+    end.
diff --git a/src/smoosh_priority_queue.erl b/src/smoosh_priority_queue.erl
new file mode 100644
index 0000000..b7ede55
--- /dev/null
+++ b/src/smoosh_priority_queue.erl
@@ -0,0 +1,85 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(smoosh_priority_queue).
+
+-export([new/0, last_updated/2, is_key/2, in/4, in/5, out/1, size/1, info/1]).
+
+-record(priority_queue, {
+    dict=dict:new(),
+    tree=gb_trees:empty()
+}).
+
+new() ->
+    #priority_queue{}.
+
+last_updated(Key, #priority_queue{dict=Dict}) ->
+    case dict:find(Key, Dict) of
+        {ok, {_, LastUpdated}} ->
+            LastUpdated;
+        error ->
+            false
+    end.
+
+is_key(Key, #priority_queue{dict=Dict}) ->
+    dict:is_key(Key, Dict).
+
+in(Key, Value, Priority, Q) ->
+    in(Key, Value, Priority, infinity, Q).
+
+in(Key, Value, Priority, Capacity, #priority_queue{dict=Dict, tree=Tree}) ->
+    Tree1 = case dict:find(Key, Dict) of
+        {ok, TreeKey} ->
+            gb_trees:delete_any(TreeKey, Tree);
+        error ->
+            Tree
+    end,
+    TreeKey1 = {Priority, now()},
+    Tree2 = gb_trees:enter(TreeKey1, {Key, Value}, Tree1),
+    Dict1 = dict:store(Key, TreeKey1, Dict),
+    truncate(Capacity, #priority_queue{dict=Dict1, tree=Tree2}).
+
+out(#priority_queue{dict=Dict,tree=Tree}) ->
+    case gb_trees:is_empty(Tree) of
+    true ->
+        false;
+    false ->
+        {_, {Key, Value}, Tree1} = gb_trees:take_largest(Tree),
+        Dict1 = dict:erase(Key, Dict),
+        {Key, Value, #priority_queue{dict=Dict1, tree=Tree1}}
+    end.
+
+size(#priority_queue{tree=Tree}) ->
+    gb_trees:size(Tree).
+
+info(#priority_queue{tree=Tree}=Q) ->
+    [{size, ?MODULE:size(Q)}|
+     case gb_trees:is_empty(Tree) of
+         true ->
+             [];
+         false ->
+             {Min, _, _} = gb_trees:take_smallest(Tree),
+             {Max, _, _} = gb_trees:take_largest(Tree),
+             [{min, Min}, {max, Max}]
+     end].
+
+truncate(infinity, Q) ->
+    Q;
+truncate(Capacity, Q) when Capacity > 0 ->
+    truncate(Capacity, ?MODULE:size(Q), Q).
+
+truncate(Capacity, Size, Q) when Size =< Capacity ->
+    Q;
+truncate(Capacity, Size, #priority_queue{dict=Dict, tree=Tree}) when Size > 0 ->
+    {_, {Key, _}, Tree1} = gb_trees:take_smallest(Tree),
+    Q1 = #priority_queue{dict=dict:erase(Key, Dict), tree=Tree1},
+    truncate(Capacity, ?MODULE:size(Q1), Q1).
diff --git a/src/smoosh_server.erl b/src/smoosh_server.erl
new file mode 100644
index 0000000..d0e064b
--- /dev/null
+++ b/src/smoosh_server.erl
@@ -0,0 +1,545 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(smoosh_server).
+-behaviour(gen_server).
+-vsn(3).
+-behaviour(config_listener).
+-include_lib("couch/include/couch_db.hrl").
+
+% public api.
+-export([
+    start_link/0,
+    suspend/0,
+    resume/0,
+    enqueue/1,
+    sync_enqueue/1,
+    sync_enqueue/2,
+    handle_db_event/3,
+    status/0
+]).
+
+% gen_server api.
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+    code_change/3, terminate/2]).
+
+% config_listener api
+-export([handle_config_change/5, handle_config_terminate/3]).
+
+% exported but for internal use.
+-export([enqueue_request/2]).
+
+% private records.
+
+-record(state, {
+    db_channels=[],
+    view_channels=[],
+    schema_channels=[],
+    tab,
+    event_listener,
+    waiting=dict:new()
+}).
+
+-record(channel, {
+    name,
+    pid
+}).
+
+% public functions.
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+suspend() ->
+    gen_server:call(?MODULE, suspend).
+
+resume() ->
+    gen_server:call(?MODULE, resume).
+
+status() ->
+    gen_server:call(?MODULE, status).
+
+enqueue(Object) ->
+    gen_server:cast(?MODULE, {enqueue, Object}).
+
+sync_enqueue(Object) ->
+    gen_server:call(?MODULE, {enqueue, Object}).
+
+sync_enqueue(Object, Timeout) ->
+    gen_server:call(?MODULE, {enqueue, Object}, Timeout).
+
+handle_db_event(DbName, local_updated, St) ->
+    smoosh_server:enqueue(DbName),
+    {ok, St};
+handle_db_event(DbName, updated, St) ->
+    smoosh_server:enqueue(DbName),
+    {ok, St};
+handle_db_event(DbName, {index_commit, IdxName}, St) ->
+    smoosh_server:enqueue({DbName, IdxName}),
+    {ok, St};
+handle_db_event(DbName, {schema_updated, DDocId}, St) ->
+    smoosh_server:enqueue({schema, DbName, DDocId}),
+    {ok, St};
+handle_db_event(_DbName, _Event, St) ->
+    {ok, St}.
+
+% gen_server functions.
+
+init([]) ->
+    process_flag(trap_exit, true),
+    ok = config:listen_for_changes(?MODULE, nil),
+    {ok, Pid} = start_event_listener(),
+    DbChannels = smoosh_utils:split(
+                   config:get("smoosh", "db_channels", "upgrade_dbs,ratio_dbs,slack_dbs")),
+    ViewChannels = smoosh_utils:split(
+                     config:get("smoosh", "view_channels", "upgrade_views,ratio_views,slack_views")),
+    SchemaChannels = smoosh_utils:split(config:get("smoosh",
+        "schema_channels", "ratio_schemas,slack_schemas")),
+    Tab = ets:new(channels, [{keypos, #channel.name}]),
+    {ok, create_missing_channels(#state{
+        db_channels=DbChannels,
+        view_channels=ViewChannels,
+        schema_channels=SchemaChannels,
+        event_listener=Pid,
+        tab=Tab
+    })}.
+
+handle_config_change("smoosh", "db_channels", L, _, _) ->
+    {ok, gen_server:cast(?MODULE, {new_db_channels, smoosh_utils:split(L)})};
+handle_config_change("smoosh", "view_channels", L, _, _) ->
+    {ok, gen_server:cast(?MODULE, {new_view_channels, smoosh_utils:split(L)})};
+handle_config_change("smoosh", "schema_channels", L, _, _) ->
+    {ok, gen_server:cast(?MODULE, {new_schema_channels, smoosh_utils:split(L)})};
+handle_config_change(_, _, _, _, _) ->
+    {ok, nil}.
+
+handle_config_terminate(_, stop, _) -> ok;
+handle_config_terminate(_, _, _) ->
+    spawn(fun() ->
+        timer:sleep(5000),
+        config:listen_for_changes(?MODULE, nil)
+    end).
+
+handle_call(status, _From, State) ->
+    Acc = ets:foldl(fun get_channel_status/2, [], State#state.tab),
+    {reply, {ok, Acc}, State};
+
+handle_call({enqueue, Object}, _From, State) ->
+    {noreply, NewState} = handle_cast({enqueue, Object}, State),
+    {reply, ok, NewState};
+
+handle_call(suspend, _From, State) ->
+    ets:foldl(fun(#channel{name=Name, pid=P}, _) ->
+        couch_log:notice("Suspending ~p", [Name]),
+        smoosh_channel:suspend(P) end, 0,
+        State#state.tab),
+    {reply, ok, State};
+
+handle_call(resume, _From, State) ->
+    ets:foldl(fun(#channel{name=Name, pid=P}, _) ->
+        couch_log:notice("Resuming ~p", [Name]),
+        smoosh_channel:resume(P) end, 0,
+        State#state.tab),
+    {reply, ok, State}.
+
+handle_cast({new_db_channels, Channels}, State) ->
+    [smoosh_channel:close(channel_pid(State#state.tab, C)) ||
+        C <- State#state.db_channels -- Channels],
+    {noreply, create_missing_channels(State#state{db_channels=Channels})};
+
+handle_cast({new_view_channels, Channels}, State) ->
+    [smoosh_channel:close(channel_pid(State#state.tab, C)) ||
+        C <- State#state.view_channels -- Channels],
+    {noreply, create_missing_channels(State#state{view_channels=Channels})};
+
+handle_cast({new_schema_channels, Channels}, State) ->
+    [smoosh_channel:close(channel_pid(State#state.tab, C)) ||
+        C <- State#state.schema_channels -- Channels],
+    {noreply, create_missing_channels(State#state{view_channels=Channels})};
+
+handle_cast({enqueue, Object}, State) ->
+    #state{waiting=Waiting}=State,
+    case dict:is_key(Object, Waiting) of
+        true ->
+            {noreply, State};
+        false ->
+            {_Pid, Ref} = spawn_monitor(?MODULE, enqueue_request, [State, Object]),
+            {noreply, State#state{waiting=dict:store(Object, Ref, Waiting)}}
+    end.
+
+handle_info({'EXIT', Pid, Reason}, #state{event_listener=Pid}=State) ->
+        couch_log:notice("update notifier died ~p", [Reason]),
+        {ok, Pid1} = start_event_listener(),
+        {noreply, State#state{event_listener=Pid1}};
+handle_info({'EXIT', Pid, Reason}, State) ->
+    couch_log:notice("~p ~p died ~p", [?MODULE, Pid, Reason]),
+    case ets:match_object(State#state.tab, #channel{pid=Pid, _='_'}) of
+    [#channel{name=Name}] ->
+        ets:delete(State#state.tab, Name);
+    _ ->
+        ok
+    end,
+    {noreply, create_missing_channels(State)};
+
+handle_info({'DOWN', Ref, _, _, _}, State) ->
+    Waiting = dict:filter(fun(_Key, Value) -> Value =/= Ref end,
+                          State#state.waiting),
+    {noreply, State#state{waiting=Waiting}};
+
+handle_info(_Msg, State) ->
+    {noreply, State}.
+
+terminate(_Reason, State) ->
+    ets:foldl(fun(#channel{pid=P}, _) -> smoosh_channel:close(P) end, 0,
+        State#state.tab),
+    ok.
+
+code_change(_OldVsn, {state, DbChannels, ViewChannels, Tab,
+        EventListener, Waiting}, _Extra) ->
+    {ok, #state{db_channels=DbChannels, view_channels=ViewChannels,
+        schema_channels=[], tab=Tab, event_listener = EventListener,
+            waiting=Waiting}};
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+% private functions.
+
+get_channel_status(#channel{name=Name, pid=P}, Acc0) when is_pid(P) ->
+    try gen_server:call(P, status) of
+    {ok, Status} ->
+        [{Name, Status} | Acc0];
+    _ ->
+        Acc0
+    catch _:_ ->
+        Acc0
+    end;
+get_channel_status(_, Acc0) ->
+    Acc0.
+
+start_event_listener() ->
+    couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]).
+
+enqueue_request(State, Object) ->
+    try
+        case find_channel(State, Object) of
+        false ->
+            ok;
+        {ok, Pid, Priority} ->
+            smoosh_channel:enqueue(Pid, Object, Priority)
+        end
+    catch Class:Exception ->
+        Stack = erlang:get_stacktrace(),
+        couch_log:notice("~s: ~p ~p for ~s : ~p",
+            [?MODULE, Class, Exception,
+                smoosh_utils:stringify(Object), Stack])
+    end.
+
+find_channel(#state{}=State, {schema, DbName, GroupId}) ->
+    find_channel(State#state.tab, State#state.schema_channels, {schema, DbName, GroupId});
+find_channel(#state{}=State, {Shard, GroupId}) ->
+    find_channel(State#state.tab, State#state.view_channels, {Shard, GroupId});
+find_channel(#state{}=State, DbName) ->
+    find_channel(State#state.tab, State#state.db_channels, DbName).
+
+find_channel(_Tab, [], _Object) ->
+    false;
+find_channel(Tab, [Channel|Rest], Object) ->
+    Pid = channel_pid(Tab, Channel),
+    LastUpdated = smoosh_channel:last_updated(Pid, Object),
+    Staleness = 6.0e7 * list_to_integer(config:get("smoosh", "staleness", "5")),
+    case LastUpdated =:= false orelse
+        timer:now_diff(now(), LastUpdated) > Staleness of
+    true ->
+        case smoosh_utils:ignore_db(Object) of
+        true ->
+            find_channel(Tab, Rest, Object);
+        _ ->
+            case get_priority(Channel, Object) of
+            0 ->
+                find_channel(Tab, Rest, Object);
+            Priority ->
+                {ok, Pid, Priority}
+            end
+        end;
+    false ->
+        find_channel(Tab, Rest, Object)
+    end.
+
+channel_pid(Tab, Channel) ->
+    [#channel{pid=Pid}] = ets:lookup(Tab, Channel),
+    Pid.
+
+create_missing_channels(State) ->
+    create_missing_channels(State#state.tab, State#state.db_channels),
+    create_missing_channels(State#state.tab, State#state.view_channels),
+    create_missing_channels(State#state.tab, State#state.schema_channels),
+    State.
+
+create_missing_channels(_Tab, []) ->
+    ok;
+create_missing_channels(Tab, [Channel|Rest]) ->
+    case ets:lookup(Tab, Channel) of
+        [] ->
+            {ok, Pid} = smoosh_channel:start_link(Channel),
+            true = ets:insert(Tab, [#channel{name=Channel, pid=Pid}]);
+        _ ->
+            ok
+    end,
+    create_missing_channels(Tab, Rest).
+
+get_priority(Channel, {Shard, GroupId}) ->
+    case couch_index_server:get_index(couch_mrview_index, Shard, GroupId) of
+    {ok, Pid} ->
+        try
+            {ok, ViewInfo} = couch_index:get_info(Pid),
+            {SizeInfo} = couch_util:get_value(sizes, ViewInfo),
+            DiskSize = couch_util:get_value(file, SizeInfo),
+            ActiveSize = couch_util:get_value(active, SizeInfo),
+            NeedsUpgrade = needs_upgrade(ViewInfo),
+            get_priority(Channel, DiskSize, ActiveSize, NeedsUpgrade)
+        catch
+            exit:{timeout, _} ->
+                0
+        end;
+    {not_found, _Reason} ->
+        0;
+    {error, Reason} ->
+        couch_log:warning("Failed to get group_pid for ~p ~p ~p: ~p",
+            [Channel, Shard, GroupId, Reason]),
+        0
+    end;
+
+get_priority(Channel, {schema, DbName, DDocId}) ->
+    case couch_md_index_manager:get_group_pid(DbName, DDocId) of
+    {ok, Pid} ->
+        {ok, SchemaInfo} = couch_md_index:get_info(Pid),
+        DiskSize = couch_util:get_value(disk_size, SchemaInfo),
+        DataSize = couch_util:get_value(data_size, SchemaInfo),
+        get_priority(Channel, DiskSize, DataSize, false);
+    {error, Reason} ->
+        couch_log:warning("Failed to get group_pid for ~p ~p ~p: ~p",
+            [Channel, DbName, DDocId, Reason]),
+        0
+    end;
+
+get_priority(Channel, DbName) when is_list(DbName) ->
+    get_priority(Channel, ?l2b(DbName));
+get_priority(Channel, DbName) when is_binary(DbName) ->
+    {ok, Db} = couch_db:open_int(DbName, []),
+    try get_priority(Channel, Db) after couch_db:close(Db) end;
+get_priority(Channel, Db) ->
+    {ok, DocInfo} = couch_db:get_db_info(Db),
+    {SizeInfo} = couch_util:get_value(sizes, DocInfo),
+    DiskSize = couch_util:get_value(file, SizeInfo),
+    ActiveSize = couch_util:get_value(active, SizeInfo),
+    NeedsUpgrade = needs_upgrade(DocInfo),
+    case db_changed(Channel, DocInfo) of
+        true  -> get_priority(Channel, DiskSize, ActiveSize, NeedsUpgrade);
+        false -> 0
+    end.
+
+get_priority(Channel, DiskSize, DataSize, NeedsUpgrade) ->
+    Priority = get_priority(Channel),
+    MinSize = to_number(Channel, "min_size", "1048576"),
+    MaxSize = to_number(Channel, "max_size", "infinity"),
+    DefaultMinPriority = case Priority of "slack" -> "16777216"; _ -> "5.0" end,
+    MinPriority = to_number(Channel, "min_priority", DefaultMinPriority),
+    MaxPriority = to_number(Channel, "max_priority", "infinity"),
+    if Priority =:= "upgrade", NeedsUpgrade ->
+            1;
+       DiskSize =< MinSize ->
+            0;
+       DiskSize > MaxSize ->
+            0;
+       DataSize =:= 0 ->
+            MinPriority;
+       Priority =:= "ratio", DiskSize/DataSize =< MinPriority ->
+            0;
+       Priority =:= "ratio", DiskSize/DataSize > MaxPriority ->
+            0;
+       Priority =:= "ratio" ->
+            DiskSize/DataSize;
+       Priority =:= "slack", DiskSize-DataSize =< MinPriority ->
+            0;
+       Priority =:= "slack", DiskSize-DataSize > MaxPriority ->
+            0;
+       Priority =:= "slack" ->
+            DiskSize-DataSize;
+       true ->
+            0
+    end.
+
+db_changed(Channel, Info) ->
+    case couch_util:get_value(compacted_seq, Info) of
+        undefined ->
+            true;
+        CompactedSeq ->
+            MinChanges = list_to_integer(
+                smoosh_utils:get(Channel, "min_changes", "0")),
+            UpdateSeq = couch_util:get_value(update_seq, Info),
+            UpdateSeq - CompactedSeq >= MinChanges
+    end.
+
+to_number(Channel, Name, Default) ->
+    case smoosh_utils:get(Channel, Name, Default) of
+        "infinity" -> infinity;
+        Value ->
+            try
+                list_to_float(Value)
+            catch error:badarg ->
+                list_to_integer(Value)
+            end
+    end.
+
+get_priority("ratio_dbs") ->
+    "ratio";
+get_priority("ratio_views") ->
+    "ratio";
+get_priority("ratio_schemas") ->
+    "ratio";
+get_priority("slack_dbs") ->
+    "slack";
+get_priority("slack_views") ->
+    "slack";
+get_priority("slack_schemas") ->
+    "slack";
+get_priority("upgrade_dbs") ->
+    "upgrade";
+get_priority("upgrade_views") ->
+    "upgrade";
+get_priority(Channel) ->
+    smoosh_utils:get(Channel, "priority", "ratio").
+
+needs_upgrade(Props) ->
+    DiskVersion = couch_util:get_value(disk_format_version, Props),
+    case couch_util:get_value(engine, Props) of
+        couch_bt_engine ->
+            (couch_bt_engine_header:latest(DiskVersion) =:= false);
+        _ ->
+            false
+    end.
+
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+
+setup() ->
+    meck:new([config, couch_index, couch_index_server], [passthrough]),
+    Pid = list_to_pid("<0.0.0>"),
+    meck:expect(couch_index_server, get_index, 3, {ok, Pid}),
+    meck:expect(config, get, fun(_, _, Default) -> Default end),
+    Shard = <<"shards/00000000-1fffffff/test.1529510412">>,
+    GroupId = <<"_design/ddoc">>,
+    {ok, Shard, GroupId}.
+
+
+teardown(_) ->
+    meck:unload().
+
+
+get_priority_test_() ->
+    {
+        foreach,
+        fun setup/0,
+        fun teardown/1,
+        [
+            fun t_ratio_view/1,
+            fun t_slack_view/1,
+            fun t_no_data_view/1,
+            fun t_below_min_priority_view/1,
+            fun t_below_min_size_view/1,
+            fun t_timeout_view/1,
+            fun t_missing_view/1,
+            fun t_invalid_view/1
+        ]
+}.
+
+
+t_ratio_view({ok, Shard, GroupId}) ->
+    ?_test(begin
+        meck:expect(couch_index, get_info, fun(_) ->
+            {ok, [{sizes, {[{file, 5242880}, {active, 524288}]}}]}
+        end),
+        ?assertEqual(10.0, get_priority("ratio_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId}))
+    end).
+
+t_slack_view({ok, Shard, GroupId}) ->
+    ?_test(begin
+        meck:expect(couch_index, get_info, fun(_) ->
+            {ok, [{sizes, {[{file, 33554432}, {active, 16777215}]}}]}
+        end),
+        ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})),
+        ?assertEqual(16777217, get_priority("slack_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId}))
+    end).
+
+t_no_data_view({ok, Shard, GroupId}) ->
+    ?_test(begin
+        meck:expect(couch_index, get_info, fun(_) ->
+            {ok, [{sizes, {[{file, 5242880}, {active, 0}]}}]}
+        end),
+        ?assertEqual(5.0, get_priority("ratio_views", {Shard, GroupId})),
+        ?assertEqual(16777216, get_priority("slack_views", {Shard, GroupId})),
+        ?assertEqual(5.0, get_priority("upgrade_views", {Shard, GroupId}))
+    end).
+
+t_below_min_priority_view({ok, Shard, GroupId}) ->
+    ?_test(begin
+        meck:expect(couch_index, get_info, fun(_) ->
+            {ok, [{sizes, {[{file, 5242880}, {active, 1048576}]}}]}
+        end),
+        ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId}))
+    end).
+
+t_below_min_size_view({ok, Shard, GroupId}) ->
+    ?_test(begin
+        meck:expect(couch_index, get_info, fun(_) ->
+            {ok, [{sizes, {[{file, 1048576}, {active, 512000}]}}]}
+        end),
+        ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId}))
+    end).
+
+t_timeout_view({ok, Shard, GroupId}) ->
+    ?_test(begin
+        meck:expect(couch_index, get_info, fun(_) ->
+            exit({timeout, get_info})
+        end),
+        ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId}))
+    end).
+
+t_missing_view({ok, Shard, GroupId}) ->
+    ?_test(begin
+        meck:expect(couch_index_server, get_index, 3, {not_found, missing}),
+        ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId}))
+    end).
+
+t_invalid_view({ok, Shard, GroupId}) ->
+    ?_test(begin
+        meck:expect(couch_index_server, get_index, 3, {error, undef}),
+        ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})),
+        ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId}))
+    end).
+
+
+-endif.
diff --git a/src/smoosh_sup.erl b/src/smoosh_sup.erl
new file mode 100644
index 0000000..158498c
--- /dev/null
+++ b/src/smoosh_sup.erl
@@ -0,0 +1,38 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(smoosh_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
+
+%% ===================================================================
+%% API functions
+%% ===================================================================
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%% ===================================================================
+%% Supervisor callbacks
+%% ===================================================================
+
+init([]) ->
+    {ok, { {one_for_one, 5, 10}, [?CHILD(smoosh_server, worker)]} }.
diff --git a/src/smoosh_utils.erl b/src/smoosh_utils.erl
new file mode 100644
index 0000000..78ef83a
--- /dev/null
+++ b/src/smoosh_utils.erl
@@ -0,0 +1,58 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(smoosh_utils).
+-include_lib("couch/include/couch_db.hrl").
+
+-export([get/2, get/3, group_pid/1, split/1, stringify/1, ignore_db/1]).
+
+group_pid({Shard, GroupId}) ->
+    case couch_view_group:open_db_group(Shard, GroupId) of
+    {ok, Group} ->
+        try
+            gen_server:call(couch_view, {get_group_server, Shard, Group})
+        catch _:Error ->
+            {error, Error}
+        end;
+    Else ->
+        Else
+    end.
+
+get(Channel, Key) ->
+    ?MODULE:get(Channel, Key, undefined).
+
+get(Channel, Key, Default) ->
+    config:get("smoosh." ++ Channel, Key, Default).
+
+split(CSV) ->
+    re:split(CSV, "\\s*,\\s*", [{return,list}, trim]).
+
+stringify({DbName, GroupId}) ->
+    io_lib:format("~s ~s", [DbName, GroupId]);
+stringify({schema, DbName, GroupId}) ->
+    io_lib:format("schema: ~s ~s", [DbName, GroupId]);
+stringify(DbName) ->
+    io_lib:format("~s", [DbName]).
+
+ignore_db({DbName, _GroupName}) ->
+    ignore_db(DbName);
+ignore_db(DbName) when is_binary(DbName)->
+    ignore_db(?b2l(DbName));
+ignore_db(DbName) when is_list(DbName) ->
+    case config:get("smoosh.ignore", DbName, false) of
+    "true" ->
+        true;
+    _ ->
+        false
+    end;
+ignore_db(Db) ->
+    ignore_db(couch_db:name(Db)).