blob: c541ac60e50f5e76695613c21ffd90e30ffc0371 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.lucene.queries.function.FunctionValues;
import org.apache.lucene.queries.function.ValueSource;
import org.apache.lucene.util.BitUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrException;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.util.RefCounted;
public class VersionInfo {
public static final String VERSION_FIELD="_version_";
private final UpdateLog ulog;
private final VersionBucket[] buckets;
private SchemaField versionField;
private SchemaField idField;
final ReadWriteLock lock = new ReentrantReadWriteLock(true);
/**
* Gets and returns the {@link #VERSION_FIELD} from the specified
* schema, after verifying that it is indexed, stored, and single-valued.
* If any of these pre-conditions are not met, it throws a SolrException
* with a user suitable message indicating the problem.
*/
public static SchemaField getAndCheckVersionField(IndexSchema schema)
throws SolrException {
final String errPrefix = VERSION_FIELD + " field must exist in schema, using indexed=\"true\" or docValues=\"true\", stored=\"true\" and multiValued=\"false\"";
SchemaField sf = schema.getFieldOrNull(VERSION_FIELD);
if (null == sf) {
throw new SolrException
(SolrException.ErrorCode.SERVER_ERROR,
errPrefix + " (" + VERSION_FIELD + " does not exist)");
}
if ( !sf.indexed() && !sf.hasDocValues()) {
throw new SolrException
(SolrException.ErrorCode.SERVER_ERROR,
errPrefix + " (" + VERSION_FIELD + " must be either indexed or have docValues");
}
if ( !sf.stored() ) {
throw new SolrException
(SolrException.ErrorCode.SERVER_ERROR,
errPrefix + " (" + VERSION_FIELD + " is not stored");
}
if ( sf.multiValued() ) {
throw new SolrException
(SolrException.ErrorCode.SERVER_ERROR,
errPrefix + " (" + VERSION_FIELD + " is multiValued");
}
return sf;
}
public VersionInfo(UpdateLog ulog, int nBuckets) {
this.ulog = ulog;
IndexSchema schema = ulog.uhandler.core.getLatestSchema();
versionField = getAndCheckVersionField(schema);
idField = schema.getUniqueKeyField();
buckets = new VersionBucket[ BitUtil.nextHighestPowerOfTwo(nBuckets) ];
for (int i=0; i<buckets.length; i++) {
buckets[i] = new VersionBucket();
}
}
public void reload() {
}
public SchemaField getVersionField() {
return versionField;
}
public void lockForUpdate() {
lock.readLock().lock();
}
public void unlockForUpdate() {
lock.readLock().unlock();
}
public void blockUpdates() {
lock.writeLock().lock();
}
public void unblockUpdates() {
lock.writeLock().unlock();
}
/***
// todo: initialize... use current time to start?
// a clock that increments by 1 for every operation makes it easier to detect missing
// messages, but raises other issues:
// - need to initialize to largest thing in index or tlog
// - when becoming leader, need to make sure it's greater than
// - using to detect missing messages means we need to keep track per-leader, or make
// sure a new leader starts off with 1 greater than the last leader.
private final AtomicLong clock = new AtomicLong();
public long getNewClock() {
return clock.incrementAndGet();
}
// Named *old* to prevent accidental calling getClock and expecting a new updated clock.
public long getOldClock() {
return clock.get();
}
***/
/** We are currently using this time-based clock to avoid going back in time on a
* server restart (i.e. we don't want version numbers to start at 1 again).
*/
// Time-based lamport clock. Good for introducing some reality into clocks (to the degree
// that times are somewhat synchronized in the cluster).
// Good if we want to relax some constraints to scale down to where only one node may be
// up at a time. Possibly harder to detect missing messages (because versions are not contiguous.
long vclock;
long time;
private final Object clockSync = new Object();
public long getNewClock() {
synchronized (clockSync) {
time = System.currentTimeMillis();
long result = time << 20;
if (result <= vclock) {
result = vclock + 1;
}
vclock = result;
return vclock;
}
}
public long getOldClock() {
synchronized (clockSync) {
return vclock;
}
}
public void updateClock(long clock) {
synchronized (clockSync) {
vclock = Math.max(vclock, clock);
}
}
public VersionBucket bucket(int hash) {
// If this is a user provided hash, it may be poor in the right-hand bits.
// Make sure high bits are moved down, since only the low bits will matter.
// int h = hash + (hash >>> 8) + (hash >>> 16) + (hash >>> 24);
// Assume good hash codes for now.
int slot = hash & (buckets.length-1);
return buckets[slot];
}
public Long lookupVersion(BytesRef idBytes) {
return ulog.lookupVersion(idBytes);
}
public Long getVersionFromIndex(BytesRef idBytes) {
// TODO: we could cache much of this and invalidate during a commit.
// TODO: most DocValues classes are threadsafe - expose which.
RefCounted<SolrIndexSearcher> newestSearcher = ulog.uhandler.core.getRealtimeSearcher();
try {
SolrIndexSearcher searcher = newestSearcher.get();
long lookup = searcher.lookupId(idBytes);
if (lookup < 0) return null;
ValueSource vs = versionField.getType().getValueSource(versionField, null);
Map context = ValueSource.newContext(searcher);
vs.createWeight(context, searcher);
FunctionValues fv = vs.getValues(context, searcher.getTopReaderContext().leaves().get((int)(lookup>>32)));
long ver = fv.longVal((int)lookup);
return ver;
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error reading version from index", e);
} finally {
if (newestSearcher != null) {
newestSearcher.decref();
}
}
}
}