blob: a6c73990ccac8595ef67fc42a05a81c6ffeb99e5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import (
"errors"
"github.com/apache/ozone-go/api/common"
"github.com/apache/ozone-go/api/datanode"
dnproto "github.com/apache/ozone-go/api/proto/datanode"
"github.com/apache/ozone-go/api/proto/hdds"
omproto "github.com/apache/ozone-go/api/proto/ozone"
"io"
)
func (ozoneClient *OzoneClient) ListKeys(volume string, bucket string) ([]common.Key, error) {
keys, err := ozoneClient.OmClient.ListKeys(volume, bucket)
if err != nil {
return make([]common.Key, 0), err
}
ret := make([]common.Key, 0)
for _, r := range keys {
ret = append(ret, KeyFromProto(r))
}
return ret, nil
}
func (ozoneClient *OzoneClient) ListKeysPrefix(volume string, bucket string, prefix string) ([]common.Key, error) {
keys, err := ozoneClient.OmClient.ListKeysPrefix(volume, bucket, prefix)
if err != nil {
return make([]common.Key, 0), err
}
ret := make([]common.Key, 0)
for _, r := range keys {
ret = append(ret, KeyFromProto(r))
}
return ret, nil
}
func (ozoneClient *OzoneClient) InfoKey(volume string, bucket string, key string) (common.Key, error) {
k, err := ozoneClient.OmClient.GetKey(volume, bucket, key)
return KeyFromProto(k), err
}
func (ozoneClient *OzoneClient) GetKey(volume string, bucket string, key string, destination io.Writer) (common.Key, error) {
keyInfo, err := ozoneClient.OmClient.GetKey(volume, bucket, key)
if err != nil {
return common.Key{}, err
}
if len(keyInfo.KeyLocationList) == 0 {
return common.Key{}, errors.New("Get key returned with zero key location version " + volume + "/" + bucket + "/" + key)
}
if len(keyInfo.KeyLocationList[0].KeyLocations) == 0 {
return common.Key{}, errors.New("Key location doesn't have any datanode for key " + volume + "/" + bucket + "/" + key)
}
for _, location := range keyInfo.KeyLocationList[0].KeyLocations {
pipeline := location.Pipeline
dnBlockId := ConvertBlockId(location.BlockID)
dnClient, err := datanode.CreateDatanodeClient(pipeline)
chunks, err := dnClient.GetBlock(dnBlockId)
if err != nil {
return common.Key{}, err
}
for _, chunk := range chunks {
data, err := dnClient.ReadChunk(dnBlockId, chunk)
if err != nil {
return common.Key{}, err
}
destination.Write(data)
}
dnClient.Close()
}
return common.Key{}, nil
}
func ConvertBlockId(bid *hdds.BlockID) *dnproto.DatanodeBlockID {
id := dnproto.DatanodeBlockID{
ContainerID: bid.ContainerBlockID.ContainerID,
LocalID: bid.ContainerBlockID.LocalID,
}
return &id
}
func (ozoneClient *OzoneClient) PutKey(volume string, bucket string, key string, source io.Reader) (common.Key, error) {
createKey, err := ozoneClient.OmClient.CreateKey(volume, bucket, key)
if err != nil {
return common.Key{}, err
}
keyInfo := createKey.KeyInfo
location := keyInfo.KeyLocationList[0].KeyLocations[0]
pipeline := location.Pipeline
dnClient, err := datanode.CreateDatanodeClient(pipeline)
if err != nil {
return common.Key{}, err
}
chunkSize := 4096
buffer := make([]byte, chunkSize)
chunks := make([]*dnproto.ChunkInfo, 0)
keySize := uint64(0)
locations := make([]*omproto.KeyLocation, 0)
blockId := ConvertBlockId(location.BlockID)
eof := false
for ; ; {
blockOffset := uint64(0)
for i := 0; i < 64; i++ {
count, err := source.Read(buffer)
if err == io.EOF {
eof = true
} else if err != nil {
return common.Key{}, err
}
if count > 0 {
chunk, err := dnClient.CreateAndWriteChunk(blockId, blockOffset, buffer[0:count], uint64(count))
if err != nil {
return common.Key{}, err
}
blockOffset += uint64(count)
keySize += uint64(count)
chunks = append(chunks, &chunk)
}
if eof {
break
}
}
err = dnClient.PutBlock(blockId, chunks)
if err != nil {
return common.Key{}, err
}
if eof {
break
}
//get new block and reset counters
nextBlockResponse, err := ozoneClient.OmClient.AllocateBlock(volume, bucket, key, createKey.ID)
if err != nil {
return common.Key{}, err
}
dnClient.Close()
location = nextBlockResponse.KeyLocation
pipeline = location.Pipeline
dnClient, err = datanode.CreateDatanodeClient(pipeline)
if err != nil {
return common.Key{}, err
}
blockId = ConvertBlockId(location.BlockID)
blockOffset = 0
chunks = make([]*dnproto.ChunkInfo, 0)
}
zero := uint64(0)
locations = append(locations, &omproto.KeyLocation{
BlockID: location.BlockID,
Pipeline: location.Pipeline,
Length: &keySize,
Offset: &zero,
})
ozoneClient.OmClient.CommitKey(volume, bucket, key, createKey.ID, locations, keySize)
return common.Key{}, nil
}
func KeyFromProto(keyProto *omproto.KeyInfo) common.Key {
replicationType := common.ReplicationType(*keyProto.Type)
result := common.Key{
Name: *keyProto.KeyName,
Replication: replicationType,
VolumeName: *keyProto.VolumeName,
BucketName: *keyProto.BucketName,
Size: *keyProto.DataSize,
}
return result
}