Browse Source

First basic version with publish and subscribe

* Publish and subscribing are already working.
* It's a bit slow, specially with multiple subscribers.

themage
Marco Neves 10 years ago
parent
commit
ce84a7c0c7
6 changed files with 663 additions and 79 deletions
  1. 71 0
      bin/avidmq-cons.pl
  2. 73 0
      bin/avidmq-prod.pl
  3. 4 0
      etc/avidmq-cons.conf
  4. 4 0
      etc/avidmq-prod.conf
  5. 172 0
      lib/AvidMQ/Client.pm
  6. 339 79
      lib/AvidMQ/Server.pm

+ 71 - 0
bin/avidmq-cons.pl

@ -0,0 +1,71 @@
1
#!/usr/bin/perl -w
2
3
use strict;
4
use warnings;
5
6
use FindBin;
7
use lib $FindBin::Bin."/../lib";
8
9
use AvidMQ::Client;
10
use Getopt::Long;
11
12
my $config=$FindBin::Bin."/../etc/avidmq-cons.conf";
13
my $queue="/numbers/random/";
14
my $node=qx{hostname};
15
16
GetOptions(
17
		"configfile=s"	=> \$config,
18
	);
19
20
our $statstime=time()+10;
21
our $inittime=time();
22
our %count=();
23
our %avg=();
24
our %tot=();
25
our $msgcnt=0;
26
27
AvidMQ::Client->new(
28
		configfile	=> $config,
29
		Publish		=> \&get_numbers,
30
		subscribe	=> [$queue],
31
	)->run();
32
33
34
sub get_numbers {
35
	my $avidmq=shift;
36
	my $message=shift;
37
	
38
	our ($statstime,$inittime,%count,%avg,%tot);
39
	my $node=$message->{node};
40
	my $number=$message->{number};
41
	
42
	$count{$node}=0 unless $count{$node};
43
	$avg{$node}=0 unless $avg{$node};
44
	$tot{$node}=0 unless $tot{$node};
45
	$msgcnt++;
46
	
47
	$avg{$node}=($avg{$node}*$count{$node}+$number)/++$count{$node};
48
	$tot{$node}+=$number;
49
	
50
	if (time() > $statstime) {
51
		my $tottime=time()-$inittime;
52
		my $avgsec=$msgcnt/$tottime;
53
		
54
		
55
		print STDERR <<EOS;
56
Runned for: $tottime [since $inittime]
57
Messages Recieved: $msgcnt [$avgsec per sec]
58
59
EOS
60
		for my $node (keys %count) {
61
			print STDERR <<EOS;
62
Producer Node: $node
63
	Recieved Numbers: $count{$node}
64
	Avg number: $avg{$node}
65
	Sum of all numbers: $tot{$node}
66
EOS
67
		}
68
		print STDERR "\n\n";
69
		$statstime=time()+10;
70
	}
71
}

+ 73 - 0
bin/avidmq-prod.pl

@ -0,0 +1,73 @@
1
#!/usr/bin/perl -w
2
3
use strict;
4
use warnings;
5
6
use FindBin;
7
use lib $FindBin::Bin."/../lib";
8
9
use AvidMQ::Client;
10
use Getopt::Long;
11
12
my $config=$FindBin::Bin."/../etc/avidmq-prod.conf";
13
my $queue="/numbers/random/";
14
my $node=qx{hostname};
15
chomp $node;
16
17
GetOptions(
18
		"configfile=s"	=> \$config,
19
	);
20
21
our $statstime=time()+10;
22
our $inittime=time();
23
our $count=0;
24
our $avg=0;
25
our $tot=0;
26
27
AvidMQ::Client->new(
28
		configfile	=> $config,
29
		Idle		=> \&generate_random,
30
	)->run();
