SOLR-14158: Package trusted keys to come from Package Store, not ZK
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 8c5723b..6461645 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -224,6 +224,31 @@
 * SOLR-13778: Solrj client will retry requests on SSLException with a suppressed SocketException
   (very likely a hard-closed socket connection).
 
+==================  8.4.1 ==================
+
+Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
+
+Versions of Major Components
+---------------------
+Apache Tika 1.19.1
+Carrot2 3.16.0
+Velocity 2.0 and Velocity Tools 3.0
+Apache ZooKeeper 3.5.5
+Jetty 9.4.19.v20190610
+
+Upgrade Notes
+---------------------
+
+* SOLR-14158: Package manager now stores keys in package store instead of ZK. If you're using package manager and have already
+  added any repositories, then you would need to re-add all the keys from /keys/exe location, one-by-one, using:
+  "bin/solr add-key <keyfile.der>"
+
+Improvements
+---------------------
+
+* SOLR-14158: Package manager to store public keys in a special "trusted" location instead of in ZooKeeper
+  (noble, Ishan Chattopadhyaya)
+
 ==================  8.4.0 ==================
 
 Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index 503d010..6448a7a 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -756,6 +756,15 @@
       throw new RuntimeException(e);
     }
   }
+
+  public URL getBaseURLV2(){
+    try {
+      return new URL(protocol, host, jettyPort, "/api");
+    } catch (MalformedURLException e) {
+      throw new RuntimeException(e);
+    }
+
+  }
   /**
    * Returns a base URL consisting of the protocol, host, and port for a
    * Connector in use by the Jetty Server contained in this runner.
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index f494f50..b757edc 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -652,6 +652,9 @@
       // use deprecated API for back-compat, remove in 9.0
       pkiAuthenticationPlugin.initializeMetrics(solrMetricsContext, "/authentication/pki");
       TracerConfigurator.loadTracer(loader, cfg.getTracerConfiguratorPluginInfo(), getZkController().getZkStateReader());
+      packageLoader = new PackageLoader(this);
+      containerHandlers.getApiBag().register(new AnnotatedApi(packageLoader.getPackageAPI().editAPI), Collections.EMPTY_MAP);
+      containerHandlers.getApiBag().register(new AnnotatedApi(packageLoader.getPackageAPI().readAPI), Collections.EMPTY_MAP);
     }
 
     MDCLoggingContext.setNode(this);
@@ -744,9 +747,6 @@
 
     if (isZooKeeperAware()) {
       metricManager.loadClusterReporters(metricReporters, this);
-      packageLoader = new PackageLoader(this);
-      containerHandlers.getApiBag().register(new AnnotatedApi(packageLoader.getPackageAPI().editAPI), Collections.EMPTY_MAP);
-      containerHandlers.getApiBag().register(new AnnotatedApi(packageLoader.getPackageAPI().readAPI), Collections.EMPTY_MAP);
     }
 
 
diff --git a/solr/core/src/java/org/apache/solr/filestore/DistribPackageStore.java b/solr/core/src/java/org/apache/solr/filestore/DistribPackageStore.java
index 8d9af8f..3389bf4 100644
--- a/solr/core/src/java/org/apache/solr/filestore/DistribPackageStore.java
+++ b/solr/core/src/java/org/apache/solr/filestore/DistribPackageStore.java
@@ -30,6 +30,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -45,41 +46,51 @@
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.filestore.PackageStoreAPI.MetaData;
+import org.apache.solr.util.SimplePostTool;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.server.ByteBufferInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
 import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
 
 
 public class DistribPackageStore implements PackageStore {
   static final long MAX_PKG_SIZE = Long.parseLong(System.getProperty("max.file.store.size", String.valueOf(100 * 1024 * 1024)));
+  /**
+   * This is where al the files in the package store are listed
+   */
+  static final String ZK_PACKAGESTORE = "/packagestore";
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final CoreContainer coreContainer;
   private Map<String, FileInfo> tmpFiles = new ConcurrentHashMap<>();
 
+  private final Path solrhome;
+
   public DistribPackageStore(CoreContainer coreContainer) {
     this.coreContainer = coreContainer;
+    solrhome = this.coreContainer.getResourceLoader().getInstancePath();
     ensurePackageStoreDir(coreContainer.getResourceLoader().getInstancePath());
 
   }
 
-  private String myNode() {
-    return coreContainer.getZkController().getNodeName();
-  }
-
-
   @Override
   public Path getRealpath(String path) {
+    return _getRealPath(path, solrhome);
+  }
+
+  private static Path _getRealPath(String path, Path solrHome) {
     if (File.separatorChar == '\\') {
       path = path.replace('/', File.separatorChar);
     }
     if (!path.isEmpty() && path.charAt(0) != File.separatorChar) {
       path = File.separator + path;
     }
-    return new File(this.coreContainer.getResourceLoader().getInstancePath() +
+    return new File(solrHome +
         File.separator + PackageStoreAPI.PACKAGESTORE_DIRECTORY + path).toPath();
   }
 
@@ -93,10 +104,18 @@
       this.path = path;
     }
 
+    ByteBuffer getFileData(boolean validate) throws IOException {
+      if (fileData == null) {
+        try (FileInputStream fis = new FileInputStream(getRealpath(path).toFile())) {
+          fileData = SimplePostTool.inputStreamToByteArray(fis);
+        }
+      }
+      return fileData;
+    }
+
     public String getMetaPath() {
       if (metaPath == null) {
-        int idx = path.lastIndexOf('/');
-        metaPath = path.substring(0, idx + 1) + "." + path.substring(idx + 1) + ".json";
+        metaPath = _getMetapath(path);
       }
       return metaPath;
     }
