blob: 7cdbaf1e02768c190ae6e9dff819f73849dc3580 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.maven.index.updater;
import java.io.BufferedInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.UTFDataFormatException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.GZIPInputStream;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.FieldType;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.maven.index.ArtifactInfo;
import org.apache.maven.index.context.DocumentFilter;
import org.apache.maven.index.context.IndexUtils;
import org.apache.maven.index.context.IndexingContext;
import org.apache.maven.index.context.NexusAnalyzer;
import org.apache.maven.index.context.NexusIndexWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An index data reader used to parse transfer index format.
*
* @author Eugene Kuleshov
*/
public class IndexDataReader {
private static final Logger LOGGER = LoggerFactory.getLogger(IndexDataReader.class);
private final DataInputStream dis;
private final Path tempStorage;
private final DocumentFilter filter;
private final FSDirectoryFactory factory;
private final int threads;
public IndexDataReader(final InputStream is) throws IOException {
this(is, 1);
}
public IndexDataReader(final InputStream is, final int threads) throws IOException {
this(is, null, null, null, threads);
}
public IndexDataReader(final InputStream is, final IndexUpdateRequest request) throws IOException {
this(
is,
request.getIndexTempDir() != null ? request.getIndexTempDir().toPath() : null,
request.getExtractionFilter(),
request.getFSDirectoryFactory(),
request.getThreads());
}
public IndexDataReader(
final InputStream is,
final Path tempStorage,
final DocumentFilter filter,
final FSDirectoryFactory factory,
final int threads)
throws IOException {
if (threads < 1) {
throw new IllegalArgumentException("Reader threads must be greater than zero: " + threads);
}
this.tempStorage = Objects.requireNonNullElse(tempStorage, Path.of(System.getProperty("java.io.tmpdir")));
this.factory = Objects.requireNonNullElse(factory, FSDirectoryFactory.DEFAULT);
this.filter = filter;
this.threads = threads;
// MINDEXER-13
// LightweightHttpWagon may have performed automatic decompression
// Handle it transparently
is.mark(2);
InputStream data;
if (is.read() == 0x1f && is.read() == 0x8b) // GZIPInputStream.GZIP_MAGIC
{
is.reset();
data = new BufferedInputStream(new GZIPInputStream(is, 1024 * 8), 1024 * 8);
} else {
is.reset();
data = new BufferedInputStream(is, 1024 * 8);
}
this.dis = new DataInputStream(data);
}
public IndexDataReadResult readIndex(IndexWriter w, IndexingContext context) throws IOException {
if (threads == 1) {
return readIndexST(w, context);
} else {
return readIndexMT(w, context);
}
}
private IndexDataReadResult readIndexST(IndexWriter w, IndexingContext context) throws IOException {
LOGGER.debug("Reading ST index...");
Instant start = Instant.now();
long timestamp = readHeader();
Date date = null;
if (timestamp != -1) {
date = new Date(timestamp);
IndexUtils.updateTimestamp(w.getDirectory(), date);
}
int n = 0;
Document doc;
Set<String> rootGroups = new HashSet<>();
Set<String> allGroups = new HashSet<>();
while ((doc = readDocument()) != null) {
addToIndex(doc, context, w, rootGroups, allGroups);
n++;
}
w.commit();
IndexDataReadResult result = new IndexDataReadResult();
result.setDocumentCount(n);
result.setTimestamp(date);
result.setRootGroups(rootGroups);
result.setAllGroups(allGroups);
LOGGER.debug(
"Reading ST index done in {} sec",
Duration.between(start, Instant.now()).getSeconds());
return result;
}
private IndexDataReadResult readIndexMT(IndexWriter w, IndexingContext context) throws IOException {
LOGGER.debug("Reading MT index...");
Instant start = Instant.now();
long timestamp = readHeader();
int n = 0;
final Document theEnd = new Document();
Set<String> rootGroups = ConcurrentHashMap.newKeySet();
Set<String> allGroups = ConcurrentHashMap.newKeySet();
ArrayBlockingQueue<Document> queue = new ArrayBlockingQueue<>(10000);
ExecutorService executorService = Executors.newFixedThreadPool(threads);
List<Throwable> errors = Collections.synchronizedList(new ArrayList<>());
List<FSDirectory> siloDirectories = new ArrayList<>(threads);
List<IndexWriter> siloWriters = new ArrayList<>(threads);
AtomicBoolean stopEarly = new AtomicBoolean(false);
LOGGER.debug("Creating {} silo writer threads...", threads);
for (int i = 0; i < threads; i++) {
final int silo = i;
FSDirectory siloDirectory = tempDirectory("silo" + i);
siloDirectories.add(siloDirectory);
siloWriters.add(tempWriter(siloDirectory));
executorService.execute(() -> {
LOGGER.debug("Starting thread {}", Thread.currentThread().getName());
try {
while (true) {
try {
Document doc = queue.take();
if (doc == theEnd) {
break;
}
addToIndex(doc, context, siloWriters.get(silo), rootGroups, allGroups);
} catch (Throwable e) {
errors.add(e);
if (stopEarly.compareAndSet(false, true)) {
queue.clear(); // unblock producer
executorService.shutdownNow(); // unblock consumers
}
break;
}
}
} finally {
LOGGER.debug("Done thread {}", Thread.currentThread().getName());
}
});
}
LOGGER.debug("Loading up documents into silos");
try {
Document doc;
while (!stopEarly.get() && (doc = readDocument()) != null) {
queue.put(doc);
n++;
}
LOGGER.debug("Signalling END");
for (int i = 0; i < threads; i++) {
queue.put(theEnd);
}
LOGGER.debug("Shutting down threads");
executorService.shutdown();
executorService.awaitTermination(5L, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new IOException("Interrupted", e);
}
if (!errors.isEmpty()) {
if (errors.stream().allMatch(ex -> ex instanceof IOException || ex instanceof InterruptedException)) {
IOException exception = new IOException("Error during load of index");
errors.forEach(exception::addSuppressed);
throw exception;
} else {
RuntimeException exception = new RuntimeException("Error during load of index");
errors.forEach(exception::addSuppressed);
throw exception;
}
}
LOGGER.debug("Silos loaded...");
Date date = null;
if (timestamp != -1) {
date = new Date(timestamp);
IndexUtils.updateTimestamp(w.getDirectory(), date);
}
LOGGER.debug("Closing silo writers...");
for (IndexWriter siloWriter : siloWriters) {
siloWriter.commit();
siloWriter.close();
}
LOGGER.debug("Merging silo directories...");
w.addIndexes(siloDirectories.toArray(new Directory[0]));
LOGGER.debug("Cleanup of silo directories...");
for (FSDirectory siloDirectory : siloDirectories) {
File dir = siloDirectory.getDirectory().toFile();
siloDirectory.close();
IndexUtils.delete(dir);
}
LOGGER.debug("Finalizing...");
w.commit();
IndexDataReadResult result = new IndexDataReadResult();
result.setDocumentCount(n);
result.setTimestamp(date);
result.setRootGroups(rootGroups);
result.setAllGroups(allGroups);
LOGGER.debug(
"Reading MT index done in {} sec",
Duration.between(start, Instant.now()).getSeconds());
return result;
}
private FSDirectory tempDirectory(final String name) throws IOException {
return factory.open(
Files.createTempDirectory(tempStorage, name + ".dir").toFile());
}
private IndexWriter tempWriter(final FSDirectory directory) throws IOException {
IndexWriterConfig config = new IndexWriterConfig(new NexusAnalyzer());
config.setUseCompoundFile(false);
return new NexusIndexWriter(directory, config);
}
private void addToIndex(
final Document doc,
final IndexingContext context,
final IndexWriter indexWriter,
final Set<String> rootGroups,
final Set<String> allGroups)
throws IOException {
ArtifactInfo ai = IndexUtils.constructArtifactInfo(doc, context);
if (ai != null) {
if (filter == null || filter.accept(doc)) {
indexWriter.addDocument(IndexUtils.updateDocument(doc, context, false, ai));
rootGroups.add(ai.getRootGroup());
allGroups.add(ai.getGroupId());
}
} else {
// these two fields are automatically handled in code above
if (doc.getField(ArtifactInfo.ALL_GROUPS) == null && doc.getField(ArtifactInfo.ROOT_GROUPS) == null) {
indexWriter.addDocument(doc);
}
}
}
public long readHeader() throws IOException {
final byte hdrbyte = (byte) ((IndexDataWriter.VERSION << 24) >> 24);
if (hdrbyte != dis.readByte()) {
// data format version mismatch
throw new IOException("Provided input contains unexpected data (0x01 expected as 1st byte)!");
}
return dis.readLong();
}
public Document readDocument() throws IOException {
int fieldCount;
try {
fieldCount = dis.readInt();
} catch (EOFException ex) {
return null; // no more documents
}
Document doc = new Document();
for (int i = 0; i < fieldCount; i++) {
doc.add(readField());
}
// Fix up UINFO field wrt MINDEXER-41
final Field uinfoField = (Field) doc.getField(ArtifactInfo.UINFO);
final String info = doc.get(ArtifactInfo.INFO);
if (uinfoField != null && info != null && !info.isEmpty()) {
final String[] splitInfo = ArtifactInfo.FS_PATTERN.split(info);
if (splitInfo.length > 6) {
final String extension = splitInfo[6];
final String uinfoString = uinfoField.stringValue();
if (uinfoString.endsWith(ArtifactInfo.FS + ArtifactInfo.NA)) {
uinfoField.setStringValue(uinfoString + ArtifactInfo.FS + ArtifactInfo.nvl(extension));
}
}
}
return doc;
}
private Field readField() throws IOException {
int flags = dis.read();
FieldType fieldType = new FieldType();
if ((flags & IndexDataWriter.F_INDEXED) > 0) {
boolean tokenized = (flags & IndexDataWriter.F_TOKENIZED) > 0;
fieldType.setTokenized(tokenized);
}
fieldType.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS);
fieldType.setStored((flags & IndexDataWriter.F_STORED) > 0);
String name = dis.readUTF();
String value = readUTF(dis);
return new Field(name, value, fieldType);
}
private static String readUTF(DataInput in) throws IOException {
int utflen = in.readInt();
byte[] bytearr;
char[] chararr;
try {
bytearr = new byte[utflen];
chararr = new char[utflen];
} catch (OutOfMemoryError e) {
throw new IOException(
"Index data content is inappropriate (is junk?), leads to OutOfMemoryError!"
+ " See MINDEXER-28 for more information!",
e);
}
int c, char2, char3;
int count = 0;
int chararrCount = 0;
in.readFully(bytearr, 0, utflen);
while (count < utflen) {
c = bytearr[count] & 0xff;
if (c > 127) {
break;
}
count++;
chararr[chararrCount++] = (char) c;
}
while (count < utflen) {
c = bytearr[count] & 0xff;
switch (c >> 4) {
case 0:
case 1:
case 2:
case 3:
case 4:
case 5:
case 6:
case 7:
/* 0xxxxxxx */
count++;
chararr[chararrCount++] = (char) c;
break;
case 12:
case 13:
/* 110x xxxx 10xx xxxx */
count += 2;
if (count > utflen) {
throw new UTFDataFormatException("malformed input: partial character at end");
}
char2 = bytearr[count - 1];
if ((char2 & 0xC0) != 0x80) {
throw new UTFDataFormatException("malformed input around byte " + count);
}
chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
break;
case 14:
/* 1110 xxxx 10xx xxxx 10xx xxxx */
count += 3;
if (count > utflen) {
throw new UTFDataFormatException("malformed input: partial character at end");
}
char2 = bytearr[count - 2];
char3 = bytearr[count - 1];
if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
throw new UTFDataFormatException("malformed input around byte " + (count - 1));
}
chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F)));
break;
default:
/* 10xx xxxx, 1111 xxxx */
throw new UTFDataFormatException("malformed input around byte " + count);
}
}
// The number of chars produced may be less than utflen
return new String(chararr, 0, chararrCount);
}
/**
* An index data read result holder
*/
public static class IndexDataReadResult {
private Date timestamp;
private int documentCount;
private Set<String> rootGroups;
private Set<String> allGroups;
public void setDocumentCount(int documentCount) {
this.documentCount = documentCount;
}
public int getDocumentCount() {
return documentCount;
}
public void setTimestamp(Date timestamp) {
this.timestamp = timestamp;
}
public Date getTimestamp() {
return timestamp;
}
public void setRootGroups(Set<String> rootGroups) {
this.rootGroups = rootGroups;
}
public Set<String> getRootGroups() {
return rootGroups;
}
public void setAllGroups(Set<String> allGroups) {
this.allGroups = allGroups;
}
public Set<String> getAllGroups() {
return allGroups;
}
}
/**
* Reads index content by using a visitor. <br>
* The visitor is called for each read documents after it has been populated with Lucene fields.
*
* @param visitor an index data visitor
* @param context indexing context
* @return statistics about read data
* @throws IOException in case of an IO exception during index file access
*/
public IndexDataReadResult readIndex(final IndexDataReadVisitor visitor, final IndexingContext context)
throws IOException {
dis.readByte(); // data format version
long timestamp = dis.readLong();
Date date = null;
if (timestamp != -1) {
date = new Date(timestamp);
}
int n = 0;
Document doc;
while ((doc = readDocument()) != null) {
visitor.visitDocument(IndexUtils.updateDocument(doc, context, false));
n++;
}
IndexDataReadResult result = new IndexDataReadResult();
result.setDocumentCount(n);
result.setTimestamp(date);
return result;
}
/**
* Visitor of indexed Lucene documents.
*/
public interface IndexDataReadVisitor {
/**
* Called on each read document. The document is already populated with fields.
*
* @param document read document
*/
void visitDocument(Document document);
}
}