| package handlers |
| |
| import ( |
| "context" |
| cryptorand "crypto/rand" |
| "expvar" |
| "fmt" |
| "math/rand" |
| "net" |
| "net/http" |
| "net/url" |
| "os" |
| "regexp" |
| "runtime" |
| "strings" |
| "time" |
| |
| "github.com/docker/distribution" |
| "github.com/docker/distribution/configuration" |
| dcontext "github.com/docker/distribution/context" |
| "github.com/docker/distribution/health" |
| "github.com/docker/distribution/health/checks" |
| prometheus "github.com/docker/distribution/metrics" |
| "github.com/docker/distribution/notifications" |
| "github.com/docker/distribution/reference" |
| "github.com/docker/distribution/registry/api/errcode" |
| "github.com/docker/distribution/registry/api/v2" |
| "github.com/docker/distribution/registry/auth" |
| registrymiddleware "github.com/docker/distribution/registry/middleware/registry" |
| repositorymiddleware "github.com/docker/distribution/registry/middleware/repository" |
| "github.com/docker/distribution/registry/proxy" |
| "github.com/docker/distribution/registry/storage" |
| memorycache "github.com/docker/distribution/registry/storage/cache/memory" |
| rediscache "github.com/docker/distribution/registry/storage/cache/redis" |
| storagedriver "github.com/docker/distribution/registry/storage/driver" |
| "github.com/docker/distribution/registry/storage/driver/factory" |
| storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware" |
| "github.com/docker/distribution/version" |
| "github.com/docker/go-metrics" |
| "github.com/docker/libtrust" |
| "github.com/garyburd/redigo/redis" |
| "github.com/gorilla/mux" |
| "github.com/sirupsen/logrus" |
| ) |
| |
| // randomSecretSize is the number of random bytes to generate if no secret |
| // was specified. |
| const randomSecretSize = 32 |
| |
| // defaultCheckInterval is the default time in between health checks |
| const defaultCheckInterval = 10 * time.Second |
| |
| // App is a global registry application object. Shared resources can be placed |
| // on this object that will be accessible from all requests. Any writable |
| // fields should be protected. |
| type App struct { |
| context.Context |
| |
| Config *configuration.Configuration |
| |
| router *mux.Router // main application router, configured with dispatchers |
| driver storagedriver.StorageDriver // driver maintains the app global storage driver instance. |
| registry distribution.Namespace // registry is the primary registry backend for the app instance. |
| repoRemover distribution.RepositoryRemover // repoRemover provides ability to delete repos |
| accessController auth.AccessController // main access controller for application |
| |
| // httpHost is a parsed representation of the http.host parameter from |
| // the configuration. Only the Scheme and Host fields are used. |
| httpHost url.URL |
| |
| // events contains notification related configuration. |
| events struct { |
| sink notifications.Sink |
| source notifications.SourceRecord |
| } |
| |
| redis *redis.Pool |
| |
| // trustKey is a deprecated key used to sign manifests converted to |
| // schema1 for backward compatibility. It should not be used for any |
| // other purposes. |
| trustKey libtrust.PrivateKey |
| |
| // isCache is true if this registry is configured as a pull through cache |
| isCache bool |
| |
| // readOnly is true if the registry is in a read-only maintenance mode |
| readOnly bool |
| } |
| |
| // NewApp takes a configuration and returns a configured app, ready to serve |
| // requests. The app only implements ServeHTTP and can be wrapped in other |
| // handlers accordingly. |
| func NewApp(ctx context.Context, config *configuration.Configuration) *App { |
| app := &App{ |
| Config: config, |
| Context: ctx, |
| router: v2.RouterWithPrefix(config.HTTP.Prefix), |
| isCache: config.Proxy.RemoteURL != "", |
| } |
| |
| // Register the handler dispatchers. |
| app.register(v2.RouteNameBase, func(ctx *Context, r *http.Request) http.Handler { |
| return http.HandlerFunc(apiBase) |
| }) |
| app.register(v2.RouteNameManifest, manifestDispatcher) |
| app.register(v2.RouteNameCatalog, catalogDispatcher) |
| app.register(v2.RouteNameTags, tagsDispatcher) |
| app.register(v2.RouteNameBlob, blobDispatcher) |
| app.register(v2.RouteNameBlobUpload, blobUploadDispatcher) |
| app.register(v2.RouteNameBlobUploadChunk, blobUploadDispatcher) |
| |
| // override the storage driver's UA string for registry outbound HTTP requests |
| storageParams := config.Storage.Parameters() |
| if storageParams == nil { |
| storageParams = make(configuration.Parameters) |
| } |
| storageParams["useragent"] = fmt.Sprintf("docker-distribution/%s %s", version.Version, runtime.Version()) |
| |
| var err error |
| app.driver, err = factory.Create(config.Storage.Type(), storageParams) |
| if err != nil { |
| // TODO(stevvooe): Move the creation of a service into a protected |
| // method, where this is created lazily. Its status can be queried via |
| // a health check. |
| panic(err) |
| } |
| |
| purgeConfig := uploadPurgeDefaultConfig() |
| if mc, ok := config.Storage["maintenance"]; ok { |
| if v, ok := mc["uploadpurging"]; ok { |
| purgeConfig, ok = v.(map[interface{}]interface{}) |
| if !ok { |
| panic("uploadpurging config key must contain additional keys") |
| } |
| } |
| if v, ok := mc["readonly"]; ok { |
| readOnly, ok := v.(map[interface{}]interface{}) |
| if !ok { |
| panic("readonly config key must contain additional keys") |
| } |
| if readOnlyEnabled, ok := readOnly["enabled"]; ok { |
| app.readOnly, ok = readOnlyEnabled.(bool) |
| if !ok { |
| panic("readonly's enabled config key must have a boolean value") |
| } |
| } |
| } |
| } |
| |
| startUploadPurger(app, app.driver, dcontext.GetLogger(app), purgeConfig) |
| |
| app.driver, err = applyStorageMiddleware(app.driver, config.Middleware["storage"]) |
| if err != nil { |
| panic(err) |
| } |
| |
| app.configureSecret(config) |
| app.configureEvents(config) |
| app.configureRedis(config) |
| app.configureLogHook(config) |
| |
| options := registrymiddleware.GetRegistryOptions() |
| if config.Compatibility.Schema1.TrustKey != "" { |
| app.trustKey, err = libtrust.LoadKeyFile(config.Compatibility.Schema1.TrustKey) |
| if err != nil { |
| panic(fmt.Sprintf(`could not load schema1 "signingkey" parameter: %v`, err)) |
| } |
| } else { |
| // Generate an ephemeral key to be used for signing converted manifests |
| // for clients that don't support schema2. |
| app.trustKey, err = libtrust.GenerateECP256PrivateKey() |
| if err != nil { |
| panic(err) |
| } |
| } |
| |
| options = append(options, storage.Schema1SigningKey(app.trustKey)) |
| |
| if config.Compatibility.Schema1.Enabled { |
| options = append(options, storage.EnableSchema1) |
| } |
| |
| if config.HTTP.Host != "" { |
| u, err := url.Parse(config.HTTP.Host) |
| if err != nil { |
| panic(fmt.Sprintf(`could not parse http "host" parameter: %v`, err)) |
| } |
| app.httpHost = *u |
| } |
| |
| if app.isCache { |
| options = append(options, storage.DisableDigestResumption) |
| } |
| |
| // configure deletion |
| if d, ok := config.Storage["delete"]; ok { |
| e, ok := d["enabled"] |
| if ok { |
| if deleteEnabled, ok := e.(bool); ok && deleteEnabled { |
| options = append(options, storage.EnableDelete) |
| } |
| } |
| } |
| |
| // configure redirects |
| var redirectDisabled bool |
| if redirectConfig, ok := config.Storage["redirect"]; ok { |
| v := redirectConfig["disable"] |
| switch v := v.(type) { |
| case bool: |
| redirectDisabled = v |
| default: |
| panic(fmt.Sprintf("invalid type for redirect config: %#v", redirectConfig)) |
| } |
| } |
| if redirectDisabled { |
| dcontext.GetLogger(app).Infof("backend redirection disabled") |
| } else { |
| options = append(options, storage.EnableRedirect) |
| } |
| |
| if !config.Validation.Enabled { |
| config.Validation.Enabled = !config.Validation.Disabled |
| } |
| |
| // configure validation |
| if config.Validation.Enabled { |
| if len(config.Validation.Manifests.URLs.Allow) == 0 && len(config.Validation.Manifests.URLs.Deny) == 0 { |
| // If Allow and Deny are empty, allow nothing. |
| options = append(options, storage.ManifestURLsAllowRegexp(regexp.MustCompile("^$"))) |
| } else { |
| if len(config.Validation.Manifests.URLs.Allow) > 0 { |
| for i, s := range config.Validation.Manifests.URLs.Allow { |
| // Validate via compilation. |
| if _, err := regexp.Compile(s); err != nil { |
| panic(fmt.Sprintf("validation.manifests.urls.allow: %s", err)) |
| } |
| // Wrap with non-capturing group. |
| config.Validation.Manifests.URLs.Allow[i] = fmt.Sprintf("(?:%s)", s) |
| } |
| re := regexp.MustCompile(strings.Join(config.Validation.Manifests.URLs.Allow, "|")) |
| options = append(options, storage.ManifestURLsAllowRegexp(re)) |
| } |
| if len(config.Validation.Manifests.URLs.Deny) > 0 { |
| for i, s := range config.Validation.Manifests.URLs.Deny { |
| // Validate via compilation. |
| if _, err := regexp.Compile(s); err != nil { |
| panic(fmt.Sprintf("validation.manifests.urls.deny: %s", err)) |
| } |
| // Wrap with non-capturing group. |
| config.Validation.Manifests.URLs.Deny[i] = fmt.Sprintf("(?:%s)", s) |
| } |
| re := regexp.MustCompile(strings.Join(config.Validation.Manifests.URLs.Deny, "|")) |
| options = append(options, storage.ManifestURLsDenyRegexp(re)) |
| } |
| } |
| } |
| |
| // configure storage caches |
| if cc, ok := config.Storage["cache"]; ok { |
| v, ok := cc["blobdescriptor"] |
| if !ok { |
| // Backwards compatible: "layerinfo" == "blobdescriptor" |
| v = cc["layerinfo"] |
| } |
| |
| switch v { |
| case "redis": |
| if app.redis == nil { |
| panic("redis configuration required to use for layerinfo cache") |
| } |
| cacheProvider := rediscache.NewRedisBlobDescriptorCacheProvider(app.redis) |
| localOptions := append(options, storage.BlobDescriptorCacheProvider(cacheProvider)) |
| app.registry, err = storage.NewRegistry(app, app.driver, localOptions...) |
| if err != nil { |
| panic("could not create registry: " + err.Error()) |
| } |
| dcontext.GetLogger(app).Infof("using redis blob descriptor cache") |
| case "inmemory": |
| cacheProvider := memorycache.NewInMemoryBlobDescriptorCacheProvider() |
| localOptions := append(options, storage.BlobDescriptorCacheProvider(cacheProvider)) |
| app.registry, err = storage.NewRegistry(app, app.driver, localOptions...) |
| if err != nil { |
| panic("could not create registry: " + err.Error()) |
| } |
| dcontext.GetLogger(app).Infof("using inmemory blob descriptor cache") |
| default: |
| if v != "" { |
| dcontext.GetLogger(app).Warnf("unknown cache type %q, caching disabled", config.Storage["cache"]) |
| } |
| } |
| } |
| |
| if app.registry == nil { |
| // configure the registry if no cache section is available. |
| app.registry, err = storage.NewRegistry(app.Context, app.driver, options...) |
| if err != nil { |
| panic("could not create registry: " + err.Error()) |
| } |
| } |
| |
| app.registry, err = applyRegistryMiddleware(app, app.registry, config.Middleware["registry"]) |
| if err != nil { |
| panic(err) |
| } |
| |
| authType := config.Auth.Type() |
| |
| if authType != "" && !strings.EqualFold(authType, "none") { |
| accessController, err := auth.GetAccessController(config.Auth.Type(), config.Auth.Parameters()) |
| if err != nil { |
| panic(fmt.Sprintf("unable to configure authorization (%s): %v", authType, err)) |
| } |
| app.accessController = accessController |
| dcontext.GetLogger(app).Debugf("configured %q access controller", authType) |
| } |
| |
| // configure as a pull through cache |
| if config.Proxy.RemoteURL != "" { |
| app.registry, err = proxy.NewRegistryPullThroughCache(ctx, app.registry, app.driver, config.Proxy) |
| if err != nil { |
| panic(err.Error()) |
| } |
| app.isCache = true |
| dcontext.GetLogger(app).Info("Registry configured as a proxy cache to ", config.Proxy.RemoteURL) |
| } |
| var ok bool |
| app.repoRemover, ok = app.registry.(distribution.RepositoryRemover) |
| if !ok { |
| dcontext.GetLogger(app).Warnf("Registry does not implement RempositoryRemover. Will not be able to delete repos and tags") |
| } |
| |
| return app |
| } |
| |
| // RegisterHealthChecks is an awful hack to defer health check registration |
| // control to callers. This should only ever be called once per registry |
| // process, typically in a main function. The correct way would be register |
| // health checks outside of app, since multiple apps may exist in the same |
| // process. Because the configuration and app are tightly coupled, |
| // implementing this properly will require a refactor. This method may panic |
| // if called twice in the same process. |
| func (app *App) RegisterHealthChecks(healthRegistries ...*health.Registry) { |
| if len(healthRegistries) > 1 { |
| panic("RegisterHealthChecks called with more than one registry") |
| } |
| healthRegistry := health.DefaultRegistry |
| if len(healthRegistries) == 1 { |
| healthRegistry = healthRegistries[0] |
| } |
| |
| if app.Config.Health.StorageDriver.Enabled { |
| interval := app.Config.Health.StorageDriver.Interval |
| if interval == 0 { |
| interval = defaultCheckInterval |
| } |
| |
| storageDriverCheck := func() error { |
| _, err := app.driver.Stat(app, "/") // "/" should always exist |
| if _, ok := err.(storagedriver.PathNotFoundError); ok { |
| err = nil // pass this through, backend is responding, but this path doesn't exist. |
| } |
| return err |
| } |
| |
| if app.Config.Health.StorageDriver.Threshold != 0 { |
| healthRegistry.RegisterPeriodicThresholdFunc("storagedriver_"+app.Config.Storage.Type(), interval, app.Config.Health.StorageDriver.Threshold, storageDriverCheck) |
| } else { |
| healthRegistry.RegisterPeriodicFunc("storagedriver_"+app.Config.Storage.Type(), interval, storageDriverCheck) |
| } |
| } |
| |
| for _, fileChecker := range app.Config.Health.FileCheckers { |
| interval := fileChecker.Interval |
| if interval == 0 { |
| interval = defaultCheckInterval |
| } |
| dcontext.GetLogger(app).Infof("configuring file health check path=%s, interval=%d", fileChecker.File, interval/time.Second) |
| healthRegistry.Register(fileChecker.File, health.PeriodicChecker(checks.FileChecker(fileChecker.File), interval)) |
| } |
| |
| for _, httpChecker := range app.Config.Health.HTTPCheckers { |
| interval := httpChecker.Interval |
| if interval == 0 { |
| interval = defaultCheckInterval |
| } |
| |
| statusCode := httpChecker.StatusCode |
| if statusCode == 0 { |
| statusCode = 200 |
| } |
| |
| checker := checks.HTTPChecker(httpChecker.URI, statusCode, httpChecker.Timeout, httpChecker.Headers) |
| |
| if httpChecker.Threshold != 0 { |
| dcontext.GetLogger(app).Infof("configuring HTTP health check uri=%s, interval=%d, threshold=%d", httpChecker.URI, interval/time.Second, httpChecker.Threshold) |
| healthRegistry.Register(httpChecker.URI, health.PeriodicThresholdChecker(checker, interval, httpChecker.Threshold)) |
| } else { |
| dcontext.GetLogger(app).Infof("configuring HTTP health check uri=%s, interval=%d", httpChecker.URI, interval/time.Second) |
| healthRegistry.Register(httpChecker.URI, health.PeriodicChecker(checker, interval)) |
| } |
| } |
| |
| for _, tcpChecker := range app.Config.Health.TCPCheckers { |
| interval := tcpChecker.Interval |
| if interval == 0 { |
| interval = defaultCheckInterval |
| } |
| |
| checker := checks.TCPChecker(tcpChecker.Addr, tcpChecker.Timeout) |
| |
| if tcpChecker.Threshold != 0 { |
| dcontext.GetLogger(app).Infof("configuring TCP health check addr=%s, interval=%d, threshold=%d", tcpChecker.Addr, interval/time.Second, tcpChecker.Threshold) |
| healthRegistry.Register(tcpChecker.Addr, health.PeriodicThresholdChecker(checker, interval, tcpChecker.Threshold)) |
| } else { |
| dcontext.GetLogger(app).Infof("configuring TCP health check addr=%s, interval=%d", tcpChecker.Addr, interval/time.Second) |
| healthRegistry.Register(tcpChecker.Addr, health.PeriodicChecker(checker, interval)) |
| } |
| } |
| } |
| |
| // register a handler with the application, by route name. The handler will be |
| // passed through the application filters and context will be constructed at |
| // request time. |
| func (app *App) register(routeName string, dispatch dispatchFunc) { |
| handler := app.dispatcher(dispatch) |
| |
| // Chain the handler with prometheus instrumented handler |
| if app.Config.HTTP.Debug.Prometheus.Enabled { |
| namespace := metrics.NewNamespace(prometheus.NamespacePrefix, "http", nil) |
| httpMetrics := namespace.NewDefaultHttpMetrics(strings.Replace(routeName, "-", "_", -1)) |
| metrics.Register(namespace) |
| handler = metrics.InstrumentHandler(httpMetrics, handler) |
| } |
| |
| // TODO(stevvooe): This odd dispatcher/route registration is by-product of |
| // some limitations in the gorilla/mux router. We are using it to keep |
| // routing consistent between the client and server, but we may want to |
| // replace it with manual routing and structure-based dispatch for better |
| // control over the request execution. |
| |
| app.router.GetRoute(routeName).Handler(handler) |
| } |
| |
| // configureEvents prepares the event sink for action. |
| func (app *App) configureEvents(configuration *configuration.Configuration) { |
| // Configure all of the endpoint sinks. |
| var sinks []notifications.Sink |
| for _, endpoint := range configuration.Notifications.Endpoints { |
| if endpoint.Disabled { |
| dcontext.GetLogger(app).Infof("endpoint %s disabled, skipping", endpoint.Name) |
| continue |
| } |
| |
| dcontext.GetLogger(app).Infof("configuring endpoint %v (%v), timeout=%s, headers=%v", endpoint.Name, endpoint.URL, endpoint.Timeout, endpoint.Headers) |
| endpoint := notifications.NewEndpoint(endpoint.Name, endpoint.URL, notifications.EndpointConfig{ |
| Timeout: endpoint.Timeout, |
| Threshold: endpoint.Threshold, |
| Backoff: endpoint.Backoff, |
| Headers: endpoint.Headers, |
| IgnoredMediaTypes: endpoint.IgnoredMediaTypes, |
| Ignore: endpoint.Ignore, |
| }) |
| |
| sinks = append(sinks, endpoint) |
| } |
| |
| // NOTE(stevvooe): Moving to a new queuing implementation is as easy as |
| // replacing broadcaster with a rabbitmq implementation. It's recommended |
| // that the registry instances also act as the workers to keep deployment |
| // simple. |
| app.events.sink = notifications.NewBroadcaster(sinks...) |
| |
| // Populate registry event source |
| hostname, err := os.Hostname() |
| if err != nil { |
| hostname = configuration.HTTP.Addr |
| } else { |
| // try to pick the port off the config |
| _, port, err := net.SplitHostPort(configuration.HTTP.Addr) |
| if err == nil { |
| hostname = net.JoinHostPort(hostname, port) |
| } |
| } |
| |
| app.events.source = notifications.SourceRecord{ |
| Addr: hostname, |
| InstanceID: dcontext.GetStringValue(app, "instance.id"), |
| } |
| } |
| |
| type redisStartAtKey struct{} |
| |
| func (app *App) configureRedis(configuration *configuration.Configuration) { |
| if configuration.Redis.Addr == "" { |
| dcontext.GetLogger(app).Infof("redis not configured") |
| return |
| } |
| |
| pool := &redis.Pool{ |
| Dial: func() (redis.Conn, error) { |
| // TODO(stevvooe): Yet another use case for contextual timing. |
| ctx := context.WithValue(app, redisStartAtKey{}, time.Now()) |
| |
| done := func(err error) { |
| logger := dcontext.GetLoggerWithField(ctx, "redis.connect.duration", |
| dcontext.Since(ctx, redisStartAtKey{})) |
| if err != nil { |
| logger.Errorf("redis: error connecting: %v", err) |
| } else { |
| logger.Infof("redis: connect %v", configuration.Redis.Addr) |
| } |
| } |
| |
| conn, err := redis.DialTimeout("tcp", |
| configuration.Redis.Addr, |
| configuration.Redis.DialTimeout, |
| configuration.Redis.ReadTimeout, |
| configuration.Redis.WriteTimeout) |
| if err != nil { |
| dcontext.GetLogger(app).Errorf("error connecting to redis instance %s: %v", |
| configuration.Redis.Addr, err) |
| done(err) |
| return nil, err |
| } |
| |
| // authorize the connection |
| if configuration.Redis.Password != "" { |
| if _, err = conn.Do("AUTH", configuration.Redis.Password); err != nil { |
| defer conn.Close() |
| done(err) |
| return nil, err |
| } |
| } |
| |
| // select the database to use |
| if configuration.Redis.DB != 0 { |
| if _, err = conn.Do("SELECT", configuration.Redis.DB); err != nil { |
| defer conn.Close() |
| done(err) |
| return nil, err |
| } |
| } |
| |
| done(nil) |
| return conn, nil |
| }, |
| MaxIdle: configuration.Redis.Pool.MaxIdle, |
| MaxActive: configuration.Redis.Pool.MaxActive, |
| IdleTimeout: configuration.Redis.Pool.IdleTimeout, |
| TestOnBorrow: func(c redis.Conn, t time.Time) error { |
| // TODO(stevvooe): We can probably do something more interesting |
| // here with the health package. |
| _, err := c.Do("PING") |
| return err |
| }, |
| Wait: false, // if a connection is not available, proceed without cache. |
| } |
| |
| app.redis = pool |
| |
| // setup expvar |
| registry := expvar.Get("registry") |
| if registry == nil { |
| registry = expvar.NewMap("registry") |
| } |
| |
| registry.(*expvar.Map).Set("redis", expvar.Func(func() interface{} { |
| return map[string]interface{}{ |
| "Config": configuration.Redis, |
| "Active": app.redis.ActiveCount(), |
| } |
| })) |
| } |
| |
| // configureLogHook prepares logging hook parameters. |
| func (app *App) configureLogHook(configuration *configuration.Configuration) { |
| entry, ok := dcontext.GetLogger(app).(*logrus.Entry) |
| if !ok { |
| // somehow, we are not using logrus |
| return |
| } |
| |
| logger := entry.Logger |
| |
| for _, configHook := range configuration.Log.Hooks { |
| if !configHook.Disabled { |
| switch configHook.Type { |
| case "mail": |
| hook := &logHook{} |
| hook.LevelsParam = configHook.Levels |
| hook.Mail = &mailer{ |
| Addr: configHook.MailOptions.SMTP.Addr, |
| Username: configHook.MailOptions.SMTP.Username, |
| Password: configHook.MailOptions.SMTP.Password, |
| Insecure: configHook.MailOptions.SMTP.Insecure, |
| From: configHook.MailOptions.From, |
| To: configHook.MailOptions.To, |
| } |
| logger.Hooks.Add(hook) |
| default: |
| } |
| } |
| } |
| } |
| |
| // configureSecret creates a random secret if a secret wasn't included in the |
| // configuration. |
| func (app *App) configureSecret(configuration *configuration.Configuration) { |
| if configuration.HTTP.Secret == "" { |
| var secretBytes [randomSecretSize]byte |
| if _, err := cryptorand.Read(secretBytes[:]); err != nil { |
| panic(fmt.Sprintf("could not generate random bytes for HTTP secret: %v", err)) |
| } |
| configuration.HTTP.Secret = string(secretBytes[:]) |
| dcontext.GetLogger(app).Warn("No HTTP secret provided - generated random secret. This may cause problems with uploads if multiple registries are behind a load-balancer. To provide a shared secret, fill in http.secret in the configuration file or set the REGISTRY_HTTP_SECRET environment variable.") |
| } |
| } |
| |
| func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
| defer r.Body.Close() // ensure that request body is always closed. |
| |
| // Prepare the context with our own little decorations. |
| ctx := r.Context() |
| ctx = dcontext.WithRequest(ctx, r) |
| ctx, w = dcontext.WithResponseWriter(ctx, w) |
| ctx = dcontext.WithLogger(ctx, dcontext.GetRequestLogger(ctx)) |
| r = r.WithContext(ctx) |
| |
| defer func() { |
| status, ok := ctx.Value("http.response.status").(int) |
| if ok && status >= 200 && status <= 399 { |
| dcontext.GetResponseLogger(r.Context()).Infof("response completed") |
| } |
| }() |
| |
| // Set a header with the Docker Distribution API Version for all responses. |
| w.Header().Add("Docker-Distribution-API-Version", "registry/2.0") |
| app.router.ServeHTTP(w, r) |
| } |
| |
| // dispatchFunc takes a context and request and returns a constructed handler |
| // for the route. The dispatcher will use this to dynamically create request |
| // specific handlers for each endpoint without creating a new router for each |
| // request. |
| type dispatchFunc func(ctx *Context, r *http.Request) http.Handler |
| |
| // TODO(stevvooe): dispatchers should probably have some validation error |
| // chain with proper error reporting. |
| |
| // dispatcher returns a handler that constructs a request specific context and |
| // handler, using the dispatch factory function. |
| func (app *App) dispatcher(dispatch dispatchFunc) http.Handler { |
| return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
| for headerName, headerValues := range app.Config.HTTP.Headers { |
| for _, value := range headerValues { |
| w.Header().Add(headerName, value) |
| } |
| } |
| |
| context := app.context(w, r) |
| |
| if err := app.authorized(w, r, context); err != nil { |
| dcontext.GetLogger(context).Warnf("error authorizing context: %v", err) |
| return |
| } |
| |
| // Add username to request logging |
| context.Context = dcontext.WithLogger(context.Context, dcontext.GetLogger(context.Context, auth.UserNameKey)) |
| |
| // sync up context on the request. |
| r = r.WithContext(context) |
| |
| if app.nameRequired(r) { |
| nameRef, err := reference.WithName(getName(context)) |
| if err != nil { |
| dcontext.GetLogger(context).Errorf("error parsing reference from context: %v", err) |
| context.Errors = append(context.Errors, distribution.ErrRepositoryNameInvalid{ |
| Name: getName(context), |
| Reason: err, |
| }) |
| if err := errcode.ServeJSON(w, context.Errors); err != nil { |
| dcontext.GetLogger(context).Errorf("error serving error json: %v (from %v)", err, context.Errors) |
| } |
| return |
| } |
| repository, err := app.registry.Repository(context, nameRef) |
| |
| if err != nil { |
| dcontext.GetLogger(context).Errorf("error resolving repository: %v", err) |
| |
| switch err := err.(type) { |
| case distribution.ErrRepositoryUnknown: |
| context.Errors = append(context.Errors, v2.ErrorCodeNameUnknown.WithDetail(err)) |
| case distribution.ErrRepositoryNameInvalid: |
| context.Errors = append(context.Errors, v2.ErrorCodeNameInvalid.WithDetail(err)) |
| case errcode.Error: |
| context.Errors = append(context.Errors, err) |
| } |
| |
| if err := errcode.ServeJSON(w, context.Errors); err != nil { |
| dcontext.GetLogger(context).Errorf("error serving error json: %v (from %v)", err, context.Errors) |
| } |
| return |
| } |
| |
| // assign and decorate the authorized repository with an event bridge. |
| context.Repository, context.RepositoryRemover = notifications.Listen( |
| repository, |
| context.App.repoRemover, |
| app.eventBridge(context, r)) |
| |
| context.Repository, err = applyRepoMiddleware(app, context.Repository, app.Config.Middleware["repository"]) |
| if err != nil { |
| dcontext.GetLogger(context).Errorf("error initializing repository middleware: %v", err) |
| context.Errors = append(context.Errors, errcode.ErrorCodeUnknown.WithDetail(err)) |
| |
| if err := errcode.ServeJSON(w, context.Errors); err != nil { |
| dcontext.GetLogger(context).Errorf("error serving error json: %v (from %v)", err, context.Errors) |
| } |
| return |
| } |
| } |
| |
| dispatch(context, r).ServeHTTP(w, r) |
| // Automated error response handling here. Handlers may return their |
| // own errors if they need different behavior (such as range errors |
| // for layer upload). |
| if context.Errors.Len() > 0 { |
| if err := errcode.ServeJSON(w, context.Errors); err != nil { |
| dcontext.GetLogger(context).Errorf("error serving error json: %v (from %v)", err, context.Errors) |
| } |
| |
| app.logError(context, context.Errors) |
| } |
| }) |
| } |
| |
| type errCodeKey struct{} |
| |
| func (errCodeKey) String() string { return "err.code" } |
| |
| type errMessageKey struct{} |
| |
| func (errMessageKey) String() string { return "err.message" } |
| |
| type errDetailKey struct{} |
| |
| func (errDetailKey) String() string { return "err.detail" } |
| |
| func (app *App) logError(ctx context.Context, errors errcode.Errors) { |
| for _, e1 := range errors { |
| var c context.Context |
| |
| switch e1.(type) { |
| case errcode.Error: |
| e, _ := e1.(errcode.Error) |
| c = context.WithValue(ctx, errCodeKey{}, e.Code) |
| c = context.WithValue(c, errMessageKey{}, e.Message) |
| c = context.WithValue(c, errDetailKey{}, e.Detail) |
| case errcode.ErrorCode: |
| e, _ := e1.(errcode.ErrorCode) |
| c = context.WithValue(ctx, errCodeKey{}, e) |
| c = context.WithValue(c, errMessageKey{}, e.Message()) |
| default: |
| // just normal go 'error' |
| c = context.WithValue(ctx, errCodeKey{}, errcode.ErrorCodeUnknown) |
| c = context.WithValue(c, errMessageKey{}, e1.Error()) |
| } |
| |
| c = dcontext.WithLogger(c, dcontext.GetLogger(c, |
| errCodeKey{}, |
| errMessageKey{}, |
| errDetailKey{})) |
| dcontext.GetResponseLogger(c).Errorf("response completed with error") |
| } |
| } |
| |
| // context constructs the context object for the application. This only be |
| // called once per request. |
| func (app *App) context(w http.ResponseWriter, r *http.Request) *Context { |
| ctx := r.Context() |
| ctx = dcontext.WithVars(ctx, r) |
| ctx = dcontext.WithLogger(ctx, dcontext.GetLogger(ctx, |
| "vars.name", |
| "vars.reference", |
| "vars.digest", |
| "vars.uuid")) |
| |
| context := &Context{ |
| App: app, |
| Context: ctx, |
| } |
| |
| if app.httpHost.Scheme != "" && app.httpHost.Host != "" { |
| // A "host" item in the configuration takes precedence over |
| // X-Forwarded-Proto and X-Forwarded-Host headers, and the |
| // hostname in the request. |
| context.urlBuilder = v2.NewURLBuilder(&app.httpHost, false) |
| } else { |
| context.urlBuilder = v2.NewURLBuilderFromRequest(r, app.Config.HTTP.RelativeURLs) |
| } |
| |
| return context |
| } |
| |
| // authorized checks if the request can proceed with access to the requested |
| // repository. If it succeeds, the context may access the requested |
| // repository. An error will be returned if access is not available. |
| func (app *App) authorized(w http.ResponseWriter, r *http.Request, context *Context) error { |
| dcontext.GetLogger(context).Debug("authorizing request") |
| repo := getName(context) |
| |
| if app.accessController == nil { |
| return nil // access controller is not enabled. |
| } |
| |
| var accessRecords []auth.Access |
| |
| if repo != "" { |
| accessRecords = appendAccessRecords(accessRecords, r.Method, repo) |
| if fromRepo := r.FormValue("from"); fromRepo != "" { |
| // mounting a blob from one repository to another requires pull (GET) |
| // access to the source repository. |
| accessRecords = appendAccessRecords(accessRecords, "GET", fromRepo) |
| } |
| } else { |
| // Only allow the name not to be set on the base route. |
| if app.nameRequired(r) { |
| // For this to be properly secured, repo must always be set for a |
| // resource that may make a modification. The only condition under |
| // which name is not set and we still allow access is when the |
| // base route is accessed. This section prevents us from making |
| // that mistake elsewhere in the code, allowing any operation to |
| // proceed. |
| if err := errcode.ServeJSON(w, errcode.ErrorCodeUnauthorized); err != nil { |
| dcontext.GetLogger(context).Errorf("error serving error json: %v (from %v)", err, context.Errors) |
| } |
| return fmt.Errorf("forbidden: no repository name") |
| } |
| accessRecords = appendCatalogAccessRecord(accessRecords, r) |
| } |
| |
| ctx, err := app.accessController.Authorized(context.Context, accessRecords...) |
| if err != nil { |
| switch err := err.(type) { |
| case auth.Challenge: |
| // Add the appropriate WWW-Auth header |
| err.SetHeaders(w) |
| |
| if err := errcode.ServeJSON(w, errcode.ErrorCodeUnauthorized.WithDetail(accessRecords)); err != nil { |
| dcontext.GetLogger(context).Errorf("error serving error json: %v (from %v)", err, context.Errors) |
| } |
| default: |
| // This condition is a potential security problem either in |
| // the configuration or whatever is backing the access |
| // controller. Just return a bad request with no information |
| // to avoid exposure. The request should not proceed. |
| dcontext.GetLogger(context).Errorf("error checking authorization: %v", err) |
| w.WriteHeader(http.StatusBadRequest) |
| } |
| |
| return err |
| } |
| |
| dcontext.GetLogger(ctx).Info("authorized request") |
| // TODO(stevvooe): This pattern needs to be cleaned up a bit. One context |
| // should be replaced by another, rather than replacing the context on a |
| // mutable object. |
| context.Context = ctx |
| return nil |
| } |
| |
| // eventBridge returns a bridge for the current request, configured with the |
| // correct actor and source. |
| func (app *App) eventBridge(ctx *Context, r *http.Request) notifications.Listener { |
| actor := notifications.ActorRecord{ |
| Name: getUserName(ctx, r), |
| } |
| request := notifications.NewRequestRecord(dcontext.GetRequestID(ctx), r) |
| |
| return notifications.NewBridge(ctx.urlBuilder, app.events.source, actor, request, app.events.sink, app.Config.Notifications.EventConfig.IncludeReferences) |
| } |
| |
| // nameRequired returns true if the route requires a name. |
| func (app *App) nameRequired(r *http.Request) bool { |
| route := mux.CurrentRoute(r) |
| if route == nil { |
| return true |
| } |
| routeName := route.GetName() |
| return routeName != v2.RouteNameBase && routeName != v2.RouteNameCatalog |
| } |
| |
| // apiBase implements a simple yes-man for doing overall checks against the |
| // api. This can support auth roundtrips to support docker login. |
| func apiBase(w http.ResponseWriter, r *http.Request) { |
| const emptyJSON = "{}" |
| // Provide a simple /v2/ 200 OK response with empty json response. |
| w.Header().Set("Content-Type", "application/json; charset=utf-8") |
| w.Header().Set("Content-Length", fmt.Sprint(len(emptyJSON))) |
| |
| fmt.Fprint(w, emptyJSON) |
| } |
| |
| // appendAccessRecords checks the method and adds the appropriate Access records to the records list. |
| func appendAccessRecords(records []auth.Access, method string, repo string) []auth.Access { |
| resource := auth.Resource{ |
| Type: "repository", |
| Name: repo, |
| } |
| |
| switch method { |
| case "GET", "HEAD": |
| records = append(records, |
| auth.Access{ |
| Resource: resource, |
| Action: "pull", |
| }) |
| case "POST", "PUT", "PATCH": |
| records = append(records, |
| auth.Access{ |
| Resource: resource, |
| Action: "pull", |
| }, |
| auth.Access{ |
| Resource: resource, |
| Action: "push", |
| }) |
| case "DELETE": |
| records = append(records, |
| auth.Access{ |
| Resource: resource, |
| Action: "delete", |
| }) |
| } |
| return records |
| } |
| |
| // Add the access record for the catalog if it's our current route |
| func appendCatalogAccessRecord(accessRecords []auth.Access, r *http.Request) []auth.Access { |
| route := mux.CurrentRoute(r) |
| routeName := route.GetName() |
| |
| if routeName == v2.RouteNameCatalog { |
| resource := auth.Resource{ |
| Type: "registry", |
| Name: "catalog", |
| } |
| |
| accessRecords = append(accessRecords, |
| auth.Access{ |
| Resource: resource, |
| Action: "*", |
| }) |
| } |
| return accessRecords |
| } |
| |
| // applyRegistryMiddleware wraps a registry instance with the configured middlewares |
| func applyRegistryMiddleware(ctx context.Context, registry distribution.Namespace, middlewares []configuration.Middleware) (distribution.Namespace, error) { |
| for _, mw := range middlewares { |
| rmw, err := registrymiddleware.Get(ctx, mw.Name, mw.Options, registry) |
| if err != nil { |
| return nil, fmt.Errorf("unable to configure registry middleware (%s): %s", mw.Name, err) |
| } |
| registry = rmw |
| } |
| return registry, nil |
| |
| } |
| |
| // applyRepoMiddleware wraps a repository with the configured middlewares |
| func applyRepoMiddleware(ctx context.Context, repository distribution.Repository, middlewares []configuration.Middleware) (distribution.Repository, error) { |
| for _, mw := range middlewares { |
| rmw, err := repositorymiddleware.Get(ctx, mw.Name, mw.Options, repository) |
| if err != nil { |
| return nil, err |
| } |
| repository = rmw |
| } |
| return repository, nil |
| } |
| |
| // applyStorageMiddleware wraps a storage driver with the configured middlewares |
| func applyStorageMiddleware(driver storagedriver.StorageDriver, middlewares []configuration.Middleware) (storagedriver.StorageDriver, error) { |
| for _, mw := range middlewares { |
| smw, err := storagemiddleware.Get(mw.Name, mw.Options, driver) |
| if err != nil { |
| return nil, fmt.Errorf("unable to configure storage middleware (%s): %v", mw.Name, err) |
| } |
| driver = smw |
| } |
| return driver, nil |
| } |
| |
| // uploadPurgeDefaultConfig provides a default configuration for upload |
| // purging to be used in the absence of configuration in the |
| // configuration file |
| func uploadPurgeDefaultConfig() map[interface{}]interface{} { |
| config := map[interface{}]interface{}{} |
| config["enabled"] = true |
| config["age"] = "168h" |
| config["interval"] = "24h" |
| config["dryrun"] = false |
| return config |
| } |
| |
| func badPurgeUploadConfig(reason string) { |
| panic(fmt.Sprintf("Unable to parse upload purge configuration: %s", reason)) |
| } |
| |
| // startUploadPurger schedules a goroutine which will periodically |
| // check upload directories for old files and delete them |
| func startUploadPurger(ctx context.Context, storageDriver storagedriver.StorageDriver, log dcontext.Logger, config map[interface{}]interface{}) { |
| if config["enabled"] == false { |
| return |
| } |
| |
| var purgeAgeDuration time.Duration |
| var err error |
| purgeAge, ok := config["age"] |
| if ok { |
| ageStr, ok := purgeAge.(string) |
| if !ok { |
| badPurgeUploadConfig("age is not a string") |
| } |
| purgeAgeDuration, err = time.ParseDuration(ageStr) |
| if err != nil { |
| badPurgeUploadConfig(fmt.Sprintf("Cannot parse duration: %s", err.Error())) |
| } |
| } else { |
| badPurgeUploadConfig("age missing") |
| } |
| |
| var intervalDuration time.Duration |
| interval, ok := config["interval"] |
| if ok { |
| intervalStr, ok := interval.(string) |
| if !ok { |
| badPurgeUploadConfig("interval is not a string") |
| } |
| |
| intervalDuration, err = time.ParseDuration(intervalStr) |
| if err != nil { |
| badPurgeUploadConfig(fmt.Sprintf("Cannot parse interval: %s", err.Error())) |
| } |
| } else { |
| badPurgeUploadConfig("interval missing") |
| } |
| |
| var dryRunBool bool |
| dryRun, ok := config["dryrun"] |
| if ok { |
| dryRunBool, ok = dryRun.(bool) |
| if !ok { |
| badPurgeUploadConfig("cannot parse dryrun") |
| } |
| } else { |
| badPurgeUploadConfig("dryrun missing") |
| } |
| |
| go func() { |
| rand.Seed(time.Now().Unix()) |
| jitter := time.Duration(rand.Int()%60) * time.Minute |
| log.Infof("Starting upload purge in %s", jitter) |
| time.Sleep(jitter) |
| |
| for { |
| storage.PurgeUploads(ctx, storageDriver, time.Now().Add(-purgeAgeDuration), !dryRunBool) |
| log.Infof("Starting upload purge in %s", intervalDuration) |
| time.Sleep(intervalDuration) |
| } |
| }() |
| } |