blob: 41ce98e7fb8af83eedf4dfc357a2070e9b30184d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package om
import (
"errors"
"github.com/apache/ozone-go/api/common"
"github.com/apache/ozone-go/api/proto/hdds"
ozone_proto "github.com/apache/ozone-go/api/proto/ozone"
"github.com/hortonworks/gohadoop"
hadoop_ipc_client "github.com/hortonworks/gohadoop/hadoop_common/ipc/client"
uuid "github.com/nu7hatch/gouuid"
"net"
"strconv"
"sync"
)
var OM_PROTOCOL = "org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol"
type OmClient struct {
OmHost string
client *hadoop_ipc_client.Client
clientId string
mu sync.Mutex
}
func CreateOmClient(omhost string) OmClient {
clientId, _ := uuid.NewV4()
ugi, _ := gohadoop.CreateSimpleUGIProto()
c := &hadoop_ipc_client.Client{
ClientId: clientId,
Ugi: ugi,
ServerAddress: net.JoinHostPort(omhost, strconv.Itoa(9862))}
return OmClient{
OmHost: omhost,
client: c,
}
}
func (om *OmClient) GetKey(volume string, bucket string, key string) (*ozone_proto.KeyInfo, error) {
keyArgs := &ozone_proto.KeyArgs{
VolumeName: &volume,
BucketName: &bucket,
KeyName: &key,
}
req := ozone_proto.LookupKeyRequest{
KeyArgs: keyArgs,
}
requestType := ozone_proto.Type_LookupKey
wrapperRequest := ozone_proto.OMRequest{
CmdType: &requestType,
LookupKeyRequest: &req,
ClientId: &om.clientId,
}
resp, err := om.submitRequest(&wrapperRequest)
if err != nil {
return nil, err
}
keyProto := resp.GetLookupKeyResponse().GetKeyInfo()
println(keyProto)
return keyProto, nil
}
func (om *OmClient) ListKeys(volume string, bucket string) ([]*ozone_proto.KeyInfo, error) {
return om.ListKeysPrefix(volume, bucket, "")
}
func (om *OmClient) AllocateBlock(volume string, bucket string, key string, clientID *uint64) (*ozone_proto.AllocateBlockResponse,error) {
req := ozone_proto.AllocateBlockRequest{
KeyArgs: &ozone_proto.KeyArgs{
VolumeName: &volume,
BucketName: &bucket,
KeyName: &key,
},
ClientID: clientID,
}
msgType := ozone_proto.Type_AllocateBlock
wrapperRequest := ozone_proto.OMRequest{
CmdType: &msgType,
AllocateBlockRequest: &req,
ClientId: &om.clientId,
}
resp, err := om.submitRequest(&wrapperRequest)
if err != nil {
return nil, err
}
return resp.AllocateBlockResponse, nil
}
func (om *OmClient) CreateKey(volume string, bucket string, key string) (*ozone_proto.CreateKeyResponse, error) {
req := ozone_proto.CreateKeyRequest{
KeyArgs: &ozone_proto.KeyArgs{
VolumeName: &volume,
BucketName: &bucket,
KeyName: &key,
},
}
createKeys := ozone_proto.Type_CreateKey
wrapperRequest := ozone_proto.OMRequest{
CmdType: &createKeys,
CreateKeyRequest: &req,
ClientId: &om.clientId,
}
resp, err := om.submitRequest(&wrapperRequest)
if err != nil {
return nil, err
}
return resp.CreateKeyResponse, nil
}
func (om *OmClient) CommitKey(volume string, bucket string, key string, id *uint64, keyLocations []*ozone_proto.KeyLocation, size uint64) (common.Key, error) {
one := hdds.ReplicationFactor_ONE
standalone := hdds.ReplicationType_STAND_ALONE
req := ozone_proto.CommitKeyRequest{
KeyArgs: &ozone_proto.KeyArgs{
VolumeName: &volume,
BucketName: &bucket,
KeyName: &key,
KeyLocations: keyLocations,
DataSize: &size,
Factor: &one,
Type: &standalone,
},
ClientID: id,
}
messageType := ozone_proto.Type_CommitKey
wrapperRequest := ozone_proto.OMRequest{
CmdType: &messageType,
CommitKeyRequest: &req,
ClientId: &om.clientId,
}
_, err := om.submitRequest(&wrapperRequest)
if err != nil {
return common.Key{}, err
}
return common.Key{}, nil
}
func (om *OmClient) ListKeysPrefix(volume string, bucket string, prefix string) ([]*ozone_proto.KeyInfo, error) {
req := ozone_proto.ListKeysRequest{
VolumeName: &volume,
BucketName: &bucket,
Prefix: ptr(prefix),
Count: ptri(1000),
}
listKeys := ozone_proto.Type_ListKeys
wrapperRequest := ozone_proto.OMRequest{
CmdType: &listKeys,
ListKeysRequest: &req,
ClientId: &om.clientId,
}
resp, err := om.submitRequest(&wrapperRequest)
if err != nil {
return nil, err
}
return resp.GetListKeysResponse().GetKeyInfo(), nil
}
func getRpcPort(ports []*hdds.Port) uint32 {
for _, port := range ports {
if port.GetName() == "RATIS" {
return port.GetValue()
}
}
return 0
}
func ptri(i int32) *int32 {
return &i
}
func ptr(s string) *string {
return &s
}
func (om *OmClient) submitRequest(request *ozone_proto.OMRequest, ) (*ozone_proto.OMResponse, error) {
wrapperResponse := ozone_proto.OMResponse{}
om.mu.Lock()
err := om.client.Call(gohadoop.GetCalleeRPCRequestHeaderProto(&OM_PROTOCOL), request, &wrapperResponse)
om.mu.Unlock()
if err != nil {
return nil, err
}
if *wrapperResponse.Status != ozone_proto.Status_OK {
return nil, errors.New("Error on calling OM " + wrapperResponse.Status.String() + " " + *wrapperResponse.Message)
}
return &wrapperResponse, nil
}