Browse Source

Some bases for the core funcionalities.

themage
Marco Neves 10 years ago
parent
commit
bf9edb1c81
7 changed files with 422 additions and 7 deletions
  1. 10 3
      bin/avidmq-server.pl
  2. 52 3
      database/avidmq.sql
  3. 24 0
      etc/avidmq-core.conf
  4. 26 0
      etc/avidmq-core2.conf
  5. 1 0
      etc/avidmq.conf
  6. 287 1
      lib/AvidMQ/Server.pm
  7. 22 0
      lib/AvidMQ/Store/mysql.pm

+ 10 - 3
bin/avidmq-server.pl

@ -3,11 +3,18 @@
3 3
use strict;
4 4
use warnings;
5 5
6
use FindBind;
7
use lib $FindBin::Bin."../lib";
6
use FindBin;
7
use lib $FindBin::Bin."/../lib";
8 8
9 9
use AvidMQ::Server;
10
use Getopt::Long;
10 11
11
AvidMQ::Server->new()->run();
12
my $config=$FindBin::Bin."/../etc/avidmq.conf";
13
14
GetOptions(
15
		"configfile=s"	=> \$config,
16
	);
17
18
AvidMQ::Server->new(configfile=>$config)->run();
12 19
13 20

+ 52 - 3
database/avidmq.sql

