Mercurial > hg > Members > e085722 > Cerium
comparison Renderer/Engine/lindaapi.cc @ 0:04e28d8d3c6f
first commit
author | Daiki KINJYO <e085722@ie.u-ryukyu.ac.jp> |
---|---|
date | Mon, 08 Nov 2010 01:23:25 +0900 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:04e28d8d3c6f |
---|---|
1 // $Id$ | |
2 // | |
3 | |
4 /*---------------------------------------------------------------------- | |
5 インクルードファイル読み込み | |
6 ----------------------------------------------------------------------*/ | |
7 #include <stdio.h> | |
8 #include <string.h> | |
9 #include <stdlib.h> | |
10 #include <sys/time.h> | |
11 #include <unistd.h> | |
12 #include <netinet/in.h> | |
13 #include <sys/select.h> | |
14 #include <netdb.h> | |
15 #include <netinet/tcp.h> | |
16 #include <sys/un.h> | |
17 #include <errno.h> | |
18 | |
19 #include <sys/types.h> | |
20 #include <sys/socket.h> | |
21 #include <arpa/inet.h> | |
22 | |
23 #include "lindaapi.h" | |
24 | |
25 | |
26 #if 0 | |
27 #define PSX_Debug(deb) (putchar(PS_DEB)),\ | |
28 (printf deb ),\ | |
29 (putchar(PS_DEB)) | |
30 #define DEB(a) | |
31 #else | |
32 #define PSX_Debug(deb) | |
33 #define DEB(a) /* a */ | |
34 #endif | |
35 | |
36 /* Global Variables */ | |
37 static COMMAND *q_top, *q_end; /* コマンドキュー */ | |
38 static REPLY *reply, *r_end; /* 受け取り用キュー */ | |
39 static int qsize; /* コマンドキューのサイズ */ | |
40 static fd_set g_fds; /* 接続しているタプルスペース群のFD(FileDiscripter)を保持 */ | |
41 static int g_max_fds; /* 監視するFDの最大値 */ | |
42 | |
43 /* Static Functions */ | |
44 static void unix_chkserv(int ps); | |
45 void psx_free(void *); | |
46 static int psx_queue(unsigned int tspace_id, unsigned int id, | |
47 unsigned int size, unsigned char *data, char mode, | |
48 void(*callback)(unsigned char *,void *),void * obj); | |
49 | |
50 #ifdef COUNT_PACKET | |
51 // print packet count message per PRINT_INTERVAL sec | |
52 #define PRINT_INTERVAL 4 | |
53 static void count_packet(char type); | |
54 | |
55 /*-------------------------------------------------------------------/ | |
56 static void | |
57 count_packet (char type): | |
58 パケットの送受信カウントする | |
59 | |
60 引き数: | |
61 type - 送信、受信 (char型: s,r) | |
62 /-------------------------------------------------------------------*/ | |
63 static void | |
64 count_packet(char type) | |
65 { | |
66 static int send_packet=-1,receive_packet=0; | |
67 static struct timeval start,now,previous; | |
68 | |
69 if (out_packet == -1) { | |
70 gettimeofday(&start,NULL); | |
71 gettimeofday(&previous,NULL); | |
72 send_packet = 0; | |
73 printf("packet\tout\tread\t\ttime\n"); | |
74 } | |
75 | |
76 if (type == 's') { | |
77 send_packet++; | |
78 } else if (type == 'r') { | |
79 receive_packet++; | |
80 } else { | |
81 fprintf(stderr,"No type in count_packet function\n"); | |
82 return; | |
83 } | |
84 | |
85 gettimeofday(&now,NULL); | |
86 if ((now.tv_sec-previous.tv_sec) > PRINT_INTERVAL) { | |
87 printf("log\t%d\t%d\t%ld\n", | |
88 send_packet,receive_packet,now.tv_sec-start.tv_sec); | |
89 fflush(stdout); | |
90 | |
91 previous.tv_sec = now.tv_sec; | |
92 send_packet = receive_packet = 0; | |
93 } | |
94 } | |
95 #endif | |
96 | |
97 | |
98 #define unix_read_w read | |
99 | |
100 #if 0 | |
101 /*-------------------------------------------------------------------/ | |
102 static int | |
103 unix_write (int fd, unsigned char *buf, unsigned int size): | |
104 サーバへTUPLEを送る。 | |
105 | |
106 引き数: | |
107 fd - サーバのファイルディスクリプタ | |
108 buf - サーバへ送るデータ(TUPLEヘッダ含む) | |
109 size - bufのbyte数 | |
110 返り値: | |
111 送った(書きこんだ)データのbyte数 | |
112 /-------------------------------------------------------------------*/ | |
113 static int | |
114 unix_write_bak(int fd,unsigned char *buf,unsigned int size) { | |
115 int i,nsize; | |
116 nsize = htonl(size); | |
117 i = write(fd,&nsize,INT_SIZE); | |
118 i += write(fd,buf,size); // size == datasize + LINDA_HEADER_SIZE | |
119 #ifdef COUNT_PACKET | |
120 count_packet('s'); | |
121 #endif | |
122 return(i); | |
123 } | |
124 #endif | |
125 | |
126 static int | |
127 unix_write(int fd,unsigned char *buf,unsigned int size) { | |
128 unsigned int count=0; | |
129 uint32_t nsize; | |
130 | |
131 /* これから送信するデータのサイズをまず送信 */ | |
132 nsize = htonl(size); | |
133 write(fd, &nsize, INT_SIZE); | |
134 | |
135 /* 目的のデータを送信 */ | |
136 while (count < size) { | |
137 count += write(fd, buf+count, size-count); | |
138 } | |
139 #ifdef COUNT_PACKET | |
140 count_packet('s'); | |
141 #endif | |
142 return count+INT_SIZE; | |
143 } | |
144 | |
145 #define unix_write_w unix_write | |
146 | |
147 #define SERV_NAME unix_port | |
148 #define PROTO_NAME "tcp" | |
149 #define SERVER_NAME hostname | |
150 #define MAX_REQ 16 | |
151 | |
152 | |
153 | |
154 /*-------------------------------------------------------------------/ | |
155 void | |
156 init_linda(): | |
157 大域変数の初期化等を行なう | |
158 /-------------------------------------------------------------------*/ | |
159 void | |
160 init_linda() { | |
161 FD_ZERO(&g_fds); | |
162 /* 大域変数はゼロクリアされる | |
163 g_max_fds = 0; | |
164 q_end = q_top = NULL; | |
165 r_end = reply = NULL; | |
166 qsize = 0; | |
167 */ | |
168 } | |
169 | |
170 struct addrinfo *gethostaddrinfo(const char *host, int port) { | |
171 struct addrinfo hints; | |
172 hints.ai_protocol = 0; | |
173 hints.ai_addrlen = 0; | |
174 hints.ai_addr = NULL; | |
175 hints.ai_canonname = NULL; | |
176 hints.ai_next = NULL; | |
177 hints.ai_family = PF_UNSPEC; | |
178 hints.ai_socktype = SOCK_STREAM; | |
179 hints.ai_flags = AI_PASSIVE; | |
180 struct addrinfo *res0; | |
181 char portbuf[6]; | |
182 sprintf(portbuf, "%d", port); | |
183 int error = getaddrinfo(host, portbuf, &hints, &res0); | |
184 if (error) { | |
185 fprintf(stderr, "error: getaddrinfo (%d)\n", error); | |
186 } | |
187 return res0; | |
188 } | |
189 | |
190 | |
191 | |
192 /*-------------------------------------------------------------------/ | |
193 int | |
194 open_linda (char * hostname, int port): | |
195 Lindaサーバとのコネクションを確立し、タプルスペースのIDを返す。 | |
196 現在はファイルディスクリプタを返している。 | |
197 | |
198 引き数: | |
199 hostname - サーバのホスト名 | |
200 port - サーバのポート番号 | |
201 返り値: | |
202 コネクション確立が成功するとそのファイルディスクリプタを返す。 | |
203 失敗すると -1 を返す。 | |
204 /-------------------------------------------------------------------*/ | |
205 int | |
206 open_linda(const char *hostname, int port){ | |
207 int fd; | |
208 struct hostent *hoste; | |
209 struct sockaddr_in serv_addr; | |
210 struct sockaddr_un serv_addr_un; | |
211 | |
212 if (hostname[0]=='/') { | |
213 /* Unix domain */ | |
214 if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) == FAIL){ | |
215 perror("socket"); | |
216 return(-1); | |
217 } | |
218 serv_addr_un.sun_family = AF_UNIX; | |
219 strcpy(serv_addr_un.sun_path, hostname); | |
220 fprintf(stdout,"connecting ... %d\n", serv_addr.sin_port); | |
221 if (connect(fd, (struct sockaddr *)&serv_addr_un,sizeof(serv_addr_un)) == FAIL){ | |
222 perror("connect"); | |
223 close(fd); | |
224 return(-1); | |
225 } | |
226 | |
227 } else { | |
228 /* INET domain */ | |
229 if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == FAIL){ | |
230 perror("socket"); | |
231 return(-1); | |
232 } | |
233 if ((hoste = gethostbyname(SERVER_NAME)) == NULL){ | |
234 fprintf(stderr,"hostname error\n"); | |
235 close(fd); | |
236 return(-1); | |
237 } | |
238 serv_addr.sin_family = AF_INET; | |
239 serv_addr.sin_port = port; | |
240 serv_addr.sin_addr.s_addr = ((struct in_addr *)(hoste->h_addr))->s_addr; | |
241 if (serv_addr.sin_family == AF_INET) { | |
242 int tmp = 1; | |
243 setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, | |
244 (char *) &tmp, sizeof (int)); | |
245 } | |
246 fprintf(stdout,"connecting ... %d\n", serv_addr.sin_port); | |
247 if (connect(fd, (struct sockaddr *)&serv_addr,sizeof(serv_addr)) == FAIL){ | |
248 fprintf(stderr,"connection error! errno :%d %s\n", errno, | |
249 strerror(errno)); | |
250 close(fd); | |
251 return(-1); | |
252 } | |
253 } | |
254 | |
255 FD_SET(fd, &g_fds); | |
256 if (g_max_fds < fd) g_max_fds = fd; | |
257 | |
258 fprintf(stdout," connect middle server %d\n", fd); | |
259 return fd; | |
260 } | |
261 | |
262 int | |
263 open_linda_java(const char *hostname, int port){ | |
264 int fd; | |
265 struct hostent *hoste; | |
266 struct sockaddr_in serv_addr; | |
267 struct sockaddr_un serv_addr_un; | |
268 | |
269 if (hostname[0]=='/') { | |
270 /* Unix domain */ | |
271 if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) == FAIL){ | |
272 perror("socket"); | |
273 return(-1); | |
274 } | |
275 serv_addr_un.sun_family = AF_UNIX; | |
276 strcpy(serv_addr_un.sun_path, hostname); | |
277 DEB(fprintf(stdout,"connecting ... %d\n", serv_addr.sin_port)); | |
278 if (connect(fd, (struct sockaddr *)&serv_addr_un,sizeof(serv_addr_un)) == FAIL){ | |
279 perror("connect"); | |
280 close(fd); | |
281 return(-1); | |
282 } | |
283 | |
284 } else { | |
285 /* INET domain */ | |
286 if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == FAIL){ | |
287 perror("socket"); | |
288 return(-2); | |
289 } | |
290 serv_addr.sin_family = AF_INET; | |
291 serv_addr.sin_port = htons(port); | |
292 | |
293 serv_addr.sin_addr.s_addr = inet_addr(hostname); | |
294 if (serv_addr.sin_addr.s_addr == 0xffffffff) { | |
295 if ((hoste = gethostbyname(hostname)) == NULL){ | |
296 fprintf(stdout, "hostname error\n"); | |
297 close(fd); | |
298 return(-1); | |
299 } | |
300 serv_addr.sin_addr.s_addr = *(unsigned int *)hoste->h_addr_list[0]; | |
301 } | |
302 | |
303 if (serv_addr.sin_family == AF_INET) { | |
304 int tmp = 1; | |
305 setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, | |
306 (char *) &tmp, sizeof (int)); | |
307 } | |
308 DEB(fprintf(stdout,"connecting ... %d \n", ntohs(serv_addr.sin_port))); | |
309 DEB(fprintf(stdout," serv_addr.sin_port ... %d \n", ntohs(serv_addr.sin_port))); | |
310 //fprintf(stdout," serv_addr.sin_addr.s_addr... %s\n", serv_addr.sin_addr.s_addr); | |
311 if (connect(fd, (struct sockaddr *)&serv_addr,sizeof(serv_addr)) == FAIL){ | |
312 perror("connect"); | |
313 close(fd); | |
314 return(-4); | |
315 } | |
316 } | |
317 | |
318 FD_SET(fd, &g_fds); | |
319 if (g_max_fds < fd) g_max_fds = fd; | |
320 | |
321 DEB(fprintf(stdout," connect middle server %d\n", fd)); | |
322 return fd; | |
323 } | |
324 | |
325 | |
326 /*-------------------------------------------------------------------/ | |
327 int | |
328 close_linda(int tspace_id): | |
329 接続しているタプルスペースへの接続を切る。 | |
330 ソケットを閉じ、g_fds から外す。 | |
331 引数: | |
332 tspace_id - 閉じるタプルスペースのID | |
333 返り値: | |
334 close の値 | |
335 /-------------------------------------------------------------------*/ | |
336 int | |
337 close_linda(int tspace_id){ | |
338 int retval; | |
339 int i; | |
340 if ((retval = close(tspace_id)) == 0) { | |
341 FD_CLR(tspace_id, &g_fds); | |
342 if (g_max_fds == tspace_id) { | |
343 for (i = g_max_fds-1; FD_ISSET(i, &g_fds) && i; i--); | |
344 g_max_fds = i; | |
345 } | |
346 } | |
347 return retval; | |
348 } | |
349 | |
350 /*-------------------------------------------------------------------/ | |
351 int | |
352 psx_out (unsigned int tspace_id, unsigned int id, | |
353 unsigned char *data, unsigned int size): | |
354 outコマンドをCOMMANDキューへ溜める。 | |
355 | |
356 引き数: | |
357 tspace_id - タプルスペースのID | |
358 id - タプルのID | |
359 data - 送信するデータ | |
360 size - dataのサイズ | |
361 返り値: | |
362 シーケンス番号 | |
363 /-------------------------------------------------------------------*/ | |
364 int | |
365 psx_out(unsigned int tspace_id, unsigned int id, | |
366 unsigned char *data, unsigned int size){ | |
367 int r; | |
368 if ((r = psx_queue(tspace_id, id, size, data, 'o', NULL, NULL)) == FAIL) { | |
369 return(FAIL); | |
370 } | |
371 DEB( fprintf(stdout, "psx_out: size = %d, command = %s\n", | |
372 q_end->size, q_end->command+LINDA_HEADER_SIZE)); | |
373 return(r); | |
374 } | |
375 | |
376 extern int psx_callback_update(unsigned int tspace_id, unsigned int id, | |
377 unsigned char *data, unsigned int size, | |
378 void(*callback)(unsigned char *,void *),void * obj) { | |
379 int r; | |
380 if ((r = psx_queue(tspace_id, id, size, data, 'u', callback, obj)) == FAIL) { | |
381 return(FAIL); | |
382 } | |
383 DEB( fprintf(stdout, "psx_update: size = %d, command = %s\n", | |
384 q_end->size, q_end->command+LINDA_HEADER_SIZE)); | |
385 return(r); | |
386 } | |
387 | |
388 /*-------------------------------------------------------------------/ | |
389 int | |
390 psx_ld (unsigned tspace_id, unsigned int id, | |
391 char mode, void(*callback)(char*,void*), void * obj): | |
392 in,read,waitなどの受信コマンドをCOMMANDキューへ溜める。 | |
393 psx_in,psx_rd,psx_wait_rdなどに置き換えられている。 | |
394 | |
395 引き数: | |
396 tspace_id- タプルスペースのID | |
397 id - タプルのID | |
398 mode - i,r,w の文字を取り、各々in,read,waitを表している。 | |
399 callback - コールバックを使用する場合の関数へのポインタ。 | |
400 使用しない場合はNULLをいれる。 | |
401 obj - コールバックで用いる関数の引き数。 | |
402 返り値: | |
403 psx_queue内でmallocされたREPLY構造体へのポインタ | |
404 /-------------------------------------------------------------------*/ | |
405 int | |
406 psx_ld(unsigned int tspace_id, unsigned int id, | |
407 char mode, void(*callback)(unsigned char *,void *), void * obj){ | |
408 int r; | |
409 if ((r = psx_queue(tspace_id, id, 0, NULL, mode, callback, obj)) == FAIL) { | |
410 return(FAIL); | |
411 } | |
412 return(r); | |
413 } | |
414 | |
415 /*-------------------------------------------------------------------/ | |
416 unsigned char * | |
417 psx_reply (int seq): | |
418 サーバから答えが来たデータを返す。 | |
419 | |
420 引き数: | |
421 seq - psx_ld()が返した値。 | |
422 返り値: | |
423 seqに対応したデータを返す。データをまだ受信していない場合は | |
424 NULLを返す。 | |
425 /-------------------------------------------------------------------*/ | |
426 unsigned char * | |
427 psx_reply(unsigned int seq){ | |
428 REPLY *p, *q; | |
429 unsigned char *ans; | |
430 | |
431 DEB(fprintf(stdout, "psx_reply: search of seq = %d\n", seq)); | |
432 PSX_Debug(("psx_reply: seq %x\n", seq)); | |
433 for(q = NULL,p = reply; p; q = p,p = p->next){ | |
434 if (p->seq == seq){ | |
435 DEB(fprintf(stdout, "psx_reply: match of seq = %d\n", seq)); | |
436 if (p->mode == '!'){ | |
437 ans = p->answer; | |
438 if (q == NULL){ | |
439 reply = p->next; | |
440 if(p==r_end) { | |
441 r_end = p->next; | |
442 } | |
443 } else { | |
444 q->next = p->next; | |
445 if(p==r_end) { | |
446 r_end = q; | |
447 } | |
448 } | |
449 PSX_Debug(("psx_reply: reply %p r_end %p p %p q %p\n",reply,r_end,p,q)); | |
450 psx_free(p); | |
451 DEB( for(p=reply;p;p=p->next) { PSX_Debug(("psx_queue dump: seq %d mode %c %x %x\n",p->seq,p->mode,p,p->next))}); | |
452 DEB( fprintf(stdout, "psx_reply: returned answer = %s\n", ans)); | |
453 PSX_Debug(("psx_reply: answer %s\n",ans)); | |
454 return(ans); | |
455 } else { | |
456 if (p->mode == '?'){ | |
457 DEB(fprintf(stdout, "psx_reply: don't accept anser\n")); | |
458 return(NULL); | |
459 } | |
460 } | |
461 } | |
462 | |
463 } | |
464 PSX_Debug(("psx_reply: no match seq %d\n",seq)); | |
465 DEB(fprintf(stdout, "psx_reply: no match of seq\n")); | |
466 return(NULL); | |
467 } | |
468 | |
469 /*-------------------------------------------------------------------/ | |
470 void | |
471 psx_sync_n (): | |
472 サーバとデータの送受信をする。COMMANDキューに溜まったデータを | |
473 送信し、サーバから送られて来たデータを対応するREPLYへいれる。 | |
474 /-------------------------------------------------------------------*/ | |
475 #define TIMEDELTA 10 | |
476 | |
477 void | |
478 psx_sync_n() { | |
479 psx_sync_n_timeout(TIMEDELTA); | |
480 } | |
481 | |
482 void | |
483 psx_sync_n_timeout(int time) { | |
484 int acount; | |
485 COMMAND *c, *t; | |
486 | |
487 fd_set tmp; | |
488 struct timeval timeout; | |
489 timeout.tv_sec=0; | |
490 timeout.tv_usec=time * 1000; | |
491 | |
492 acount = 0; | |
493 while (q_top != NULL){ | |
494 c = q_top; | |
495 unix_write_w(c->tspace_id, c->command, c->size); | |
496 psx_free(c->command); | |
497 t = c->next; | |
498 psx_free(c); | |
499 q_top = c = t; | |
500 qsize--; | |
501 } | |
502 | |
503 tmp = g_fds; | |
504 while(select((unsigned int)g_max_fds+1, &tmp, NULL, NULL, &timeout) > 0) { | |
505 unsigned int i; | |
506 for (i = 0; i < (unsigned int)g_max_fds+1; i++) { | |
507 if (FD_ISSET(i, &tmp)) { | |
508 unix_chkserv(i); | |
509 } | |
510 } | |
511 } | |
512 } | |
513 | |
514 /*-------------------------------------------------------------------/ | |
515 static int | |
516 psx_queue (unsigned int tspace_id, unsigned int id, | |
517 unsigned int size, unsigned char *data, char mode, | |
518 void(*callback)(char*,void*), void * obj): | |
519 out,in,read,waitなどのコマンドをCOMMANDキューに溜める。データを | |
520 受信するコマンド(in,read,wait)のときは受け取ったときにデータを | |
521 格納するREPLY構造体を作る。 | |
522 | |
523 引き数: | |
524 tspace_id- 送信先タプルスペースのID | |
525 id - アクセスするTUPLE SpaceのID | |
526 size - dataのサイズ | |
527 data - 送信するデータ。受信時はNULL。 | |
528 mode - コマンドのモード(out,in,read,wait は各々char型: o,i,r,w) | |
529 callback - コールバックを使用する場合の関数へのポインタ。 | |
530 使用しない場合はNULL。 | |
531 obj - コールバックで用いる関数に引き渡すデータ。 | |
532 返り値: | |
533 成功した場合 - mallocしたREPLY構造体へのポインタ。outの場合は | |
534 0が返る。 | |
535 失敗した場合 - FAIL(-1)が返る。 | |
536 /-------------------------------------------------------------------*/ | |
537 static int | |
538 psx_queue(unsigned int tspace_id, unsigned int id, | |
539 unsigned int size, unsigned char *data, char mode, | |
540 void(*callback)(unsigned char *,void *), void * obj){ | |
541 REPLY *p; | |
542 COMMAND *c; | |
543 | |
544 if (qsize >= MAX_QUEUE) { | |
545 // PSX_Debug(("max queue: qsize=%d",qsize)); | |
546 psx_sync_n(); | |
547 } | |
548 | |
549 if (q_top == NULL) { | |
550 if ((q_top = (COMMAND *) malloc (sizeof(COMMAND))) == NULL){ | |
551 return(FAIL); | |
552 } | |
553 c = q_end = q_top; | |
554 } else { | |
555 if ((q_end->next = (COMMAND *) malloc (sizeof(COMMAND))) == NULL){ | |
556 return(FAIL); | |
557 } | |
558 c = q_end; | |
559 q_end = q_end->next; | |
560 } | |
561 | |
562 /* size は DATASIZE */ | |
563 if ((q_end->command = (unsigned char *) malloc(size+LINDA_HEADER_SIZE)) == NULL) { | |
564 psx_free(q_end); | |
565 c->next = NULL; | |
566 return(FAIL); | |
567 } | |
568 | |
569 /* データ受け取り要求(in,rd,wait)なら受け取り用の箱を用意 */ | |
570 if (mode != 'o') { | |
571 if (reply == NULL){ | |
572 if ((reply = (REPLY *) malloc (sizeof(REPLY))) == NULL){ | |
573 return(FAIL); | |
574 } | |
575 p = r_end = reply; p->next = NULL; | |
576 } else { | |
577 if ((r_end->next = (REPLY *) malloc (sizeof(REPLY))) == NULL){ | |
578 return(FAIL); | |
579 } | |
580 p = r_end->next; r_end = p; p->next = NULL; | |
581 } | |
582 p->mode = '?'; | |
583 p->seq = (unsigned long)p; // 構造体のアドレスで識別 | |
584 p->callback = callback; | |
585 p->obj = obj; | |
586 PSX_Debug(("psx_queue: seq %d reply %p p %p r_end %p\n",p->seq,reply,p,r_end)); | |
587 } else { | |
588 p = 0; | |
589 } | |
590 q_end->command[LINDA_MODE_OFFSET] = mode; | |
591 | |
592 q_end->command[LINDA_ID_OFFSET] = id >> 8; | |
593 q_end->command[LINDA_ID_OFFSET+1] = id & 0xff; | |
594 | |
595 q_end->command[LINDA_SEQ_OFFSET] = ((unsigned long)p>>24) & 0xff; | |
596 q_end->command[LINDA_SEQ_OFFSET+1] = ((unsigned long)p>>16) & 0xff; | |
597 q_end->command[LINDA_SEQ_OFFSET+2] = ((unsigned long)p>>8) & 0xff; | |
598 q_end->command[LINDA_SEQ_OFFSET+3] = ((unsigned long)p) & 0xff; | |
599 | |
600 q_end->command[LINDA_DATA_LENGTH_OFFSET] = (size>>24) & 0xff; | |
601 q_end->command[LINDA_DATA_LENGTH_OFFSET+1] = (size>>16) & 0xff; | |
602 q_end->command[LINDA_DATA_LENGTH_OFFSET+2] = (size>>8) & 0xff; | |
603 q_end->command[LINDA_DATA_LENGTH_OFFSET+3] = (size) & 0xff; | |
604 | |
605 q_end->size = size+LINDA_HEADER_SIZE; /* command size */ | |
606 q_end->tspace_id = tspace_id; /* destination id */ | |
607 q_end->next = NULL; | |
608 qsize++; | |
609 if (data && size>0) | |
610 memcpy(q_end->command+LINDA_HEADER_SIZE, data, size); | |
611 return((unsigned long)p); | |
612 } | |
613 | |
614 /*-------------------------------------------------------------------/ | |
615 static void | |
616 unix_chkserv (int ps): | |
617 サーバからデータ(TUPLE)を受け取る。REPLY構造体にコールバック関数 | |
618 が指定されていればその関数を実行し、REPLY構造体をキューから取り | |
619 除く。コールバック関数が指定されていなければREPLY構造体にデータ | |
620 を引き渡す。 | |
621 引数: | |
622 ps - 接続しているタプルスペースのソケット | |
623 /-------------------------------------------------------------------*/ | |
624 static void | |
625 unix_chkserv(int ps){ | |
626 int i,pkt,npkt,mode; | |
627 REPLY *r,*prev; | |
628 int a; | |
629 unsigned char * tuple = 0; | |
630 | |
631 if((i=read(ps,&npkt,INT_SIZE))<0) { | |
632 perror("read"); | |
633 exit(1); | |
634 } | |
635 pkt = ntohl(npkt); | |
636 DEB(printf("pkt: %d\n",pkt)); | |
637 DEB(fprintf(stdout, "psx_chkserv: queue number: %d , size = %d\n", i, pkt)); | |
638 if((tuple = (unsigned char *)malloc(pkt))==NULL){ | |
639 perror("malloc"); | |
640 exit(1); | |
641 } | |
642 for(a=0;a<pkt;a+=i) { | |
643 if((i=unix_read_w(ps,tuple+a,pkt-a))<0) { | |
644 fprintf(stderr, "psx_chkserv: read error! on i=%d pkt=%d %s\n", | |
645 i, pkt, strerror(errno)); | |
646 exit(1);//close(ps); | |
647 } | |
648 } | |
649 | |
650 #ifdef COUNT_PACKET | |
651 count_packet('r'); | |
652 #endif | |
653 mode = psx_get_mode(tuple); | |
654 i = psx_get_id(tuple); | |
655 unsigned int k = psx_get_seq(tuple); | |
656 PSX_Debug(("psx_chkserv: anser packet size = %d id %d seq %d\n", pkt,i,k)); | |
657 DEB(fprintf(stdout, "psx_chkserv: data from server: %s id=%d seq = %d\n", tuple, i, k)); | |
658 DEB ( | |
659 for(p=reply;p;p=p->next) { | |
660 PSX_Debug(printf("psx_queue dump: seq %d mode %c %x %x\n",p->seq,p->mode,p,p->next)); | |
661 }) | |
662 | |
663 for(prev = NULL,r = reply; r; prev = r,r = r->next){ | |
664 DEB(fprintf(stdout,"seq: %d\n",r->seq);) | |
665 if (r->seq == k){ | |
666 if(r->callback){ // call callback function | |
667 (*r->callback)(tuple,r->obj); | |
668 if (prev == NULL){ | |
669 reply = r->next; | |
670 if(r == r_end) { | |
671 r_end = r->next; | |
672 } | |
673 } else { | |
674 prev->next = r->next; | |
675 if(r == r_end) { | |
676 r_end = prev; | |
677 } | |
678 } | |
679 psx_free(r); | |
680 }else{ // normal reply | |
681 PSX_Debug(("psx_chkserv: copy answer r %p seq %d\n",r,k)); | |
682 if(mode == 'a'){ | |
683 r->answer = tuple; | |
684 }else{ | |
685 r->answer = NULL; | |
686 } | |
687 r->mode = '!'; | |
688 } | |
689 break; | |
690 } | |
691 } | |
692 tuple = 0; | |
693 if (!r){ | |
694 DEB(fprintf(stdout, "unix_chkserv: accepted seq %d does not match. \n",k)); | |
695 } | |
696 } | |
697 | |
698 void psx_free(void *tuple) | |
699 { | |
700 free(tuple); | |
701 } | |
702 | |
703 /*-------------------------------------------------------------------/ | |
704 static unsigned int | |
705 get_int(unsigned char * tuple, int offset): | |
706 TUPLEのヘッダに格納された int型 のデータを得るための関数 | |
707 psx_get_datalength() と psx_get_seq() から呼ばれる。 | |
708 | |
709 引き数: | |
710 tuple - ヘッダ情報も含んだTUPLE。psx_reply()で得たものでもいい。 | |
711 offset - 取りだすデータのオフセット。LINDA_DATA_LENGTH_OFFSET | |
712 か LINDA_SEQ_OFFSET。 | |
713 | |
714 返り値: | |
715 指定したオフセットに格納されていた数値(int型) | |
716 /-------------------------------------------------------------------*/ | |
717 static unsigned int | |
718 get_int(unsigned char * tuple, int offset){ | |
719 unsigned int i; | |
720 i = (tuple[offset] <<24) + | |
721 (tuple[offset+1]<<16) + | |
722 (tuple[offset+2]<<8) + | |
723 (tuple[offset+3]); | |
724 return i; | |
725 } | |
726 | |
727 unsigned int | |
728 psx_get_datalength(unsigned char * tuple){ | |
729 return get_int(tuple,LINDA_DATA_LENGTH_OFFSET); | |
730 } | |
731 | |
732 unsigned char * | |
733 psx_get_data(unsigned char * tuple) { | |
734 return tuple + LINDA_HEADER_SIZE; | |
735 } | |
736 | |
737 unsigned int | |
738 psx_get_seq(unsigned char * tuple){ | |
739 return get_int(tuple,LINDA_SEQ_OFFSET); | |
740 } | |
741 | |
742 unsigned short | |
743 psx_get_id(unsigned char * tuple){ | |
744 return (tuple[LINDA_ID_OFFSET] * 256 + tuple[LINDA_ID_OFFSET+1]); | |
745 } | |
746 | |
747 unsigned char | |
748 psx_get_mode(unsigned char * tuple){ | |
749 return tuple[LINDA_MODE_OFFSET]; | |
750 } | |
751 | |
752 static | |
753 void | |
754 set_int_to_char(unsigned char * tuple, int i, int offset){ | |
755 tuple[offset] = (i>>24) & 0xff; | |
756 tuple[offset+1] = (i>>16) & 0xff; | |
757 tuple[offset+2] = (i>>8) & 0xff; | |
758 tuple[offset+3] = (i) & 0xff; | |
759 } | |
760 | |
761 void | |
762 psx_set_datalength(unsigned char * tuple, int length){ | |
763 set_int_to_char(tuple,length,LINDA_DATA_LENGTH_OFFSET); | |
764 } | |
765 | |
766 | |
767 void | |
768 psx_set_seq(unsigned char * tuple, int seq){ | |
769 set_int_to_char(tuple,seq,LINDA_SEQ_OFFSET); | |
770 } | |
771 | |
772 void | |
773 psx_set_id(unsigned char * tuple, short id){ | |
774 tuple[LINDA_ID_OFFSET] = id >> 8; | |
775 tuple[LINDA_ID_OFFSET+1] = id & 0xff; | |
776 } | |
777 | |
778 void | |
779 psx_set_mode(unsigned char * tuple, char mode){ | |
780 tuple[LINDA_MODE_OFFSET] = mode; | |
781 } | |
782 | |
783 | |
784 | |
785 /* end */ |