No Description

Client.pm 3.3KB

    package AvidMQ::Client; use strict; use warnings; use Carp qw(confess carp); use Config::Tiny; use POE qw(Component::Client::TCP Filter::Reference); use Smart::Comments '####'; sub new { my $class=shift; my $self=bless {}, $class; $self->__init(@_); return $self; } sub __init { my $self=shift; my %args=@_; confess "Initing without configfile" unless $args{configfile}; confess "config file not found: $args{configfile}" unless -f $args{configfile}; my $config=Config::Tiny->read($args{configfile}); $self->{config}=$config; $self->{_msgid}=0; $self->{PublishHandler}=$args{Publish}||sub {}; $self->{OkHandler}=$args{Ok}||sub {}; $self->{ErrorHandler}=$args{Error}||sub {}; $self->{IdleHandler}=$args{Idle}||0; $self->{subscribe}=ref $args{subscribe} ? $args{subscribe} : [$args{subscribe}]; } sub run { my $self=shift; $self->__prepare_client(); $poe_kernel->run(); } sub publish { my $self=shift; my $queue=shift; my $message=shift; my $msg={ msgid=>++$self->{_msgid}, type=>'publish', queue=>$queue, message => $message, }; $self->{avidmq}->{server}->put($msg); } sub subscribe { my $self=shift; my $queue=shift; my $msg={msgid=>++$self->{_msgid}, type=>'subscribe',queue=>$queue}; $self->{avidmq}->{server}->put($msg); } sub __prepare_client { my $self=shift; my $router=$self->{config}->{avidmq}->{router}; my ($host,$port)=split /\:/, $router; $port=6666 unless $port; unless ($host and $port) { die "Invalid Core address '$router'\n"; } #### Trying for master: $host."->".$port POE::Component::Client::TCP->new( RemoteAddress => $host, RemotePort => $port, Filter => 'POE::Filter::Reference', Connected => sub { $self->__init_client(@_) }, ConnectError => sub { $self->__init_client_fail(@_) }, ServerInput => sub { $self->__tcp_router_input(@_) }, Disconnected => sub { $self->__tcp_router_disconnect(@_) }, ); if ($self->{IdleHandler}) { POE::Session->create( inline_states => { _start => sub { $_[KERNEL]->alias_set('IdleSession'); }, idleaction => sub { $self->__idle_action(@_) }, } ); } } sub __init_client { my $self=shift; $self->{avidmq}=$_[HEAP]; if ($self->{IdleHandler}) { $_[KERNEL]->post(IdleSession=>'idleaction'); } if ($self->{subscribe}) { for my $queue (@{$self->{subscribe}}) { $self->subscribe($queue); } } } sub __init_client_fail { my $self=shift; my ($operation, $error_number, $error_string) = @_[ARG0..ARG2]; warn "$operation error $error_number occurred: $error_string"; die "Could not connect to $self->{config}->{avidmq}->{router}\n"; } my %inputhandlers=( publish => 'PublishHandler', error => 'ErrorHandler', ok => 'OkHandler', ); sub __tcp_router_input { my $self=shift; my $input=$_[ARG0]; return unless $input and ref $input and $input->{type}; if ($inputhandlers{$input->{type}} and ref $self->{$inputhandlers{$input->{type}}} eq 'CODE') { my $handler=$self->{$inputhandlers{$input->{type}}}; $handler->($self, $input->{message}, $input); } } sub __tcp_router_disconnect { my $self=shift; die "Disconnected from the AvidMQ Router"; } sub __idle_action { my $self=shift; my $kernel=$_[KERNEL]; if (ref $self->{IdleHandler} eq 'CODE') { my $handler=$self->{IdleHandler}; $handler->($self); } $kernel->delay('idleaction'=>0.001); } 1;