@ -1,6 +1,6 @@
1 1
-- MySQL dump 10.11
2 2
--
3
-- Host: localhost    Database: avidmq
3 4
-- ------------------------------------------------------
4 5
-- Server version	5.0.81-1
5 6
@ -49,11 +49,12 @@ DROP TABLE IF EXISTS `publish`;
49 49
/*!40101 SET character_set_client = utf8 */;
50 50
CREATE TABLE `publish` (
51 51
  `id` bigint(20) unsigned NOT NULL default '0',
52
  `queue` varchar(500) default NULL,
52
  `origid` varchar(100) default NULL,
53 53
  `published` datetime default NULL,
54 54
  `expire` datetime default NULL,
55 55
  `message` text,
56
  PRIMARY KEY  (`id`)
56
  PRIMARY KEY  (`id`),
57
  UNIQUE KEY `origid` (`origid`)
57 58
) ENGINE=MyISAM DEFAULT CHARSET=latin1;
58 59
/*!40101 SET character_set_client = @saved_cs_client */;
59 60
@ -67,6 +68,52 @@ LOCK TABLES `publish` WRITE;
67 68
UNLOCK TABLES;
68 69
69 70
--
71
-- Table structure for table `publish_queue`
72
--
73
74
DROP TABLE IF EXISTS `publish_queue`;
75
/*!40101 SET @saved_cs_client     = @@character_set_client */;
76
/*!40101 SET character_set_client = utf8 */;
77
CREATE TABLE `publish_queue` (
78
  `publish__id` bigint(20) default NULL,
79
  `queue__id` bigint(20) default NULL
80
) ENGINE=MyISAM DEFAULT CHARSET=latin1;
81
/*!40101 SET character_set_client = @saved_cs_client */;
82
83
--
84
-- Dumping data for table `publish_queue`
85
--
86
87
LOCK TABLES `publish_queue` WRITE;
88
/*!40000 ALTER TABLE `publish_queue` DISABLE KEYS */;
89
/*!40000 ALTER TABLE `publish_queue` ENABLE KEYS */;
90
UNLOCK TABLES;
91
92
--
93
-- Table structure for table `queue`
94
--
95
96
DROP TABLE IF EXISTS `queue`;
97
/*!40101 SET @saved_cs_client     = @@character_set_client */;
98
/*!40101 SET character_set_client = utf8 */;
99
CREATE TABLE `queue` (
100
  `id` bigint(20) NOT NULL default '0',
101
  `queue` varchar(500) default NULL,
102
  PRIMARY KEY  (`id`),
103
  UNIQUE KEY `queue` (`queue`)
104
) ENGINE=MyISAM DEFAULT CHARSET=latin1;
105
/*!40101 SET character_set_client = @saved_cs_client */;
106
107
--
108
-- Dumping data for table `queue`
109
--
110
111
LOCK TABLES `queue` WRITE;
112
/*!40000 ALTER TABLE `queue` DISABLE KEYS */;
113
/*!40000 ALTER TABLE `queue` ENABLE KEYS */;
114
UNLOCK TABLES;
115
116
--
70 117
-- Table structure for table `task`
71 118
--
72 119
@ -77,7 +124,7 @@ CREATE TABLE `task` (
77 124
  `id` bigint(20) unsigned NOT NULL default '0',
78 125
  `origid` varchar(60) default NULL,
79 126
  `name` varchar(60) default NULL,
80
  `queue` varchar(500) default NULL,
127
  `queue__id` bigint(20) default NULL,
81 128
  `retry` tinyint(3) unsigned default NULL,
82 129
  `nextrun` datetime default NULL,
83 130
  `lastrun` datetime default NULL,
@ -135,4 +182,4 @@ UNLOCK TABLES;
135 182
/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;
136 183
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
137 184
185
-- Dump completed on 2009-08-09 22:47:40

+ 24 - 0
etc/avidmq-core.conf

@ -1,2 +1,26 @@
1 1
; This will be the configuration for a Core AvidMQ
2 2
3
[avidmq]
4
; The paramenters to AvidMQ.
5
; Mode can be:
6
; * core (one of the master, who will store everything)
7
; * remote (one router, that will store only what is needed for their clientes)
8
mode		= core
9
port		= 6666
10
host		= 0
11
12
13
[network]
14
; This section list the core nodes, that we will try to connect to on
15
; startup to see if any of them did take over the network (and will work as
16
; his slave if one of them did. Will start as master otherwise)
17
core	= 127.0.0.1:6667
18
19
[database]
20
; This section have the database configuration
21
type		= mysql
22
dbname		= avidmq
23
username	= avidmq
24
password	= doandshare
25
26

+ 26 - 0
etc/avidmq-core2.conf

@ -0,0 +1,26 @@
1
; This will be the configuration for a Core AvidMQ
2
3
[avidmq]
4
; The paramenters to AvidMQ.
5
; Mode can be:
6
; * core (one of the master, who will store everything)
7
; * remote (one router, that will store only what is needed for their clientes)
8
mode		= core
9
port		= 6667
10
host		= 0
11
12
13
[network]
14
; This section list the core nodes, that we will try to connect to on
15
; startup to see if any of them did take over the network (and will work as
16
; his slave if one of them did. Will start as master otherwise)
17
core	= 127.0.0.1:6666
18
19
[database]
20
; This section have the database configuration
21
type		= mysql
22
dbname		= avidmq2
23
username	= avidmq
24
password	= doandshare
25
26

+ 1 - 0
etc/avidmq.conf

@ -0,0 +1 @@
1
avidmq-core.conf

+ 287 - 1
lib/AvidMQ/Server.pm

@ -1,5 +1,18 @@
1 1
package AvidMQ::Server;
2 2
3
use strict;
4
use warnings;
5
6
use Carp qw(confess carp);
7
use Config::Tiny;
8
use XML::Simple;
9
10
use POE qw(Component::Server::TCP Component::Client::TCP Filter::Reference);
11
12
use Smart::Comments '####';
13
14
sub AvidMQProtVersion { '0.0.1' }
15
3 16
sub new {
4 17
	my $class=shift;
5 18
	my $self=bless {}, $class;
@ -11,11 +24,284 @@ sub new {
11 24
12 25
sub init {
13 26
	my $self=shift;
27
	my %args=@_;
28
29
	confess "Initing without configfile" unless $args{configfile};
30
	confess "onfig file not found: $args{configfile}"
31
		unless -f $args{configfile};
14 32
33
	my $config=Config::Tiny->read($args{configfile});
34
35
	$self->{config}=$config;	
15 36
}
16 37
17 38
sub run {
18
#	this will be the run for the server.
39
	my $self=shift;
40
41
	$self->start_session();
42
43
	$poe_kernel->run();
44
}
45
46
sub start_session {
47
	my $self=shift;
48
49
	POE::Session->create(
50
		inline_states	=> {
51
			_start	=> sub { 
52
				$_[KERNEL]->alias_set('AvidMQ');
53
				$self->start_avidmqd(@_);
54
			},
55
			start_tcp_daemon	=> sub { $self->start_tcp_daemon(@_); },
56
57
		},
58
	);
59
}
60
61
sub start_avidmqd {
62
	my $self=shift;
63
64
	$self->start_store();
65
66
	my $mode=$self->{config}->{avidmq}->{mode};
67
68
	if ($mode eq 'core') {
69
		$self->start_core_daemon(@_);
70
	} elsif ($mode eq 'router') {
71
		$self->start_router_daemon(@_);
72
	} else {
73
		confess "Invalid avidmq mode";
74
	}
75
}
76
77
sub start_core_daemon {
78
	my $self=shift;
79
	my $nodes=$self->{config}->{network}->{core};
80
	if ($nodes) {
81
		my @nodes=split /\s*,\s*/, $nodes;
82
		$self->{_core_nodes}=\@nodes;
83
		$self->init_slave(@_);
84
	} else {
85
		warn "Looks that I am the only core node for this network...\n";
86
		$self->init_master(@_);
87
	}
88
}
89
90
####################################################################
91
# Core Slave Init and handlers
92
####################################################################
93
94
sub init_slave {
95
	my $self=shift;
96
	
97
	my $ncore=shift @{$self->{_core_nodes}};
98
99
	unless ($ncore) {
100
		return $self->init_master(@_);
101
	}
102
103
	my ($host,$port)=split /\:/, $ncore;
104
	unless ($host and $port) {
105
		warn "Invalid Core address '$ncore'\n";
106
		return $self->init_slave(@_);
107
	}
108
109
	#### Trying for master: $host."->".$port
110
	POE::Component::Client::TCP->new(
111
		RemoteAddress 	=> $host,
112
		RemotePort		=> $port,
113
		Connected		=> sub { $self->init_slave_succeed(@_) },
114
		ConnectError	=> sub { $self->init_slave_fail(@_) },
115
		ServerInput		=> sub { $self->tcp_master_input(@_) },
116
		Disconnected	=> sub { $self->tcp_master_disconnect(@_) },
117
	);
118
}
119
120
sub init_slave_succeed {
121
	my $self=shift;
122
123
	$self->{_core_master} = 0;
124
	$_[HEAP]->{AvidMQ}->{state}='new';
125
126
	# This will be moved to post core_sync
127
	$_[KERNEL]->post(AvidMQ => 'start_tcp_daemon');
128
}
129
130
sub init_slave_fail {
131
	my $self=shift;
132
133
	#### This slave failed. Let's try the next one.
134
	$self->init_slave(@_);
135
}
136
137
sub tcp_master_input {
138
	my $self=shift;
139
	my ($kernel,$heap,$input)=@_[KERNEL,HEAP,ARG0];
140
141
	if ($heap->{AvidMQ}->{state} eq 'new') {
142
		if ($input=~/HELLO AvidMQ/) {
143
			my $config=$self->_config_xml();
144
			$heap->{server}->put($config);
145
			$heap->{server}->set_filter(POE::Filter::Reference->new());
146
			$heap->{AvidMQ}->{state} = 'set';
147
		} else {
148
			# The other side is not a AvidMQ server
149
			$kernel->yield("shutdown");
150
			$self->init_slave(@_);
151
		}
152
	} elsif (ref $input and $input->{type}) {
153
		#### At tcp_master_input: $input
154
	} else {
155
		#### Unexpected input: $input
156
	}
157
}
158
159
sub tcp_master_disconnect {
160
	#### Missing the tcp_master_disconnect - need to implement it
161
}
162
163
####################################################################
164
# Core Master Init method 
165
####################################################################
166
167
sub init_master {
168
	my $self=shift;
169
	
170
	$self->{_core_master} = 1;
171
172
	$_[KERNEL]->post(AvidMQ => 'start_tcp_daemon');
173
174
}
175
176
####################################################################
177
# Store methods
178
####################################################################
179
180
sub start_store {
181
	my $self=shift;
182
183
	return; #Will implement this at a later time.
184
185
	my $storetype=$self->{config}->{database}->{type};
186
	my $plugin="AvidMQ::Store::".$storetype;
187
	my $code="require $plugin;";
188
	eval $code;
189
	die "Can't load the store plugin: $@\n$code\n" if $@;
190
}
191
192
####################################################################
193
# TCP Daemon
194
####################################################################
195
196
sub start_tcp_daemon {
197
	my $self=shift;
198
	my $avidmqcfg=$self->{config}->{avidmq};
199
200
	#### Going to start the TCP daemon
201
202
	POE::Component::Server::TCP->new(
203
			Port	=> $avidmqcfg->{port},
204
			ClientConnected => sub { $self->tcp_client_connect(@_); },
205
			ClientDisconnected	=> sub { $self->tcp_client_disconnected(@_) },
206
			ClientInput		=> sub { $self->tcp_client_input(@_); },
207
		);
208
}
209
210
sub tcp_client_connect {
211
	my $self=shift;
212
	#### At tcp_client_connect
213
	$_[HEAP]{client}->put("HELLO AvidMQ v".AvidMQProtVersion());
214
	$_[HEAP]{AvidMQ}->{state} = 'new';
215
}
216
217
sub tcp_client_disconnected {
218
	my $self=shift;
219
	my ($kernel,$heap)=@_[KERNEL,HEAP];
220
	#### At tcp_client_disconnect: $heap->{client}->ID
221
222
	for my $queue (@{$heap->{AvidMQ}->{subscribes}}) {
223
		#### Unsubscribing: $queue
224
		delete $self->{_subscribers}->{$queue}->{$heap->{client}->ID};
225
	}
226
227
}
228
229
sub tcp_client_input {
230
	my $self=shift;
231
	my ($kernel,$heap,$input)=@_[KERNEL,HEAP,ARG0];
232
233
	if ($heap->{AvidMQ}->{state} eq 'new') {
234
		return unless $input;
235
		$heap->{AvidMQ}->{cfg}.=$input;
236
		if ($heap->{AvidMQ}->{cfg}=~m{<config>.*</config>}msx) {
237
			if ($self->_config_client($heap)) {
238
				$heap->{client}->put({type => 'configok'});
239
				$heap->{AvidMQ}->{state} = 'set';
240
			} else {
241
				$heap->{client}->put('ERROR CREATING THE FILTER');
242
				$kernel->yield('shutdown');
243
			}
244
		}
245
	} else {
246
		#### At tcp_input: $input
247
	}
248
}
249
250
251
####################################################################
252
# Helper Methods
253
####################################################################
254
255
###################
256
# _config_xml
257
###################
258
# _config_xml creates the Config XML to send to the upserver
259
# At this time, the filter to use is always Reference.
260
sub _config_xml {
261
	my $self=shift;
262
	my $mode=$self->{config}->{avidmq}->{mode};
263
	my $ts=time;
264
	return <<EOX
265
<config>
266
	<mode>$mode</mode>
267
	<time>$ts</time>
268
	<filter>Reference</filter>
269
</config>
270
EOX
271
}
272
273
###################
274
# _config_client
275
###################
276
# _config_client sets the configuration for the client based on the
277
# config.xml on the heap.
278
sub _config_client {
279
	my $self=shift;
280
	my $heap=shift;
281
282
	my $cfg=XMLin($heap->{AvidMQ}->{cfg});
283
	$heap->{AvidMQ}->{config}=$cfg;
284
	my $filter=$cfg->{filter};
285
	$filter=~s/\W//g;
286
	my $code="use POE::Filter::$filter; return POE::Filter::$filter->new();";
287
288
	$filter = eval $code;
289
	if ($@) {
290
		warn "error creating filter: $@";
291
		return 0;
292
	} elsif ($filter) {
293
		$heap->{client}->set_filter($filter);
294
295
		if ($cfg->{mode} eq 'core') {
296
			$self->{_subscribers}->{'/'}->{$heap->{client}->ID}=$heap;
297
			$heap->{AvidMQ}->{subscribes}=['/'];
298
299
			#### subscribers: $self->{_subscribers}
300
		}
301
	} else {
302
		warn "no error, but no filter either";
303
		return 0;
304
	}
19 305
}
20 306
21 307
1;

+ 22 - 0
lib/AvidMQ/Store/mysql.pm

@ -0,0 +1,22 @@
1
package AvidMQ::Store::mysql;
2
3
use strict;
4
use warnings;
5
6
use POE qw(Component::DBIAgent); 
7
8
sub new {
9
	my $class=shift;
10
	my $self=bless {}, $class;
11
12
	$self->init(@_);
13
14
	return $self;
15
}
16
17
sub init {
18
	my $self=shift;
19
20
}
21
22
1;