Mercurial > hg > nginx-tests
comparison stream_upstream_least_conn.t @ 557:05cbe9e2def8
Tests: basic stream tests for upstream least_conn.
author | Sergey Kandaurov <pluknet@nginx.com> |
---|---|
date | Thu, 23 Apr 2015 14:01:13 +0300 |
parents | |
children | ff49e1c00b35 |
comparison
equal
deleted
inserted
replaced
556:97d89d9ab4ed | 557:05cbe9e2def8 |
---|---|
1 #!/usr/bin/perl | |
2 | |
3 # (C) Sergey Kandaurov | |
4 # (C) Nginx, Inc. | |
5 | |
6 # Stream tests for upstream least_conn balancer module. | |
7 | |
8 ############################################################################### | |
9 | |
10 use warnings; | |
11 use strict; | |
12 | |
13 use Test::More; | |
14 | |
15 use IO::Select; | |
16 | |
17 BEGIN { use FindBin; chdir($FindBin::Bin); } | |
18 | |
19 use lib 'lib'; | |
20 use Test::Nginx; | |
21 | |
22 ############################################################################### | |
23 | |
24 select STDERR; $| = 1; | |
25 select STDOUT; $| = 1; | |
26 | |
27 my $t = Test::Nginx->new()->has(qw/stream stream_upstream_least_conn/)->plan(2) | |
28 ->write_file_expand('nginx.conf', <<'EOF'); | |
29 | |
30 %%TEST_GLOBALS%% | |
31 | |
32 daemon off; | |
33 | |
34 events { | |
35 } | |
36 | |
37 stream { | |
38 upstream u { | |
39 least_conn; | |
40 server 127.0.0.1:8081; | |
41 server 127.0.0.1:8082; | |
42 } | |
43 | |
44 server { | |
45 listen 127.0.0.1:8080; | |
46 proxy_pass u; | |
47 } | |
48 } | |
49 | |
50 EOF | |
51 | |
52 $t->run_daemon(\&stream_daemon, 8081); | |
53 $t->run_daemon(\&stream_daemon, 8082); | |
54 $t->run(); | |
55 | |
56 $t->waitforsocket('127.0.0.1:8081'); | |
57 $t->waitforsocket('127.0.0.1:8082'); | |
58 | |
59 ############################################################################### | |
60 | |
61 is(many('.', 10), '8081: 5, 8082: 5', 'balanced'); | |
62 is(parallel('w', 10), '8081: 1, 8082: 9', 'least_conn'); | |
63 | |
64 ############################################################################### | |
65 | |
66 sub many { | |
67 my ($data, $count, %opts) = @_; | |
68 my (%ports, $peer); | |
69 | |
70 $peer = $opts{peer}; | |
71 | |
72 for (1 .. $count) { | |
73 if (stream_get($data, $peer) =~ /(\d+)/) { | |
74 $ports{$1} = 0 unless defined $ports{$1}; | |
75 $ports{$1}++; | |
76 } | |
77 } | |
78 | |
79 return join ', ', map { $_ . ": " . $ports{$_} } sort keys %ports; | |
80 } | |
81 | |
82 sub parallel { | |
83 my ($data, $count, %opts) = @_; | |
84 my (@sockets, %ports, $peer); | |
85 | |
86 $peer = $opts{peer} || undef; | |
87 | |
88 for (1 .. $count) { | |
89 my $s = stream_connect($peer); | |
90 push @sockets, $s; | |
91 stream_write($s, $data); | |
92 select undef, undef, undef, 0.2; | |
93 } | |
94 | |
95 for (1 .. $count) { | |
96 my $s = pop @sockets; | |
97 if (stream_read($s) =~ /(\d+)/) { | |
98 $ports{$1} = 0 unless defined $ports{$1}; | |
99 $ports{$1}++; | |
100 } | |
101 close $s; | |
102 } | |
103 | |
104 return join ', ', map { $_ . ": " . $ports{$_} } sort keys %ports; | |
105 } | |
106 | |
107 sub stream_get { | |
108 my ($data, $peer) = @_; | |
109 | |
110 my $s = stream_connect($peer); | |
111 stream_write($s, $data); | |
112 my $r = stream_read($s); | |
113 | |
114 $s->close; | |
115 return $r; | |
116 } | |
117 | |
118 sub stream_connect { | |
119 my $peer = shift; | |
120 my $s = IO::Socket::INET->new( | |
121 Proto => 'tcp', | |
122 PeerAddr => $peer || '127.0.0.1:8080' | |
123 ) | |
124 or die "Can't connect to nginx: $!\n"; | |
125 | |
126 return $s; | |
127 } | |
128 | |
129 sub stream_write { | |
130 my ($s, $message) = @_; | |
131 | |
132 local $SIG{PIPE} = 'IGNORE'; | |
133 | |
134 $s->blocking(0); | |
135 while (IO::Select->new($s)->can_write(1.5)) { | |
136 my $n = $s->syswrite($message); | |
137 last unless $n; | |
138 $message = substr($message, $n); | |
139 last unless length $message; | |
140 } | |
141 | |
142 if (length $message) { | |
143 $s->close(); | |
144 } | |
145 } | |
146 | |
147 sub stream_read { | |
148 my ($s) = @_; | |
149 my ($buf); | |
150 | |
151 $s->blocking(0); | |
152 if (IO::Select->new($s)->can_read(3)) { | |
153 $s->sysread($buf, 1024); | |
154 }; | |
155 | |
156 log_in($buf); | |
157 return $buf; | |
158 } | |
159 | |
160 ############################################################################### | |
161 | |
162 sub stream_daemon { | |
163 my ($port) = @_; | |
164 | |
165 my $server = IO::Socket::INET->new( | |
166 Proto => 'tcp', | |
167 LocalAddr => '127.0.0.1', | |
168 LocalPort => $port, | |
169 Listen => 5, | |
170 Reuse => 1 | |
171 ) | |
172 or die "Can't create listening socket: $!\n"; | |
173 | |
174 my $sel = IO::Select->new($server); | |
175 | |
176 local $SIG{PIPE} = 'IGNORE'; | |
177 | |
178 while (my @ready = $sel->can_read) { | |
179 foreach my $fh (@ready) { | |
180 if ($server == $fh) { | |
181 my $new = $fh->accept; | |
182 $new->autoflush(1); | |
183 $sel->add($new); | |
184 | |
185 } elsif (stream_handle_client($fh)) { | |
186 $sel->remove($fh); | |
187 $fh->close; | |
188 } | |
189 } | |
190 } | |
191 } | |
192 | |
193 sub stream_handle_client { | |
194 my ($client) = @_; | |
195 | |
196 log2c("(new connection $client)"); | |
197 | |
198 $client->sysread(my $buffer, 65536) or return 1; | |
199 | |
200 log2i("$client $buffer"); | |
201 | |
202 my $port = $client->sockport(); | |
203 | |
204 if ($buffer =~ /w/ && $port == 8081) { | |
205 Test::Nginx::log_core('||', "$port: sleep(2.5)"); | |
206 select undef, undef, undef, 2.5; | |
207 } | |
208 | |
209 $buffer = $port; | |
210 | |
211 log2o("$client $buffer"); | |
212 | |
213 $client->syswrite($buffer); | |
214 | |
215 return 1; | |
216 } | |
217 | |
218 sub log2i { Test::Nginx::log_core('|| <<', @_); } | |
219 sub log2o { Test::Nginx::log_core('|| >>', @_); } | |
220 sub log2c { Test::Nginx::log_core('||', @_); } | |
221 | |
222 ############################################################################### |