| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package org.apache.couchdb.nouveau.core; |
| |
| import static com.codahale.metrics.MetricRegistry.name; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.file.FileAlreadyExistsException; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.time.Duration; |
| import java.util.List; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.stream.Stream; |
| |
| import org.apache.couchdb.nouveau.api.IndexDefinition; |
| import org.eclipse.jetty.io.RuntimeIOException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.codahale.metrics.MetricRegistry; |
| import com.codahale.metrics.caffeine.MetricsStatsCounter; |
| import com.fasterxml.jackson.annotation.JsonProperty; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.github.benmanes.caffeine.cache.Cache; |
| import com.github.benmanes.caffeine.cache.Caffeine; |
| import com.github.benmanes.caffeine.cache.RemovalCause; |
| import com.github.benmanes.caffeine.cache.RemovalListener; |
| import com.github.benmanes.caffeine.cache.Scheduler; |
| |
| import io.dropwizard.lifecycle.Managed; |
| import jakarta.ws.rs.WebApplicationException; |
| import jakarta.ws.rs.core.Response.Status; |
| |
| /** |
| * The central class of Nouveau, responsible for loading and unloading Lucene |
| * indexes and making them available for query. |
| */ |
| |
| public final class IndexManager implements Managed { |
| |
| @FunctionalInterface |
| public interface IndexFunction<V, R> { |
| R apply(final V value) throws IOException; |
| } |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(IndexManager.class); |
| |
| private int maxIndexesOpen; |
| |
| private int commitIntervalSeconds; |
| |
| private int idleSeconds; |
| |
| private Path rootDir; |
| |
| private ObjectMapper objectMapper; |
| |
| private MetricRegistry metricRegistry; |
| |
| private ScheduledExecutorService scheduler; |
| |
| private Cache<String, Index> cache; |
| |
| public <R> R with(final String name, final IndexLoader loader, final IndexFunction<Index, R> indexFun) |
| throws IOException, InterruptedException { |
| while (true) { |
| if (!exists(name)) { |
| throw new WebApplicationException("Index does not exist", Status.NOT_FOUND); |
| } |
| |
| final Index index; |
| try { |
| index = cache.get(name, (n) -> { |
| LOGGER.info("opening {}", n); |
| final Path path = indexPath(n); |
| try { |
| final IndexDefinition indexDefinition = loadIndexDefinition(n); |
| return loader.apply(path, indexDefinition); |
| } catch (final IOException e) { |
| throw new RuntimeIOException(e); |
| } |
| }); |
| } catch (final RuntimeIOException e) { |
| throw (IOException) e.getCause(); |
| } |
| |
| if (index.tryAcquire(1, TimeUnit.SECONDS)) { |
| try { |
| final R result = indexFun.apply(index); |
| if (index.needsCommit(commitIntervalSeconds, TimeUnit.SECONDS)) { |
| scheduler.execute(() -> { |
| if (index.tryAcquire()) { |
| try { |
| LOGGER.debug("committing {}", name); |
| try { |
| index.commit(); |
| } catch (final IOException e) { |
| LOGGER.warn("I/O exception while committing " + name, e); |
| } |
| } finally { |
| index.release(); |
| } |
| } |
| }); |
| } |
| return result; |
| } finally { |
| index.release(); |
| } |
| } |
| } |
| } |
| |
| public void create(final String name, IndexDefinition indexDefinition) throws IOException { |
| if (exists(name)) { |
| throw new WebApplicationException("Index already exists", Status.EXPECTATION_FAILED); |
| } |
| // Validate index definiton |
| // TODO luceneFor(indexDefinition).validate(indexDefinition); |
| |
| // Persist definition |
| final Path path = indexDefinitionPath(name); |
| if (Files.exists(path)) { |
| throw new FileAlreadyExistsException(name + " already exists"); |
| } |
| Files.createDirectories(path.getParent()); |
| objectMapper.writeValue(path.toFile(), indexDefinition); |
| } |
| |
| public boolean exists(final String name) { |
| return Files.exists(indexDefinitionPath(name)); |
| } |
| |
| public void deleteAll(final String path, final List<String> exclusions) throws IOException { |
| LOGGER.info("deleting indexes below {} (excluding {})", path, |
| exclusions == null ? "nothing" : exclusions); |
| |
| final Path indexRootPath = indexRootPath(path); |
| if (!indexRootPath.toFile().exists()) { |
| return; |
| } |
| Stream<Path> stream = Files.find(indexRootPath, 100, |
| (p, attr) -> attr.isDirectory() && isIndex(p)); |
| try { |
| stream.forEach((p) -> { |
| final String relativeToExclusions = indexRootPath.relativize(p).toString(); |
| if (exclusions != null && exclusions.indexOf(relativeToExclusions) != -1) { |
| return; |
| } |
| final String relativeName = rootDir.relativize(p).toString(); |
| try { |
| deleteIndex(relativeName); |
| } catch (final IOException e) { |
| LOGGER.error("I/O exception deleting " + p, e); |
| } |
| // Clean any newly empty directories. |
| do { |
| final File f = p.toFile(); |
| if (f.isDirectory() && f.list().length == 0) { |
| f.delete(); |
| } |
| } while ((p = p.getParent()) != null && !rootDir.equals(p)); |
| }); |
| } finally { |
| stream.close(); |
| } |
| } |
| |
| private void deleteIndex(final String name) throws IOException { |
| final Index index = cache.asMap().remove(name); |
| if (index != null) { |
| index.setDeleteOnClose(true); |
| close(name, index); |
| } else { |
| IOUtils.rm(indexRootPath(name)); |
| } |
| } |
| |
| @JsonProperty |
| public int getMaxIndexesOpen() { |
| return maxIndexesOpen; |
| } |
| |
| public void setMaxIndexesOpen(int maxIndexesOpen) { |
| this.maxIndexesOpen = maxIndexesOpen; |
| } |
| |
| public int getCommitIntervalSeconds() { |
| return commitIntervalSeconds; |
| } |
| |
| public void setCommitIntervalSeconds(int commitIntervalSeconds) { |
| this.commitIntervalSeconds = commitIntervalSeconds; |
| } |
| |
| public int getIdleSeconds() { |
| return idleSeconds; |
| } |
| |
| public void setIdleSeconds(int idleSeconds) { |
| this.idleSeconds = idleSeconds; |
| } |
| |
| public void setScheduler(ScheduledExecutorService scheduler) { |
| this.scheduler = scheduler; |
| } |
| |
| public Path getRootDir() { |
| return rootDir; |
| } |
| |
| public void setRootDir(Path rootDir) { |
| this.rootDir = rootDir; |
| } |
| |
| public void setObjectMapper(final ObjectMapper objectMapper) { |
| this.objectMapper = objectMapper; |
| } |
| |
| public void setMetricRegistry(final MetricRegistry metricRegistry) { |
| this.metricRegistry = metricRegistry; |
| } |
| |
| @Override |
| public void start() throws IOException { |
| cache = Caffeine.newBuilder() |
| .recordStats(() -> new MetricsStatsCounter(metricRegistry, name(IndexManager.class, "cache"))) |
| .initialCapacity(maxIndexesOpen) |
| .maximumSize(maxIndexesOpen) |
| .expireAfterAccess(Duration.ofSeconds(idleSeconds)) |
| .scheduler(Scheduler.systemScheduler()) |
| .evictionListener(new IndexEvictionListener()) |
| .build(); |
| } |
| |
| @Override |
| public void stop() throws IOException, InterruptedException { |
| final var it = cache.asMap().entrySet().iterator(); |
| while (it.hasNext()) { |
| var e = it.next(); |
| LOGGER.info("closing {} during shutdown", e.getKey()); |
| close(e.getKey(), e.getValue()); |
| it.remove(); |
| } |
| } |
| |
| private boolean isIndex(final Path path) { |
| return path.resolve("index_definition.json").toFile().exists(); |
| } |
| |
| private Path indexDefinitionPath(final String name) { |
| return indexRootPath(name).resolve("index_definition.json"); |
| } |
| |
| private Path indexPath(final String name) { |
| return indexRootPath(name).resolve("index"); |
| } |
| |
| private IndexDefinition loadIndexDefinition(final String name) throws IOException { |
| return objectMapper.readValue(indexDefinitionPath(name).toFile(), IndexDefinition.class); |
| } |
| |
| private Path indexRootPath(final String name) { |
| final Path result = rootDir.resolve(name).normalize(); |
| if (result.startsWith(rootDir)) { |
| return result; |
| } |
| throw new WebApplicationException(name + " attempts to escape from index root directory", |
| Status.BAD_REQUEST); |
| } |
| |
| private class IndexEvictionListener implements RemovalListener<String, Index> { |
| |
| public void onRemoval(String name, Index index, RemovalCause cause) { |
| LOGGER.info("closing {} for cause {}", name, cause); |
| try { |
| close(name, index); |
| } catch (final IOException e) { |
| LOGGER.error("I/O exception when evicting " + name, e); |
| } |
| } |
| } |
| |
| private void close(final String name, final Index index) throws IOException { |
| IOUtils.runAll( |
| () -> { |
| if (index.tryAcquire()) { |
| try { |
| if (!index.isDeleteOnClose() && index.commit()) { |
| LOGGER.debug("committed {} before close", name); |
| } |
| } finally { |
| index.release(); |
| } |
| } |
| }, |
| () -> { |
| index.close(); |
| }, |
| () -> { |
| if (index.isDeleteOnClose()) { |
| IOUtils.rm(indexRootPath(name)); |
| } |
| }); |
| } |
| |
| } |