No Description

Server.pm 13KB

    package AvidMQ::Server; use strict; use warnings; use Carp qw(confess carp); use Config::Tiny; use XML::Simple; use POE qw(Component::Server::TCP Component::Client::TCP Filter::Reference); use Smart::Comments '####'; sub AvidMQProtVersion { '0.0.1' } sub new { my $class=shift; my $self=bless {}, $class; $self->init(@_); return $self; } sub init { my $self=shift; my %args=@_; confess "Initing without configfile" unless $args{configfile}; confess "config file not found: $args{configfile}" unless -f $args{configfile}; my $config=Config::Tiny->read($args{configfile}); $self->{config}=$config; $self->{_msgid}=0; $self->{_reply_queue}={}; $self->{_broad_queue}=[]; $self->{_broad_messages}={}; $self->{_subscribers}={}; } sub run { my $self=shift; $self->start_session(); $poe_kernel->run(); } sub start_session { my $self=shift; POE::Session->create( inline_states => { _start => sub { $_[KERNEL]->alias_set('AvidMQ'); $self->start_avidmqd(@_); }, start_tcp_daemon => sub { $self->start_tcp_daemon(@_); }, broadcast => sub { $self->broadcast(@_); }, }, ); } ################### # start_avidmqd ################### # Starts the daemon and connects to the avidmq network. sub start_avidmqd { my $self=shift; $self->start_store(); my $mode=$self->{_mode}=$self->{config}->{avidmq}->{mode}; if ($mode eq 'core') { $self->start_core_daemon(@_); } elsif ($mode eq 'router') { $self->start_router_daemon(@_); } else { confess "Invalid avidmq mode"; } } sub start_core_daemon { my $self=shift; my $nodes=$self->{config}->{network}->{core}; if ($nodes) { my @nodes=split /\s*,\s*/, $nodes; $self->{_core_nodes}=\@nodes; $self->init_slave(@_); } else { warn "Looks that I am the only core node for this network...\n"; $self->init_master(@_); } } #################################################################### # Core Slave Init and handlers #################################################################### sub init_slave { my $self=shift; my $ncore=shift @{$self->{_core_nodes}}; unless ($ncore) { return $self->init_master(@_); } my ($host,$port)=split /\:/, $ncore; unless ($host and $port) { warn "Invalid Core address '$ncore'\n"; return $self->init_slave(@_); } #### Trying for master: $host."->".$port POE::Component::Client::TCP->new( RemoteAddress => $host, RemotePort => $port, Filter => 'POE::Filter::Reference', Connected => sub { $self->init_slave_succeed(@_) }, ConnectError => sub { $self->init_slave_fail(@_) }, ServerInput => sub { $self->tcp_master_input(@_) }, Disconnected => sub { $self->tcp_master_disconnect(@_) }, ); } sub init_slave_succeed { my $self=shift; $self->{_core_master} = 0; $_[HEAP]->{AvidMQ}->{master}=1; # It's the core master that must start the talk. } sub init_slave_fail { my $self=shift; #### This master failed. Let's try the next one. $self->init_slave(@_); } my %msg_handle=( hello => \&msg_hello, config => \&msg_config, configok => \&msg_configok, publish => \&msg_publish, subscribe => \&msg_subscribe, ); sub tcp_master_input { my $self=shift; my ($kernel,$heap,$input)=@_[KERNEL,HEAP,ARG0]; unless (ref $input) { #### Got input on tcp_master_input: $input return; } unless ($input->{type}) { #### got input without type on tcp_master_input\n: $input return; } if ($input->{type}=~/ok|error/ and my $omsg=$self->{_reply_queue}->{$input->{msgid}}) { delete $self->{_reply_queue}->{$input->{msgid}}; $input->{msgid}=$omsg->{original_id}; $self->_reply(undef, $input, $omsg->{from}); ##### forwarded reply to client: $input return; } ##### At tcp_master_input: $input if ($msg_handle{$input->{type}}) { $msg_handle{$input->{type}}->($self, @_); } else { #### Unknow msgtype: $input->{type} } } sub tcp_master_disconnect { #### Missing the tcp_master_disconnect - need to implement it } #################################################################### # Core Master Init method #################################################################### sub init_master { my $self=shift; $self->{_core_master} = 1; $_[KERNEL]->post(AvidMQ => 'start_tcp_daemon'); } #################################################################### # Message Handlers #################################################################### ################### # msg_hello ################### sub msg_hello { my $self=shift; my ($kernel,$heap,$input)=@_[KERNEL,HEAP,ARG0]; $self->_reply(undef, {type=>'config', config=>{}}, $heap); } ################### # msg_config ################### # msg_config handles the initial request from the clients, that will # present them to their master. sub msg_config { my $self=shift; my ($kernel,$heap,$input)=@_[KERNEL,HEAP,ARG0]; # At this time will not verify much. Later will take care of this # If needed. Just say ok, for now. $self->_reply($input,{type=>'configok'},$heap); } ################### # configok ################### # msg_configok handles the message returned from their master to the # config request. When this server mode is 'core' it will subscribe # to the entire queue tree, so that it can reply to subscription requests # directly, without request another subscription to his master. sub msg_configok { my $self=shift; my ($kernel,$heap,$input)=@_[KERNEL,HEAP,ARG0]; $self->{master}=$heap; #### Master: $self->{master} #### Heap: $heap if ($self->{_mode} eq 'core') { #### Requesting to subscribe '/' as core $self->_send_to({type=>'subscribe', queue=>'/'}, $heap); } else { #### At msg_configok[!core]: $input #### Need to find out the last subscriptions and get them again } # Now, that we connected to a master, let's start the tcp daemon $_[KERNEL]->post(AvidMQ => 'start_tcp_daemon'); } sub msg_subscribe { my $self=shift; my ($kernel,$heap,$input)=@_[KERNEL,HEAP,ARG0]; my $queue=$input->{queue}; unless ($queue) { #### Missing queue: $queue return $self->_reply($input, $self->_error_msg('Missing Queue'),$heap); } unless ($queue=~m{^/(\w+/)*(\w+)?$}smx) { #### Invalid Queue: $queue return $self->_reply($input, $self->_error_msg('Invalid Queue'),$heap); } ##### Eventually need to implement here the permission verification #### Subscribing to: $queue $self->{_subscribers}->{$queue}->{$heap->{client}->ID}=$heap; push @{$heap->{AvidMQ}->{subscribes}}, $queue; $self->_send_to($self->_ok_msg(),$heap); } ################### # msg_publish ################### # msg_publish handles the broadcast of publish events # when the publish come from the master it broadcast them to # every subscriber. If they come from a client (slave/client) # forward them to the master. If this is the core_master, verify # the permissions and broadcast it if the producer is valid. sub msg_publish { my $self=shift; my ($kernel,$heap,$input)=@_[KERNEL,HEAP,ARG0]; if ($self->{_core_master}) { $self->verify_publish(@_); } elsif ($heap->{AvidMQ}->{master}) { $self->prepare_broadcast(@_); } else { ##### Master: $self->{master} $self->_send_to($input,$self->{master},$heap) } } #################################################################### # Store methods #################################################################### sub start_store { my $self=shift; return; #Will implement this at a later time. my $storetype=$self->{config}->{database}->{type}; my $plugin="AvidMQ::Store::".$storetype; my $code="require $plugin;"; eval $code; die "Can't load the store plugin: $@\n$code\n" if $@; } #################################################################### # TCP Daemon #################################################################### sub start_tcp_daemon { my $self=shift; my $avidmqcfg=$self->{config}->{avidmq}; #### Going to start the TCP daemon POE::Component::Server::TCP->new( Port => $avidmqcfg->{port}, ClientConnected => sub { $self->tcp_client_connect(@_); }, ClientDisconnected => sub { $self->tcp_client_disconnected(@_) }, ClientInput => sub { $self->tcp_client_input(@_); }, ClientFilter => 'POE::Filter::Reference', Concurrency => 10, ); } sub tcp_client_connect { my $self=shift; #### At tcp_client_connect $_[HEAP]{AvidMQ}->{subscribes} = []; $_[HEAP]->{AvidMQ}->{master}=0; $self->_send_to( {type=>'hello',version=>'AvidMQ v'.AvidMQProtVersion()}, $_[HEAP], ); } sub tcp_client_disconnected { my $self=shift; my ($kernel,$heap)=@_[KERNEL,HEAP]; #### At tcp_client_disconnect: $heap->{client}->ID for my $queue (@{$heap->{AvidMQ}->{subscribes}}) { #### Unsubscribing: $queue delete $self->{_subscribers}->{$queue}->{$heap->{client}->ID}; } } sub tcp_client_input { my $self=shift; my ($kernel,$heap,$input)=@_[KERNEL,HEAP,ARG0]; unless (ref $input) { #### Got input on tcp_client_input: $input return; } unless ($input->{type}) { #### got input without type on tcp_client_input: $input return; } if ($msg_handle{$input->{type}}) { $msg_handle{$input->{type}}->($self, @_); } else { #### Unknow msgtype at tcp_client_input: $input->{type} } } #################################################################### # Publish and BroadCast #################################################################### ################### # verify_publish ################### # verify_publish will verify if the source can publish, reply to # the source and call prepare_broadcast to start the broadcast # process. sub verify_publish { my $self=shift; my ($kernel,$heap,$input)=@_[KERNEL,HEAP,ARG0]; my $queue=$input->{queue}; unless ($queue) { #### Missing queue: $queue return $self->_reply($input, $self->_error_msg('Missing Queue'),$heap); } unless ($queue=~m{^/(\w+/)*(\w+)?$}smx) { #### Invalid Queue: $queue return $self->_reply($input, $self->_error_msg('Invalid Queue'),$heap); } ##### Eventually need to implement here the permission verification $self->_reply($input, $self->_ok_msg, $heap); $self->prepare_broadcast(@_); } ################### # prepare_broadcast ################### # prepare_broadcast add a message to the queue of messages to be # broadcasted. sub prepare_broadcast { my $self=shift; my ($kernel,$input)=@_[KERNEL,ARG0]; my $queue=$input->{queue}; $self->{_subscribers}->{$queue}={} unless $self->{_subscribers}->{$queue}; my %subs=%{$self->{_subscribers}->{$queue}}; while ($queue and $queue ne '/') { $queue=~s{[^/]*/?$}{}; $self->{_subscribers}->{$queue}={} unless $self->{_subscribers}->{$queue}; %subs=(%{$self->{_subscribers}->{$queue}}, %subs); } if (%subs) { my @subs=values %subs; my $id=++$self->{_msgid}; push @{$self->{_broad_queue}}, $id; $self->{_broad_messages}->{$id}={ message=>$input, subscribers=>\@subs }; unless ($self->{_broadcasting}) { $self->{_broadcasting}=1; $kernel->post(AvidMQ => 'broadcast'); } } else { ##### No subscribers to the queue: $input->{queue} } } ################### # broadcast ################### # broadcast is the method than sends each message to all the subscribers # that must get it. sub broadcast { my $self=shift; my ($kernel,$heap)=@_[KERNEL,HEAP]; my $broadid=$self->{_broad_queue}->[0]; if ($broadid) { my $bmsg=$self->{_broad_messages}->{$broadid}; if ($bmsg->{subscribers} and @{$bmsg->{subscribers}}) { my $sub=shift @{$bmsg->{subscribers}}; $self->_send_to($bmsg->{message}, $sub); } unless ($bmsg->{subscribers} and @{$bmsg->{subscribers}}) { shift @{$self->{_broad_queue}}; #trash it delete $self->{_broad_messages}->{$broadid}; } } if (@{$self->{_broad_queue}}) { $kernel->yield('broadcast'); } else { $self->{_broadcasting}=0; } } #################################################################### # Helper Methods #################################################################### ################### # _send_to ################### # _send_to recieves a message and an $heap and send the message to # the connected wheel (that can be a TCP::Client or a TCP::Server). sub _send_to { my $self=shift; my $msg=shift; my $remote=shift; my $from=shift; my $id=++$self->{_msgid}; if ($msg->{msgid} and $from) { $self->{_reply_queue}->{$id}={ original_id => $msg->{msgid}, source => $from }; } $msg->{msgid}=$id; if ($remote->{server}) { $remote->{server}->put($msg); } elsif ($remote->{client}) { $remote->{client}->put($msg); } else { #### Don't know how to send to: ref $remote confess "Don't know how to send to $remote"; } } ################### # _reply ################### # _reply is similar to _send_to, but use the msgid from the original # message. If needs an adicional argument, the original message sub _reply { my $self=shift; my $omsg=shift; my $msg=shift; my $remote=shift; $msg->{msgid}=$omsg->{msgid} if $omsg; if ($remote->{server}) { $remote->{server}->put($msg); } elsif ($remote->{client}) { $remote->{client}->put($msg); } else { #### Don't know how to reply to: ref $remote } } ################### # _error_msg ################### # _error_msg create the error message to send back as response sub _error_msg { my $self=shift; my $msg=shift; return { type => 'error', message=> $msg }; } ################### # _ok_msg ################### # _ok_msg create the default ok message to send back as response # to be used on requests with no data to be returned sub _ok_msg { return { type => 'ok' }; } 1;