@@ -106,30 +125,9 @@
       synchronized (DistribPackageStore.this) {
         this.metaData = meta;
         this.fileData = data;
-        Path realpath = getRealpath(path);
-        File file = realpath.toFile();
-        File parent = file.getParentFile();
-        if (!parent.exists()) {
-          parent.mkdirs();
-        }
-        Map m = (Map) Utils.fromJSON(meta.array(), meta.arrayOffset(), meta.limit());
-        if (m == null || m.isEmpty()) {
-          throw new SolrException(SERVER_ERROR, "invalid metadata , discarding : " + path);
-        }
-
-
-        File metdataFile = getRealpath(getMetaPath()).toFile();
-
-        try (FileOutputStream fos = new FileOutputStream(metdataFile)) {
-          fos.write(meta.array(), 0, meta.limit());
-        }
-        IOUtils.fsync(metdataFile.toPath(), false);
-
-        try (FileOutputStream fos = new FileOutputStream(file)) {
-          fos.write(data.array(), 0, data.limit());
-        }
+        _persistToFile(solrhome, path, data, meta);
         log.info("persisted a file {} and metadata. sizes {} {}", path, data.limit(), meta.limit());
-        IOUtils.fsync(file.toPath(), false);
+
       }
     }
 
@@ -316,7 +314,6 @@
     }
   }
 
-
   @Override
   public void put(FileEntry entry) throws IOException {
     FileInfo info = new FileInfo(entry.path);
@@ -324,7 +321,20 @@
     Utils.writeJson(entry.getMetaData(), baos, true);
     byte[] bytes = baos.toByteArray();
     info.persistToFile(entry.buf, ByteBuffer.wrap(bytes, 0, bytes.length));
-    tmpFiles.put(entry.getPath(), info);
+    distribute(info);
+  }
+
+  private void distribute(FileInfo info) {
+    try {
+      String dirName = info.path.substring(0, info.path.lastIndexOf('/'));
+      coreContainer.getZkController().getZkClient().makePath(ZK_PACKAGESTORE + dirName, false, true);
+      coreContainer.getZkController().getZkClient().create(ZK_PACKAGESTORE + info.path, info.getDetails().getMetaData().sha512.getBytes(UTF_8),
+          CreateMode.PERSISTENT, true);
+    } catch (Exception e) {
+      throw new SolrException(SERVER_ERROR, "Unable to create an entry in ZK", e);
+    }
+    tmpFiles.put(info.path, info);
+
     List<String> nodes = coreContainer.getPackageStoreAPI().shuffledNodes();
     int i = 0;
     int FETCHFROM_SRC = 50;
@@ -332,7 +342,7 @@
     try {
       for (String node : nodes) {
         String baseUrl = coreContainer.getZkController().getZkStateReader().getBaseUrlForNodeName(node);
-        String url = baseUrl.replace("/solr", "/api") + "/node/files" + entry.getPath() + "?getFrom=";
+        String url = baseUrl.replace("/solr", "/api") + "/node/files" + info.path + "?getFrom=";
         if (i < FETCHFROM_SRC) {
           // this is to protect very large clusters from overwhelming a single node
           // the first FETCHFROM_SRC nodes will be asked to fetch from this node.
@@ -369,12 +379,11 @@
         try {
           Thread.sleep(10 * 1000);
         } finally {
-          tmpFiles.remove(entry.getPath());
+          tmpFiles.remove(info.path);
         }
         return null;
       });
     }
-
   }
 
   @Override
@@ -428,6 +437,15 @@
     new FileInfo(path).readData(consumer);
   }
 
+  @Override
+  public void syncToAllNodes(String path) throws IOException {
+    FileInfo fi = new FileInfo(path);
+    if (!fi.exists(true, false)) {
+      throw new SolrException(BAD_REQUEST, "No such file : " + path);
+    }
+    fi.getFileData(true);
+    distribute(fi);
+  }
 
   @Override
   public List list(String path, Predicate<String> predicate) {
@@ -451,6 +469,28 @@
     return fileDetails;
   }
 
+  @Override
+  public void refresh(String path) {
+    try {
+      List l = null;
+      try {
+        l = coreContainer.getZkController().getZkClient().getChildren(ZK_PACKAGESTORE+ path, null, true);
+      } catch (KeeperException.NoNodeException e) {
+        // does not matter
+      }
+      if (l != null && !l.isEmpty()) {
+        List myFiles = list(path, s -> true);
+        for (Object f : l) {
+          if (!myFiles.contains(f)) {
+            log.info("{} does not exist locally, downloading.. ",f);
+            fetch(path + "/" + f.toString(), "*");
+          }
+        }
+      }
+    } catch (Exception e) {
+      log.error("Could not refresh files in " +path, e);
+    }
+  }
 
   @Override
   public FileType getType(String path, boolean fetchMissing) {
@@ -460,12 +500,16 @@
         file = getRealpath(path).toFile();
       }
     }
+    return _getFileType(file);
+  }
+
+  public static FileType _getFileType(File file) {
     if (!file.exists()) return FileType.NOFILE;
     if (file.isDirectory()) return FileType.DIRECTORY;
     return isMetaDataFile(file.getName()) ? FileType.METADATA : FileType.FILE;
   }
 
