| #!/usr/bin/env python |
| |
| ## |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| from __future__ import absolute_import, division, print_function |
| |
| import io |
| |
| from zope.interface import implements |
| |
| import avro.io |
| from avro import ipc |
| from twisted.internet.defer import Deferred, maybeDeferred |
| from twisted.internet.protocol import Protocol |
| from twisted.web import resource, server |
| from twisted.web.client import Agent |
| from twisted.web.http_headers import Headers |
| from twisted.web.iweb import IBodyProducer |
| |
| |
| class TwistedRequestor(ipc.BaseRequestor): |
| """A Twisted-compatible requestor. Returns a Deferred that will fire with the |
| returning value, instead of blocking until the request completes.""" |
| |
| def _process_handshake(self, call_response, message_name, request_datum): |
| # process the handshake and call response |
| buffer_decoder = avro.io.BinaryDecoder(io.BytesIO(call_response)) |
| call_response_exists = self.read_handshake_response(buffer_decoder) |
| if call_response_exists: |
| return self.read_call_response(message_name, buffer_decoder) |
| else: |
| return self.request(message_name, request_datum) |
| |
| def issue_request(self, call_request, message_name, request_datum): |
| d = self.transceiver.transceive(call_request) |
| d.addCallback(self._process_handshake, message_name, request_datum) |
| return d |
| |
| |
| class RequestStreamingProducer(object): |
| """A streaming producer for issuing requests with the Twisted.web Agent.""" |
| implements(IBodyProducer) |
| |
| paused = False |
| stopped = False |
| started = False |
| |
| def __init__(self, message): |
| self._message = message |
| self._length = len(message) |
| # We need a buffer length header for every buffer and an additional |
| # zero-length buffer as the message terminator |
| self._length += (self._length / ipc.BUFFER_SIZE + 2) \ |
| * ipc.BUFFER_HEADER_LENGTH |
| self._total_bytes_sent = 0 |
| self._deferred = Deferred() |
| |
| # read-only properties |
| message = property(lambda self: self._message) |
| length = property(lambda self: self._length) |
| consumer = property(lambda self: self._consumer) |
| deferred = property(lambda self: self._deferred) |
| |
| def _get_total_bytes_sent(self): |
| return self._total_bytes_sent |
| |
| def _set_total_bytes_sent(self, bytes_sent): |
| self._total_bytes_sent = bytes_sent |
| |
| total_bytes_sent = property(_get_total_bytes_sent, _set_total_bytes_sent) |
| |
| def startProducing(self, consumer): |
| if self.started: |
| return |
| |
| self.started = True |
| self._consumer = consumer |
| # Keep writing data to the consumer until we're finished, |
| # paused (pauseProducing()) or stopped (stopProducing()) |
| while self.length - self.total_bytes_sent > 0 and \ |
| not self.paused and not self.stopped: |
| self.write() |
| # self.write will fire this deferred once it has written |
| # the entire message to the consumer |
| return self.deferred |
| |
| def resumeProducing(self): |
| self.paused = False |
| self.write(self) |
| |
| def pauseProducing(self): |
| self.paused = True |
| |
| def stopProducing(self): |
| self.stopped = True |
| |
| def write(self): |
| if self.length - self.total_bytes_sent > ipc.BUFFER_SIZE: |
| buffer_length = ipc.BUFFER_SIZE |
| else: |
| buffer_length = self.length - self.total_bytes_sent |
| self.write_buffer(self.message[self.total_bytes_sent: |
| (self.total_bytes_sent + buffer_length)]) |
| self.total_bytes_sent += buffer_length |
| # Make sure we wrote the entire message |
| if self.total_bytes_sent == self.length and not self.stopped: |
| self.stopProducing() |
| # A message is always terminated by a zero-length buffer. |
| self.write_buffer_length(0) |
| self.deferred.callback(None) |
| |
| def write_buffer(self, chunk): |
| buffer_length = len(chunk) |
| self.write_buffer_length(buffer_length) |
| self.consumer.write(chunk) |
| |
| def write_buffer_length(self, n): |
| self.consumer.write(ipc.BIG_ENDIAN_INT_STRUCT.pack(n)) |
| |
| |
| class AvroProtocol(Protocol): |
| |
| recvd = '' |
| done = False |
| |
| def __init__(self, finished): |
| self.finished = finished |
| self.message = [] |
| |
| def dataReceived(self, data): |
| self.recvd = self.recvd + data |
| while len(self.recvd) >= ipc.BUFFER_HEADER_LENGTH: |
| buffer_length, = ipc.BIG_ENDIAN_INT_STRUCT.unpack( |
| self.recvd[:ipc.BUFFER_HEADER_LENGTH]) |
| if buffer_length == 0: |
| response = ''.join(self.message) |
| self.done = True |
| self.finished.callback(response) |
| break |
| if len(self.recvd) < buffer_length + ipc.BUFFER_HEADER_LENGTH: |
| break |
| buffer = self.recvd[ipc.BUFFER_HEADER_LENGTH:buffer_length + ipc.BUFFER_HEADER_LENGTH] |
| self.recvd = self.recvd[buffer_length + ipc.BUFFER_HEADER_LENGTH:] |
| self.message.append(buffer) |
| |
| def connectionLost(self, reason): |
| if not self.done: |
| self.finished.errback(ipc.ConnectionClosedException("Reader read 0 bytes.")) |
| |
| |
| class TwistedHTTPTransceiver(object): |
| """This transceiver uses the Agent class present in Twisted.web >= 9.0 |
| for issuing requests to the remote endpoint.""" |
| |
| def __init__(self, host, port, remote_name=None, reactor=None): |
| self.url = "http://%s:%d/" % (host, port) |
| |
| if remote_name is None: |
| # There's no easy way to get this peer's remote address |
| # in Twisted so I use a random UUID to identify ourselves |
| import uuid |
| self.remote_name = uuid.uuid4() |
| |
| if reactor is None: |
| from twisted.internet import reactor |
| self.agent = Agent(reactor) |
| |
| def read_framed_message(self, response): |
| finished = Deferred() |
| response.deliverBody(AvroProtocol(finished)) |
| return finished |
| |
| def transceive(self, request): |
| req_method = 'POST' |
| req_headers = { |
| 'Content-Type': ['avro/binary'], |
| 'Accept-Encoding': ['identity'], |
| } |
| |
| body_producer = RequestStreamingProducer(request) |
| d = self.agent.request( |
| req_method, |
| self.url, |
| headers=Headers(req_headers), |
| bodyProducer=body_producer) |
| return d.addCallback(self.read_framed_message) |
| |
| |
| class AvroResponderResource(resource.Resource): |
| """This Twisted.web resource can be placed anywhere in a URL hierarchy |
| to provide an Avro endpoint. Different Avro protocols can be served |
| by the same web server as long as they are in different resources in |
| a URL hierarchy.""" |
| isLeaf = True |
| |
| def __init__(self, responder): |
| resource.Resource.__init__(self) |
| self.responder = responder |
| |
| def cb_render_POST(self, resp_body, request): |
| request.setResponseCode(200) |
| request.setHeader('Content-Type', 'avro/binary') |
| resp_writer = ipc.FramedWriter(request) |
| resp_writer.write_framed_message(resp_body) |
| request.finish() |
| |
| def render_POST(self, request): |
| # Unfortunately, Twisted.web doesn't support incoming |
| # streamed input yet, the whole payload must be kept in-memory |
| request.content.seek(0, 0) |
| call_request_reader = ipc.FramedReader(request.content) |
| call_request = call_request_reader.read_framed_message() |
| d = maybeDeferred(self.responder.respond, call_request) |
| d.addCallback(self.cb_render_POST, request) |
| return server.NOT_DONE_YET |