blob: 671fb30a3bbcdb5bbc9057ab10d4f101b6150c29 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package appprotoos
import (
"context"
"fmt"
"os"
"path/filepath"
"sync"
)
import (
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/protobuf/types/pluginpb"
)
import (
"github.com/apache/dubbo-kubernetes/pkg/bufman/pkg/app/appproto"
"github.com/apache/dubbo-kubernetes/pkg/bufman/pkg/normalpath"
"github.com/apache/dubbo-kubernetes/pkg/bufman/pkg/storage"
"github.com/apache/dubbo-kubernetes/pkg/bufman/pkg/storage/storagearchive"
"github.com/apache/dubbo-kubernetes/pkg/bufman/pkg/storage/storagemem"
"github.com/apache/dubbo-kubernetes/pkg/bufman/pkg/storage/storageos"
)
// Constants used to create .jar files.
var (
manifestPath = normalpath.Join("META-INF", "MANIFEST.MF")
manifestContent = []byte(`Manifest-Version: 1.0
Created-By: 1.6.0 (protoc)
`)
)
type responseWriter struct {
logger *zap.Logger
storageosProvider storageos.Provider
responseWriter appproto.ResponseWriter
// If set, create directories if they don't already exist.
createOutDirIfNotExists bool
// Cache the readWriteBuckets by their respective output paths.
// These builders are transformed to storage.ReadBuckets and written
// to disk once the responseWriter is flushed.
//
// Note that output paths are used as-is with respect to the
// caller's configuration. It's possible that a single invocation
// will specify the same filepath in multiple ways, e.g. "." and
// "$(pwd)". However, we intentionally treat these as distinct paths
// to mirror protoc's insertion point behavior.
//
// For example, the following command will fail because protoc treats
// "." and "$(pwd)" as distinct paths:
//
// $ protoc example.proto --insertion-point-receiver_out=. --insertion-point-writer_out=$(pwd)
//
readWriteBuckets map[string]storage.ReadWriteBucket
// Cache the functions used to flush all of the responses to disk.
// This holds all of the buckets in-memory so that we only write
// the results to disk if all of the responses are successful.
closers []func() error
lock sync.RWMutex
}
func newResponseWriter(
logger *zap.Logger,
storageosProvider storageos.Provider,
options ...ResponseWriterOption,
) *responseWriter {
responseWriterOptions := newResponseWriterOptions()
for _, option := range options {
option(responseWriterOptions)
}
return &responseWriter{
logger: logger,
storageosProvider: storageosProvider,
responseWriter: appproto.NewResponseWriter(logger),
createOutDirIfNotExists: responseWriterOptions.createOutDirIfNotExists,
readWriteBuckets: make(map[string]storage.ReadWriteBucket),
}
}
func (w *responseWriter) AddResponse(
ctx context.Context,
response *pluginpb.CodeGeneratorResponse,
pluginOut string,
) error {
// It's important that we get a consistent output path
// so that we use the same in-memory bucket for paths
// set to the same directory.
//
// filepath.Abs calls filepath.Clean.
//
// For example:
//
// --insertion-point-receiver_out=insertion --insertion-point-writer_out=./insertion/ --insertion-point_writer_out=/foo/insertion
absPluginOut, err := filepath.Abs(normalpath.Unnormalize(pluginOut))
if err != nil {
return err
}
w.lock.Lock()
defer w.lock.Unlock()
return w.addResponse(
ctx,
response,
absPluginOut,
w.createOutDirIfNotExists,
)
}
func (w *responseWriter) Close() error {
w.lock.Lock()
defer w.lock.Unlock()
for _, closeFunc := range w.closers {
if err := closeFunc(); err != nil {
// Although unlikely, if an error happens here,
// some generated files could be written to disk,
// whereas others aren't.
//
// Regardless, we stop at the first error so that
// we don't unncessarily write more results.
return err
}
}
// Re-initialize the cached values to be safe.
w.readWriteBuckets = make(map[string]storage.ReadWriteBucket)
w.closers = nil
return nil
}
func (w *responseWriter) addResponse(
ctx context.Context,
response *pluginpb.CodeGeneratorResponse,
pluginOut string,
createOutDirIfNotExists bool,
) error {
switch filepath.Ext(pluginOut) {
case ".jar":
return w.writeZip(
ctx,
response,
pluginOut,
true,
createOutDirIfNotExists,
)
case ".zip":
return w.writeZip(
ctx,
response,
pluginOut,
false,
createOutDirIfNotExists,
)
default:
return w.writeDirectory(
ctx,
response,
pluginOut,
createOutDirIfNotExists,
)
}
}
func (w *responseWriter) writeZip(
ctx context.Context,
response *pluginpb.CodeGeneratorResponse,
outFilePath string,
includeManifest bool,
createOutDirIfNotExists bool,
) (retErr error) {
outDirPath := filepath.Dir(outFilePath)
if readWriteBucket, ok := w.readWriteBuckets[outFilePath]; ok {
// We already have a readWriteBucket for this outFilePath, so
// we can write to the same bucket.
if err := w.responseWriter.WriteResponse(
ctx,
readWriteBucket,
response,
appproto.WriteResponseWithInsertionPointReadBucket(readWriteBucket),
); err != nil {
return err
}
return nil
}
// OK to use os.Stat instead of os.Lstat here.
fileInfo, err := os.Stat(outDirPath)
if err != nil {
if os.IsNotExist(err) {
if createOutDirIfNotExists {
if err := os.MkdirAll(outDirPath, 0o755); err != nil {
return err
}
} else {
return err
}
}
return err
} else if !fileInfo.IsDir() {
return fmt.Errorf("not a directory: %s", outDirPath)
}
readWriteBucket := storagemem.NewReadWriteBucket()
if includeManifest {
if err := storage.PutPath(ctx, readWriteBucket, manifestPath, manifestContent); err != nil {
return err
}
}
if err := w.responseWriter.WriteResponse(
ctx,
readWriteBucket,
response,
appproto.WriteResponseWithInsertionPointReadBucket(readWriteBucket),
); err != nil {
return err
}
// Add this readWriteBucket to the set so that other plugins
// can write to the same files (re: insertion points).
w.readWriteBuckets[outFilePath] = readWriteBucket
w.closers = append(w.closers, func() (retErr error) {
// We're done writing all of the content into this
// readWriteBucket, so we zip it when we flush.
file, err := os.Create(outFilePath)
if err != nil {
return err
}
defer func() {
retErr = multierr.Append(retErr, file.Close())
}()
// protoc does not compress.
return storagearchive.Zip(ctx, readWriteBucket, file, false)
})
return nil
}
func (w *responseWriter) writeDirectory(
ctx context.Context,
response *pluginpb.CodeGeneratorResponse,
outDirPath string,
createOutDirIfNotExists bool,
) error {
if readWriteBucket, ok := w.readWriteBuckets[outDirPath]; ok {
// We already have a readWriteBucket for this outDirPath, so
// we can write to the same bucket.
if err := w.responseWriter.WriteResponse(
ctx,
readWriteBucket,
response,
appproto.WriteResponseWithInsertionPointReadBucket(readWriteBucket),
); err != nil {
return err
}
return nil
}
readWriteBucket := storagemem.NewReadWriteBucket()
if err := w.responseWriter.WriteResponse(
ctx,
readWriteBucket,
response,
appproto.WriteResponseWithInsertionPointReadBucket(readWriteBucket),
); err != nil {
return err
}
// Add this readWriteBucket to the set so that other plugins
// can write to the same files (re: insertion points).
w.readWriteBuckets[outDirPath] = readWriteBucket
w.closers = append(w.closers, func() error {
if createOutDirIfNotExists {
if err := os.MkdirAll(outDirPath, 0o755); err != nil {
return err
}
}
// This checks that the directory exists.
osReadWriteBucket, err := w.storageosProvider.NewReadWriteBucket(
outDirPath,
storageos.ReadWriteBucketWithSymlinksIfSupported(),
)
if err != nil {
return err
}
if _, err := storage.Copy(ctx, readWriteBucket, osReadWriteBucket); err != nil {
return err
}
return nil
})
return nil
}
type responseWriterOptions struct {
createOutDirIfNotExists bool
}
func newResponseWriterOptions() *responseWriterOptions {
return &responseWriterOptions{}
}