comparison Cassandra/lib/Thrift/Socket.pm @ 0:a2f0a2c135cf

hg init
author Shoshi TAMAKI <shoshi@cr.ie.u-ryukyu.ac.jp>
date Sun, 06 Jun 2010 22:00:38 +0900
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:a2f0a2c135cf
1 #
2 # Licensed to the Apache Software Foundation (ASF) under one
3 # or more contributor license agreements. See the NOTICE file
4 # distributed with this work for additional information
5 # regarding copyright ownership. The ASF licenses this file
6 # to you under the Apache License, Version 2.0 (the
7 # "License"); you may not use this file except in compliance
8 # with the License. You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing,
13 # software distributed under the License is distributed on an
14 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 # KIND, either express or implied. See the License for the
16 # specific language governing permissions and limitations
17 # under the License.
18 #
19
20 require 5.6.0;
21 use strict;
22 use warnings;
23
24 use Thrift;
25 use Thrift::Transport;
26
27 use IO::Socket::INET;
28 use IO::Select;
29
30 package Thrift::Socket;
31
32 use base('Thrift::Transport');
33
34 sub new
35 {
36 my $classname = shift;
37 my $host = shift || "localhost";
38 my $port = shift || 9090;
39 my $debugHandler = shift;
40
41 my $self = {
42 host => $host,
43 port => $port,
44 debugHandler => $debugHandler,
45 debug => 0,
46 sendTimeout => 10000,
47 recvTimeout => 10000,
48 handle => undef,
49 };
50
51 return bless($self,$classname);
52 }
53
54
55 sub setSendTimeout
56 {
57 my $self = shift;
58 my $timeout = shift;
59
60 $self->{sendTimeout} = $timeout;
61 }
62
63 sub setRecvTimeout
64 {
65 my $self = shift;
66 my $timeout = shift;
67
68 $self->{recvTimeout} = $timeout;
69 }
70
71
72 #
73 #Sets debugging output on or off
74 #
75 # @param bool $debug
76 #
77 sub setDebug
78 {
79 my $self = shift;
80 my $debug = shift;
81
82 $self->{debug} = $debug;
83 }
84
85 #
86 # Tests whether this is open
87 #
88 # @return bool true if the socket is open
89 #
90 sub isOpen
91 {
92 my $self = shift;
93
94 if( defined $self->{handle} ){
95 return ($self->{handle}->handles())[0]->connected;
96 }
97
98 return 0;
99 }
100
101 #
102 # Connects the socket.
103 #
104 sub open
105 {
106 my $self = shift;
107
108 my $sock = IO::Socket::INET->new(PeerAddr => $self->{host},
109 PeerPort => $self->{port},
110 Proto => 'tcp',
111 Timeout => $self->{sendTimeout}/1000)
112 || do {
113 my $error = 'TSocket: Could not connect to '.$self->{host}.':'.$self->{port}.' ('.$!.')';
114
115 if ($self->{debug}) {
116 $self->{debugHandler}->($error);
117 }
118
119 die new Thrift::TException($error);
120
121 };
122
123
124 $self->{handle} = new IO::Select( $sock );
125 }
126
127 #
128 # Closes the socket.
129 #
130 sub close
131 {
132 my $self = shift;
133
134 if( defined $self->{handle} ){
135 CORE::close( ($self->{handle}->handles())[0] );
136 }
137 }
138
139 #
140 # Uses stream get contents to do the reading
141 #
142 # @param int $len How many bytes
143 # @return string Binary data
144 #
145 sub readAll
146 {
147 my $self = shift;
148 my $len = shift;
149
150
151 return unless defined $self->{handle};
152
153 my $pre = "";
154 while (1) {
155
156 #check for timeout
157 my @sockets = $self->{handle}->can_read( $self->{recvTimeout} / 1000 );
158
159 if(@sockets == 0){
160 die new Thrift::TException('TSocket: timed out reading '.$len.' bytes from '.
161 $self->{host}.':'.$self->{port});
162 }
163
164 my $sock = $sockets[0];
165
166 my ($buf,$sz);
167 $sock->recv($buf, $len);
168
169 if (!defined $buf || $buf eq '') {
170
171 die new Thrift::TException('TSocket: Could not read '.$len.' bytes from '.
172 $self->{host}.':'.$self->{port});
173
174 } elsif (($sz = length($buf)) < $len) {
175
176 $pre .= $buf;
177 $len -= $sz;
178
179 } else {
180 return $pre.$buf;
181 }
182 }
183 }
184
185 #
186 # Read from the socket
187 #
188 # @param int $len How many bytes
189 # @return string Binary data
190 #
191 sub read
192 {
193 my $self = shift;
194 my $len = shift;
195
196 return unless defined $self->{handle};
197
198 #check for timeout
199 my @sockets = $self->{handle}->can_read( $self->{recvTimeout} / 1000 );
200
201 if(@sockets == 0){
202 die new Thrift::TException('TSocket: timed out reading '.$len.' bytes from '.
203 $self->{host}.':'.$self->{port});
204 }
205
206 my $sock = $sockets[0];
207
208 my ($buf,$sz);
209 $sock->recv($buf, $len);
210
211 if (!defined $buf || $buf eq '') {
212
213 die new TException('TSocket: Could not read '.$len.' bytes from '.
214 $self->{host}.':'.$self->{port});
215
216 }
217
218 return $buf;
219 }
220
221
222 #
223 # Write to the socket.
224 #
225 # @param string $buf The data to write
226 #
227 sub write
228 {
229 my $self = shift;
230 my $buf = shift;
231
232
233 return unless defined $self->{handle};
234
235 while (length($buf) > 0) {
236
237
238 #check for timeout
239 my @sockets = $self->{handle}->can_write( $self->{sendTimeout} / 1000 );
240
241 if(@sockets == 0){
242 die new Thrift::TException('TSocket: timed out writing to bytes from '.
243 $self->{host}.':'.$self->{port});
244 }
245
246 my $sock = $sockets[0];
247
248 my $got = $sock->send($buf);
249
250 if (!defined $got || $got == 0 ) {
251 die new Thrift::TException('TSocket: Could not write '.length($buf).' bytes '.
252 $self->{host}.':'.$self->{host});
253 }
254
255 $buf = substr($buf, $got);
256 }
257 }
258
259 #
260 # Flush output to the socket.
261 #
262 sub flush
263 {
264 my $self = shift;
265
266 return unless defined $self->{handle};
267
268 my $ret = ($self->{handle}->handles())[0]->flush;
269 }
270
271
272 #
273 # Build a ServerSocket from the ServerTransport base class
274 #
275 package Thrift::ServerSocket;
276
277 use base qw( Thrift::Socket Thrift::ServerTransport );
278
279 use constant LISTEN_QUEUE_SIZE => 128;
280
281 sub new
282 {
283 my $classname = shift;
284 my $port = shift;
285
286 my $self = $classname->SUPER::new(undef, $port, undef);
287 return bless($self,$classname);
288 }
289
290 sub listen
291 {
292 my $self = shift;
293
294 # Listen to a new socket
295 my $sock = IO::Socket::INET->new(LocalAddr => undef, # any addr
296 LocalPort => $self->{port},
297 Proto => 'tcp',
298 Listen => LISTEN_QUEUE_SIZE,
299 ReuseAddr => 1)
300 || do {
301 my $error = 'TServerSocket: Could not bind to ' .
302 $self->{host} . ':' . $self->{port} . ' (' . $! . ')';
303
304 if ($self->{debug}) {
305 $self->{debugHandler}->($error);
306 }
307
308 die new Thrift::TException($error);
309 };
310
311 $self->{handle} = $sock;
312 }
313
314 sub accept
315 {
316 my $self = shift;
317
318 if ( exists $self->{handle} and defined $self->{handle} )
319 {
320 my $client = $self->{handle}->accept();
321 my $result = new Thrift::Socket;
322 $result->{handle} = new IO::Select($client);
323 return $result;
324 }
325
326 return 0;
327 }
328
329
330 1;