| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(ddoc_cache_lru). |
| -behaviour(gen_server). |
| -vsn(1). |
| |
| |
| -export([ |
| start_link/0, |
| open/1, |
| insert/2, |
| refresh/2 |
| ]). |
| |
| -export([ |
| init/1, |
| terminate/2, |
| handle_call/3, |
| handle_cast/2, |
| handle_info/2, |
| code_change/3 |
| ]). |
| |
| -export([ |
| handle_db_event/3 |
| ]). |
| |
| |
| -include("ddoc_cache.hrl"). |
| |
| |
| -define(OPENER, ddoc_cache_opener). |
| |
| |
| -record(st, { |
| pids, % pid -> key |
| dbs, % dbname -> docid -> key -> pid |
| evictor |
| }). |
| |
| |
| start_link() -> |
| gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). |
| |
| |
| open(Key) -> |
| try ets:lookup(?CACHE, Key) of |
| [] -> |
| lru_start(Key, true); |
| [#entry{pid = undefined}] -> |
| lru_start(Key, false); |
| [#entry{val = undefined, pid = Pid}] -> |
| couch_stats:increment_counter([ddoc_cache, miss]), |
| ddoc_cache_entry:open(Pid, Key); |
| [#entry{val = Val, pid = Pid}] -> |
| couch_stats:increment_counter([ddoc_cache, hit]), |
| ddoc_cache_entry:accessed(Pid), |
| {ok, Val} |
| catch _:_ -> |
| couch_stats:increment_counter([ddoc_cache, recovery]), |
| ddoc_cache_entry:recover(Key) |
| end. |
| |
| |
| insert(Key, Value) -> |
| case ets:lookup(?CACHE, Key) of |
| [] -> |
| Wrapped = ddoc_cache_value:wrap(Value), |
| gen_server:call(?MODULE, {start, Key, Wrapped}, infinity); |
| [#entry{}] -> |
| ok |
| end. |
| |
| |
| refresh(DbName, DDocIds) -> |
| gen_server:cast(?MODULE, {refresh, DbName, DDocIds}). |
| |
| |
| init(_) -> |
| couch_util:set_mqd_off_heap(), |
| process_flag(trap_exit, true), |
| BaseOpts = [public, named_table], |
| CacheOpts = [ |
| set, |
| {read_concurrency, true}, |
| {keypos, #entry.key} |
| ] ++ BaseOpts, |
| ets:new(?CACHE, CacheOpts), |
| ets:new(?LRU, [ordered_set, {write_concurrency, true}] ++ BaseOpts), |
| {ok, Pids} = khash:new(), |
| {ok, Dbs} = khash:new(), |
| {ok, Evictor} = couch_event:link_listener( |
| ?MODULE, handle_db_event, nil, [all_dbs] |
| ), |
| ?EVENT(lru_init, nil), |
| {ok, #st{ |
| pids = Pids, |
| dbs = Dbs, |
| evictor = Evictor |
| }}. |
| |
| |
| terminate(_Reason, St) -> |
| case is_pid(St#st.evictor) of |
| true -> exit(St#st.evictor, kill); |
| false -> ok |
| end, |
| ok. |
| |
| |
| handle_call({start, Key, Default}, _From, St) -> |
| #st{ |
| pids = Pids, |
| dbs = Dbs |
| } = St, |
| case ets:lookup(?CACHE, Key) of |
| [] -> |
| MaxSize = config:get_integer("ddoc_cache", "max_size", 104857600), |
| case trim(St, max(0, MaxSize)) of |
| ok -> |
| true = ets:insert_new(?CACHE, #entry{key = Key}), |
| {ok, Pid} = ddoc_cache_entry:start_link(Key, Default), |
| true = ets:update_element(?CACHE, Key, {#entry.pid, Pid}), |
| ok = khash:put(Pids, Pid, Key), |
| store_key(Dbs, Key, Pid), |
| {reply, {ok, Pid}, St}; |
| full -> |
| ?EVENT(full, Key), |
| {reply, full, St} |
| end; |
| [#entry{pid = Pid}] -> |
| {reply, {ok, Pid}, St} |
| end; |
| |
| handle_call(Msg, _From, St) -> |
| {stop, {invalid_call, Msg}, {invalid_call, Msg}, St}. |
| |
| |
| handle_cast({evict, DbName}, St) -> |
| gen_server:abcast(mem3:nodes(), ?OPENER, {do_evict, DbName}), |
| {noreply, St}; |
| |
| handle_cast({refresh, DbName, DDocIds}, St) -> |
| gen_server:abcast(mem3:nodes(), ?OPENER, {do_evict, DbName, DDocIds}), |
| {noreply, St}; |
| |
| handle_cast({do_evict, DbName}, St) -> |
| #st{ |
| dbs = Dbs |
| } = St, |
| ToRem = case khash:lookup(Dbs, DbName) of |
| {value, DDocIds} -> |
| AccOut = khash:fold(DDocIds, fun(_, Keys, Acc1) -> |
| khash:to_list(Keys) ++ Acc1 |
| end, []), |
| ?EVENT(evicted, DbName), |
| AccOut; |
| not_found -> |
| ?EVENT(evict_noop, DbName), |
| [] |
| end, |
| lists:foreach(fun({Key, Pid}) -> |
| remove_entry(St, Key, Pid) |
| end, ToRem), |
| khash:del(Dbs, DbName), |
| {noreply, St}; |
| |
| handle_cast({do_refresh, DbName, DDocIdList}, St) -> |
| #st{ |
| dbs = Dbs |
| } = St, |
| % We prepend no_ddocid to the DDocIdList below |
| % so that we refresh all custom and validation |
| % function entries which load data from all |
| % design documents. |
| case khash:lookup(Dbs, DbName) of |
| {value, DDocIds} -> |
| lists:foreach(fun(DDocId) -> |
| case khash:lookup(DDocIds, DDocId) of |
| {value, Keys} -> |
| khash:fold(Keys, fun(_, Pid, _) -> |
| ddoc_cache_entry:refresh(Pid) |
| end, nil); |
| not_found -> |
| ok |
| end |
| end, [no_ddocid | DDocIdList]); |
| not_found -> |
| ok |
| end, |
| {noreply, St}; |
| |
| handle_cast(Msg, St) -> |
| {stop, {invalid_cast, Msg}, St}. |
| |
| |
| handle_info({'EXIT', Pid, Reason}, #st{evictor = Pid} = St) -> |
| {stop, Reason, St}; |
| |
| handle_info({'EXIT', Pid, normal}, St) -> |
| % This clause handles when an entry starts |
| % up but encounters an error or uncacheable |
| % response from its recover call. |
| #st{ |
| pids = Pids |
| } = St, |
| {value, Key} = khash:lookup(Pids, Pid), |
| khash:del(Pids, Pid), |
| remove_key(St, Key), |
| {noreply, St}; |
| |
| handle_info(Msg, St) -> |
| {stop, {invalid_info, Msg}, St}. |
| |
| |
| code_change(_OldVsn, St, _Extra) -> |
| {ok, St}. |
| |
| |
| handle_db_event(ShardDbName, created, St) -> |
| gen_server:cast(?MODULE, {evict, mem3:dbname(ShardDbName)}), |
| {ok, St}; |
| |
| handle_db_event(ShardDbName, deleted, St) -> |
| gen_server:cast(?MODULE, {evict, mem3:dbname(ShardDbName)}), |
| {ok, St}; |
| |
| handle_db_event(_DbName, _Event, St) -> |
| {ok, St}. |
| |
| |
| lru_start(Key, DoInsert) -> |
| case gen_server:call(?MODULE, {start, Key, undefined}, infinity) of |
| {ok, Pid} -> |
| couch_stats:increment_counter([ddoc_cache, miss]), |
| Resp = ddoc_cache_entry:open(Pid, Key), |
| if not DoInsert -> ok; true -> |
| ddoc_cache_entry:insert(Key, Resp) |
| end, |
| Resp; |
| full -> |
| couch_stats:increment_counter([ddoc_cache, recovery]), |
| ddoc_cache_entry:recover(Key) |
| end. |
| |
| |
| trim(_, 0) -> |
| full; |
| |
| trim(St, MaxSize) -> |
| CurSize = ets:info(?CACHE, memory) * erlang:system_info(wordsize), |
| if CurSize =< MaxSize -> ok; true -> |
| case ets:first(?LRU) of |
| {_Ts, Key, Pid} -> |
| remove_entry(St, Key, Pid), |
| trim(St, MaxSize); |
| '$end_of_table' -> |
| full |
| end |
| end. |
| |
| |
| remove_entry(St, Key, Pid) -> |
| #st{ |
| pids = Pids |
| } = St, |
| unlink_and_flush(Pid), |
| ddoc_cache_entry:shutdown(Pid), |
| khash:del(Pids, Pid), |
| remove_key(St, Key). |
| |
| |
| store_key(Dbs, Key, Pid) -> |
| DbName = ddoc_cache_entry:dbname(Key), |
| DDocId = ddoc_cache_entry:ddocid(Key), |
| case khash:lookup(Dbs, DbName) of |
| {value, DDocIds} -> |
| case khash:lookup(DDocIds, DDocId) of |
| {value, Keys} -> |
| khash:put(Keys, Key, Pid); |
| not_found -> |
| {ok, Keys} = khash:from_list([{Key, Pid}]), |
| khash:put(DDocIds, DDocId, Keys) |
| end; |
| not_found -> |
| {ok, Keys} = khash:from_list([{Key, Pid}]), |
| {ok, DDocIds} = khash:from_list([{DDocId, Keys}]), |
| khash:put(Dbs, DbName, DDocIds) |
| end. |
| |
| |
| remove_key(St, Key) -> |
| #st{ |
| dbs = Dbs |
| } = St, |
| DbName = ddoc_cache_entry:dbname(Key), |
| DDocId = ddoc_cache_entry:ddocid(Key), |
| {value, DDocIds} = khash:lookup(Dbs, DbName), |
| {value, Keys} = khash:lookup(DDocIds, DDocId), |
| khash:del(Keys, Key), |
| case khash:size(Keys) of |
| 0 -> khash:del(DDocIds, DDocId); |
| _ -> ok |
| end, |
| case khash:size(DDocIds) of |
| 0 -> khash:del(Dbs, DbName); |
| _ -> ok |
| end. |
| |
| |
| unlink_and_flush(Pid) -> |
| erlang:unlink(Pid), |
| % Its possible that the entry process has already exited before |
| % we unlink it so we have to flush out a possible 'EXIT' |
| % message sitting in our message queue. Notice that we're |
| % maintaining the assertion that all entry processes only |
| % ever exit normally. |
| receive |
| {'EXIT', Pid, normal} -> |
| ok |
| after 0 -> |
| ok |
| end. |