blob: ed7b00939edf5e0468254d0be6e67e37658d2b4f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.segment.azure;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.CopyStatus;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.getName;
public class AzureArchiveManager implements SegmentArchiveManager {
private static final Logger log = LoggerFactory.getLogger(AzureSegmentArchiveReader.class);
private final CloudBlobDirectory cloudBlobDirectory;
private final IOMonitor ioMonitor;
private final FileStoreMonitor monitor;
public AzureArchiveManager(CloudBlobDirectory cloudBlobDirectory, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) {
this.cloudBlobDirectory = cloudBlobDirectory;
this.ioMonitor = ioMonitor;
this.monitor = fileStoreMonitor;
}
@Override
public List<String> listArchives() throws IOException {
try {
return StreamSupport.stream(cloudBlobDirectory
.listBlobs(null, false, EnumSet.noneOf(BlobListingDetails.class), null, null)
.spliterator(), false)
.filter(i -> i instanceof CloudBlobDirectory)
.map(i -> (CloudBlobDirectory) i)
.filter(i -> getName(i).endsWith(".tar"))
.map(CloudBlobDirectory::getPrefix)
.map(Paths::get)
.map(Path::getFileName)
.map(Path::toString)
.collect(Collectors.toList());
} catch (URISyntaxException | StorageException e) {
throw new IOException(e);
}
}
@Override
public SegmentArchiveReader open(String archiveName) throws IOException {
try {
CloudBlobDirectory archiveDirectory = getDirectory(archiveName);
if (!archiveDirectory.getBlockBlobReference("closed").exists()) {
throw new IOException("The archive " + archiveName + " hasn't been closed correctly.");
}
return new AzureSegmentArchiveReader(archiveDirectory, ioMonitor);
} catch (StorageException | URISyntaxException e) {
throw new IOException(e);
}
}
@Override
public SegmentArchiveReader forceOpen(String archiveName) throws IOException {
CloudBlobDirectory archiveDirectory = getDirectory(archiveName);
return new AzureSegmentArchiveReader(archiveDirectory, ioMonitor);
}
@Override
public SegmentArchiveWriter create(String archiveName) throws IOException {
return new AzureSegmentArchiveWriter(getDirectory(archiveName), ioMonitor, monitor);
}
@Override
public boolean delete(String archiveName) {
try {
getBlobs(archiveName)
.forEach(cloudBlob -> {
try {
cloudBlob.delete();
} catch (StorageException e) {
log.error("Can't delete segment {}", cloudBlob.getUri().getPath(), e);
}
});
return true;
} catch (IOException e) {
log.error("Can't delete archive {}", archiveName, e);
return false;
}
}
@Override
public boolean renameTo(String from, String to) {
try {
CloudBlobDirectory targetDirectory = getDirectory(to);
getBlobs(from)
.forEach(cloudBlob -> {
try {
renameBlob(cloudBlob, targetDirectory);
} catch (IOException e) {
log.error("Can't rename segment {}", cloudBlob.getUri().getPath(), e);
}
});
return true;
} catch (IOException e) {
log.error("Can't rename archive {} to {}", from, to, e);
return false;
}
}
@Override
public void copyFile(String from, String to) throws IOException {
CloudBlobDirectory targetDirectory = getDirectory(to);
getBlobs(from)
.forEach(cloudBlob -> {
try {
copyBlob(cloudBlob, targetDirectory);
} catch (IOException e) {
log.error("Can't copy segment {}", cloudBlob.getUri().getPath(), e);
}
});
}
@Override
public boolean exists(String archiveName) {
try {
return getBlobs(archiveName).findAny().isPresent();
} catch (IOException e) {
log.error("Can't check the existence of {}", archiveName, e);
return false;
}
}
@Override
public void recoverEntries(String archiveName, LinkedHashMap<UUID, byte[]> entries) throws IOException {
Pattern pattern = Pattern.compile(AzureUtilities.SEGMENT_FILE_NAME_PATTERN);
List<RecoveredEntry> entryList = new ArrayList<>();
for (CloudBlob b : getBlobList(archiveName)) {
String name = getName(b);
Matcher m = pattern.matcher(name);
if (!m.matches()) {
continue;
}
int position = Integer.parseInt(m.group(1), 16);
UUID uuid = UUID.fromString(m.group(2));
long length = b.getProperties().getLength();
if (length > 0) {
byte[] data = new byte[(int) length];
try {
b.downloadToByteArray(data, 0);
} catch (StorageException e) {
throw new IOException(e);
}
entryList.add(new RecoveredEntry(position, uuid, data, name));
}
}
Collections.sort(entryList);
int i = 0;
for (RecoveredEntry e : entryList) {
if (e.position != i) {
log.warn("Missing entry {}.??? when recovering {}. No more segments will be read.", String.format("%04X", i), archiveName);
break;
}
log.info("Recovering segment {}/{}", archiveName, e.fileName);
entries.put(e.uuid, e.data);
i++;
}
}
private CloudBlobDirectory getDirectory(String archiveName) throws IOException {
try {
return cloudBlobDirectory.getDirectoryReference(archiveName);
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
private Stream<CloudBlob> getBlobs(String archiveName) throws IOException {
return AzureUtilities.getBlobs(getDirectory(archiveName));
}
private List<CloudBlob> getBlobList(String archiveName) throws IOException {
return getBlobs(archiveName).collect(Collectors.toList());
}
private void renameBlob(CloudBlob blob, CloudBlobDirectory newParent) throws IOException {
copyBlob(blob, newParent);
try {
blob.delete();
} catch (StorageException e) {
throw new IOException(e);
}
}
private void copyBlob(CloudBlob blob, CloudBlobDirectory newParent) throws IOException {
checkArgument(blob instanceof CloudBlockBlob, "Only page blobs are supported for the rename");
try {
String blobName = getName(blob);
CloudBlockBlob newBlob = newParent.getBlockBlobReference(blobName);
newBlob.startCopy(blob.getUri());
boolean isStatusPending = true;
while (isStatusPending) {
newBlob.downloadAttributes();
if (newBlob.getCopyState().getStatus() == CopyStatus.PENDING) {
Thread.sleep(100);
} else {
isStatusPending = false;
}
}
CopyStatus finalStatus = newBlob.getCopyState().getStatus();
if (newBlob.getCopyState().getStatus() != CopyStatus.SUCCESS) {
throw new IOException("Invalid copy status for " + blob.getUri().getPath() + ": " + finalStatus);
}
} catch (StorageException | InterruptedException | URISyntaxException e) {
throw new IOException(e);
}
}
private static class RecoveredEntry implements Comparable<RecoveredEntry> {
private final byte[] data;
private final UUID uuid;
private final int position;
private final String fileName;
public RecoveredEntry(int position, UUID uuid, byte[] data, String fileName) {
this.data = data;
this.uuid = uuid;
this.position = position;
this.fileName = fileName;
}
@Override
public int compareTo(RecoveredEntry o) {
return Integer.compare(this.position, o.position);
}
}
}