blob: 5a32db54a8e7239271cfb32feb68eb93120dc5b4 [file] [log] [blame]
# <@LICENSE>
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to you under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# </@LICENSE>
package Mail::SpamAssassin::Util::TinyRedis;
# Implements the new unified request protocol, introduced in Redis 1.2 .
use strict;
use re 'taint';
use warnings;
use Errno qw(EINTR EAGAIN EPIPE ENOTCONN ECONNRESET ECONNABORTED);
use IO::Socket::UNIX;
use Time::HiRes ();
our $io_socket_module_name;
BEGIN {
if (eval { require IO::Socket::IP }) {
$io_socket_module_name = 'IO::Socket::IP';
} elsif (eval { require IO::Socket::INET6 }) {
$io_socket_module_name = 'IO::Socket::INET6';
} elsif (eval { require IO::Socket::INET }) {
$io_socket_module_name = 'IO::Socket::INET';
}
}
sub new {
my($class, %args) = @_;
my $self = bless { args => {%args} }, $class;
my $outbuf = ''; $self->{outbuf} = \$outbuf;
$self->{batch_size} = 0;
$self->{server} = $args{server} || $args{sock} || '127.0.0.1:6379';
$self->{on_connect} = $args{on_connect};
return if !$self->connect;
$self;
}
sub DESTROY {
my $self = $_[0];
local($@, $!, $_);
undef $self->{sock};
}
sub disconnect {
my $self = $_[0];
local($@, $!);
undef $self->{sock};
}
sub connect {
my $self = $_[0];
$self->disconnect;
my $sock;
my $server = $self->{server};
if ($server =~ m{^/}) {
$sock = IO::Socket::UNIX->new(
Peer => $server, Type => SOCK_STREAM);
} elsif ($server =~ /^(?: \[ ([^\]]+) \] | ([^:]+) ) : ([^:]+) \z/xs) {
$server = defined $1 ? $1 : $2; my $port = $3;
$sock = $io_socket_module_name->new(
PeerAddr => $server, PeerPort => $port, Proto => 'tcp');
} else {
die "Invalid 'server:port' specification: $server";
}
if ($sock) {
$self->{sock} = $sock;
$self->{sock_fd} = $sock->fileno; $self->{fd_mask} = '';
vec($self->{fd_mask}, $self->{sock_fd}, 1) = 1;
# an on_connect() callback must not use batched calls!
$self->{on_connect}->($self) if $self->{on_connect};
}
$sock;
}
# Receive, parse and return $cnt consecutive redis replies as a list.
#
sub _response {
my($self, $cnt) = @_;
my $sock = $self->{sock};
if (!$sock) {
$self->connect or die "Connect failed: $!";
$sock = $self->{sock};
};
my @list;
for (1 .. $cnt) {
my $result = <$sock>;
if (!defined $result) {
$self->disconnect;
die "Error reading from Redis server: $!";
}
chomp $result;
my $resp_type = substr($result, 0, 1, '');
if ($resp_type eq '$') { # bulk reply
if ($result < 0) {
push(@list, undef); # null bulk reply
} else {
my $data = ''; my $ofs = 0; my $len = $result + 2;
while ($len > 0) {
my $nbytes = read($sock, $data, $len, $ofs);
if (!$nbytes) {
$self->disconnect;
defined $nbytes or die "Error reading from Redis server: $!";
die "Redis server closed connection";
}
$ofs += $nbytes; $len -= $nbytes;
}
chomp $data;
push(@list, $data);
}
} elsif ($resp_type eq ':') { # integer reply
push(@list, 0+$result);
} elsif ($resp_type eq '+') { # status reply
push(@list, $result);
} elsif ($resp_type eq '*') { # multi-bulk reply
push(@list, $result < 0 ? undef : $self->_response(0+$result) );
} elsif ($resp_type eq '-') { # error reply
die "$result\n";
} else {
die "Unknown Redis reply: $resp_type ($result)";
}
}
\@list;
}
sub _write_buff {
my($self, $bufref) = @_;
if (!$self->{sock}) { $self->connect or die "Connect failed: $!" };
my $nwrite;
for (my $ofs = 0; $ofs < length($$bufref); $ofs += $nwrite) {
# to reliably detect a disconnect we need to check for an input event
# using a select; checking status of syswrite is not sufficient
my($rout, $wout, $inbuff); my $fd_mask = $self->{fd_mask};
my $nfound = select($rout=$fd_mask, $wout=$fd_mask, undef, undef);
defined $nfound && $nfound >= 0 or die "Select failed: $!";
if (vec($rout, $self->{sock_fd}, 1) &&
!sysread($self->{sock}, $inbuff, 1024)) {
# eof, try reconnecting
$self->connect or die "Connect failed: $!";
}
local $SIG{PIPE} = 'IGNORE'; # don't signal on a write to a widowed pipe
$nwrite = syswrite($self->{sock}, $$bufref, length($$bufref)-$ofs, $ofs);
next if defined $nwrite;
$nwrite = 0;
if ($! == EINTR || $! == EAGAIN) { # no big deal, try again
Time::HiRes::sleep(0.1); # slow down, just in case
} else {
$self->disconnect;
if ($! == ENOTCONN || $! == EPIPE ||
$! == ECONNRESET || $! == ECONNABORTED) {
$self->connect or die "Connect failed: $!";
} else {
die "Error writing to redis socket: $!";
}
}
}
1;
}
# Send a redis command with arguments, returning a redis reply.
#
sub call {
my $self = shift;
my $buff = '*' . scalar(@_) . "\015\012";
$buff .= '$' . length($_) . "\015\012" . $_ . "\015\012" for @_;
$self->_write_buff(\$buff);
local($/) = "\015\012";
my $arr_ref = $self->_response(1);
$arr_ref && $arr_ref->[0];
}
# Append a redis command with arguments to a batch.
#
sub b_call {
my $self = shift;
my $bufref = $self->{outbuf};
$$bufref .= '*' . scalar(@_) . "\015\012";
$$bufref .= '$' . length($_) . "\015\012" . $_ . "\015\012" for @_;
++ $self->{batch_size};
}
# Send a batch of commands, returning an arrayref of redis replies,
# each array element corresponding to one command in a batch.
#
sub b_results {
my $self = $_[0];
my $batch_size = $self->{batch_size};
return if !$batch_size;
my $bufref = $self->{outbuf};
$self->_write_buff($bufref);
$$bufref = ''; $self->{batch_size} = 0;
local($/) = "\015\012";
$self->_response($batch_size);
}
1;