| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.jackrabbit.oak.segment; |
| |
| import static org.apache.jackrabbit.guava.common.base.Preconditions.checkNotNull; |
| import static org.apache.jackrabbit.guava.common.base.Preconditions.checkState; |
| import static org.apache.jackrabbit.guava.common.collect.Lists.newArrayList; |
| import static org.apache.jackrabbit.guava.common.collect.Maps.newHashMap; |
| import static org.apache.jackrabbit.guava.common.collect.Sets.newHashSet; |
| import static java.lang.Thread.currentThread; |
| |
| import java.io.IOException; |
| import java.util.AbstractMap.SimpleImmutableEntry; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.jackrabbit.guava.common.base.Supplier; |
| import org.apache.jackrabbit.guava.common.util.concurrent.Monitor; |
| import org.apache.jackrabbit.guava.common.util.concurrent.Monitor.Guard; |
| import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration; |
| import org.jetbrains.annotations.NotNull; |
| |
| /** |
| * This {@link WriteOperationHandler} uses a pool of {@link SegmentBufferWriter}s, |
| * which it passes to its {@link #execute(GCGeneration, WriteOperation) execute} method. |
| * <p> |
| * Instances of this class are thread safe. |
| */ |
| public class SegmentBufferWriterPool implements WriteOperationHandler { |
| |
| /** |
| * Monitor protecting the state of this pool. Neither of {@link #writers}, |
| * {@link #borrowed} and {@link #disposed} must be modified without owning |
| * this monitor. |
| */ |
| private final Monitor poolMonitor = new Monitor(true); |
| |
| /** |
| * Pool of current writers that are not in use |
| */ |
| private final Map<Object, SegmentBufferWriter> writers = newHashMap(); |
| |
| /** |
| * Writers that are currently in use |
| */ |
| private final Set<SegmentBufferWriter> borrowed = newHashSet(); |
| |
| /** |
| * Retired writers that have not yet been flushed |
| */ |
| private final Set<SegmentBufferWriter> disposed = newHashSet(); |
| |
| @NotNull |
| private final SegmentIdProvider idProvider; |
| |
| @NotNull |
| private final SegmentReader reader; |
| |
| @NotNull |
| private final Supplier<GCGeneration> gcGeneration; |
| |
| @NotNull |
| private final String wid; |
| |
| private short writerId = -1; |
| |
| public SegmentBufferWriterPool( |
| @NotNull SegmentIdProvider idProvider, |
| @NotNull SegmentReader reader, |
| @NotNull String wid, |
| @NotNull Supplier<GCGeneration> gcGeneration) { |
| this.idProvider = checkNotNull(idProvider); |
| this.reader = checkNotNull(reader); |
| this.wid = checkNotNull(wid); |
| this.gcGeneration = checkNotNull(gcGeneration); |
| } |
| |
| @Override |
| @NotNull |
| public GCGeneration getGCGeneration() { |
| return gcGeneration.get(); |
| } |
| |
| @NotNull |
| @Override |
| public RecordId execute(@NotNull GCGeneration gcGeneration, |
| @NotNull WriteOperation writeOperation) |
| throws IOException { |
| SimpleImmutableEntry<?,?> key = new SimpleImmutableEntry<>(currentThread(), gcGeneration); |
| SegmentBufferWriter writer = borrowWriter(key, gcGeneration); |
| try { |
| return writeOperation.execute(writer); |
| } finally { |
| returnWriter(key, writer); |
| } |
| } |
| |
| @Override |
| public void flush(@NotNull SegmentStore store) throws IOException { |
| List<SegmentBufferWriter> toFlush = newArrayList(); |
| List<SegmentBufferWriter> toReturn = newArrayList(); |
| |
| poolMonitor.enter(); |
| try { |
| // Collect all writers that are not currently in use and clear |
| // the list so they won't get re-used anymore. |
| toFlush.addAll(writers.values()); |
| writers.clear(); |
| |
| // Collect all borrowed writers, which we need to wait for. |
| // Clear the list so they will get disposed once returned. |
| toReturn.addAll(borrowed); |
| borrowed.clear(); |
| } finally { |
| poolMonitor.leave(); |
| } |
| |
| // Wait for the return of the borrowed writers. This is the |
| // case once all of them appear in the disposed set. |
| if (safeEnterWhen(poolMonitor, allReturned(toReturn))) { |
| try { |
| // Collect all disposed writers and clear the list to mark them |
| // as flushed. |
| toFlush.addAll(toReturn); |
| disposed.removeAll(toReturn); |
| } finally { |
| poolMonitor.leave(); |
| } |
| } |
| |
| // Call flush from outside the pool monitor to avoid potential |
| // deadlocks of that method calling SegmentStore.writeSegment |
| for (SegmentBufferWriter writer : toFlush) { |
| writer.flush(store); |
| } |
| } |
| |
| /** |
| * Create a {@code Guard} that is satisfied if and only if {@link #disposed} |
| * contains all items in {@code toReturn} |
| */ |
| @NotNull |
| private Guard allReturned(final List<SegmentBufferWriter> toReturn) { |
| return new Guard(poolMonitor) { |
| |
| @Override |
| public boolean isSatisfied() { |
| return disposed.containsAll(toReturn); |
| } |
| |
| }; |
| } |
| |
| /** |
| * Same as {@code monitor.enterWhen(guard)} but copes with that pesky {@code |
| * InterruptedException} by catching it and setting this thread's |
| * interrupted flag. |
| */ |
| private static boolean safeEnterWhen(Monitor monitor, Guard guard) { |
| try { |
| monitor.enterWhen(guard); |
| return true; |
| } catch (InterruptedException ignore) { |
| currentThread().interrupt(); |
| return false; |
| } |
| } |
| |
| /** |
| * Return a writer from the pool by its {@code key}. This method may return |
| * a fresh writer at any time. Callers need to return a writer before |
| * borrowing it again. Failing to do so leads to undefined behaviour. |
| */ |
| private SegmentBufferWriter borrowWriter(@NotNull Object key, @NotNull GCGeneration gcGeneration) { |
| poolMonitor.enter(); |
| try { |
| SegmentBufferWriter writer = writers.remove(key); |
| if (writer == null) { |
| writer = new SegmentBufferWriter( |
| idProvider, |
| reader, |
| getWriterId(wid), |
| gcGeneration |
| ); |
| } |
| borrowed.add(writer); |
| return writer; |
| } finally { |
| poolMonitor.leave(); |
| } |
| } |
| |
| /** |
| * Return a writer to the pool using the {@code key} that was used to borrow |
| * it. |
| */ |
| private void returnWriter(Object key, SegmentBufferWriter writer) { |
| poolMonitor.enter(); |
| try { |
| if (borrowed.remove(writer)) { |
| checkState(writers.put(key, writer) == null); |
| } else { |
| // Defer flush this writer as it was borrowed while flush() was called. |
| disposed.add(writer); |
| } |
| } finally { |
| poolMonitor.leave(); |
| } |
| } |
| |
| private String getWriterId(String wid) { |
| if (++writerId > 9999) { |
| writerId = 0; |
| } |
| // Manual padding seems to be fastest here |
| if (writerId < 10) { |
| return wid + ".000" + writerId; |
| } else if (writerId < 100) { |
| return wid + ".00" + writerId; |
| } else if (writerId < 1000) { |
| return wid + ".0" + writerId; |
| } else { |
| return wid + "." + writerId; |
| } |
| } |
| } |