blob: 25f56be8a3901350254d1352af56045a8eea737b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package pixiu
import (
"context"
"log"
"net/http"
"strconv"
"sync"
"time"
)
import (
"github.com/pkg/errors"
fc "github.com/dubbogo/dubbo-go-pixiu-filter/pkg/api/config"
"github.com/dubbogo/dubbo-go-pixiu-filter/pkg/router"
)
import (
"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/common/extension"
"github.com/apache/dubbo-go-pixiu/pkg/config"
ctx "github.com/apache/dubbo-go-pixiu/pkg/context"
h "github.com/apache/dubbo-go-pixiu/pkg/context/http"
"github.com/apache/dubbo-go-pixiu/pkg/filter/header"
"github.com/apache/dubbo-go-pixiu/pkg/filter/host"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pkg/model"
)
// ListenerService the facade of a listener
type ListenerService struct {
*model.Listener
}
// Start start the listener
func (l *ListenerService) Start() {
switch l.Address.SocketAddress.Protocol {
case model.HTTP:
l.httpListener()
default:
panic("unsupported protocol start: " + l.Address.SocketAddress.ProtocolStr)
}
}
func (l *ListenerService) httpListener() {
hl := NewDefaultHttpListener()
hl.pool.New = func() interface{} {
return l.allocateContext()
}
// user customize http config
var hc model.HttpConfig
if l.Config != nil {
if c, ok := l.Config.(*model.HttpConfig); ok {
hc = *c
}
}
mux := http.NewServeMux()
mux.HandleFunc("/", hl.ServeHTTP)
srv := http.Server{
Addr: resolveAddress(l.Address.SocketAddress.Address + ":" + strconv.Itoa(l.Address.SocketAddress.Port)),
Handler: mux,
ReadTimeout: resolveStr2Time(hc.ReadTimeoutStr, 20*time.Second),
WriteTimeout: resolveStr2Time(hc.WriteTimeoutStr, 20*time.Second),
IdleTimeout: resolveStr2Time(hc.IdleTimeoutStr, 20*time.Second),
MaxHeaderBytes: resolveInt2IntProp(hc.MaxHeaderBytes, 1<<20),
}
logger.Infof("[dubbo-go-pixiu] httpListener start at : %s", srv.Addr)
log.Println(srv.ListenAndServe())
}
func (l *ListenerService) allocateContext() *h.HttpContext {
return &h.HttpContext{
Listener: l.Listener,
FilterChains: l.FilterChains,
HttpConnectionManager: l.findHttpManager(),
BaseContext: ctx.NewBaseContext(),
}
}
func (l *ListenerService) findHttpManager() model.HttpConnectionManager {
for _, fc := range l.FilterChains {
for _, f := range fc.Filters {
if f.Name == constant.HTTPConnectManagerFilter {
return f.Config.(model.HttpConnectionManager)
}
}
}
return *DefaultHttpConnectionManager()
}
// DefaultHttpListener
type DefaultHttpListener struct {
pool sync.Pool
}
// NewDefaultHttpListener create http listener
func NewDefaultHttpListener() *DefaultHttpListener {
return &DefaultHttpListener{
pool: sync.Pool{},
}
}
// ServeHTTP http request entrance.
func (s *DefaultHttpListener) ServeHTTP(w http.ResponseWriter, r *http.Request) {
hc := s.pool.Get().(*h.HttpContext)
hc.Request = r
hc.ResetWritermen(w)
hc.Reset()
api, err := s.routeRequest(hc, r)
if err != nil {
s.pool.Put(hc)
return
}
hc.Ctx = context.Background()
addFilter(hc, api)
s.handleHTTPRequest(hc)
s.pool.Put(hc)
}
func addFilter(ctx *h.HttpContext, api router.API) {
ctx.AppendFilterFunc(extension.GetMustFilterFunc(constant.MetricFilter),
extension.GetMustFilterFunc(constant.RecoveryFilter), extension.GetMustFilterFunc(constant.TimeoutFilter))
alc := config.GetBootstrap().StaticResources.AccessLogConfig
if alc.Enable {
ctx.AppendFilterFunc(extension.GetMustFilterFunc(constant.AccessLogFilter))
}
ctx.AppendFilterFunc(extension.GetMustFilterFunc(constant.RateLimitFilter))
switch api.Method.IntegrationRequest.RequestType {
// TODO add some basic filter for diff protocol
case fc.DubboRequest:
case fc.HTTPRequest:
httpFilter(ctx, api.Method.IntegrationRequest)
}
ctx.AppendFilterFunc(header.New().Do(), extension.GetMustFilterFunc(constant.RemoteCallFilter))
ctx.BuildFilters()
ctx.AppendFilterFunc(extension.GetMustFilterFunc(constant.ResponseFilter))
}
// try to create filter from config.
func httpFilter(ctx *h.HttpContext, request fc.IntegrationRequest) {
if len(request.Host) != 0 {
ctx.AppendFilterFunc(host.New(request.Host).Do())
}
}
func (s *DefaultHttpListener) routeRequest(ctx *h.HttpContext, req *http.Request) (router.API, error) {
apiDiscSrv := extension.GetMustAPIDiscoveryService(constant.LocalMemoryApiDiscoveryService)
api, err := apiDiscSrv.GetAPI(req.URL.Path, fc.HTTPVerb(req.Method))
if err != nil {
ctx.WriteWithStatus(http.StatusNotFound, constant.Default404Body)
ctx.AddHeader(constant.HeaderKeyContextType, constant.HeaderValueTextPlain)
e := errors.Errorf("Requested URL %s not found", req.URL.Path)
logger.Debug(e.Error())
return router.API{}, e
}
if !api.Method.OnAir {
ctx.WriteWithStatus(http.StatusNotAcceptable, constant.Default406Body)
ctx.AddHeader(constant.HeaderKeyContextType, constant.HeaderValueTextPlain)
e := errors.Errorf("Requested API %s %s does not online", req.Method, req.URL.Path)
logger.Debug(e.Error())
return router.API{}, e
}
ctx.API(api)
return api, nil
}
func (s *DefaultHttpListener) handleHTTPRequest(c *h.HttpContext) {
if len(c.BaseContext.Filters) > 0 {
c.Next()
c.WriteHeaderNow()
return
}
// TODO redirect
}
func resolveInt2IntProp(currentV, defaultV int) int {
if currentV == 0 {
return defaultV
}
return currentV
}
func resolveStr2Time(currentV string, defaultV time.Duration) time.Duration {
if currentV == "" {
return defaultV
} else {
if duration, err := time.ParseDuration(currentV); err != nil {
return 20 * time.Second
} else {
return duration
}
}
}
func resolveAddress(addr string) string {
if addr == "" {
logger.Debug("Addr is undefined. Using port :8080 by default")
return ":8080"
}
return addr
}