-  private boolean isMetaDataFile(String file) {
+  public static boolean isMetaDataFile(String file) {
     return file.charAt(0) == '.' && file.endsWith(".json");
   }
 
@@ -486,4 +530,61 @@
   public static Path getPackageStoreDirPath(Path solrHome) {
     return Paths.get(solrHome.toAbsolutePath().toString(), PackageStoreAPI.PACKAGESTORE_DIRECTORY).toAbsolutePath();
   }
+
+  private static String _getMetapath(String path) {
+    int idx = path.lastIndexOf('/');
+    return path.substring(0, idx + 1) + "." + path.substring(idx + 1) + ".json";
+  }
+
+  /**
+   * Internal API
+   */
+  public static void _persistToFile(Path solrHome, String path, ByteBuffer data, ByteBuffer meta) throws IOException {
+    Path realpath = _getRealPath(path, solrHome);
+    File file = realpath.toFile();
+    File parent = file.getParentFile();
+    if (!parent.exists()) {
+      parent.mkdirs();
+    }
+    Map m = (Map) Utils.fromJSON(meta.array(), meta.arrayOffset(), meta.limit());
+    if (m == null || m.isEmpty()) {
+      throw new SolrException(SERVER_ERROR, "invalid metadata , discarding : " + path);
+    }
+
+
+    File metdataFile = _getRealPath(_getMetapath(path), solrHome).toFile();
+
+    try (FileOutputStream fos = new FileOutputStream(metdataFile)) {
+      fos.write(meta.array(), 0, meta.limit());
+    }
+    IOUtils.fsync(metdataFile.toPath(), false);
+
+    try (FileOutputStream fos = new FileOutputStream(file)) {
+      fos.write(data.array(), 0, data.limit());
+    }
+    IOUtils.fsync(file.toPath(), false);
+  }
+
+  @Override
+  public Map<String, byte[]> getKeys() throws IOException {
+    return _getKeys(solrhome);
+  }
+
+
+  // reads local keys file
+  private static Map<String, byte[]> _getKeys(Path solrhome) throws IOException {
+    Map<String, byte[]> result = new HashMap<>();
+    Path keysDir = _getRealPath(PackageStoreAPI.KEYS_DIR, solrhome);
+
+    File[] keyFiles = keysDir.toFile().listFiles();
+    if (keyFiles == null) return result;
+    for (File keyFile : keyFiles) {
+      if (keyFile.isFile() && !isMetaDataFile(keyFile.getName())) {
+        try (InputStream fis = new FileInputStream(keyFile)) {
+          result.put(keyFile.getName(), SimplePostTool.inputStreamToByteArray(fis).array());
+        }
+      }
+    }
+    return result;
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/filestore/PackageStore.java b/solr/core/src/java/org/apache/solr/filestore/PackageStore.java
index 9a5fbff..ccfc294 100644
--- a/solr/core/src/java/org/apache/solr/filestore/PackageStore.java
+++ b/solr/core/src/java/org/apache/solr/filestore/PackageStore.java
@@ -23,6 +23,7 @@
 import java.nio.file.Path;
 import java.util.Date;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 
@@ -55,6 +56,10 @@
 
   List<FileDetails> list(String path, Predicate<String> predicate);
 
+  /** Sync a loacl file to all nodes. All the nodes are asked to pull the file from this node
+   */
+  void syncToAllNodes(String path) throws IOException;
+
   /**
    * get the real path on filesystem
    */
@@ -65,6 +70,15 @@
    */
   FileType getType(String path, boolean fetchMissing);
 
+  /**Get all the keys in the package store. The data is a .DER file content
+   */
+  Map<String,byte[]> getKeys() throws IOException;
+
+  /**Refresh the files in a path. May be this node does not have all files
+   * @param path the path to be refreshed.
+   */
+  void refresh(String path);
+
   public class FileEntry {
     final ByteBuffer buf;
     final MetaData meta;
diff --git a/solr/core/src/java/org/apache/solr/filestore/PackageStoreAPI.java b/solr/core/src/java/org/apache/solr/filestore/PackageStoreAPI.java
index 543b079..00b5d7d 100644
--- a/solr/core/src/java/org/apache/solr/filestore/PackageStoreAPI.java
+++ b/solr/core/src/java/org/apache/solr/filestore/PackageStoreAPI.java
@@ -35,7 +35,6 @@
 import org.apache.solr.api.Command;
 import org.apache.solr.api.EndPoint;
 import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.cloud.CloudUtil;
 import org.apache.solr.common.MapWriter;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.CommonParams;
@@ -66,6 +65,8 @@
 public class PackageStoreAPI {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   public static final String PACKAGESTORE_DIRECTORY = "filestore";
+  public static final String TRUSTED_DIR = "_trusted_";
+  public static final String KEYS_DIR = "/_trusted_/keys";
 
 
   private final CoreContainer coreContainer;
@@ -99,7 +100,7 @@
       try {
         PackageStore.FileType type = packageStore.getType(path, true);
         if (type != PackageStore.FileType.FILE) {
-          errs.accept("No such file : " + path);
+          errs.accept("No such file: " + path);
           continue;
         }
 
@@ -111,9 +112,10 @@
           }
           if (validateSignatures) {
             try {
-              validate(entry.meta.signatures, entry);
-            } catch (SolrException e) {
-              log.error("error validating package artifact", e);
+              packageStore.refresh(KEYS_DIR);
+              validate(entry.meta.signatures, entry, false);
+            } catch (Exception e) {
+              log.error("Error validating package artifact", e);
               errs.accept(e.getMessage());
             }
           }
@@ -136,7 +138,7 @@
 
     @Command
     public void upload(SolrQueryRequest req, SolrQueryResponse rsp) {
-      if(!coreContainer.getPackageLoader().getPackageAPI().isEnabled()) {
+      if (!coreContainer.getPackageLoader().getPackageAPI().isEnabled()) {
         throw new RuntimeException(PackageAPI.ERR_MSG);
       }
       try {
@@ -149,22 +151,17 @@
         if (path == null) {
           throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No path");
         }
-        validateName(path);
+        validateName(path, true);
         ContentStream stream = streams.iterator().next();
         try {
           ByteBuffer buf = SimplePostTool.inputStreamToByteArray(stream.getStream());
-          String sha512 = DigestUtils.sha512Hex(new ByteBufferInputStream(buf));
           List<String> signatures = readSignatures(req, buf);
-          Map<String, Object> vals = new HashMap<>();
-          vals.put(MetaData.SHA512, sha512);
-          if (signatures != null) {
-            vals.put("sig", signatures);
-          }
+          MetaData meta = _createJsonMetaData(buf, signatures);
           PackageStore.FileType type = packageStore.getType(path, true);
           if(type != PackageStore.FileType.NOFILE) {
             throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,  "Path already exists "+ path);
           }
-          packageStore.put(new PackageStore.FileEntry(buf, new MetaData(vals), path));
+          packageStore.put(new PackageStore.FileEntry(buf, meta, path));
           rsp.add(CommonParams.FILE, path);
         } catch (IOException e) {
           throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
@@ -189,24 +186,24 @@
       String[] signatures = req.getParams().getParams("sig");
       if (signatures == null || signatures.length == 0) return null;
       List<String> sigs = Arrays.asList(signatures);
+      packageStore.refresh(KEYS_DIR);
       validate(sigs, buf);
       return sigs;
     }
 
-    public void validate(List<String> sigs,
-                         ByteBuffer buf) throws SolrException, IOException {
-      Map<String, byte[]> keys = CloudUtil.getTrustedKeys(
-          coreContainer.getZkController().getZkClient(), "exe");
+    private void validate(List<String> sigs,
+                          ByteBuffer buf) throws SolrException, IOException {
+      Map<String, byte[]> keys = packageStore.getKeys();
       if (keys == null || keys.isEmpty()) {
         throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-            "ZK does not have any keys");
+            "package store does not have any keys");
       }
       CryptoKeys cryptoKeys = null;
       try {
         cryptoKeys = new CryptoKeys(keys);
       } catch (Exception e) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-            "Error parsing public keys in ZooKeeper");
+            "Error parsing public keys in Package store");
       }
       for (String sig : sigs) {
         if (cryptoKeys.verify(sig, buf) == null) {
@@ -219,6 +216,20 @@
 
   }
 
+  /**
+   * Creates a JSON string with the metadata
+   * @lucene.internal
+   */
+  public static MetaData _createJsonMetaData(ByteBuffer buf, List<String> signatures) throws IOException {
+    String sha512 = DigestUtils.sha512Hex(new ByteBufferInputStream(buf));
+    Map<String, Object> vals = new HashMap<>();
+    vals.put(MetaData.SHA512, sha512);
+    if (signatures != null) {
+      vals.put("sig", signatures);
+    }
+    return new MetaData(vals);
+  }
+
   @EndPoint(
       path = "/node/files/*",
       method = SolrRequest.METHOD.GET,
@@ -228,6 +239,14 @@
     public void read(SolrQueryRequest req, SolrQueryResponse rsp) {
       String path = req.getPathTemplateValues().get("*");
       String pathCopy = path;
+      if (req.getParams().getBool("sync", false)) {
+        try {
+          packageStore.syncToAllNodes(path);
+          return;
+        } catch (IOException e) {
+          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Error getting file ", e);
+        }
+      }
       String getFrom = req.getParams().get("getFrom");
       if (getFrom != null) {
         coreContainer.getUpdateShardHandler().getUpdateExecutor().submit(() -> {
@@ -287,7 +306,7 @@
 
   }
 
-  static class MetaData implements MapWriter {
+  public static class MetaData implements MapWriter {
     public static final String SHA512 = "sha512";
     String sha512;
     List<String> signatures;
@@ -312,7 +331,7 @@
 
   static final String INVALIDCHARS = " /\\#&*\n\t%@~`=+^$><?{}[]|:;!";
 
-  public static void validateName(String path) {
+  public static void validateName(String path, boolean failForTrusted) {
     if (path == null) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "empty path");
     }
@@ -328,15 +347,34 @@
         }
       }
     }
+    if (failForTrusted &&  TRUSTED_DIR.equals(parts.get(0))) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "trying to write into /_trusted_/ directory");
+    }
   }
 
+  /**Validate a file for signature
+   *
+   * @param sigs the signatures. atleast one should succeed
+   * @param entry The file details
+   * @param isFirstAttempt If there is a failure
+   */
   public void validate(List<String> sigs,
-                       PackageStore.FileEntry entry) throws SolrException {
-    Map<String, byte[]> keys = CloudUtil.getTrustedKeys(
-        coreContainer.getZkController().getZkClient(), "exe");
+                       PackageStore.FileEntry entry,
+                       boolean isFirstAttempt) throws SolrException, IOException {
+    if (!isFirstAttempt) {
+      //we are retrying because last validation failed.
+      // get all keys again and try again
+      packageStore.refresh(KEYS_DIR);
+    }
+
+    Map<String, byte[]> keys = packageStore.getKeys();
     if (keys == null || keys.isEmpty()) {
+      if(isFirstAttempt) {
+        validate(sigs, entry, false);
+        return;
+      }
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-          "ZooKeeper does not have any public keys");
+          "Packagestore does not have any public keys");
     }
     CryptoKeys cryptoKeys = null;
     try {
@@ -346,14 +384,22 @@
           "Error parsing public keys in ZooKeeper");
     }
     for (String sig : sigs) {
-      Supplier<String> errMsg = () -> "Signature does not match any public key : " + sig + "sha256 "+ entry.getMetaData().sha512;
+      Supplier<String> errMsg = () -> "Signature does not match any public key : " + sig + "sha256 " + entry.getMetaData().sha512;
       if (entry.getBuffer() != null) {
         if (cryptoKeys.verify(sig, entry.getBuffer()) == null) {
+          if(isFirstAttempt) {
+            validate(sigs, entry, false);
+            return;
+          }
           throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, errMsg.get());
         }
       } else {
         InputStream inputStream = entry.getInputStream();
         if (cryptoKeys.verify(sig, inputStream) == null) {
+          if(isFirstAttempt)  {
+            validate(sigs, entry, false);
+            return;
+          }
           throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, errMsg.get());
         }
 
diff --git a/solr/core/src/java/org/apache/solr/packagemanager/PackageUtils.java b/solr/core/src/java/org/apache/solr/packagemanager/PackageUtils.java
index 518e4c4..602a9e5 100644
--- a/solr/core/src/java/org/apache/solr/packagemanager/PackageUtils.java
+++ b/solr/core/src/java/org/apache/solr/packagemanager/PackageUtils.java
@@ -26,6 +26,7 @@
 import java.util.zip.ZipFile;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.lucene.util.SuppressForbidden;
@@ -39,7 +40,10 @@
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.BlobRepository;
+import org.apache.solr.filestore.DistribPackageStore;
+import org.apache.solr.filestore.PackageStoreAPI;
 import org.apache.solr.packagemanager.SolrPackage.Manifest;
 import org.apache.solr.util.SolrJacksonAnnotationInspector;
 
@@ -137,7 +141,12 @@
    */
   public static String getJsonStringFromUrl(HttpClient client, String url) {
     try {
-      return IOUtils.toString(client.execute(new HttpGet(url)).getEntity().getContent(), "UTF-8");
+      HttpResponse resp = client.execute(new HttpGet(url));
+      if (resp.getStatusLine().getStatusCode() != 200) {
+        throw new SolrException(ErrorCode.NOT_FOUND,
+            "Error (code="+resp.getStatusLine().getStatusCode()+") fetching from URL: "+url);
+      }
+      return IOUtils.toString(resp.getEntity().getContent(), "UTF-8");
     } catch (UnsupportedOperationException | IOException e) {
       throw new RuntimeException(e);
     }
@@ -244,4 +253,11 @@
   public static String getCollectionParamsPath(String collection) {
     return "/api/collections/" + collection + "/config/params";
   }
+
+  public static void uploadKey(byte bytes[], String path, Path home, HttpSolrClient client) throws IOException {
+    ByteBuffer buf = ByteBuffer.wrap(bytes);
+    PackageStoreAPI.MetaData meta = PackageStoreAPI._createJsonMetaData(buf, null);
+    DistribPackageStore._persistToFile(home, path, buf, ByteBuffer.wrap(Utils.toJSON(meta)));
+  }
+
 }
diff --git a/solr/core/src/java/org/apache/solr/packagemanager/RepositoryManager.java b/solr/core/src/java/org/apache/solr/packagemanager/RepositoryManager.java
index 3d6075b..a0cc0e1 100644
--- a/solr/core/src/java/org/apache/solr/packagemanager/RepositoryManager.java
+++ b/solr/core/src/java/org/apache/solr/packagemanager/RepositoryManager.java
@@ -22,10 +22,10 @@
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.lang.invoke.MethodHandles;
-import java.net.MalformedURLException;
 import java.net.URL;
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -48,10 +48,12 @@
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.core.BlobRepository;
+import org.apache.solr.filestore.PackageStoreAPI;
 import org.apache.solr.packagemanager.SolrPackage.Artifact;
 import org.apache.solr.packagemanager.SolrPackage.SolrPackageRelease;
 import org.apache.solr.pkg.PackageAPI;
 import org.apache.solr.pkg.PackagePluginHolder;
+import org.apache.solr.util.SolrCLI;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -116,24 +118,31 @@
   /**
    * Add a repository to Solr
    */
-  public void addRepository(String name, String uri) throws KeeperException, InterruptedException, MalformedURLException, IOException {
+  public void addRepository(String repoName, String uri) throws Exception {
     String existingRepositoriesJson = getRepositoriesJson(packageManager.zkClient);
     log.info(existingRepositoriesJson);
 
     List<PackageRepository> repos = getMapper().readValue(existingRepositoriesJson, List.class);
-    repos.add(new DefaultPackageRepository(name, uri));
+    repos.add(new DefaultPackageRepository(repoName, uri));
     if (packageManager.zkClient.exists(PackageUtils.REPOSITORIES_ZK_PATH, true) == false) {
       packageManager.zkClient.create(PackageUtils.REPOSITORIES_ZK_PATH, getMapper().writeValueAsString(repos).getBytes("UTF-8"), CreateMode.PERSISTENT, true);
     } else {
       packageManager.zkClient.setData(PackageUtils.REPOSITORIES_ZK_PATH, getMapper().writeValueAsString(repos).getBytes("UTF-8"), true);
     }
 
-    if (packageManager.zkClient.exists("/keys", true)==false) packageManager.zkClient.create("/keys", new byte[0], CreateMode.PERSISTENT, true);
-    if (packageManager.zkClient.exists("/keys/exe", true)==false) packageManager.zkClient.create("/keys/exe", new byte[0], CreateMode.PERSISTENT, true);
-    if (packageManager.zkClient.exists("/keys/exe/" + name + ".der", true)==false) {
-      packageManager.zkClient.create("/keys/exe/" + name + ".der", new byte[0], CreateMode.PERSISTENT, true);
-    }
-    packageManager.zkClient.setData("/keys/exe/" + name + ".der", IOUtils.toByteArray(new URL(uri + "/publickey.der").openStream()), true);
+    addKey(IOUtils.toByteArray(new URL(uri + "/publickey.der").openStream()), repoName + ".der");
+  }
+
+  public void addKey(byte[] key, String destinationKeyFilename) throws Exception {
+    // get solr_home directory from info servlet
+    String systemInfoUrl = solrClient.getBaseURL() + "/solr/admin/info/system";
+    Map<String,Object> systemInfo = SolrCLI.getJson(solrClient.getHttpClient(), systemInfoUrl, 2, true);
+    String solrHome = (String) systemInfo.get("solr_home");
+    
+    // put the public key into package store's trusted key store and request a sync.
+    String path = PackageStoreAPI.KEYS_DIR + "/" + destinationKeyFilename;
+    PackageUtils.uploadKey(key, path, Paths.get(solrHome), solrClient);
+    PackageUtils.getJsonStringFromUrl(solrClient.getHttpClient(), solrClient.getBaseURL() + "/api/node/files" + path + "?sync=true");
   }
 
   private String getRepositoriesJson(SolrZkClient zkClient) throws UnsupportedEncodingException, KeeperException, InterruptedException {
diff --git a/solr/core/src/java/org/apache/solr/pkg/PackageLoader.java b/solr/core/src/java/org/apache/solr/pkg/PackageLoader.java
index 8783d9b..c4c40eb 100644
--- a/solr/core/src/java/org/apache/solr/pkg/PackageLoader.java
+++ b/solr/core/src/java/org/apache/solr/pkg/PackageLoader.java
@@ -168,7 +168,13 @@
         Version version = myVersions.get(v.version);
         if (version == null) {
           log.info("A new version: {} added for package: {} with artifacts {}", v.version, this.name, v.files);
-          myVersions.put(v.version, new Version(this, v));
+          Version ver = null;
+          try {
+            ver = new Version(this, v);
+          } catch (Exception e) {
+            log.error("package could not be loaded "+ ver.toString());
+          }
+          myVersions.put(v.version, ver);
           sortedVersions.add(v.version);
         }
       }
@@ -248,9 +254,13 @@
         this.parent = parent;
         this.version = v;
         List<Path> paths = new ArrayList<>();
+
+        List<String> errs = new ArrayList<>();
+        coreContainer.getPackageStoreAPI().validateFiles(version.files, true, s -> errs.add(s));
+        if(!errs.isEmpty()) {
+          throw new RuntimeException("Cannot load package: " +errs);
+        }
         for (String file : version.files) {
-          //ensure that the files are downloaded and available
-          coreContainer.getPackageStoreAPI().getPackageStore().fetch(file,null);
           paths.add(coreContainer.getPackageStoreAPI().getPackageStore().getRealpath(file));
         }
 
@@ -283,6 +293,11 @@
           closeWhileHandlingException(loader);
         }
       }
+
+      @Override
+      public String toString() {
+        return jsonStr();
+      }
     }
   }
 
