blob: b08fc7251a46b29e2e3fb8d491bbceb01fa628b3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.hints;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.SyncUtil;
/**
* Encapsulates the state of a peer's hints: the queue of hints files for dispatch, and the current writer (if any).
*
* The queue for dispatch is multi-threading safe.
*
* The writer MUST only be accessed by {@link HintsWriteExecutor}.
*/
final class HintsStore
{
private static final Logger logger = LoggerFactory.getLogger(HintsStore.class);
public final UUID hostId;
private final File hintsDirectory;
private final ImmutableMap<String, Object> writerParams;
private final Map<HintsDescriptor, InputPosition> dispatchPositions;
private final Deque<HintsDescriptor> dispatchDequeue;
private final Queue<HintsDescriptor> corruptedFiles;
// last timestamp used in a descriptor; make sure to not reuse the same timestamp for new descriptors.
private volatile long lastUsedTimestamp;
private volatile HintsWriter hintsWriter;
private HintsStore(UUID hostId, File hintsDirectory, ImmutableMap<String, Object> writerParams, List<HintsDescriptor> descriptors)
{
this.hostId = hostId;
this.hintsDirectory = hintsDirectory;
this.writerParams = writerParams;
dispatchPositions = new ConcurrentHashMap<>();
dispatchDequeue = new ConcurrentLinkedDeque<>(descriptors);
corruptedFiles = new ConcurrentLinkedQueue<>();
//noinspection resource
lastUsedTimestamp = descriptors.stream().mapToLong(d -> d.timestamp).max().orElse(0L);
}
static HintsStore create(UUID hostId, File hintsDirectory, ImmutableMap<String, Object> writerParams, List<HintsDescriptor> descriptors)
{
descriptors.sort((d1, d2) -> Long.compare(d1.timestamp, d2.timestamp));
return new HintsStore(hostId, hintsDirectory, writerParams, descriptors);
}
@VisibleForTesting
int getDispatchQueueSize()
{
return dispatchDequeue.size();
}
InetAddress address()
{
return StorageService.instance.getEndpointForHostId(hostId);
}
boolean isLive()
{
InetAddress address = address();
return address != null && FailureDetector.instance.isAlive(address);
}
HintsDescriptor poll()
{
return dispatchDequeue.poll();
}
void offerFirst(HintsDescriptor descriptor)
{
dispatchDequeue.offerFirst(descriptor);
}
void offerLast(HintsDescriptor descriptor)
{
dispatchDequeue.offerLast(descriptor);
}
void deleteAllHints()
{
HintsDescriptor descriptor;
while ((descriptor = poll()) != null)
{
cleanUp(descriptor);
delete(descriptor);
}
while ((descriptor = corruptedFiles.poll()) != null)
{
cleanUp(descriptor);
delete(descriptor);
}
}
void delete(HintsDescriptor descriptor)
{
File hintsFile = new File(hintsDirectory, descriptor.fileName());
if (hintsFile.delete())
logger.info("Deleted hint file {}", descriptor.fileName());
else
logger.error("Failed to delete hint file {}", descriptor.fileName());
//noinspection ResultOfMethodCallIgnored
new File(hintsDirectory, descriptor.checksumFileName()).delete();
}
boolean hasFiles()
{
return !dispatchDequeue.isEmpty();
}
InputPosition getDispatchOffset(HintsDescriptor descriptor)
{
return dispatchPositions.get(descriptor);
}
void markDispatchOffset(HintsDescriptor descriptor, InputPosition inputPosition)
{
dispatchPositions.put(descriptor, inputPosition);
}
void cleanUp(HintsDescriptor descriptor)
{
dispatchPositions.remove(descriptor);
}
void markCorrupted(HintsDescriptor descriptor)
{
corruptedFiles.add(descriptor);
}
/*
* Methods dealing with HintsWriter.
*
* All of these, with the exception of isWriting(), are for exclusively single-threaded use by HintsWriteExecutor.
*/
boolean isWriting()
{
return hintsWriter != null;
}
HintsWriter getOrOpenWriter()
{
if (hintsWriter == null)
hintsWriter = openWriter();
return hintsWriter;
}
HintsWriter getWriter()
{
return hintsWriter;
}
private HintsWriter openWriter()
{
lastUsedTimestamp = Math.max(System.currentTimeMillis(), lastUsedTimestamp + 1);
HintsDescriptor descriptor = new HintsDescriptor(hostId, lastUsedTimestamp, writerParams);
try
{
return HintsWriter.create(hintsDirectory, descriptor);
}
catch (IOException e)
{
throw new FSWriteError(e, descriptor.fileName());
}
}
void closeWriter()
{
if (hintsWriter != null)
{
hintsWriter.close();
offerLast(hintsWriter.descriptor());
hintsWriter = null;
SyncUtil.trySyncDir(hintsDirectory);
}
}
void fsyncWriter()
{
if (hintsWriter != null)
hintsWriter.fsync();
}
}