Browse Source

Added support for stateless REST scanners
Bumped version to 0.030

Bosko Devetak 4 years ago
parent
commit
71047c47f1
4 changed files with 332 additions and 1 deletions
  1. 7 0
      Changes
  2. 1 1
      lib/HBase/JSONRest.pm
  3. 323 0
      lib/HBase/JSONRest/Scanner.pm
  4. 1 0
      t/uri_gen.t

+ 7 - 0
Changes

@ -51,3 +51,10 @@ Revision history for HBase-JSONRest
51 51
        [BUG FIXES]
52 52
        - Bug fix for missing uri_escape of a key in _build_multiget_uri.
53 53
54
0.030   Date/time: 2015-06-15
55
56
        [ENHANCEMENTS]
57
        - Added support for stateless REST scanners
58
59
60

+ 1 - 1
lib/HBase/JSONRest.pm

@ -11,7 +11,7 @@ use JSON::XS qw(decode_json encode_json);
11 11
use Time::HiRes qw(gettimeofday time);
12 12
use Data::Dumper;
13 13
14
our $VERSION = "0.021";
14
our $VERSION = "0.030";
15 15
16 16
my %INFO_ROUTES = (
17 17
    version => '/version',

+ 323 - 0
lib/HBase/JSONRest/Scanner.pm

@ -0,0 +1,323 @@
1
package HBase::JSONRest::Scanner;
2
3
use strict;
4
use warnings;
5
6
use URI::Escape;
7
use Time::HiRes qw(time);
8
9
# new
10
sub new {
11
    my $class  = shift;
12
    my $params = shift;
13
14
    die "HBase handle required!"
15
        unless ($params->{hbase} and (ref $params->{hbase}));
16
17
    my $hbase = $params->{hbase};
18
19
    my $first_key = $params->{start_key};
20
21
    my $limit     = $params->{atatime} || 50;
22
    my $batchsize = $params->{_batchsize} || 50;
23
24
    my $self = {
25
        hbase        => $hbase,
26
27
        table      => $params->{table},
28
        prefix     => $params->{prefix},
29
30
        first_key  => $params->{first_key},
31
        last_key_from_previous_batch => undef,
32
33
        limit     => $limit,
34
        batchsize => $batchsize,
35
    };
36
37
    return bless $self, $class;
38
}
39
40
# get_next_batch
41
sub get_next_batch {
42
43
    my $self = shift;
44
45
    $self->{_last_batch_time_start} = time;
46
47
    my $table  = $self->{table};
48
    my $prefix = $self->{prefix};
49
    my $limit  = $self->{limit};
50
    my $hbase  = $self->{hbase};
51
52
    my $last_key_from_previous_batch;
53
54
    if (!$self->{first_key}) {
55
56
        my $first_row = $self->_get_first_row_of_prefix();
57
58
        return undef if (!$first_row && !$first_row->{row}); # no rows for that prefix
59
60
        $self->{first_key} = $first_row->{row};
61
62
        my $rows = $self->_scan_raw({
63
            table      => $self->{table},
64
            startrow   => $self->{first_key}, # <- inclusive
65
            limit      => $limit,
66
            batchsize  => $self->{batchsize},
67
        });
68
69
        if (!$hbase->{last_error}) {
70
            $self->{last_key_from_previous_batch} = $rows->[-1]->{row};
71
            $self->{last_batch_time} = time - $self->{_last_batch_time_start};
72
            return $rows;
73
        }
74
        else {
75
            die "Error while trying to get the first key of a prefix!" . Dumper($hbase->{last_error});
76
        }
77
    }
78
    else {
79
80
        return undef if !$self->{last_key_from_previous_batch}; # no more records
81
82
        $last_key_from_previous_batch = $self->{last_key_from_previous_batch};
83
        $self->{last_key_from_previous_batch} = undef;
84
85
        my $next_batch = $self->_scan_raw({
86
            table     => $table,
87
88
            # inclusive scan for startrow, so we add x to skip the key that
89
            # was allready returned in previous batch
90
            startrow  => $last_key_from_previous_batch . "x",
91
92
            limit     => $limit,
93
            batchsize => $self->{batchsize},
94
        });
95
96
        if (!$hbase->{last_error}) {
97
            $self->{last_key_from_previous_batch} = $next_batch->[-1]->{row};
98
            $self->{last_batch_time} = time - $self->{_last_batch_time_start};
99
            return $next_batch;
100
        }
101
        else {
102
            die "Scanner error while trying to get next batch!"
103
                . Dumper($hbase->{last_error});
104
        }
105
    }
106
}
107
108
# _get_first_row_of_prefix
109
sub _get_first_row_of_prefix {
110
    my $self = shift;
111
112
    my $prefix = $self->{prefix};
113
    my $hbase  = $self->{hbase};
114
    my $table  = $self->{table};
115
116
    my $rows = $self->_scan_raw({
117
        table     => $table,
118
        rowprefix => $prefix,
119
        limit     => 1,
120
    });
121
122
    die "Should be only one first row!"
123
        if ( scalar @$rows > 1);
124
125
    return undef unless $rows->[0];
126
127
    my $first_row = $rows->[0];
128
129
    return $first_row;
130
}
131
132
# _scan_raw (uses passed paremeters instead of instance parameters)
133
sub _scan_raw {
134
    my $self   = shift;
135
    my $params = shift;
136
137
    my $hbase = $self->{hbase};
138
    $hbase->{last_error} = undef;
139
140
    my $scan_uri = $self->_build_scan_uri($params);
141
142
    my $rows = $hbase->_get_tiny($scan_uri);
143
144
    return $rows;
145
}
146
147
sub _build_scan_uri {
148
    my $self   = shift;
149
    my $params = shift;
150
151
    #
152
    #    request parameters:
153
    #
154
    #    1. startrow - The start row for the scan.
155
    #    2. endrow - The end row for the scan.
156
    #    3. columns - The columns to scan.
157
    #    4. starttime, endtime - To only retrieve columns within a specific range of version timestamps,both start and end time must be specified.
158
    #    5. maxversions - To limit the number of versions of each column to be returned.
159
    #    6. batchsize - To limit the maximum number of values returned for each call to next().
160
    #    7. limit - The number of rows to return in the scan operation.
161
162
    my $table       = $params->{table};
163
    my $batchsize   = $params->{batchsize}   || 10;
164
    my $limit       = $params->{limit}       || 10;
165
166
    # optional
167
    my $startrow    = $params->{startrow}    || "";
168
    my $rowprefix   = $params->{rowprefix}   || "";
169
    my $endrow      = $params->{endrow}      || "";
170
    my $columns     = $params->{columns}     || "";
171
172
    my $starttime   = $params->{starttime}   || "";
173
    my $endtime     = $params->{endtime}     || "";
174
175
    my $maxversions = $params->{maxversions} || "";
176
177
    # simple version: only mandatory parameters used (and rowprefix)
178
    my $uri;
179
180
    if ($rowprefix) {
181
182
        $uri = "/"
183
             . uri_escape($table)
184
             . "/"
185
             . uri_escape($rowprefix)
186
             . '*'
187
             . "?limit="     . $limit
188
             . "&batchsize=" . $batchsize
189
        ;
190
    }
191
    elsif (!$rowprefix && $startrow) {
192
        $uri
193
            = "/"
194
            . uri_escape($table)
195
            . "/"
196
            . '*?'
197
            . "startrow="   . uri_escape($startrow)
198
            . "&limit="     . $limit
199
            . "&batchsize=" . $batchsize
200
        ;
201
    }
202
    else {
203
        die "unsupported option!";
204
    }
205
206
    return $uri;
207
}
208
209
1;
210
211
__END__
212
213
=encoding utf8
214
215
=head1 NAME
216
217
HBase::JSONRest::Scanner - Simple client for HBase stateless REST scanners
218
219
=head1 SYNOPSIS
220
221
A simple scanner:
222
223
    use HBase::JSONRest;
224
225
    my $hbase = HBase::JSONRest->new(host => 'my-rest-host');
226
227
    my $table       = 'name of table to scan';
228
    my $prefix      = 'key prefix to scan';
229
    my $batch_size  = 100; # rows per one batch
230
231
    my $scanner = HBase::JSONRest::Scanner->new({
232
        hbase   => $hbase,
233
        table   => $table,
234
        prefix  => $prefix,
235
        atatime => $batch_size,
236
    });
237
238
    my $rows;
239
    while ($rows = $scanner->get_next_batch()) {
240
        print STDERR "got "
241
            . @$rows . " rows in "
242
            . sprintf("%.3f", $scanner->{last_batch_time}) . " seconds\n\n";
243
        print STDERR "first key in batch ==> " . $rows->[0]->{row} . "\n";
244
        print STDERR "last key in batch  ==> " . $rows->[-1]->{row} . "\n";
245
    }
246
247
=head1 DESCRIPTION
248
249
Simple client for HBase stateless REST scanners.
250
251
=head1 METHODS
252
253
=head2 new
254
255
Constructor. Cretes an HBase stateless REST scanner object.
256
257
    my $scanner = HBase::JSONRest::Scanner->new({
258
        hbase   => $hbase,
259
        table   => $table,
260
        prefix  => $prefix,
261
        atatime => $batch_size,
262
    });
263
264
=head2 get_next_batch
265
266
Gets the next batch of records
267
268
    while ($rows = $scanner->get_next_batch()) {
269
        ...
270
    }
271
272
=head1 VERSION
273
274
Current version: 0.030
275
276
=head1 AUTHOR
277
278
bdevetak - Bosko Devetak (cpan:BDEVETAK) <bosko.devetak@gmail.com>
279
280
=head1 CONTRIBUTORS
281
282
theMage, C<<  <cpan:NEVES> >>, <mailto:themage@magick-source.net>
283
284
Sawyer X, C<< <xsawyerx at cpan.org> >>
285
286
Eric Herman, C<< <eherman at cpan.org> >>
287
288
Robert Nilsson, <rn@orbstation.com>
289
290
=head1 COPYRIGHT
291
292
Copyright (c) 2014 the HBase::JSONRest L</AUTHOR> and L</CONTRIBUTORS>
293
as listed above.
294
295
=head1 LICENSE
296
297
This library is free software and may be distributed under the same terms
298
as perl itself. See L<http://dev.perl.org/licenses/>.
299
300
=head1 DISCLAIMER OF WARRANTY
301
302
BECAUSE THIS SOFTWARE IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY
303
FOR THE SOFTWARE, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN
304
OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES
305
PROVIDE THE SOFTWARE "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER
306
EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
307
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE
308
ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE SOFTWARE IS WITH
309
YOU. SHOULD THE SOFTWARE PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL
310
NECESSARY SERVICING, REPAIR, OR CORRECTION.
311
312
IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING
313
WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR
314
REDISTRIBUTE THE SOFTWARE AS PERMITTED BY THE ABOVE LICENCE, BE
315
LIABLE TO YOU FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL,
316
OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE
317
THE SOFTWARE (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING
318
RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A
319
FAILURE OF THE SOFTWARE TO OPERATE WITH ANY OTHER SOFTWARE), EVEN IF
320
SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF
321
SUCH DAMAGES.
322
323
=cut

+ 1 - 0
t/uri_gen.t

@ -5,6 +5,7 @@ use warnings;
5 5
use Test::More tests => 8;
6 6
7 7
use HBase::JSONRest;
8
use HBase::JSONRest::Scanner;
8 9
9 10
# 1. simple get (no column spec, no version spec)
10 11
ok(