blob: cacbf6bef3efee397d0d9db5d7ce1c939c9a0c71 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package thrift
import (
"context"
"fmt"
"strings"
)
/*
TMultiplexedProtocol is a protocol-independent concrete decorator
that allows a Thrift client to communicate with a multiplexing Thrift server,
by prepending the service name to the function name during function calls.
NOTE: THIS IS NOT USED BY SERVERS. On the server, use TMultiplexedProcessor to handle request
from a multiplexing client.
This example uses a single socket transport to invoke two services:
socket := thrift.NewTSocketFromAddrTimeout(addr, TIMEOUT)
transport := thrift.NewTFramedTransport(socket)
protocol := thrift.NewTBinaryProtocolTransport(transport)
mp := thrift.NewTMultiplexedProtocol(protocol, "Calculator")
service := Calculator.NewCalculatorClient(mp)
mp2 := thrift.NewTMultiplexedProtocol(protocol, "WeatherReport")
service2 := WeatherReport.NewWeatherReportClient(mp2)
err := transport.Open()
if err != nil {
t.Fatal("Unable to open client socket", err)
}
fmt.Println(service.Add(2,2))
fmt.Println(service2.GetTemperature())
*/
type TMultiplexedProtocol struct {
TProtocol
serviceName string
}
const MULTIPLEXED_SEPARATOR = ":"
func NewTMultiplexedProtocol(protocol TProtocol, serviceName string) *TMultiplexedProtocol {
return &TMultiplexedProtocol{
TProtocol: protocol,
serviceName: serviceName,
}
}
func (t *TMultiplexedProtocol) WriteMessageBegin(ctx context.Context, name string, typeId TMessageType, seqid int32) error {
if typeId == CALL || typeId == ONEWAY {
return t.TProtocol.WriteMessageBegin(ctx, t.serviceName+MULTIPLEXED_SEPARATOR+name, typeId, seqid)
} else {
return t.TProtocol.WriteMessageBegin(ctx, name, typeId, seqid)
}
}
/*
TMultiplexedProcessor is a TProcessor allowing
a single TServer to provide multiple services.
To do so, you instantiate the processor and then register additional
processors with it, as shown in the following example:
var processor = thrift.NewTMultiplexedProcessor()
firstProcessor :=
processor.RegisterProcessor("FirstService", firstProcessor)
processor.registerProcessor(
"Calculator",
Calculator.NewCalculatorProcessor(&CalculatorHandler{}),
)
processor.registerProcessor(
"WeatherReport",
WeatherReport.NewWeatherReportProcessor(&WeatherReportHandler{}),
)
serverTransport, err := thrift.NewTServerSocketTimeout(addr, TIMEOUT)
if err != nil {
t.Fatal("Unable to create server socket", err)
}
server := thrift.NewTSimpleServer2(processor, serverTransport)
server.Serve();
*/
type TMultiplexedProcessor struct {
serviceProcessorMap map[string]TProcessor
DefaultProcessor TProcessor
}
func NewTMultiplexedProcessor() *TMultiplexedProcessor {
return &TMultiplexedProcessor{
serviceProcessorMap: make(map[string]TProcessor),
}
}
// ProcessorMap returns a mapping of "{ProcessorName}{MULTIPLEXED_SEPARATOR}{FunctionName}"
// to TProcessorFunction for any registered processors. If there is also a
// DefaultProcessor, the keys for the methods on that processor will simply be
// "{FunctionName}". If the TMultiplexedProcessor has both a DefaultProcessor and
// other registered processors, then the keys will be a mix of both formats.
//
// The implementation differs with other TProcessors in that the map returned is
// a new map, while most TProcessors just return their internal mapping directly.
// This means that edits to the map returned by this implementation of ProcessorMap
// will not affect the underlying mapping within the TMultiplexedProcessor.
func (t *TMultiplexedProcessor) ProcessorMap() map[string]TProcessorFunction {
processorFuncMap := make(map[string]TProcessorFunction)
for name, processor := range t.serviceProcessorMap {
for method, processorFunc := range processor.ProcessorMap() {
processorFuncName := name + MULTIPLEXED_SEPARATOR + method
processorFuncMap[processorFuncName] = processorFunc
}
}
if t.DefaultProcessor != nil {
for method, processorFunc := range t.DefaultProcessor.ProcessorMap() {
processorFuncMap[method] = processorFunc
}
}
return processorFuncMap
}
// AddToProcessorMap updates the underlying TProcessor ProccessorMaps depending on
// the format of "name".
//
// If "name" is in the format "{ProcessorName}{MULTIPLEXED_SEPARATOR}{FunctionName}",
// then it sets the given TProcessorFunction on the inner TProcessor with the
// ProcessorName component using the FunctionName component.
//
// If "name" is just in the format "{FunctionName}", that is to say there is no
// MULTIPLEXED_SEPARATOR, and the TMultiplexedProcessor has a DefaultProcessor
// configured, then it will set the given TProcessorFunction on the DefaultProcessor
// using the given name.
//
// If there is not a TProcessor available for the given name, then this function
// does nothing. This can happen when there is no TProcessor registered for
// the given ProcessorName or if all that is given is the FunctionName and there
// is no DefaultProcessor set.
func (t *TMultiplexedProcessor) AddToProcessorMap(name string, processorFunc TProcessorFunction) {
components := strings.SplitN(name, MULTIPLEXED_SEPARATOR, 2)
if len(components) != 2 {
if t.DefaultProcessor != nil && len(components) == 1 {
t.DefaultProcessor.AddToProcessorMap(components[0], processorFunc)
}
return
}
processorName := components[0]
funcName := components[1]
if processor, ok := t.serviceProcessorMap[processorName]; ok {
processor.AddToProcessorMap(funcName, processorFunc)
}
}
// verify that TMultiplexedProcessor implements TProcessor
var _ TProcessor = (*TMultiplexedProcessor)(nil)
func (t *TMultiplexedProcessor) RegisterDefault(processor TProcessor) {
t.DefaultProcessor = processor
}
func (t *TMultiplexedProcessor) RegisterProcessor(name string, processor TProcessor) {
if t.serviceProcessorMap == nil {
t.serviceProcessorMap = make(map[string]TProcessor)
}
t.serviceProcessorMap[name] = processor
}
func (t *TMultiplexedProcessor) Process(ctx context.Context, in, out TProtocol) (bool, TException) {
name, typeId, seqid, err := in.ReadMessageBegin(ctx)
if err != nil {
return false, NewTProtocolException(err)
}
if typeId != CALL && typeId != ONEWAY {
return false, NewTProtocolException(fmt.Errorf("Unexpected message type %v", typeId))
}
//extract the service name
v := strings.SplitN(name, MULTIPLEXED_SEPARATOR, 2)
if len(v) != 2 {
if t.DefaultProcessor != nil {
smb := NewStoredMessageProtocol(in, name, typeId, seqid)
return t.DefaultProcessor.Process(ctx, smb, out)
}
return false, NewTProtocolException(fmt.Errorf(
"Service name not found in message name: %s. Did you forget to use a TMultiplexProtocol in your client?",
name,
))
}
actualProcessor, ok := t.serviceProcessorMap[v[0]]
if !ok {
return false, NewTProtocolException(fmt.Errorf(
"Service name not found: %s. Did you forget to call registerProcessor()?",
v[0],
))
}
smb := NewStoredMessageProtocol(in, v[1], typeId, seqid)
return actualProcessor.Process(ctx, smb, out)
}
//Protocol that use stored message for ReadMessageBegin
type storedMessageProtocol struct {
TProtocol
name string
typeId TMessageType
seqid int32
}
func NewStoredMessageProtocol(protocol TProtocol, name string, typeId TMessageType, seqid int32) *storedMessageProtocol {
return &storedMessageProtocol{protocol, name, typeId, seqid}
}
func (s *storedMessageProtocol) ReadMessageBegin(ctx context.Context) (name string, typeId TMessageType, seqid int32, err error) {
return s.name, s.typeId, s.seqid, nil
}