| /************************************************************ |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| *************************************************************/ |
| |
| #ifndef SINGA_WORKER_H_ |
| #define SINGA_WORKER_H_ |
| |
| #include <string> |
| #include <vector> |
| #include "comm/socket.h" |
| #include "neuralnet/neuralnet.h" |
| #include "proto/job.pb.h" |
| |
| namespace singa { |
| |
| //!< sleep 5 milliseconds if the Param is not updated to the expected version |
| const int kCollectSleepTime = 5; |
| /** |
| * The Worker class which runs the training algorithm. |
| * The first worker group will initialize parameters of the Net, |
| * and put them into the distributed memory/table. |
| * The virtual function TrainOneBatch and TestOneBatch implement the |
| * training and test algorithm for one mini-batch data. |
| * |
| * Child workers override the two functions to implement their training |
| * algorithms, e.g., the BPWorker/CDWorker/BPTTWorker implements the BP/CD/BPTT |
| * algorithm respectively. |
| */ |
| class Worker { |
| public: |
| /** |
| * Create an instance of the subclass of Worker. |
| * |
| * @param[in] conf configuration of the TrainOneBatch algorithm. Different |
| * Worker subclasses implement different algorithms. Hence the creation is |
| * based on the TrainOneBatch algorithm type. Currently SINGA |
| * provides two algorithms: |
| * -# Back-propagation for the feed-forward models, e.g., CNN and MLP, and the |
| * recurrent neural networks. |
| * -# Contrastive divergence for the energy models, e.g., RBM. |
| * |
| * @return a pointer to the instance of the Worker subclass. |
| */ |
| static Worker* Create(const AlgProto& conf); |
| virtual ~Worker(); |
| /** |
| * @param[in] grp_id global worker group ID |
| * @param[in] id worker ID within the group |
| * @param[in] conf job configuration |
| * @param[in] train_net pointer to the training neural net, which could be |
| * shared with other workers from the same group. Different workers run over |
| * differnt subset of layers. |
| * @param[in] val_net pointer to the validation neural net. Currently only the |
| * first worker from the first group would have validation neural net. All |
| * other workers receive nullptr for this argument. |
| * @param[in] test_net pointer to the test neural net. Currently only the |
| * first worker from the first group would have test neural net. All other |
| * workers receive nullptr for this argument. |
| */ |
| virtual void Setup(int grp_id, int id, const JobProto& conf, |
| NeuralNet* train_net, NeuralNet* val_net, NeuralNet* test_net); |
| |
| /** |
| * Main function of Worker. |
| * |
| * Train the neuralnet step by step, test/validation is done periodically. |
| */ |
| void Run(); |
| |
| /** |
| * Init values of Param instances assocaited with local layers (i.e., layers |
| * dispatched to this worker). |
| * |
| * If one Param is owned by the worker, then it should be initialized and put |
| * to servers. Otherwise Get() should be called to get the Param. The Get() |
| * may not send get requests if the Param owner is in the same procs, for |
| * which case the memory space of the Param objects are shared. But if this |
| * worker and the Param owner worker run on different devices (e.g., GPUs), |
| * then the get request would be sent. |
| * |
| * If the training starts from scrath, every Param object is initialzed using |
| * ParamGenerator. After that, the worker may |
| * train for a couple of steps to warmup the params before put |
| * them to servers (warmup of JobProto controls this). |
| * |
| * If one Param object's name matches that of one Param object from the |
| * checkpoint files, its Param values would be loaded from checkpoint files. |
| * |
| * @param[in] job_conf job configuration which provides settings for |
| * checkpoint file paths, warmup steps and Param versions. |
| * @param[out] net pointer to a neural net whose Param values will be |
| * initialized. |
| */ |
| void InitNetParams(const JobProto& job_conf, NeuralNet* net); |
| |
| /** |
| * Checkpoint all Param objects owned by the worker onto disk. |
| * The serialization is done using BlobProtos which includes the name, version |
| * and values of each Param object. |
| * Different workers would generate different checkpoint files. The file path |
| * is <workspace>/checkpoint-<jobname>-step<step>-worker<worker_id>.bin |
| * @param[in] step training step |
| * @param[in] folder directory to put the checkpoint file |
| * @param net the training net whose Param objects will be dumped. |
| */ |
| void Checkpoint(int step, const std::string& folder, NeuralNet* net); |
| |
| /** |
| * Train one mini-batch. |
| * Test/Validation is done before training. |
| * |
| * @param[in] step training step. |
| * @param[in] net neural net to be trained. |
| */ |
| virtual void TrainOneBatch(int step, NeuralNet* net) = 0; |
| |
| /** |
| * Test/validate one mini-batch data. |
| * |
| * @param[in] step test step. |
| * @param[in] phase test could be done for validation or test phase. |
| * @param[in] net neural net for test |
| */ |
| virtual void TestOneBatch(int step, Phase phase, NeuralNet* net) = 0; |
| |
| /** |
| * Display infomation from layers. |
| * |
| * @param flag could be a combination of multiple phases, e.g, kTest|kForward, |
| * it is passed to the Layer::ToString() function for each layer to decide |
| * what to display . |
| * @param prefix display prefix, e.g., 'Train step 100', 'Test step 90'. |
| * @param net display layers from this neural net. |
| */ |
| void Display(int flag, const std::string& prefix, NeuralNet* net); |
| |
| /** |
| * Put Param values to server. |
| * |
| * @param param |
| * @param step used as current param version for the put request |
| */ |
| int Put(int step, Param* param); |
| |
| /** |
| * Get Param with specific version from server |
| * If the current version >= the requested version, then return. |
| * Otherwise send a get request to stub who would forwards it to servers. |
| * @param param |
| * @param step requested param version |
| */ |
| int Get(int step, Param* param); |
| |
| /** |
| * Update Param. |
| * |
| * @param param |
| * @param step training step used for updating (e.g., deciding learning rate). |
| */ |
| int Update(int step, Param* param); |
| |
| /** |
| * Wait for the response of the update/get requests. |
| * |
| * @param param |
| * @param step not used now. |
| */ |
| int Collect(int step, Param* param); |
| |
| /** |
| * Call Collect() for every param of net |
| */ |
| int CollectAll(int step, NeuralNet* net); |
| |
| /** |
| * Receive blobs from other workers due to model partitions. |
| */ |
| void ReceiveBlobs(bool data, bool grad, BridgeLayer* layer, NeuralNet* net); |
| |
| /** |
| * Send blobs to other workers due to model partitions. |
| */ |
| void SendBlobs(bool data, bool grad, BridgeLayer* layer, NeuralNet* net); |
| |
| |
| /** |
| * @param[in] step |
| * @return true if it is time to display training info, e.g., loss; otherwise |
| * false. |
| */ |
| inline bool DisplayNow(int step) const { |
| return job_conf_.disp_freq() > 0 |
| && step >= job_conf_.disp_after() |
| && ((step - job_conf_.disp_after()) % job_conf_.disp_freq() == 0); |
| } |
| /** |
| * @param[in] step |
| * @return true if it is time to finish the training; otherwise false. |
| */ |
| inline bool StopNow(int step) const { |
| return step >= job_conf_.train_steps(); |
| } |
| /** |
| * @param[in] step |
| * @return true if it is time to do checkpoint Param objects; otherwise false. |
| */ |
| inline bool CheckpointNow(int step) const { |
| return job_conf_.checkpoint_freq() > 0 |
| && step >= job_conf_.checkpoint_after() |
| && ((step - job_conf_.checkpoint_after()) |
| % job_conf_.checkpoint_freq() == 0); |
| } |
| /** |
| * @param[in] step |
| * @return true if it is time to do test over the test dataset. |
| */ |
| inline bool TestNow(int step) const { |
| return job_conf_.test_freq() > 0 |
| && job_conf_.test_steps() > 0 |
| && step >= job_conf_.test_after() |
| && ((step - job_conf_.test_after()) % job_conf_.test_freq() == 0); |
| } |
| /** |
| * @param[in] step |
| * @return true if it is time to do test over the validation dataset. |
| */ |
| inline bool ValidateNow(int step) const { |
| return job_conf_.validate_freq() > 0 |
| && job_conf_.validate_steps() > 0 |
| && step >= job_conf_.validate_after() |
| && ((step - job_conf_.validate_after()) % job_conf_.validate_freq() == 0); |
| } |
| /** |
| * @return a vector with pointers to all neural nets. |
| */ |
| const std::vector<NeuralNet*> GetNets() const { |
| return std::vector<NeuralNet*> {train_net_, val_net_, test_net_}; |
| } |
| /** |
| * @return training net. |
| */ |
| inline NeuralNet* train_net() const { |
| return train_net_; |
| } |
| /** |
| * @return group ID |
| */ |
| inline int grp_id() const { return grp_id_; } |
| /** |
| * @reutrn worker ID within the worker group. |
| */ |
| inline int id() const { return id_; } |
| |
| protected: |
| int grp_id_ = -1, id_ = -1; |
| int step_ = 0; |
| JobProto job_conf_; |
| NeuralNet* train_net_ = nullptr; |
| NeuralNet* test_net_ = nullptr; |
| NeuralNet* val_net_ = nullptr; |
| Dealer* layer_dealer_ = nullptr; |
| Dealer* dealer_ = nullptr; |
| }; |
| |
| class BPWorker: public Worker { |
| public: |
| void TrainOneBatch(int step, NeuralNet* net) override; |
| void TestOneBatch(int step, Phase phase, NeuralNet* net) override; |
| void Forward(int step, Phase phase, NeuralNet* net); |
| void Backward(int step, NeuralNet* net); |
| }; |
| |
| class CDWorker: public Worker { |
| public: |
| void TrainOneBatch(int step, NeuralNet* net) override; |
| void TestOneBatch(int step, Phase phase, NeuralNet* net) override; |
| }; |
| |
| inline int BlobTrgt(int grp, int layer) { |
| return (grp << 16) | layer; |
| } |
| |
| inline int BlobGrp(int blob_trgt) { |
| return blob_trgt >> 16; |
| } |
| |
| inline int BlobLayer(int blob_trgt) { |
| static int mask = (1 << 16) -1; |
| return blob_trgt & mask; |
| } |
| |
| } // namespace singa |
| |
| #endif // SINGA_WORKER_H_ |