| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| use strict; |
| use warnings; |
| |
| package LucyX::Remote::ClusterSearcher; |
| BEGIN { our @ISA = qw( Lucy::Search::Searcher ) } |
| our $VERSION = '0.004001'; |
| $VERSION = eval $VERSION; |
| use Carp; |
| use Storable qw( nfreeze thaw ); |
| use Scalar::Util qw( reftype ); |
| |
| # Inside-out member vars. |
| our %shards; |
| our %num_shards; |
| our %starts; |
| our %doc_max; |
| |
| use IO::Socket::INET; |
| |
| sub new { |
| my ( $either, %args ) = @_; |
| my $addrs = delete $args{shards}; |
| my $self = $either->SUPER::new(%args); |
| confess("'shards' must be an arrayref") |
| unless reftype($addrs) eq 'ARRAY'; |
| $num_shards{$$self} = scalar @$addrs; |
| |
| # Establish connections. |
| my @shards; |
| for my $addr (@$addrs) { |
| my $sock = IO::Socket::INET->new( |
| PeerAddr => $addr, |
| Proto => 'tcp', |
| Blocking => 0, |
| ); |
| confess("No socket: $!") unless $sock; |
| push @shards, |
| { |
| addr => $addr, |
| sock => $sock, |
| }; |
| } |
| $shards{$$self} = \@shards; |
| |
| # Handshake with servers. |
| my %handshake_args = ( _action => 'handshake' ); |
| my $responses = $self->_multi_rpc( \%handshake_args ); |
| for my $response (@$responses) { |
| confess unless $response; |
| } |
| |
| # Derive doc_max and relative start offsets. |
| my $doc_max_responses = $self->_multi_rpc( { _action => 'doc_max' } ); |
| my $doc_max = 0; |
| my @starts; |
| for my $shard_doc_max (@$doc_max_responses) { |
| push @starts, $doc_max; |
| $doc_max += $shard_doc_max; |
| } |
| $starts{$$self} = Lucy::Object::I32Array->new( ints => \@starts ); |
| $doc_max{$$self} = $doc_max; |
| |
| return $self; |
| } |
| |
| sub DESTROY { |
| my $self = shift; |
| $self->close if defined $shards{$$self}; |
| delete $shards{$$self}; |
| delete $num_shards{$$self}; |
| delete $starts{$$self}; |
| delete $doc_max{$$self}; |
| $self->SUPER::DESTROY; |
| } |
| |
| # Send a remote procedure call to all shards. |
| sub _multi_rpc { |
| my ( $self, $args ) = @_; |
| return $self->_rpc( $args, $shards{$$self} ); |
| } |
| |
| # Send a remote procedure call to one shard. |
| sub _single_rpc { |
| my ( $self, $args, $shard_num ) = @_; |
| my $shard = $shards{$$self}[$shard_num]; |
| my $responses = $self->_rpc( $args, [$shard] ); |
| return $responses->[0]; |
| } |
| |
| sub _rpc { |
| my ( $self, $args, $shards ) = @_; |
| |
| my $request = $self->_serialize_request($args); |
| my $timeout = 5; |
| my $shutdown = $args->{_action} eq 'done' |
| || $args->{_action} eq 'terminate'; |
| |
| my ( $rin, $win, $ein ) = ( '', '', '' ); |
| |
| # Initialize shards to send the request |
| for my $shard (@$shards) { |
| my $fileno = $shard->{sock}->fileno; |
| vec( $win, $fileno, 1 ) = 1; |
| $shard->{response} = undef; |
| $shard->{error} = undef; |
| $shard->{buf} = $request; |
| $shard->{sent} = 0; |
| $shard->{callback} = \&_cb_send; |
| $shard->{shutdown} = $shutdown; |
| } |
| |
| my $remaining = @$shards; |
| |
| # Event loop |
| while ( $remaining > 0 ) { |
| my ( $rout, $wout, $eout ); |
| |
| my $n = select( $rout = $rin, $wout = $win, $eout = $ein, $timeout ); |
| |
| confess("select: $!") if $n == -1; |
| confess("I/O timeout") if $n == 0; |
| |
| for my $shard (@$shards) { |
| next if !$shard->{callback}; |
| my $fileno = $shard->{sock}->fileno; |
| next if !vec( $rout, $fileno, 1 ) && !vec( $wout, $fileno, 1 ); |
| # Dispatch event |
| $shard->{callback}->( $shard, \$rin, \$win, \$ein ); |
| --$remaining if !$shard->{callback}; |
| } |
| } |
| |
| # Collect responses and cleanup |
| my @responses; |
| my @errors; |
| for my $shard (@$shards) { |
| if ( defined $shard->{error} ) { |
| push( @errors, $shard->{error} . ' @ ' . $shard->{addr} ); |
| } |
| else { |
| push( @responses, $shard->{response}{retval} ); |
| } |
| $shard->{response} = undef; |
| $shard->{error} = undef; |
| $shard->{buf} = undef; |
| } |
| confess( 'RPC error: ' . join( ', ', @errors ) ) if @errors; |
| return \@responses; |
| } |
| |
| # Serialize a method name and hash-style parameters using the conventions |
| # understood by SearchServer. |
| sub _serialize_request { |
| my ( $self, $args ) = @_; |
| my $serialized = nfreeze($args); |
| my $packed_len = pack( 'N', length($serialized) ); |
| my $request = "$packed_len$serialized"; |
| return \$request; |
| } |
| |
| # Send a (partial) request to a shard |
| sub _cb_send { |
| my ( $shard, $rin, $win, $ein ) = @_; |
| |
| my $msg = substr( ${ $shard->{buf} }, $shard->{sent} ); |
| my $sent = $shard->{sock}->send($msg); |
| |
| if ( !defined($sent) ) { |
| $shard->{error} = $!; |
| $shard->{callback} = undef; |
| vec( $$win, $shard->{sock}->fileno, 1 ) = 0; |
| return; |
| } |
| |
| $shard->{sent} += $sent; |
| |
| if ( $sent >= length($msg) ) { |
| # Complete |
| my $fileno = $shard->{sock}->fileno; |
| vec( $$win, $fileno, 1 ) = 0; |
| if ( $shard->{shutdown} ) { |
| # Bail out if we're either closing or shutting down the server |
| # remotely. |
| $shard->{callback} = undef; |
| } |
| else { |
| # Setup shard to read response length |
| $shard->{buf} = ''; |
| $shard->{callback} = \&_cb_recv_length; |
| vec( $$rin, $fileno, 1 ) = 1; |
| } |
| } |
| } |
| |
| # Receive a (partial) response length from a shard |
| sub _cb_recv_length { |
| my ( $shard, $rin, $win, $ein ) = @_; |
| |
| my $data; |
| my $r = $shard->{sock}->recv( $data, 4 - length( $shard->{buf} ) ); |
| |
| if ( !defined($r) || length($data) == 0 ) { |
| $shard->{error} = !defined($r) ? $! : 'Remote shutdown'; |
| $shard->{callback} = undef; |
| vec( $$rin, $shard->{sock}->fileno, 1 ) = 0; |
| return; |
| } |
| |
| $shard->{buf} .= $data; |
| |
| if ( length( $shard->{buf} ) >= 4 ) { |
| # Complete, setup shard to receive response |
| $shard->{response_size} = unpack( 'N', $shard->{buf} ); |
| $shard->{buf} = ''; |
| $shard->{callback} = \&_cb_recv_response; |
| } |
| } |
| |
| # Receive a (partial) response from a shard |
| sub _cb_recv_response { |
| my ( $shard, $rin, $win, $ein ) = @_; |
| |
| my $data; |
| my $remaining = $shard->{response_size} - length( $shard->{buf} ); |
| my $r = $shard->{sock}->recv( $data, $remaining ); |
| |
| if ( !defined($r) || length($data) == 0 ) { |
| $shard->{error} = !defined($r) ? $! : 'Remote shutdown'; |
| $shard->{callback} = undef; |
| vec( $$rin, $shard->{sock}->fileno, 1 ) = 0; |
| return; |
| } |
| |
| $shard->{buf} .= $data; |
| |
| if ( length( $shard->{buf} ) >= $shard->{response_size} ) { |
| # Finished |
| $shard->{response} = thaw( $shard->{buf} ); |
| $shard->{callback} = undef; |
| } |
| } |
| |
| sub top_docs { |
| my ( $self, %args ) = @_; |
| my $starts = $starts{$$self}; |
| my $num_shards = $num_shards{$$self}; |
| my $query = $args{query}; |
| my $num_wanted = $args{num_wanted}; |
| my $sort_spec = $args{sort_spec}; |
| |
| # Weight query if necessary. |
| my $compiler |
| = $query->isa("Lucy::Search::Compiler") |
| ? $query |
| : $query->make_compiler( |
| searcher => $self, |
| boost => $query->get_boost, |
| ); |
| |
| # Create HitQueue. |
| my $hit_q; |
| if ($sort_spec) { |
| $hit_q = Lucy::Search::HitQueue->new( |
| schema => $self->get_schema, |
| sort_spec => $sort_spec, |
| wanted => $num_wanted, |
| ); |
| } |
| else { |
| $hit_q = Lucy::Search::HitQueue->new( wanted => $num_wanted, ); |
| } |
| |
| # Gather remote responses and aggregate. |
| $args{_action} = 'top_docs'; |
| my $responses = $self->_multi_rpc( \%args ); |
| my $total_hits = 0; |
| for ( my $i = 0; $i < $num_shards; $i++ ) { |
| my $base = $starts->get($i); |
| my $sub_top_docs = $responses->[$i]; |
| my @sub_match_docs = sort { $a->get_doc_id <=> $b->get_doc_id } |
| @{ $sub_top_docs->get_match_docs }; |
| for my $match_doc (@sub_match_docs) { |
| $match_doc->set_doc_id( $match_doc->get_doc_id + $base ); |
| $hit_q->insert($match_doc); |
| } |
| $total_hits += $sub_top_docs->get_total_hits; |
| } |
| |
| # Return a TopDocs object with the best of the best. |
| my $best_match_docs = $hit_q->pop_all; |
| return Lucy::Search::TopDocs->new( |
| total_hits => $total_hits, |
| match_docs => $best_match_docs, |
| ); |
| } |
| |
| sub terminate { |
| my $self = shift; |
| $self->_multi_rpc( { _action => 'terminate' } ); |
| return; |
| } |
| |
| sub fetch_doc { |
| my ( $self, $doc_id ) = @_; |
| my $starts = $starts{$$self}; |
| my $tick = Lucy::Index::PolyReader::sub_tick( $starts, $doc_id ); |
| my $start = $starts->get($tick); |
| my %args = ( doc_id => $doc_id - $start, _action => 'fetch_doc' ); |
| my $hit_doc = $self->_single_rpc( \%args, $tick ); |
| $hit_doc->set_doc_id($doc_id); |
| return $hit_doc; |
| } |
| |
| sub fetch_doc_vec { |
| my ( $self, $doc_id ) = @_; |
| my $starts = $starts{$$self}; |
| my $tick = Lucy::Index::PolyReader::sub_tick( $starts, $doc_id ); |
| my $start = $starts->get($tick); |
| my %args = ( doc_id => $doc_id - $start, _action => 'fetch_doc_vec' ); |
| return $self->_single_rpc( \%args, $tick ); |
| } |
| |
| sub doc_max { |
| my $self = shift; |
| return $doc_max{$$self}; |
| } |
| |
| sub doc_freq { |
| my $self = shift; |
| my %args = ( @_, _action => 'doc_freq' ); |
| my $responses = $self->_multi_rpc( \%args ); |
| my $doc_freq = 0; |
| $doc_freq += $_ for @$responses; |
| return $doc_freq; |
| } |
| |
| sub close { |
| my $self = shift; |
| return unless $shards{$$self}; |
| $self->_multi_rpc( { _action => 'done' } ); |
| for my $shard ( @{ $shards{$$self} } ) { |
| close $shard->{sock} or confess("Error when closing socket: $!"); |
| } |
| delete $shards{$$self}; |
| } |
| |
| 1; |
| |
| __END__ |
| |
| =head1 NAME |
| |
| LucyX::Remote::ClusterSearcher - Search multiple remote indexes. |
| |
| =head1 SYNOPSIS |
| |
| my $searcher = eval { |
| LucyX::Remote::ClusterSearcher->new( |
| schema => MySchema->new, |
| shards => [ 'search1:7890', 'search2:7890', 'search3:7890' ], |
| ); |
| }; |
| ... |
| my $hits = eval { $searcher->hits( query => $query ) }; |
| |
| =head1 DESCRIPTION |
| |
| ClusterSearcher is a subclass of L<Lucy::Search::Searcher> which can be used |
| to search a composite index made up of multiple shards, where each shard is |
| represented by a host:port pair running L<LucyX::Remote::SearchServer>. |
| |
| =head1 METHODS |
| |
| =head2 new |
| |
| Constructor. Takes hash-style params. |
| |
| =over |
| |
| =item * |
| |
| B<schema> - A Schema, which must match the Schema used by each remote node. |
| |
| =item * |
| |
| B<shards> - An array of host:port pairs running LucyX::Remote::SearchServer |
| instances, which identifying the shards that make up the composite index. |
| |
| =back |
| |
| =cut |