| /* |
| Copyright 2014 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package handlers |
| |
| import ( |
| "context" |
| "encoding/hex" |
| "fmt" |
| "io" |
| "io/ioutil" |
| "net/http" |
| "net/url" |
| goruntime "runtime" |
| "time" |
| |
| "k8s.io/apimachinery/pkg/api/errors" |
| "k8s.io/apimachinery/pkg/api/meta" |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1" |
| "k8s.io/apimachinery/pkg/runtime" |
| "k8s.io/apimachinery/pkg/runtime/schema" |
| "k8s.io/apiserver/pkg/admission" |
| "k8s.io/apiserver/pkg/authorization/authorizer" |
| "k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager" |
| "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" |
| "k8s.io/apiserver/pkg/endpoints/metrics" |
| "k8s.io/apiserver/pkg/endpoints/request" |
| "k8s.io/apiserver/pkg/registry/rest" |
| "k8s.io/klog" |
| ) |
| |
| // RequestScope encapsulates common fields across all RESTful handler methods. |
| type RequestScope struct { |
| Namer ScopeNamer |
| |
| Serializer runtime.NegotiatedSerializer |
| runtime.ParameterCodec |
| |
| // StandardSerializers, if set, restricts which serializers can be used when |
| // we aren't transforming the output (into Table or PartialObjectMetadata). |
| // Used only by CRDs which do not yet support Protobuf. |
| StandardSerializers []runtime.SerializerInfo |
| |
| Creater runtime.ObjectCreater |
| Convertor runtime.ObjectConvertor |
| Defaulter runtime.ObjectDefaulter |
| Typer runtime.ObjectTyper |
| UnsafeConvertor runtime.ObjectConvertor |
| Authorizer authorizer.Authorizer |
| |
| EquivalentResourceMapper runtime.EquivalentResourceMapper |
| |
| TableConvertor rest.TableConvertor |
| FieldManager *fieldmanager.FieldManager |
| |
| Resource schema.GroupVersionResource |
| Kind schema.GroupVersionKind |
| Subresource string |
| |
| MetaGroupVersion schema.GroupVersion |
| |
| // HubGroupVersion indicates what version objects read from etcd or incoming requests should be converted to for in-memory handling. |
| HubGroupVersion schema.GroupVersion |
| |
| MaxRequestBodyBytes int64 |
| } |
| |
| func (scope *RequestScope) err(err error, w http.ResponseWriter, req *http.Request) { |
| responsewriters.ErrorNegotiated(err, scope.Serializer, scope.Kind.GroupVersion(), w, req) |
| } |
| |
| func (scope *RequestScope) AllowsMediaTypeTransform(mimeType, mimeSubType string, gvk *schema.GroupVersionKind) bool { |
| // some handlers like CRDs can't serve all the mime types that PartialObjectMetadata or Table can - if |
| // gvk is nil (no conversion) allow StandardSerializers to further restrict the set of mime types. |
| if gvk == nil { |
| if len(scope.StandardSerializers) == 0 { |
| return true |
| } |
| for _, info := range scope.StandardSerializers { |
| if info.MediaTypeType == mimeType && info.MediaTypeSubType == mimeSubType { |
| return true |
| } |
| } |
| return false |
| } |
| |
| // TODO: this is temporary, replace with an abstraction calculated at endpoint installation time |
| if gvk.GroupVersion() == metav1beta1.SchemeGroupVersion || gvk.GroupVersion() == metav1.SchemeGroupVersion { |
| switch gvk.Kind { |
| case "Table": |
| return scope.TableConvertor != nil && |
| mimeType == "application" && |
| (mimeSubType == "json" || mimeSubType == "yaml") |
| case "PartialObjectMetadata", "PartialObjectMetadataList": |
| // TODO: should delineate between lists and non-list endpoints |
| return true |
| default: |
| return false |
| } |
| } |
| return false |
| } |
| |
| func (scope *RequestScope) AllowsServerVersion(version string) bool { |
| return version == scope.MetaGroupVersion.Version |
| } |
| |
| func (scope *RequestScope) AllowsStreamSchema(s string) bool { |
| return s == "watch" |
| } |
| |
| var _ admission.ObjectInterfaces = &RequestScope{} |
| |
| func (r *RequestScope) GetObjectCreater() runtime.ObjectCreater { return r.Creater } |
| func (r *RequestScope) GetObjectTyper() runtime.ObjectTyper { return r.Typer } |
| func (r *RequestScope) GetObjectDefaulter() runtime.ObjectDefaulter { return r.Defaulter } |
| func (r *RequestScope) GetObjectConvertor() runtime.ObjectConvertor { return r.Convertor } |
| func (r *RequestScope) GetEquivalentResourceMapper() runtime.EquivalentResourceMapper { |
| return r.EquivalentResourceMapper |
| } |
| |
| // ConnectResource returns a function that handles a connect request on a rest.Storage object. |
| func ConnectResource(connecter rest.Connecter, scope *RequestScope, admit admission.Interface, restPath string, isSubresource bool) http.HandlerFunc { |
| return func(w http.ResponseWriter, req *http.Request) { |
| if isDryRun(req.URL) { |
| scope.err(errors.NewBadRequest("dryRun is not supported"), w, req) |
| return |
| } |
| |
| namespace, name, err := scope.Namer.Name(req) |
| if err != nil { |
| scope.err(err, w, req) |
| return |
| } |
| ctx := req.Context() |
| ctx = request.WithNamespace(ctx, namespace) |
| ae := request.AuditEventFrom(ctx) |
| admit = admission.WithAudit(admit, ae) |
| |
| opts, subpath, subpathKey := connecter.NewConnectOptions() |
| if err := getRequestOptions(req, scope, opts, subpath, subpathKey, isSubresource); err != nil { |
| err = errors.NewBadRequest(err.Error()) |
| scope.err(err, w, req) |
| return |
| } |
| if admit != nil && admit.Handles(admission.Connect) { |
| userInfo, _ := request.UserFrom(ctx) |
| // TODO: remove the mutating admission here as soon as we have ported all plugin that handle CONNECT |
| if mutatingAdmission, ok := admit.(admission.MutationInterface); ok { |
| err = mutatingAdmission.Admit(admission.NewAttributesRecord(opts, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Connect, nil, false, userInfo), scope) |
| if err != nil { |
| scope.err(err, w, req) |
| return |
| } |
| } |
| if validatingAdmission, ok := admit.(admission.ValidationInterface); ok { |
| err = validatingAdmission.Validate(admission.NewAttributesRecord(opts, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Connect, nil, false, userInfo), scope) |
| if err != nil { |
| scope.err(err, w, req) |
| return |
| } |
| } |
| } |
| requestInfo, _ := request.RequestInfoFrom(ctx) |
| metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() { |
| handler, err := connecter.Connect(ctx, name, opts, &responder{scope: scope, req: req, w: w}) |
| if err != nil { |
| scope.err(err, w, req) |
| return |
| } |
| handler.ServeHTTP(w, req) |
| }) |
| } |
| } |
| |
| // responder implements rest.Responder for assisting a connector in writing objects or errors. |
| type responder struct { |
| scope *RequestScope |
| req *http.Request |
| w http.ResponseWriter |
| } |
| |
| func (r *responder) Object(statusCode int, obj runtime.Object) { |
| responsewriters.WriteObjectNegotiated(r.scope.Serializer, r.scope, r.scope.Kind.GroupVersion(), r.w, r.req, statusCode, obj) |
| } |
| |
| func (r *responder) Error(err error) { |
| r.scope.err(err, r.w, r.req) |
| } |
| |
| // resultFunc is a function that returns a rest result and can be run in a goroutine |
| type resultFunc func() (runtime.Object, error) |
| |
| // finishRequest makes a given resultFunc asynchronous and handles errors returned by the response. |
| // An api.Status object with status != success is considered an "error", which interrupts the normal response flow. |
| func finishRequest(timeout time.Duration, fn resultFunc) (result runtime.Object, err error) { |
| // these channels need to be buffered to prevent the goroutine below from hanging indefinitely |
| // when the select statement reads something other than the one the goroutine sends on. |
| ch := make(chan runtime.Object, 1) |
| errCh := make(chan error, 1) |
| panicCh := make(chan interface{}, 1) |
| go func() { |
| // panics don't cross goroutine boundaries, so we have to handle ourselves |
| defer func() { |
| panicReason := recover() |
| if panicReason != nil { |
| // Same as stdlib http server code. Manually allocate stack |
| // trace buffer size to prevent excessively large logs |
| const size = 64 << 10 |
| buf := make([]byte, size) |
| buf = buf[:goruntime.Stack(buf, false)] |
| panicReason = fmt.Sprintf("%v\n%s", panicReason, buf) |
| // Propagate to parent goroutine |
| panicCh <- panicReason |
| } |
| }() |
| |
| if result, err := fn(); err != nil { |
| errCh <- err |
| } else { |
| ch <- result |
| } |
| }() |
| |
| select { |
| case result = <-ch: |
| if status, ok := result.(*metav1.Status); ok { |
| if status.Status != metav1.StatusSuccess { |
| return nil, errors.FromObject(status) |
| } |
| } |
| return result, nil |
| case err = <-errCh: |
| return nil, err |
| case p := <-panicCh: |
| panic(p) |
| case <-time.After(timeout): |
| return nil, errors.NewTimeoutError(fmt.Sprintf("request did not complete within requested timeout %s", timeout), 0) |
| } |
| } |
| |
| // transformDecodeError adds additional information into a bad-request api error when a decode fails. |
| func transformDecodeError(typer runtime.ObjectTyper, baseErr error, into runtime.Object, gvk *schema.GroupVersionKind, body []byte) error { |
| objGVKs, _, err := typer.ObjectKinds(into) |
| if err != nil { |
| return errors.NewBadRequest(err.Error()) |
| } |
| objGVK := objGVKs[0] |
| if gvk != nil && len(gvk.Kind) > 0 { |
| return errors.NewBadRequest(fmt.Sprintf("%s in version %q cannot be handled as a %s: %v", gvk.Kind, gvk.Version, objGVK.Kind, baseErr)) |
| } |
| summary := summarizeData(body, 30) |
| return errors.NewBadRequest(fmt.Sprintf("the object provided is unrecognized (must be of type %s): %v (%s)", objGVK.Kind, baseErr, summary)) |
| } |
| |
| // setSelfLink sets the self link of an object (or the child items in a list) to the base URL of the request |
| // plus the path and query generated by the provided linkFunc |
| func setSelfLink(obj runtime.Object, requestInfo *request.RequestInfo, namer ScopeNamer) error { |
| // TODO: SelfLink generation should return a full URL? |
| uri, err := namer.GenerateLink(requestInfo, obj) |
| if err != nil { |
| return nil |
| } |
| |
| return namer.SetSelfLink(obj, uri) |
| } |
| |
| func hasUID(obj runtime.Object) (bool, error) { |
| if obj == nil { |
| return false, nil |
| } |
| accessor, err := meta.Accessor(obj) |
| if err != nil { |
| return false, errors.NewInternalError(err) |
| } |
| if len(accessor.GetUID()) == 0 { |
| return false, nil |
| } |
| return true, nil |
| } |
| |
| // checkName checks the provided name against the request |
| func checkName(obj runtime.Object, name, namespace string, namer ScopeNamer) error { |
| objNamespace, objName, err := namer.ObjectName(obj) |
| if err != nil { |
| return errors.NewBadRequest(fmt.Sprintf( |
| "the name of the object (%s based on URL) was undeterminable: %v", name, err)) |
| } |
| if objName != name { |
| return errors.NewBadRequest(fmt.Sprintf( |
| "the name of the object (%s) does not match the name on the URL (%s)", objName, name)) |
| } |
| if len(namespace) > 0 { |
| if len(objNamespace) > 0 && objNamespace != namespace { |
| return errors.NewBadRequest(fmt.Sprintf( |
| "the namespace of the object (%s) does not match the namespace on the request (%s)", objNamespace, namespace)) |
| } |
| } |
| |
| return nil |
| } |
| |
| // setObjectSelfLink sets the self link of an object as needed. |
| // TODO: remove the need for the namer LinkSetters by requiring objects implement either Object or List |
| // interfaces |
| func setObjectSelfLink(ctx context.Context, obj runtime.Object, req *http.Request, namer ScopeNamer) error { |
| // We only generate list links on objects that implement ListInterface - historically we duck typed this |
| // check via reflection, but as we move away from reflection we require that you not only carry Items but |
| // ListMeta into order to be identified as a list. |
| if !meta.IsListType(obj) { |
| requestInfo, ok := request.RequestInfoFrom(ctx) |
| if !ok { |
| return fmt.Errorf("missing requestInfo") |
| } |
| return setSelfLink(obj, requestInfo, namer) |
| } |
| |
| uri, err := namer.GenerateListLink(req) |
| if err != nil { |
| return err |
| } |
| if err := namer.SetSelfLink(obj, uri); err != nil { |
| klog.V(4).Infof("Unable to set self link on object: %v", err) |
| } |
| requestInfo, ok := request.RequestInfoFrom(ctx) |
| if !ok { |
| return fmt.Errorf("missing requestInfo") |
| } |
| |
| count := 0 |
| err = meta.EachListItem(obj, func(obj runtime.Object) error { |
| count++ |
| return setSelfLink(obj, requestInfo, namer) |
| }) |
| |
| if count == 0 { |
| if err := meta.SetList(obj, []runtime.Object{}); err != nil { |
| return err |
| } |
| } |
| |
| return err |
| } |
| |
| func summarizeData(data []byte, maxLength int) string { |
| switch { |
| case len(data) == 0: |
| return "<empty>" |
| case data[0] == '{': |
| if len(data) > maxLength { |
| return string(data[:maxLength]) + " ..." |
| } |
| return string(data) |
| default: |
| if len(data) > maxLength { |
| return hex.EncodeToString(data[:maxLength]) + " ..." |
| } |
| return hex.EncodeToString(data) |
| } |
| } |
| |
| func limitedReadBody(req *http.Request, limit int64) ([]byte, error) { |
| defer req.Body.Close() |
| if limit <= 0 { |
| return ioutil.ReadAll(req.Body) |
| } |
| lr := &io.LimitedReader{ |
| R: req.Body, |
| N: limit + 1, |
| } |
| data, err := ioutil.ReadAll(lr) |
| if err != nil { |
| return nil, err |
| } |
| if lr.N <= 0 { |
| return nil, errors.NewRequestEntityTooLargeError(fmt.Sprintf("limit is %d", limit)) |
| } |
| return data, nil |
| } |
| |
| func parseTimeout(str string) time.Duration { |
| if str != "" { |
| timeout, err := time.ParseDuration(str) |
| if err == nil { |
| return timeout |
| } |
| klog.Errorf("Failed to parse %q: %v", str, err) |
| } |
| return 30 * time.Second |
| } |
| |
| func isDryRun(url *url.URL) bool { |
| return len(url.Query()["dryRun"]) != 0 |
| } |