blob: d3d15e0c7b3f9e84a1f0e9dda4c06b430783831d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package ipfs
import (
"context"
"crypto/rand"
"encoding/base64"
"encoding/json"
"io/ioutil"
memoryds "github.com/ipfs/go-datastore"
dsync "github.com/ipfs/go-datastore/sync"
levelds "github.com/ipfs/go-ds-leveldb"
cfg "github.com/ipfs/go-ipfs-config"
files "github.com/ipfs/go-ipfs-files"
core "github.com/ipfs/go-ipfs/core"
coreapi "github.com/ipfs/go-ipfs/core/coreapi"
repo "github.com/ipfs/go-ipfs/repo"
coreiface "github.com/ipfs/interface-go-ipfs-core"
path "github.com/ipfs/interface-go-ipfs-core/path"
ci "github.com/libp2p/go-libp2p-crypto"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/pkg/errors"
)
const (
defaultShardFun = "prefix"
)
// NodeConnectorBuilder for building the IPFS embedded node connector
type NodeConnectorBuilder struct {
ctx context.Context
localAddresses []string
bootstrapPeers []string
dstore repo.Datastore
}
// NodeConnectorOption function
type NodeConnectorOption func(*NodeConnectorBuilder) error
// NodeConnector is IPFS embedded node
type NodeConnector struct {
id string
ctx context.Context
node *core.IpfsNode
api coreiface.CoreAPI
}
// NewNodeConnector inisialises and runs a new IPFS Node
func NewNodeConnector(options ...NodeConnectorOption) (Connector, error) {
cb := &NodeConnectorBuilder{
localAddresses: []string{},
bootstrapPeers: []string{},
ctx: context.Background(),
dstore: nil,
}
for _, option := range options {
if err := option(cb); err != nil {
return nil, err
}
}
if len(cb.localAddresses) == 0 {
cb.localAddresses = []string{"/ip4/0.0.0.0/tcp/4001"}
}
if cb.dstore == nil {
return nil, errors.New("IPFS datastore not initialized")
}
// Generate Node keys
priv, _, err := ci.GenerateKeyPairWithReader(ci.RSA, 2048, rand.Reader)
if err != nil {
return nil, err
}
privkeyb, err := priv.Bytes()
if err != nil {
return nil, err
}
pid, err := peer.IDFromPrivateKey(priv)
if err != nil {
return nil, err
}
conf := cfg.Config{
Bootstrap: cb.bootstrapPeers,
}
conf.Addresses.Swarm = cb.localAddresses
conf.Identity.PeerID = pid.Pretty()
conf.Identity.PrivKey = base64.StdEncoding.EncodeToString(privkeyb)
conf.Routing.Type = "dht"
conf.Swarm.ConnMgr = cfg.ConnMgr{
Type: "basic",
LowWater: 20,
HighWater: 100,
GracePeriod: "30s",
}
appRepo := &repo.Mock{
D: dsync.MutexWrap(cb.dstore),
C: conf,
}
node, err := core.NewNode(cb.ctx, &core.BuildCfg{
Repo: appRepo,
Online: true,
})
if err != nil {
return nil, err
}
api, err := coreapi.NewCoreAPI(node)
if err != nil {
return nil, err
}
return &NodeConnector{
id: pid.Pretty(),
ctx: cb.ctx,
node: node,
api: api,
}, nil
}
// Add adds a data to the IPFS network and returns the ipfs path
func (c *NodeConnector) Add(data []byte) (string, error) {
f := files.NewBytesFile(data)
defer f.Close()
r, err := c.api.Unixfs().Add(c.ctx, f)
if err != nil {
return "", err
}
return r.Cid().String(), nil
}
// Get gets a data from ipfs path
func (c *NodeConnector) Get(ipfsPath string) ([]byte, error) {
p := path.New(ipfsPath)
node, err := c.api.Unixfs().Get(c.ctx, p)
if err != nil {
return nil, err
}
defer node.Close()
f := files.ToFile(node)
if f == nil {
return nil, errors.New("not a file")
}
defer f.Close()
b, err := ioutil.ReadAll(f)
if err != nil {
return nil, err
}
// TODO: Pin the document
return b, nil
}
// AddJSON encodes the data in JSON and adds it to the IPFS
func (c *NodeConnector) AddJSON(data interface{}) (string, error) {
jd, err := json.Marshal(data)
if err != nil {
return "", err
}
return c.Add(jd)
}
// GetJSON gets data from IPFS and decodes it from JSON
func (c *NodeConnector) GetJSON(path string, data interface{}) error {
jd, err := c.Get(path)
if err != nil {
return err
}
if err := json.Unmarshal(jd, data); err != nil {
return errors.Wrap(ErrDocumentNotValid, err.Error())
}
return nil
}
// GetID returns the local id
func (c *NodeConnector) GetID() string {
return c.id
}
// AddLocalAddress adds a new local IPFS Swarm address
func AddLocalAddress(addr string) NodeConnectorOption {
return func(cb *NodeConnectorBuilder) error {
cb.localAddresses = append(cb.localAddresses, addr)
return nil
}
}
// AddBootstrapPeer adds a IPFS peer to the list of IPFS bootatrap addresses
func AddBootstrapPeer(peerAddr ...string) NodeConnectorOption {
return func(cb *NodeConnectorBuilder) error {
cb.bootstrapPeers = append(cb.bootstrapPeers, peerAddr...)
return nil
}
}
// AddDefaultBootstrapPeers adds the default IPFS bootstrap addresses
// The default IPFS addresses are maintained by the IPFS team
func AddDefaultBootstrapPeers() NodeConnectorOption {
return func(cb *NodeConnectorBuilder) error {
cb.bootstrapPeers = append(cb.bootstrapPeers, cfg.DefaultBootstrapAddresses...)
return nil
}
}
// WithLevelDatastore initialize IPFS node with LevelDB datastore
func WithLevelDatastore(path string) NodeConnectorOption {
return func(cb *NodeConnectorBuilder) (err error) {
cb.dstore, err = levelds.NewDatastore(path, &levelds.Options{
Compression: 0,
})
return nil
}
}
// WithMemoryDatastore initialize IPFS node with memory datastore
func WithMemoryDatastore() NodeConnectorOption {
return func(cb *NodeConnectorBuilder) (err error) {
cb.dstore = memoryds.NewMapDatastore()
return nil
}
}
// WithContext sets the context
func WithContext(ctx context.Context) NodeConnectorOption {
return func(cb *NodeConnectorBuilder) error {
cb.ctx = ctx
return nil
}
}