| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.solr.schema; |
| import java.io.ByteArrayInputStream; |
| import java.lang.invoke.MethodHandles; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.solr.cloud.ZkSolrResourceLoader; |
| import org.apache.solr.common.SolrException.ErrorCode; |
| import org.apache.solr.common.cloud.OnReconnect; |
| import org.apache.solr.common.cloud.SolrZkClient; |
| import org.apache.solr.common.cloud.ZooKeeperException; |
| import org.apache.solr.core.CloseHook; |
| import org.apache.solr.core.ConfigSetService; |
| import org.apache.solr.core.CoreContainer; |
| import org.apache.solr.core.SolrCore; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher; |
| import org.apache.zookeeper.data.Stat; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** Keeps a ManagedIndexSchema up-to-date when changes are made to the serialized managed schema in ZooKeeper */ |
| public class ZkIndexSchemaReader implements OnReconnect { |
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| private final ManagedIndexSchemaFactory managedIndexSchemaFactory; |
| private SolrZkClient zkClient; |
| private String managedSchemaPath; |
| private final String uniqueCoreId; // used in equals impl to uniquely identify the core that we're dependent on |
| private SchemaWatcher schemaWatcher; |
| private ZkSolrResourceLoader zkLoader; |
| |
| public ZkIndexSchemaReader(ManagedIndexSchemaFactory managedIndexSchemaFactory, SolrCore solrCore) { |
| this.managedIndexSchemaFactory = managedIndexSchemaFactory; |
| zkLoader = (ZkSolrResourceLoader)managedIndexSchemaFactory.getResourceLoader(); |
| this.zkClient = zkLoader.getZkController().getZkClient(); |
| this.managedSchemaPath = zkLoader.getConfigSetZkPath() + "/" + managedIndexSchemaFactory.getManagedSchemaResourceName(); |
| this.uniqueCoreId = solrCore.getName()+":"+solrCore.getStartNanoTime(); |
| |
| // register a CloseHook for the core this reader is linked to, so that we can de-register the listener |
| solrCore.addCloseHook(new CloseHook() { |
| @Override |
| public void preClose(SolrCore core) { |
| CoreContainer cc = core.getCoreContainer(); |
| if (cc.isZooKeeperAware()) { |
| if (log.isDebugEnabled()) { |
| log.debug("Removing ZkIndexSchemaReader OnReconnect listener as core {} is shutting down.", core.getName()); |
| } |
| cc.getZkController().removeOnReconnectListener(ZkIndexSchemaReader.this); |
| } |
| } |
| |
| @Override |
| public void postClose(SolrCore core) { |
| // The watcher is still registered with Zookeeper, and holds a |
| // reference to the schema reader, which indirectly references the |
| // SolrCore and would prevent it from being garbage collected. |
| schemaWatcher.discardReaderReference(); |
| } |
| }); |
| |
| this.schemaWatcher = createSchemaWatcher(); |
| |
| zkLoader.getZkController().addOnReconnectListener(this); |
| } |
| |
| public Object getSchemaUpdateLock() { |
| return managedIndexSchemaFactory.getSchemaUpdateLock(); |
| } |
| |
| /** |
| * Creates a schema watcher and returns it for controlling purposes. |
| * |
| * @return the registered {@linkplain SchemaWatcher}. |
| */ |
| public SchemaWatcher createSchemaWatcher() { |
| log.info("Creating ZooKeeper watch for the managed schema at {}", managedSchemaPath); |
| |
| SchemaWatcher watcher = new SchemaWatcher(this); |
| try { |
| zkClient.exists(managedSchemaPath, watcher, true); |
| } catch (KeeperException e) { |
| final String msg = "Error creating ZooKeeper watch for the managed schema"; |
| log.error(msg, e); |
| throw new ZooKeeperException(ErrorCode.SERVER_ERROR, msg, e); |
| } catch (InterruptedException e) { |
| // Restore the interrupted status |
| Thread.currentThread().interrupt(); |
| log.warn("", e); |
| } |
| |
| return watcher; |
| } |
| |
| /** |
| * Watches for schema changes and triggers updates in the {@linkplain ZkIndexSchemaReader}. |
| */ |
| public static class SchemaWatcher implements Watcher { |
| |
| private ZkIndexSchemaReader schemaReader; |
| |
| public SchemaWatcher(ZkIndexSchemaReader reader) { |
| this.schemaReader = reader; |
| } |
| |
| @Override |
| public void process(WatchedEvent event) { |
| ZkIndexSchemaReader indexSchemaReader = schemaReader; |
| |
| if (indexSchemaReader == null) { |
| return; // the core for this reader has already been removed, don't process this event |
| } |
| |
| // session events are not change events, and do not remove the watcher |
| if (Event.EventType.None.equals(event.getType())) { |
| return; |
| } |
| log.info("A schema change: {}, has occurred - updating schema from ZooKeeper ...", event); |
| try { |
| indexSchemaReader.updateSchema(this, -1); |
| } catch (KeeperException e) { |
| if (e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) { |
| log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK"); |
| return; |
| } |
| log.error("", e); |
| throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e); |
| } catch (InterruptedException e) { |
| // Restore the interrupted status |
| Thread.currentThread().interrupt(); |
| log.warn("", e); |
| } |
| } |
| |
| /** |
| * Discard the reference to the {@code ZkIndexSchemaReader}. |
| */ |
| public void discardReaderReference() { |
| schemaReader = null; |
| } |
| } |
| |
| public ManagedIndexSchema refreshSchemaFromZk(int expectedZkVersion) throws KeeperException, InterruptedException { |
| updateSchema(null, expectedZkVersion); |
| return managedIndexSchemaFactory.getSchema(); |
| } |
| |
| // package visibility for test purposes |
| void updateSchema(Watcher watcher, int expectedZkVersion) throws KeeperException, InterruptedException { |
| Stat stat = new Stat(); |
| synchronized (getSchemaUpdateLock()) { |
| final ManagedIndexSchema oldSchema = managedIndexSchemaFactory.getSchema(); |
| if (expectedZkVersion == -1 || oldSchema.schemaZkVersion < expectedZkVersion) { |
| byte[] data = zkClient.getData(managedSchemaPath, watcher, stat, true); |
| if (stat.getVersion() != oldSchema.schemaZkVersion) { |
| if (log.isInfoEnabled()) { |
| log.info("Retrieved schema version {} from Zookeeper", stat.getVersion()); |
| } |
| long start = System.nanoTime(); |
| String resourceName = managedIndexSchemaFactory.getManagedSchemaResourceName(); |
| ManagedIndexSchema newSchema = new ManagedIndexSchema |
| (managedIndexSchemaFactory.getConfig(), resourceName, |
| () -> ConfigSetService.getParsedSchema(new ByteArrayInputStream(data),zkLoader , resourceName), managedIndexSchemaFactory.isMutable(), |
| resourceName, stat.getVersion(), oldSchema.getSchemaUpdateLock()); |
| managedIndexSchemaFactory.setSchema(newSchema); |
| long stop = System.nanoTime(); |
| log.info("Finished refreshing schema in {} ms", TimeUnit.MILLISECONDS.convert(stop - start, TimeUnit.NANOSECONDS)); |
| } else { |
| log.info("Current schema version {} is already the latest", oldSchema.schemaZkVersion); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Called after a ZooKeeper session expiration occurs; need to re-create the watcher and update the current |
| * schema from ZooKeeper. |
| */ |
| @Override |
| public void command() { |
| try { |
| // setup a new watcher to get notified when the managed schema changes |
| schemaWatcher = createSchemaWatcher(); |
| // force update now as the schema may have changed while our zk session was expired |
| updateSchema(null, -1); |
| } catch (Exception exc) { |
| log.error("Failed to update managed-schema watcher after session expiration due to: {}", exc); |
| } |
| } |
| |
| public String getUniqueCoreId() { |
| return uniqueCoreId; |
| } |
| |
| public String toString() { |
| return "ZkIndexSchemaReader: "+managedSchemaPath+", uniqueCoreId: "+uniqueCoreId; |
| } |
| |
| public int hashCode() { |
| return managedSchemaPath.hashCode()+uniqueCoreId.hashCode(); |
| } |
| |
| // We need the uniqueCoreId which is core name + start time nanos to be the tie breaker |
| // as there can be multiple ZkIndexSchemaReader instances active for the same core after |
| // a reload (one is initializing and the other is being shutdown) |
| public boolean equals(Object other) { |
| if (other == null) return false; |
| if (other == this) return true; |
| if (!(other instanceof ZkIndexSchemaReader)) return false; |
| ZkIndexSchemaReader that = (ZkIndexSchemaReader)other; |
| return this.managedSchemaPath.equals(that.managedSchemaPath) && this.uniqueCoreId.equals(that.uniqueCoreId); |
| } |
| } |