31
32
33
sub generate_random {
34
	my $avidmq=shift;
35
	my $number=rand();
36
	
37
	our ($statstime,$inittime,$count,$avg,$tot);
38
	
39
	$avg=($avg*$count+$number)/++$count;
40
	$tot+=$number;
41
	
42
	if (time() > $statstime) {
43
		my $tottime=time()-$inittime;
44
		my $avgsec=$count/$tottime;
45
		
46
		print STDERR <<EOS;
47
Runned for: $tottime [since $inittime]
48
Generated Numbers: $count [$avgsec por sec]
49
Avg number: $avg
50
Sum of all numbers: $tot
51
52
EOS
53
		$statstime=time()+10;
54
	}
55
	
56
	$avidmq->publish($queue,{node=>$node, number=>$number});
57
	
58
	if ($count>=100000) {
59
		my $tottime=time()-$inittime;
60
		my $avgsec=$count/$tottime;
61
		
62
		print STDERR <<EOS;
63
Runned for: $tottime [since $inittime]
64
Generated Numbers: $count [$avgsec por sec]
65
Avg number: $avg
66
Sum of all numbers: $tot
67
68
EOS
69
		$statstime=time()+10;
70
		
71
		exit 0;
72
	}
73
}

+ 4 - 0
etc/avidmq-cons.conf

@ -0,0 +1,4 @@
1
; This will be the configuration for a AvidMQ Client
2
3
[avidmq]
4
router=127.0.0.1:6666

+ 4 - 0
etc/avidmq-prod.conf

@ -0,0 +1,4 @@
1
; This will be the configuration for a AvidMQ Client
2
3
[avidmq]
4
router=127.0.0.1:6666

+ 172 - 0
lib/AvidMQ/Client.pm

@ -0,0 +1,172 @@
1
package AvidMQ::Client;
2
3
use strict;
4
use warnings;
5
6
use Carp qw(confess carp);
7
use Config::Tiny;
8
9
use POE qw(Component::Client::TCP Filter::Reference);
10
11
use Smart::Comments '####';
12
13
sub new {
14
	my $class=shift;
15
	my $self=bless {}, $class;
16
17
	$self->__init(@_);
18
19
	return $self;
20
}
21
22
sub __init {
23
	my $self=shift;
24
	my %args=@_;
25
26
	confess "Initing without configfile" unless $args{configfile};
27
	confess "config file not found: $args{configfile}"
28
		unless -f $args{configfile};
29
30
	my $config=Config::Tiny->read($args{configfile});
31
32
	$self->{config}=$config;
33
		
34
	$self->{_msgid}=0;
35
	
36
	$self->{PublishHandler}=$args{Publish}||sub {};
37
	$self->{OkHandler}=$args{Ok}||sub {};
38
	$self->{ErrorHandler}=$args{Error}||sub {};
39
	$self->{IdleHandler}=$args{Idle}||0;
40
	
41
	$self->{subscribe}=ref $args{subscribe}
42
			? $args{subscribe}
43
			: [$args{subscribe}];
44
}
45
46
sub run {
47
	my $self=shift;
48
49
	$self->__prepare_client();
50
51
	$poe_kernel->run();
52
}
53
54
sub publish {
55
	my $self=shift;
56
	my $queue=shift;
57
	my $message=shift;
58
	
59
	my $msg={
60
		msgid=>++$self->{_msgid},
61
		type=>'publish',
62
		queue=>$queue,
63
		message	=> $message,
64
	};
65
	
66
	$self->{avidmq}->{server}->put($msg);
67
}
68
69
sub subscribe {
70
	my $self=shift;
71
	my $queue=shift;
72
	
73
	my $msg={msgid=>++$self->{_msgid}, type=>'subscribe',queue=>$queue};
74
	$self->{avidmq}->{server}->put($msg);	
75
}
76
77
sub __prepare_client {
78
	my $self=shift;
79
	
80
	my $router=$self->{config}->{avidmq}->{router};
81
82
	my ($host,$port)=split /\:/, $router;
83
	$port=6666 unless $port;
84
	
85
	unless ($host and $port) {
86
		die "Invalid Core address '$router'\n";
87
	}
88
89
	#### Trying for master: $host."->".$port
90
	POE::Component::Client::TCP->new(
91
		RemoteAddress 	=> $host,
92
		RemotePort		=> $port,
93
		Filter			=> 'POE::Filter::Reference',
94
		Connected		=> sub { $self->__init_client(@_) },
95
		ConnectError	=> sub { $self->__init_client_fail(@_) },
96
		ServerInput		=> sub { $self->__tcp_router_input(@_) },
97
		Disconnected	=> sub { $self->__tcp_router_disconnect(@_) },
98
	);
99
	
100
	if ($self->{IdleHandler}) {
101
		POE::Session->create(
102
			inline_states	=> {
103
				_start	=> sub { 
104
					$_[KERNEL]->alias_set('IdleSession');
105
				},
106
				idleaction	=> sub { $self->__idle_action(@_) },
107
			}
108
		);
109
	}
110
}
111
112
sub __init_client {
113
	my $self=shift;
114
	$self->{avidmq}=$_[HEAP];
115
	
116
	if ($self->{IdleHandler}) {
117
		$_[KERNEL]->post(IdleSession=>'idleaction');
118
	}
119
	
120
	if ($self->{subscribe}) {
121
		for my $queue (@{$self->{subscribe}}) {
122
			$self->subscribe($queue);
123
		}
124
	}
125
}
126
127
sub __init_client_fail {
128
	my $self=shift;
129
	my ($operation, $error_number, $error_string) = @_[ARG0..ARG2];
130
	warn "$operation error $error_number occurred: $error_string";
131
132
	die "Could not connect to $self->{config}->{avidmq}->{router}\n";
133
}
134
135
my %inputhandlers=(
136
		publish	=> 'PublishHandler',
137
		error	=> 'ErrorHandler',
138
		ok		=> 'OkHandler',
139
	);
