blob: 50bad248da65c37e4e5f4cef3041c80c6ee253f3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.segment.azure;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudAppendBlob;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile;
import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader;
import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class AzureJournalFile implements JournalFile {
private static final Logger log = LoggerFactory.getLogger(AzureJournalFile.class);
private static final int JOURNAL_LINE_LIMIT = Integer.getInteger("org.apache.jackrabbit.oak.segment.azure.journal.lines", 40_000);
private final CloudBlobDirectory directory;
private final String journalNamePrefix;
private final int lineLimit;
AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix, int lineLimit) {
this.directory = directory;
this.journalNamePrefix = journalNamePrefix;
this.lineLimit = lineLimit;
}
public AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix) {
this(directory, journalNamePrefix, JOURNAL_LINE_LIMIT);
}
@Override
public JournalFileReader openJournalReader() throws IOException {
return new CombinedReader(getJournalBlobs());
}
@Override
public JournalFileWriter openJournalWriter() throws IOException {
return new AzureJournalWriter();
}
@Override
public String getName() {
return journalNamePrefix;
}
@Override
public boolean exists() {
try {
return !getJournalBlobs().isEmpty();
} catch (IOException e) {
log.error("Can't check if the file exists", e);
return false;
}
}
private String getJournalFileName(int index) {
return String.format("%s.%03d", journalNamePrefix, index);
}
private List<CloudAppendBlob> getJournalBlobs() throws IOException {
try {
List<CloudAppendBlob> result = new ArrayList<>();
for (ListBlobItem b : directory.listBlobs(journalNamePrefix)) {
if (b instanceof CloudAppendBlob) {
result.add((CloudAppendBlob) b);
} else {
log.warn("Invalid blob type: {} {}", b.getUri(), b.getClass());
}
}
result.sort(Comparator.<CloudAppendBlob, String>comparing(AzureUtilities::getName).reversed());
return result;
} catch (URISyntaxException | StorageException e) {
throw new IOException(e);
}
}
private static class AzureJournalReader implements JournalFileReader {
private final CloudBlob blob;
private ReverseFileReader reader;
private AzureJournalReader(CloudBlob blob) {
this.blob = blob;
}
@Override
public String readLine() throws IOException {
if (reader == null) {
try {
reader = new ReverseFileReader(blob);
} catch (StorageException e) {
throw new IOException(e);
}
}
return reader.readLine();
}
@Override
public void close() throws IOException {
}
}
private class AzureJournalWriter implements JournalFileWriter {
private CloudAppendBlob currentBlob;
private int blockCount;
public AzureJournalWriter() throws IOException {
List<CloudAppendBlob> blobs = getJournalBlobs();
if (blobs.isEmpty()) {
try {
currentBlob = directory.getAppendBlobReference(getJournalFileName(1));
currentBlob.createOrReplace();
} catch (URISyntaxException | StorageException e) {
throw new IOException(e);
}
} else {
currentBlob = blobs.get(0);
}
Integer bc = currentBlob.getProperties().getAppendBlobCommittedBlockCount();
blockCount = bc == null ? 0 : bc;
}
@Override
public void truncate() throws IOException {
try {
for (CloudAppendBlob cloudAppendBlob : getJournalBlobs()) {
cloudAppendBlob.delete();
}
createNextFile(0);
} catch (StorageException e) {
throw new IOException(e);
}
}
@Override
public void writeLine(String line) throws IOException {
if (blockCount >= lineLimit) {
int parsedSuffix = parseCurrentSuffix();
createNextFile(parsedSuffix);
}
try {
currentBlob.appendText(line + "\n");
blockCount++;
} catch (StorageException e) {
throw new IOException(e);
}
}
private void createNextFile(int suffix) throws IOException {
try {
currentBlob = directory.getAppendBlobReference(getJournalFileName(suffix + 1));
currentBlob.createOrReplace();
blockCount = 0;
} catch (URISyntaxException | StorageException e) {
throw new IOException(e);
}
}
private int parseCurrentSuffix() {
String name = AzureUtilities.getName(currentBlob);
Pattern pattern = Pattern.compile(Pattern.quote(journalNamePrefix) + "\\.(\\d+)" );
Matcher matcher = pattern.matcher(name);
int parsedSuffix;
if (matcher.find()) {
String suffix = matcher.group(1);
try {
parsedSuffix = Integer.parseInt(suffix);
} catch (NumberFormatException e) {
log.warn("Can't parse suffix for journal file {}", name);
parsedSuffix = 0;
}
} else {
log.warn("Can't parse journal file name {}", name);
parsedSuffix = 0;
}
return parsedSuffix;
}
@Override
public void close() throws IOException {
// do nothing
}
}
private static class CombinedReader implements JournalFileReader {
private final Iterator<AzureJournalReader> readers;
private JournalFileReader currentReader;
private CombinedReader(List<CloudAppendBlob> blobs) {
readers = blobs.stream().map(AzureJournalReader::new).iterator();
}
@Override
public String readLine() throws IOException {
String line;
do {
if (currentReader == null) {
if (!readers.hasNext()) {
return null;
}
currentReader = readers.next();
}
do {
line = currentReader.readLine();
} while ("".equals(line));
if (line == null) {
currentReader.close();
currentReader = null;
}
} while (line == null);
return line;
}
@Override
public void close() throws IOException {
while (readers.hasNext()) {
readers.next().close();
}
if (currentReader != null) {
currentReader.close();
currentReader = null;
}
}
}
}