| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| use strict; |
| use warnings; |
| |
| package LucyX::Remote::SearchServer; |
| BEGIN { our @ISA = qw( Clownfish::Obj ) } |
| our $VERSION = '0.004001'; |
| $VERSION = eval $VERSION; |
| use Carp; |
| use Storable qw( nfreeze thaw ); |
| use Scalar::Util qw( reftype ); |
| |
| # Inside-out member vars. |
| our %searcher; |
| |
| use IO::Socket::INET; |
| use IO::Select; |
| |
| sub new { |
| my ( $either, %args ) = @_; |
| my $searcher = delete $args{searcher}; |
| my $self = $either->SUPER::new(%args); |
| $searcher{$$self} = $searcher; |
| |
| return $self; |
| } |
| |
| sub DESTROY { |
| my $self = shift; |
| delete $searcher{$$self}; |
| $self->SUPER::DESTROY; |
| } |
| |
| my %dispatch = ( |
| handshake => \&do_handshake, |
| terminate => \&do_terminate, |
| doc_max => \&do_doc_max, |
| doc_freq => \&do_doc_freq, |
| top_docs => \&do_top_docs, |
| fetch_doc => \&do_fetch_doc, |
| fetch_doc_vec => \&do_fetch_doc_vec, |
| ); |
| |
| sub serve { |
| my ( $self, %args ) = @_; |
| # Establish a listening socket. |
| my $port = delete $args{port}; |
| confess("Invalid port: $port") unless $port =~ /^\d+$/; |
| my $main_sock = IO::Socket::INET->new( |
| LocalPort => $port, |
| Proto => 'tcp', |
| Listen => SOMAXCONN, |
| Reuse => 1, |
| ); |
| confess("No socket: $!") unless $main_sock; |
| my $read_set = IO::Select->new($main_sock); |
| |
| while ( my @ready = $read_set->can_read ) { |
| for my $readhandle (@ready) { |
| # If this is the main handle, we have a new client, so accept. |
| if ( $readhandle == $main_sock ) { |
| my $client_sock = $main_sock->accept; |
| $read_set->add($client_sock); |
| } |
| # Otherwise it's a client sock, so process the request. |
| else { |
| my $client_sock = $readhandle; |
| my $status = $self->serve_rpc($client_sock); |
| |
| # If "done", the client's closing. |
| if ( $status eq 'done' ) { |
| $read_set->remove($client_sock); |
| $client_sock->close; |
| next; |
| } |
| # Remote signal to close the server. |
| elsif ( $status eq 'terminate' ) { |
| my @all_handles = $read_set->handles; |
| $read_set->remove( \@all_handles ); |
| $client_sock->close; |
| $main_sock->close; |
| return; |
| } |
| } |
| } |
| } |
| } |
| |
| sub serve_rpc { |
| my ( $self, $client_sock ) = @_; |
| my ( $check_val, $buf, $len ); |
| $check_val = $client_sock->read( $buf, 4 ); |
| # If read returns 0, socket has been closed cleanly at |
| # the other end. |
| return 'done' if $check_val == 0; |
| confess $! unless $check_val == 4; |
| $len = unpack( 'N', $buf ); |
| $check_val = $client_sock->read( $buf, $len ); |
| confess $! unless $check_val == $len; |
| my $args = eval { thaw($buf) }; |
| confess $@ if $@; |
| confess "Not a hashref" unless reftype($args) eq 'HASH'; |
| my $method = delete $args->{_action}; |
| |
| # If "done", the client's closing. |
| return $method if $method eq 'done'; |
| |
| # Process the method call. |
| $dispatch{$method} |
| or confess "ERROR: Bad method name: $method\n"; |
| my $response = $dispatch{$method}->( $self, $args ); |
| my $frozen = nfreeze($response); |
| my $packed_len = pack( 'N', length($frozen) ); |
| print $client_sock "$packed_len$frozen" |
| or confess $!; |
| |
| # Remote signal to close the server. |
| return $method if $method eq 'terminate'; |
| |
| return 'continue'; |
| } |
| |
| sub do_handshake { |
| my ( $self, $args ) = @_; |
| my $retval = 1; |
| return { retval => $retval }; |
| } |
| |
| sub do_terminate { |
| return { retval => 1 }; |
| } |
| |
| sub do_doc_freq { |
| my ( $self, $args ) = @_; |
| return { retval => $searcher{$$self}->doc_freq(%$args) }; |
| } |
| |
| sub do_top_docs { |
| my ( $self, $args ) = @_; |
| my $top_docs = $searcher{$$self}->top_docs(%$args); |
| return { retval => $top_docs }; |
| } |
| |
| sub do_doc_max { |
| my ( $self, $args ) = @_; |
| my $doc_max = $searcher{$$self}->doc_max; |
| return { retval => $doc_max }; |
| } |
| |
| sub do_fetch_doc { |
| my ( $self, $args ) = @_; |
| my $doc = $searcher{$$self}->fetch_doc( $args->{doc_id} ); |
| return { retval => $doc }; |
| } |
| |
| sub do_fetch_doc_vec { |
| my ( $self, $args ) = @_; |
| my $doc_vec = $searcher{$$self}->fetch_doc_vec( $args->{doc_id} ); |
| return { retval => $doc_vec }; |
| } |
| |
| 1; |
| |
| __END__ |
| |
| =head1 NAME |
| |
| LucyX::Remote::SearchServer - Make a Searcher remotely accessible. |
| |
| =head1 SYNOPSIS |
| |
| my $searcher = Lucy::Search::IndexSearcher->new( |
| index => '/path/to/index' |
| ); |
| my $search_server = LucyX::Remote::SearchServer->new( |
| searcher => $searcher |
| ); |
| $search_server->serve( |
| port => 7890 |
| ); |
| |
| =head1 DESCRIPTION |
| |
| The SearchServer class, in conjunction with either |
| L<SearchClient|LucyX::Remote::SearchClient> or |
| L<ClusterSearcher|LucyX::Remote::ClusterSearcher>, makes it possible to run a |
| search on one machine and report results on another. |
| |
| By aggregating several SearchClients under a ClusterSearcher, the cost of |
| searching what might have been a prohibitively large monolithic index can be |
| distributed across multiple nodes, each with its own, smaller index. |
| |
| =head1 METHODS |
| |
| =head2 new |
| |
| my $search_server = LucyX::Remote::SearchServer->new( |
| searcher => $searcher, # required |
| ); |
| |
| Constructor. Takes hash-style parameters. |
| |
| =over |
| |
| =item * |
| |
| B<searcher> - the L<Searcher|Lucy::Search::IndexSearcher> that the SearchServer |
| will wrap. |
| |
| =back |
| |
| =head2 serve |
| |
| $search_server->serve( |
| port => 7890, # required |
| ); |
| |
| Open a listening socket on localhost and wait for SearchClients to connect. |
| |
| =over |
| |
| =item * |
| |
| B<port> - the port on localhost that the server should open and listen on. |
| |
| =back |
| |
| =head2 serve_rpc |
| |
| my $status = $search_server->serve_rpc($sock); |
| |
| Handle a single RPC from socket $sock. Returns 'done' if the connection should |
| be closed. Returns 'terminate' if the server should shut down. Returns |
| 'continue' if the server should continue to handle requests from this client. |
| |
| =cut |