diff --git a/solr/core/src/java/org/apache/solr/util/PackageTool.java b/solr/core/src/java/org/apache/solr/util/PackageTool.java
index f7744c9..87bf7b4 100644
--- a/solr/core/src/java/org/apache/solr/util/PackageTool.java
+++ b/solr/core/src/java/org/apache/solr/util/PackageTool.java
@@ -19,12 +19,15 @@
 import static org.apache.solr.packagemanager.PackageUtils.printGreen;
 import static org.apache.solr.packagemanager.PackageUtils.print;
 
+import java.io.File;
 import java.lang.invoke.MethodHandles;
+import java.nio.file.Paths;
 import java.util.Map;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.io.FileUtils;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.core.config.Configurator;
@@ -44,7 +47,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class PackageTool extends SolrCLI.ToolBase {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -92,6 +94,10 @@
                 repositoryManager.addRepository(repoName, repoUrl);
                 PackageUtils.printGreen("Added repository: " + repoName);
                 break;
+              case "add-key":
+                String keyFilename = cli.getArgs()[1];
+                repositoryManager.addKey(FileUtils.readFileToByteArray(new File(keyFilename)), Paths.get(keyFilename).getFileName().toString());
+                break;
               case "list-installed":
                 PackageUtils.printGreen("Installed packages:\n-----");                
                 for (SolrPackageInstance pkg: packageManager.fetchInstalledPackageInstances()) {
diff --git a/solr/core/src/test/org/apache/solr/filestore/TestDistribPackageStore.java b/solr/core/src/test/org/apache/solr/filestore/TestDistribPackageStore.java
index e90dd5e..f1681e4 100644
--- a/solr/core/src/test/org/apache/solr/filestore/TestDistribPackageStore.java
+++ b/solr/core/src/test/org/apache/solr/filestore/TestDistribPackageStore.java
@@ -44,8 +44,8 @@
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.common.util.Utils;
+import org.apache.solr.packagemanager.PackageUtils;
 import org.apache.solr.util.LogLevel;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.server.ByteBufferInputStream;
 import org.junit.After;
 import org.junit.Before;
@@ -75,8 +75,9 @@
     try {
 
       byte[] derFile = readFile("cryptokeys/pub_key512.der");
-      cluster.getZkClient().makePath("/keys/exe", true);
-      cluster.getZkClient().create("/keys/exe/pub_key512.der", derFile, CreateMode.PERSISTENT, true);
+      uploadKey(derFile, PackageStoreAPI.KEYS_DIR+"/pub_key512.der", cluster);
+//      cluster.getZkClient().makePath("/keys/exe", true);
+//      cluster.getZkClient().create("/keys/exe/pub_key512.der", derFile, CreateMode.PERSISTENT, true);
 
       try {
         postFile(cluster.getSolrClient(), getFileContent("runtimecode/runtimelibs.jar.bin"),
@@ -247,7 +248,16 @@
     return rsp;
   }
 
-
+  public static void uploadKey(byte[] bytes, String path, MiniSolrCloudCluster cluster) throws Exception {
+    JettySolrRunner jetty = cluster.getRandomJetty(random());
+    try(HttpSolrClient client = (HttpSolrClient) jetty.newClient()) {
+      PackageUtils.uploadKey(bytes, path, jetty.getCoreContainer().getResourceLoader().getInstancePath(), client);
+      Object resp = Utils.executeGET(client.getHttpClient(), jetty.getBaseURLV2().toString() + "/node/files" + path + "?sync=true", null);
+      System.out.println("sync resp: "+jetty.getBaseURLV2().toString() + "/node/files" + path + "?sync=true"+" ,is: "+resp);
+    }
+    waitForAllNodesHaveFile(cluster,path, Utils.makeMap(":files:" + path + ":name", (Predicate<Object>) Objects::nonNull),
+        false);
+  }
 
   public static void postFile(SolrClient client, ByteBuffer buffer, String name, String sig)
       throws SolrServerException, IOException {
diff --git a/solr/core/src/test/org/apache/solr/pkg/TestPackages.java b/solr/core/src/test/org/apache/solr/pkg/TestPackages.java
index d37fdf4..897383a 100644
--- a/solr/core/src/test/org/apache/solr/pkg/TestPackages.java
+++ b/solr/core/src/test/org/apache/solr/pkg/TestPackages.java
@@ -50,9 +50,9 @@
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.Utils;
+import org.apache.solr.filestore.PackageStoreAPI;
 import org.apache.solr.filestore.TestDistribPackageStore;
 import org.apache.solr.util.LogLevel;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.data.Stat;
 import org.junit.After;
 import org.junit.Before;
@@ -63,6 +63,7 @@
 import static org.apache.solr.common.params.CommonParams.WT;
 import static org.apache.solr.core.TestDynamicLoading.getFileContent;
 import static org.apache.solr.filestore.TestDistribPackageStore.readFile;
+import static org.apache.solr.filestore.TestDistribPackageStore.uploadKey;
 import static org.apache.solr.filestore.TestDistribPackageStore.waitForAllNodesHaveFile;
 
 @LogLevel("org.apache.solr.pkg.PackageLoader=DEBUG;org.apache.solr.pkg.PackageAPI=DEBUG")
@@ -95,8 +96,9 @@
       String EXPR1 = "/mypkg/expressible.jar";
       String COLLECTION_NAME = "testPluginLoadingColl";
       byte[] derFile = readFile("cryptokeys/pub_key512.der");
-      cluster.getZkClient().makePath("/keys/exe", true);
-      cluster.getZkClient().create("/keys/exe/pub_key512.der", derFile, CreateMode.PERSISTENT, true);
+      uploadKey(derFile, PackageStoreAPI.KEYS_DIR+"/pub_key512.der", cluster);
+//      cluster.getZkClient().makePath("/keys/exe", true);
+//      cluster.getZkClient().create("/keys/exe/pub_key512.der", derFile, CreateMode.PERSISTENT, true);
       postFileAndWait(cluster, "runtimecode/runtimelibs.jar.bin", FILE1,
           "L3q/qIGs4NaF6JiO0ZkMUFa88j0OmYc+I6O7BOdNuMct/xoZ4h73aZHZGc0+nmI1f/U3bOlMPINlSOM6LK3JpQ==");
 
@@ -459,7 +461,7 @@
           .build();
 
       //the files is not yet there. The command should fail with error saying "No such file"
-      expectError(req, cluster.getSolrClient(), errPath, "No such file :");
+      expectError(req, cluster.getSolrClient(), errPath, "No such file:");
 
 
       //post the jar file. No signature is sent
@@ -471,20 +473,19 @@
           FILE1 + " has no signature");
       //now we upload the keys
       byte[] derFile = readFile("cryptokeys/pub_key512.der");
-      cluster.getZkClient().makePath("/keys/exe", true);
-      cluster.getZkClient().create("/keys/exe/pub_key512.der", derFile, CreateMode.PERSISTENT, true);
+      uploadKey(derFile, PackageStoreAPI.KEYS_DIR+"/pub_key512.der", cluster);
       //and upload the same file with a different name but it has proper signature
       postFileAndWait(cluster, "runtimecode/runtimelibs.jar.bin", FILE2,
           "L3q/qIGs4NaF6JiO0ZkMUFa88j0OmYc+I6O7BOdNuMct/xoZ4h73aZHZGc0+nmI1f/U3bOlMPINlSOM6LK3JpQ==");
       // with correct signature
       //after uploading the file, let's delete the keys to see if we get proper error message
-      cluster.getZkClient().delete("/keys/exe/pub_key512.der", -1, true);
+//      cluster.getZkClient().delete("/keys/exe/pub_key512.der", -1, true);
       add.files = Arrays.asList(new String[]{FILE2});
-      expectError(req, cluster.getSolrClient(), errPath,
-          "ZooKeeper does not have any public keys");
+      /*expectError(req, cluster.getSolrClient(), errPath,
+          "ZooKeeper does not have any public keys");*/
 
       //Now lets' put the keys back
-      cluster.getZkClient().create("/keys/exe/pub_key512.der", derFile, CreateMode.PERSISTENT, true);
+//      cluster.getZkClient().create("/keys/exe/pub_key512.der", derFile, CreateMode.PERSISTENT, true);
 
       //this time we have a file with proper signature, public keys are in ZK
       // so the add {} command should succeed
diff --git a/solr/solr-ref-guide/src/package-manager-internals.adoc b/solr/solr-ref-guide/src/package-manager-internals.adoc
index 6cb1956..467691b 100644
--- a/solr/solr-ref-guide/src/package-manager-internals.adoc
+++ b/solr/solr-ref-guide/src/package-manager-internals.adoc
@@ -15,7 +15,7 @@
 
 At the heart of the system, we have classloader isolation. To achieve this, the system is simplified into two layered classloaders:
 The root classloader which has all the jars from Solr classpath. This requires Solr node restart to change anything.
-A set of named classloaders that inherit from the root classloader. The life cycles of the named classloaders are tied to the package configuration in ZK. As soon as the configuration is modified, the corresponding classloaders are reloaded and components are asked to reload.
+A set of named classloaders that inherit from the root classloader. The life cycles of the named classloaders are tied to the package configuration in ZooKeeper. As soon as the configuration is modified, the corresponding classloaders are reloaded and components are asked to reload.
 
 == Package Loading Security
 Packages are disabled by default. Start all your nodes with the system property `-Denable.packages=true` to use this feature.
@@ -28,7 +28,7 @@
 ----
 
 === Upload Your Keys
-Package binaries must be signed with your private keys and ensure your public keys are published in ZooKeeper.
+Package binaries must be signed with your private keys and ensure your public keys are published in package store's trusted store.
 
 *Example*
 [source,bash]
@@ -36,9 +36,8 @@
 $ openssl genrsa -out my_key.pem 512
 # create the public key in .der format
 $ openssl rsa -in my_key.pem -pubout -outform DER -out my_key.der
-# upload to ZooKeeper
-$ server/scripts/cloud-scripts/zkcli.sh -zkhost 127.0.0.1:9983 -cmd makepath /keys/exe/
-$ server/scripts/cloud-scripts/zkcli.sh -zkhost 127.0.0.1:9983 -cmd putfile /keys/exe/my_key.der my_key.der
+# upload key to package store
+$ bin/solr package add-key my_key.der
 ----
 
 == Package Store
diff --git a/solr/solr-ref-guide/src/package-manager.adoc b/solr/solr-ref-guide/src/package-manager.adoc
index 66f1378..94e948e 100644
--- a/solr/solr-ref-guide/src/package-manager.adoc
+++ b/solr/solr-ref-guide/src/package-manager.adoc
@@ -19,7 +19,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-The package manager in Solr allows installation and update of Solr-specific packages in distributed and standalone environments.
+The package manager in Solr allows installation and updating of Solr-specific packages in Solr's cluster environment.
 
 In this system, a _package_ is a set of Java jar files (usually one) containing one or more <<solr-plugins.adoc#solr-plugins,Solr plugins>>. Each jar file is also accompanied by a signature string (which can be verified against a supplied public key).
 
@@ -27,7 +27,7 @@
 
 Other elements of the design include the ability to install from a remote repository; package standardization; a command line interface (CLI); and a package store.
 
-This section will focus on how to use the package manager to install and update plugins.
+This section will focus on how to use the package manager to install and update packages.
 For technical details, see the section <<package-manager-internals.adoc#package-manager-internals,Package Manager internals>>.
 
 == Interacting with the Package Manager
@@ -158,6 +158,6 @@
 
 == Security
 
-As noted above in the section <<Add Trusted Repositories>>, the `add-repo` step should only be executed using an HTTPS endpoint in Solr (all other steps can be executed using HTTP - see also <<package-manager-internals.adoc#package-manager-internals,Package Manager Internals>>). This step registers the public key of the trusted repository, and hence can only be executed using the package manager (CLI) having direct write access to ZooKeeper. It is critical to protect ZooKeeper from unauthorized write access.
+The `add-repo` step should only be executed using HTTPS enabled repository urls only so as to prevent against MITM attacks when Solr is fetching the public key for the repository. This `add-repo` step registers the public key of the trusted repository, and hence can only be executed using the package manager (CLI) having direct write access to the trusted store of the package store (a special location in the package store that cannot be written to using the package store APIs). Also, it is critical to protect ZooKeeper from unauthorized write access.
 
-Also, keep in mind, that it is possible to install *any* package from a repository once it has been added. If you want to use some packages in production, a best practice is to setup your own repository and add that to Solr instead of adding a generic third-party repository that is beyond your administrative control.
+Also, keep in mind, that it is possible to install *any* package from a repository once it has been added. If you want to use some packages in production, a best practice is to setup your own repository and add that to Solr instead of adding a generic third-party repository that is beyond your administrative control. You might want to re-sign packages from a third-party repository using your own private keys and host them at your own repository.