blob: 6c12907a2d200cf38a0972dc66d172804ed46d34 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// gcemd is a metadata-configured provisioning server for GCE.
package main
import (
"flag"
"log"
"net"
"cloud.google.com/go/compute/metadata"
pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
"github.com/apache/beam/sdks/go/pkg/beam/provision"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
var (
endpoint = flag.String("endpoint", "", "Server endpoint to expose.")
)
func main() {
flag.Parse()
if *endpoint == "" {
log.Fatal("No endpoint provided. Use --endpoint=localhost:12345")
}
if !metadata.OnGCE() {
log.Fatal("Not running on GCE")
}
log.Printf("Starting provisioning server on %v", *endpoint)
jobID, err := metadata.InstanceAttributeValue("job_id")
if err != nil {
log.Fatalf("Failed to find job ID: %v", err)
}
jobName, err := metadata.InstanceAttributeValue("job_name")
if err != nil {
log.Fatalf("Failed to find job name: %v", err)
}
opt, err := metadata.InstanceAttributeValue("sdk_pipeline_options")
if err != nil {
log.Fatalf("Failed to find SDK pipeline options: %v", err)
}
options, err := provision.JSONToProto(opt)
if err != nil {
log.Fatalf("Failed to parse SDK pipeline options: %v", err)
}
info := &pb.ProvisionInfo{
JobId: jobID,
JobName: jobName,
PipelineOptions: options,
}
gs := grpc.NewServer()
pb.RegisterProvisionServiceServer(gs, &server{info: info})
listener, err := net.Listen("tcp", *endpoint)
if err != nil {
log.Fatalf("Failed to listen to %v: %v", *endpoint, err)
}
log.Fatalf("Server failed: %v", gs.Serve(listener))
}
type server struct {
info *pb.ProvisionInfo
}
func (s *server) GetProvisionInfo(ctx context.Context, req *pb.GetProvisionInfoRequest) (*pb.GetProvisionInfoResponse, error) {
return &pb.GetProvisionInfoResponse{Info: s.info}, nil
}