Mercurial > hg > Applications > casawiki
comparison Cassandra/lib/Thrift/Socket.pm @ 0:a2f0a2c135cf
hg init
author | Shoshi TAMAKI <shoshi@cr.ie.u-ryukyu.ac.jp> |
---|---|
date | Sun, 06 Jun 2010 22:00:38 +0900 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:a2f0a2c135cf |
---|---|
1 # | |
2 # Licensed to the Apache Software Foundation (ASF) under one | |
3 # or more contributor license agreements. See the NOTICE file | |
4 # distributed with this work for additional information | |
5 # regarding copyright ownership. The ASF licenses this file | |
6 # to you under the Apache License, Version 2.0 (the | |
7 # "License"); you may not use this file except in compliance | |
8 # with the License. You may obtain a copy of the License at | |
9 # | |
10 # http://www.apache.org/licenses/LICENSE-2.0 | |
11 # | |
12 # Unless required by applicable law or agreed to in writing, | |
13 # software distributed under the License is distributed on an | |
14 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 # KIND, either express or implied. See the License for the | |
16 # specific language governing permissions and limitations | |
17 # under the License. | |
18 # | |
19 | |
20 require 5.6.0; | |
21 use strict; | |
22 use warnings; | |
23 | |
24 use Thrift; | |
25 use Thrift::Transport; | |
26 | |
27 use IO::Socket::INET; | |
28 use IO::Select; | |
29 | |
30 package Thrift::Socket; | |
31 | |
32 use base('Thrift::Transport'); | |
33 | |
34 sub new | |
35 { | |
36 my $classname = shift; | |
37 my $host = shift || "localhost"; | |
38 my $port = shift || 9090; | |
39 my $debugHandler = shift; | |
40 | |
41 my $self = { | |
42 host => $host, | |
43 port => $port, | |
44 debugHandler => $debugHandler, | |
45 debug => 0, | |
46 sendTimeout => 10000, | |
47 recvTimeout => 10000, | |
48 handle => undef, | |
49 }; | |
50 | |
51 return bless($self,$classname); | |
52 } | |
53 | |
54 | |
55 sub setSendTimeout | |
56 { | |
57 my $self = shift; | |
58 my $timeout = shift; | |
59 | |
60 $self->{sendTimeout} = $timeout; | |
61 } | |
62 | |
63 sub setRecvTimeout | |
64 { | |
65 my $self = shift; | |
66 my $timeout = shift; | |
67 | |
68 $self->{recvTimeout} = $timeout; | |
69 } | |
70 | |
71 | |
72 # | |
73 #Sets debugging output on or off | |
74 # | |
75 # @param bool $debug | |
76 # | |
77 sub setDebug | |
78 { | |
79 my $self = shift; | |
80 my $debug = shift; | |
81 | |
82 $self->{debug} = $debug; | |
83 } | |
84 | |
85 # | |
86 # Tests whether this is open | |
87 # | |
88 # @return bool true if the socket is open | |
89 # | |
90 sub isOpen | |
91 { | |
92 my $self = shift; | |
93 | |
94 if( defined $self->{handle} ){ | |
95 return ($self->{handle}->handles())[0]->connected; | |
96 } | |
97 | |
98 return 0; | |
99 } | |
100 | |
101 # | |
102 # Connects the socket. | |
103 # | |
104 sub open | |
105 { | |
106 my $self = shift; | |
107 | |
108 my $sock = IO::Socket::INET->new(PeerAddr => $self->{host}, | |
109 PeerPort => $self->{port}, | |
110 Proto => 'tcp', | |
111 Timeout => $self->{sendTimeout}/1000) | |
112 || do { | |
113 my $error = 'TSocket: Could not connect to '.$self->{host}.':'.$self->{port}.' ('.$!.')'; | |
114 | |
115 if ($self->{debug}) { | |
116 $self->{debugHandler}->($error); | |
117 } | |
118 | |
119 die new Thrift::TException($error); | |
120 | |
121 }; | |
122 | |
123 | |
124 $self->{handle} = new IO::Select( $sock ); | |
125 } | |
126 | |
127 # | |
128 # Closes the socket. | |
129 # | |
130 sub close | |
131 { | |
132 my $self = shift; | |
133 | |
134 if( defined $self->{handle} ){ | |
135 CORE::close( ($self->{handle}->handles())[0] ); | |
136 } | |
137 } | |
138 | |
139 # | |
140 # Uses stream get contents to do the reading | |
141 # | |
142 # @param int $len How many bytes | |
143 # @return string Binary data | |
144 # | |
145 sub readAll | |
146 { | |
147 my $self = shift; | |
148 my $len = shift; | |
149 | |
150 | |
151 return unless defined $self->{handle}; | |
152 | |
153 my $pre = ""; | |
154 while (1) { | |
155 | |
156 #check for timeout | |
157 my @sockets = $self->{handle}->can_read( $self->{recvTimeout} / 1000 ); | |
158 | |
159 if(@sockets == 0){ | |
160 die new Thrift::TException('TSocket: timed out reading '.$len.' bytes from '. | |
161 $self->{host}.':'.$self->{port}); | |
162 } | |
163 | |
164 my $sock = $sockets[0]; | |
165 | |
166 my ($buf,$sz); | |
167 $sock->recv($buf, $len); | |
168 | |
169 if (!defined $buf || $buf eq '') { | |
170 | |
171 die new Thrift::TException('TSocket: Could not read '.$len.' bytes from '. | |
172 $self->{host}.':'.$self->{port}); | |
173 | |
174 } elsif (($sz = length($buf)) < $len) { | |
175 | |
176 $pre .= $buf; | |
177 $len -= $sz; | |
178 | |
179 } else { | |
180 return $pre.$buf; | |
181 } | |
182 } | |
183 } | |
184 | |
185 # | |
186 # Read from the socket | |
187 # | |
188 # @param int $len How many bytes | |
189 # @return string Binary data | |
190 # | |
191 sub read | |
192 { | |
193 my $self = shift; | |
194 my $len = shift; | |
195 | |
196 return unless defined $self->{handle}; | |
197 | |
198 #check for timeout | |
199 my @sockets = $self->{handle}->can_read( $self->{recvTimeout} / 1000 ); | |
200 | |
201 if(@sockets == 0){ | |
202 die new Thrift::TException('TSocket: timed out reading '.$len.' bytes from '. | |
203 $self->{host}.':'.$self->{port}); | |
204 } | |
205 | |
206 my $sock = $sockets[0]; | |
207 | |
208 my ($buf,$sz); | |
209 $sock->recv($buf, $len); | |
210 | |
211 if (!defined $buf || $buf eq '') { | |
212 | |
213 die new TException('TSocket: Could not read '.$len.' bytes from '. | |
214 $self->{host}.':'.$self->{port}); | |
215 | |
216 } | |
217 | |
218 return $buf; | |
219 } | |
220 | |
221 | |
222 # | |
223 # Write to the socket. | |
224 # | |
225 # @param string $buf The data to write | |
226 # | |
227 sub write | |
228 { | |
229 my $self = shift; | |
230 my $buf = shift; | |
231 | |
232 | |
233 return unless defined $self->{handle}; | |
234 | |
235 while (length($buf) > 0) { | |
236 | |
237 | |
238 #check for timeout | |
239 my @sockets = $self->{handle}->can_write( $self->{sendTimeout} / 1000 ); | |
240 | |
241 if(@sockets == 0){ | |
242 die new Thrift::TException('TSocket: timed out writing to bytes from '. | |
243 $self->{host}.':'.$self->{port}); | |
244 } | |
245 | |
246 my $sock = $sockets[0]; | |
247 | |
248 my $got = $sock->send($buf); | |
249 | |
250 if (!defined $got || $got == 0 ) { | |
251 die new Thrift::TException('TSocket: Could not write '.length($buf).' bytes '. | |
252 $self->{host}.':'.$self->{host}); | |
253 } | |
254 | |
255 $buf = substr($buf, $got); | |
256 } | |
257 } | |
258 | |
259 # | |
260 # Flush output to the socket. | |
261 # | |
262 sub flush | |
263 { | |
264 my $self = shift; | |
265 | |
266 return unless defined $self->{handle}; | |
267 | |
268 my $ret = ($self->{handle}->handles())[0]->flush; | |
269 } | |
270 | |
271 | |
272 # | |
273 # Build a ServerSocket from the ServerTransport base class | |
274 # | |
275 package Thrift::ServerSocket; | |
276 | |
277 use base qw( Thrift::Socket Thrift::ServerTransport ); | |
278 | |
279 use constant LISTEN_QUEUE_SIZE => 128; | |
280 | |
281 sub new | |
282 { | |
283 my $classname = shift; | |
284 my $port = shift; | |
285 | |
286 my $self = $classname->SUPER::new(undef, $port, undef); | |
287 return bless($self,$classname); | |
288 } | |
289 | |
290 sub listen | |
291 { | |
292 my $self = shift; | |
293 | |
294 # Listen to a new socket | |
295 my $sock = IO::Socket::INET->new(LocalAddr => undef, # any addr | |
296 LocalPort => $self->{port}, | |
297 Proto => 'tcp', | |
298 Listen => LISTEN_QUEUE_SIZE, | |
299 ReuseAddr => 1) | |
300 || do { | |
301 my $error = 'TServerSocket: Could not bind to ' . | |
302 $self->{host} . ':' . $self->{port} . ' (' . $! . ')'; | |
303 | |
304 if ($self->{debug}) { | |
305 $self->{debugHandler}->($error); | |
306 } | |
307 | |
308 die new Thrift::TException($error); | |
309 }; | |
310 | |
311 $self->{handle} = $sock; | |
312 } | |
313 | |
314 sub accept | |
315 { | |
316 my $self = shift; | |
317 | |
318 if ( exists $self->{handle} and defined $self->{handle} ) | |
319 { | |
320 my $client = $self->{handle}->accept(); | |
321 my $result = new Thrift::Socket; | |
322 $result->{handle} = new IO::Select($client); | |
323 return $result; | |
324 } | |
325 | |
326 return 0; | |
327 } | |
328 | |
329 | |
330 1; |