140
141
sub __tcp_router_input {
142
	my $self=shift;
143
	my $input=$_[ARG0];
144
	
145
	return unless $input and ref $input and $input->{type};
146
	
147
	if ($inputhandlers{$input->{type}}
148
			and ref $self->{$inputhandlers{$input->{type}}} eq 'CODE') {
149
		my $handler=$self->{$inputhandlers{$input->{type}}};
150
		$handler->($self, $input->{message}, $input);
151
	}
152
}
153
154
sub __tcp_router_disconnect {
155
	my $self=shift;
156
	
157
	die "Disconnected from the AvidMQ Router";
158
}
159
160
sub __idle_action {
161
	my $self=shift;
162
	my $kernel=$_[KERNEL];
163
	
164
	if (ref $self->{IdleHandler} eq 'CODE') {
165
		my $handler=$self->{IdleHandler};
166
		$handler->($self);
167
	}
168
	
169
	$kernel->delay('idleaction'=>0.001);
170
}
171
172
1;

+ 339 - 79
lib/AvidMQ/Server.pm

@ -27,12 +27,18 @@ sub init {
27 27
	my %args=@_;
28 28
29 29
	confess "Initing without configfile" unless $args{configfile};
30
	confess "onfig file not found: $args{configfile}"
30
	confess "config file not found: $args{configfile}"
31 31
		unless -f $args{configfile};
32 32
33 33
	my $config=Config::Tiny->read($args{configfile});
34 34
35
	$self->{config}=$config;	
35
	$self->{config}=$config;
36
		
37
	$self->{_msgid}=0;
38
	$self->{_reply_queue}={};
39
	$self->{_broad_queue}=[];
40
	$self->{_broad_messages}={};
41
	$self->{_subscribers}={};
36 42
}
37 43
38 44
sub run {
@ -53,17 +59,21 @@ sub start_session {
53 59
				$self->start_avidmqd(@_);
54 60
			},
55 61
			start_tcp_daemon	=> sub { $self->start_tcp_daemon(@_); },
56
62
			broadcast			=> sub { $self->broadcast(@_); },
57 63
		},
58 64
	);
59 65
}
60 66
67
###################
68
# start_avidmqd
69
###################
70
# Starts the daemon and connects to the avidmq network.
61 71
sub start_avidmqd {
62 72
	my $self=shift;
63 73
64 74
	$self->start_store();
65 75
66
	my $mode=$self->{config}->{avidmq}->{mode};
76
	my $mode=$self->{_mode}=$self->{config}->{avidmq}->{mode};
67 77
68 78
	if ($mode eq 'core') {
69 79
		$self->start_core_daemon(@_);
@ -110,6 +120,7 @@ sub init_slave {
110 120
	POE::Component::Client::TCP->new(
111 121
		RemoteAddress 	=> $host,
112 122
		RemotePort		=> $port,
123
		Filter			=> 'POE::Filter::Reference',
113 124
		Connected		=> sub { $self->init_slave_succeed(@_) },
114 125
		ConnectError	=> sub { $self->init_slave_fail(@_) },
115 126
		ServerInput		=> sub { $self->tcp_master_input(@_) },
@ -121,38 +132,54 @@ sub init_slave_succeed {
121 132
	my $self=shift;
122 133
123 134
	$self->{_core_master} = 0;
124
	$_[HEAP]->{AvidMQ}->{state}='new';
125
126
	# This will be moved to post core_sync
127
	$_[KERNEL]->post(AvidMQ => 'start_tcp_daemon');
135
	$_[HEAP]->{AvidMQ}->{master}=1;
136
	# It's the core master that must start the talk.
128 137
}
129 138
130 139
sub init_slave_fail {
131 140
	my $self=shift;
132 141
133
	#### This slave failed. Let's try the next one.
142
	#### This master failed. Let's try the next one.
134 143
	$self->init_slave(@_);
135 144
}
136 145
146
my %msg_handle=(
147
		hello		=> \&msg_hello,
148
		config		=> \&msg_config,
149
		configok	=> \&msg_configok,
150
		publish		=> \&msg_publish,
151
		subscribe	=> \&msg_subscribe,
152
	);
153
137 154
sub tcp_master_input {
138 155
	my $self=shift;
139 156
	my ($kernel,$heap,$input)=@_[KERNEL,HEAP,ARG0];
140 157
141
	if ($heap->{AvidMQ}->{state} eq 'new') {
142
		if ($input=~/HELLO AvidMQ/) {
143
			my $config=$self->_config_xml();
144
			$heap->{server}->put($config);
145
			$heap->{server}->set_filter(POE::Filter::Reference->new());
146
			$heap->{AvidMQ}->{state} = 'set';
147
		} else {
148
			# The other side is not a AvidMQ server
149
			$kernel->yield("shutdown");
150
			$self->init_slave(@_);
151
		}
152
	} elsif (ref $input and $input->{type}) {
153
		#### At tcp_master_input: $input
158
	unless (ref $input) {
159
		#### Got input on tcp_master_input: $input
160
		return;
161
	}
162
	unless ($input->{type}) {
163
		#### got input without type on tcp_master_input\n: $input	
164
		return;
165
	}
166
	
167
	if ($input->{type}=~/ok|error/ and 
168
			my $omsg=$self->{_reply_queue}->{$input->{msgid}}) {
169
			
170
		delete $self->{_reply_queue}->{$input->{msgid}};
171
		
172
		$input->{msgid}=$omsg->{original_id};
173
		$self->_reply(undef, $input, $omsg->{from});		
174
		##### forwarded reply to client: $input
175
		return;
176
	}
177
	
178
	##### At tcp_master_input: $input
179
	if ($msg_handle{$input->{type}}) {
180
		$msg_handle{$input->{type}}->($self, @_);
154 181
	} else {
155
		#### Unexpected input: $input
182
		#### Unknow msgtype: $input->{type}
156 183
	}
157 184
}
158 185
@ -174,6 +201,109 @@ sub init_master {
174 201
}
175 202
176 203
####################################################################
204
# Message Handlers
205
####################################################################
206
207
###################
208
# msg_hello
209
###################
210
sub msg_hello {
211
	my $self=shift;
212
	my ($kernel,$heap,$input)=@_[KERNEL,HEAP,ARG0];
213
	
214
	$self->_reply(undef, {type=>'config', config=>{}}, $heap);
215
}
216
217
###################
218
# msg_config
219
###################
220
# msg_config handles the initial request from the clients, that will
221
# present them to their master.
222
sub msg_config {
223
	my $self=shift;
224
	my ($kernel,$heap,$input)=@_[KERNEL,HEAP,ARG0];
225
	
226
	# At this time will not verify much. Later will take care of this
227
	# If needed. Just say ok, for now.
228
	$self->_reply($input,{type=>'configok'},$heap);
229
	
230
}
231
232
###################
233
# configok
234
###################
235
# msg_configok handles the message returned from their master to the
236
# config request. When this server mode is 'core' it will subscribe
237
# to the entire queue tree, so that it can reply to subscription requests
238
# directly, without request another subscription to his master.
239
sub msg_configok {
240
	my $self=shift;
241
	my ($kernel,$heap,$input)=@_[KERNEL,HEAP,ARG0];
242
	
243
	$self->{master}=$heap;
244
	#### Master: $self->{master}
245
	#### Heap: $heap
246
247
	if ($self->{_mode} eq 'core') {
248
		#### Requesting to subscribe '/' as core
249
		$self->_send_to({type=>'subscribe', queue=>'/'}, $heap);
250
	} else {
251
		#### At msg_configok[!core]: $input
252
		#### Need to find out the last subscriptions and get them again
253
	}
254
	
255
	# Now, that we connected to a master, let's start the tcp daemon
256
	$_[KERNEL]->post(AvidMQ => 'start_tcp_daemon');
257
}
258
259
sub msg_subscribe {
260
	my $self=shift;
261
	my ($kernel,$heap,$input)=@_[KERNEL,HEAP,ARG0];
262
263
	my $queue=$input->{queue};
264
	
265
	unless ($queue) {
266
		#### Missing queue: $queue
267
		return $self->_reply($input,
268
			$self->_error_msg('Missing Queue'),$heap);
269
	}
270
	unless ($queue=~m{^/(\w+/)*(\w+)?$}smx) {
271
		#### Invalid Queue: $queue
272
		return $self->_reply($input,
273
			$self->_error_msg('Invalid Queue'),$heap);
274
	}
275
	##### Eventually need to implement here the permission verification
276
277
	#### Subscribing to: $queue 
278
	$self->{_subscribers}->{$queue}->{$heap->{client}->ID}=$heap;
279
	push @{$heap->{AvidMQ}->{subscribes}}, $queue;
280
281
	$self->_send_to($self->_ok_msg(),$heap);
282
}
283
284
###################
285
# msg_publish
286
###################
287
# msg_publish handles the broadcast of publish events
288
# when the publish come from the master it broadcast them to
289
# every subscriber. If they come from a client (slave/client)
290
# forward them to the master. If this is the core_master, verify
291
# the permissions and broadcast it if the producer is valid.
292
sub msg_publish {
293
	my $self=shift;
294
	my ($kernel,$heap,$input)=@_[KERNEL,HEAP,ARG0];
295
	
296
	if ($self->{_core_master}) {
297
		$self->verify_publish(@_);
298
	} elsif ($heap->{AvidMQ}->{master}) {
299
		$self->prepare_broadcast(@_);
300
	} else {
301
		##### Master: $self->{master}
302
		$self->_send_to($input,$self->{master},$heap)
303
	}
304
}
305
306
####################################################################
177 307
# Store methods
178 308
####################################################################
179 309
@ -203,15 +333,23 @@ sub start_tcp_daemon {
203 333
			Port	=> $avidmqcfg->{port},
204 334
			ClientConnected => sub { $self->tcp_client_connect(@_); },
205 335
			ClientDisconnected	=> sub { $self->tcp_client_disconnected(@_) },
206
			ClientInput		=> sub { $self->tcp_client_input(@_); },
336
			ClientInput			=> sub { $self->tcp_client_input(@_); },
337
			ClientFilter		=> 'POE::Filter::Reference',
338
			Concurrency			=> 10,
207 339
		);
208 340
}
209 341
210 342
sub tcp_client_connect {
211 343
	my $self=shift;
212 344
	#### At tcp_client_connect
213
	$_[HEAP]{client}->put("HELLO AvidMQ v".AvidMQProtVersion());
214
	$_[HEAP]{AvidMQ}->{state} = 'new';
345
	$_[HEAP]{AvidMQ}->{subscribes} = [];
346
	
347
	$_[HEAP]->{AvidMQ}->{master}=0;
348
349
	$self->_send_to(
350
			{type=>'hello',version=>'AvidMQ v'.AvidMQProtVersion()},
351
			$_[HEAP],
352
		);
215 353
}
216 354
217 355
sub tcp_client_disconnected {
@ -229,79 +367,201 @@ sub tcp_client_disconnected {
229 367
sub tcp_client_input {
230 368
	my $self=shift;
231 369
	my ($kernel,$heap,$input)=@_[KERNEL,HEAP,ARG0];
370
	
371
	unless (ref $input) {
372
		#### Got input on tcp_client_input: $input
373
		return;
374
	}
375
	unless ($input->{type}) {
376
		#### got input without type on tcp_client_input: $input	
377
		return;
378
	}
379
		
380
	if ($msg_handle{$input->{type}}) {
381
		$msg_handle{$input->{type}}->($self, @_);
382
	} else {
383
		#### Unknow msgtype at tcp_client_input: $input->{type}
384
	}
385
}
386
387
####################################################################
388
# Publish and BroadCast
389
####################################################################
232 390
233
	if ($heap->{AvidMQ}->{state} eq 'new') {
234
		return unless $input;
235
		$heap->{AvidMQ}->{cfg}.=$input;
236
		if ($heap->{AvidMQ}->{cfg}=~m{<config>.*</config>}msx) {
237
			if ($self->_config_client($heap)) {
238
				$heap->{client}->put({type => 'configok'});
239
				$heap->{AvidMQ}->{state} = 'set';
240
			} else {
241
				$heap->{client}->put('ERROR CREATING THE FILTER');
242
				$kernel->yield('shutdown');
243
			}
391
###################
392
# verify_publish
393
###################
394
# verify_publish will verify if the source can publish, reply to
395
# the source and call prepare_broadcast to start the broadcast
396
# process.
397
sub verify_publish {
398
	my $self=shift;
399
	my ($kernel,$heap,$input)=@_[KERNEL,HEAP,ARG0];
400
	
401
	my $queue=$input->{queue};
402
	
403
	unless ($queue) {
404
		#### Missing queue: $queue
405
		return $self->_reply($input,
406
			$self->_error_msg('Missing Queue'),$heap);
407
	}
408
	unless ($queue=~m{^/(\w+/)*(\w+)?$}smx) {
409
		#### Invalid Queue: $queue
410
		return $self->_reply($input,
411
			$self->_error_msg('Invalid Queue'),$heap);
412
	}
413
	##### Eventually need to implement here the permission verification
414
		
415
	$self->_reply($input, $self->_ok_msg, $heap);
416
	
417
	$self->prepare_broadcast(@_);
418
}
419
420
###################
421
# prepare_broadcast
422
###################
423
# prepare_broadcast add a message to the queue of messages to be
424
# broadcasted.
425
sub prepare_broadcast {
426
	my $self=shift;
427
	my ($kernel,$input)=@_[KERNEL,ARG0];
428
	
429
	my $queue=$input->{queue};
430
	$self->{_subscribers}->{$queue}={}
431
		unless $self->{_subscribers}->{$queue};
432
	my %subs=%{$self->{_subscribers}->{$queue}};
433
	
434
	while ($queue and $queue ne '/') {
435
		$queue=~s{[^/]*/?$}{};
436
		$self->{_subscribers}->{$queue}={}
437
			unless $self->{_subscribers}->{$queue};
438
		%subs=(%{$self->{_subscribers}->{$queue}}, %subs);
439
	}
440
	
441
	if (%subs) {
442
		my @subs=values %subs;
443
		my $id=++$self->{_msgid};
444
		
445
		push @{$self->{_broad_queue}}, $id;
446
		$self->{_broad_messages}->{$id}={
447
				message=>$input,
448
				subscribers=>\@subs
449
			};
450
		unless ($self->{_broadcasting}) {
451
			$self->{_broadcasting}=1;
452
			$kernel->post(AvidMQ => 'broadcast');
244 453
		}
245 454
	} else {
246
		#### At tcp_input: $input
455
		##### No subscribers to the queue: $input->{queue}
247 456
	}
248 457
}
249 458
459
###################
460
# broadcast
461
###################
462
# broadcast is the method than sends each message to all the subscribers
463
# that must get it.
464
sub broadcast {
465
	my $self=shift;
466
	my ($kernel,$heap)=@_[KERNEL,HEAP];
467
	
468
	my $broadid=$self->{_broad_queue}->[0];
469
	
470
	if ($broadid) {
471
		my $bmsg=$self->{_broad_messages}->{$broadid};
472
		if ($bmsg->{subscribers} and @{$bmsg->{subscribers}}) {
473
			my $sub=shift @{$bmsg->{subscribers}};
474
			$self->_send_to($bmsg->{message}, $sub);
475
		}
476
		unless ($bmsg->{subscribers} and @{$bmsg->{subscribers}}) {
477
			shift @{$self->{_broad_queue}}; #trash it
478
			delete $self->{_broad_messages}->{$broadid};
479
		}
480
	}
481
	if (@{$self->{_broad_queue}}) {
482
		$kernel->yield('broadcast');
483
	} else {
484
		$self->{_broadcasting}=0;
485
	}
486
}
250 487
251 488
####################################################################
252 489
# Helper Methods
253 490
####################################################################
254 491
255 492
###################
256
# _config_xml
493
# _send_to
257 494
###################
258
# _config_xml creates the Config XML to send to the upserver
259
# At this time, the filter to use is always Reference.
260
sub _config_xml {
495
# _send_to recieves a message and an $heap and send the message to
496
# the connected wheel (that can be a TCP::Client or a TCP::Server).
497
sub _send_to {
261 498
	my $self=shift;
262
	my $mode=$self->{config}->{avidmq}->{mode};
263
	my $ts=time;
264
	return <<EOX
265
<config>
266
	<mode>$mode</mode>
267
	<time>$ts</time>
268
	<filter>Reference</filter>
269
</config>
270
EOX
499
	my $msg=shift;
500
	my $remote=shift;
501
	my $from=shift;
502
	
503
	my $id=++$self->{_msgid};
504
	
505
	if ($msg->{msgid} and $from) {
506
		$self->{_reply_queue}->{$id}={
507
			original_id => $msg->{msgid},
508
			source		=> $from
509
		};
510
	}
511
	$msg->{msgid}=$id;
512
	
513
	if ($remote->{server}) {
514
		$remote->{server}->put($msg);
515
	} elsif ($remote->{client}) {
516
		$remote->{client}->put($msg);
517
	} else {
518
		#### Don't know how to send to: ref $remote
519
		confess "Don't know how to send to $remote";
520
		
521
	}
271 522
}
272 523
273 524
###################
274
# _config_client
525
# _reply
275 526
###################
276
# _config_client sets the configuration for the client based on the
277
# config.xml on the heap.
278
sub _config_client {
527
# _reply is similar to _send_to, but use the msgid from the original
528
# message. If needs an adicional argument, the original message
529
sub _reply {
279 530
	my $self=shift;
280
	my $heap=shift;
281
282
	my $cfg=XMLin($heap->{AvidMQ}->{cfg});
283
	$heap->{AvidMQ}->{config}=$cfg;
284
	my $filter=$cfg->{filter};
285
	$filter=~s/\W//g;
286
	my $code="use POE::Filter::$filter; return POE::Filter::$filter->new();";
287
288
	$filter = eval $code;
289
	if ($@) {
290
		warn "error creating filter: $@";
291
		return 0;
292
	} elsif ($filter) {
293
		$heap->{client}->set_filter($filter);
294
295
		if ($cfg->{mode} eq 'core') {
296
			$self->{_subscribers}->{'/'}->{$heap->{client}->ID}=$heap;
297
			$heap->{AvidMQ}->{subscribes}=['/'];
298
299
			#### subscribers: $self->{_subscribers}
300
		}
531
	my $omsg=shift;
532
	my $msg=shift;
533
	my $remote=shift;
534
	
535
	$msg->{msgid}=$omsg->{msgid} if $omsg;
536
	
537
	if ($remote->{server}) {
538
		$remote->{server}->put($msg);
539
	} elsif ($remote->{client}) {
540
		$remote->{client}->put($msg);
301 541
	} else {
302
		warn "no error, but no filter either";
303
		return 0;
542
		#### Don't know how to reply to: ref $remote
304 543
	}
305 544
}
306 545
307
1;
546
547
###################
548
# _error_msg
549
###################
550
# _error_msg create the error message to send back as response
551
sub _error_msg {
552
	my $self=shift;
553
	my $msg=shift;
554
555
	return { type => 'error', message=> $msg };
556
}
557
558
###################
559
# _ok_msg
560
###################
561
# _ok_msg create the default ok message to send back as response
562
# to be used on requests with no data to be returned
563
sub _ok_msg {
564
	return { type => 'ok' };
565
}
566
567
1;