changeset 56:ee9945561f80

add parallel I/O grep (per line) with pthread. but it's very slow. really slow..
author Ryoma SHINYA <shinya@firefly.cr.ie.u-ryukyu.ac.jp>
date Wed, 27 Oct 2010 20:46:41 +0900
parents 4ae288b37591
children 81b44ae1cd73
files pyrect/jitgrep.py pyrect/translator/grep_translator.py pyrect/translator/template/grep.c
diffstat 3 files changed, 61 insertions(+), 3 deletions(-) [+]
line wrap: on
line diff
--- a/pyrect/jitgrep.py	Tue Oct 26 16:37:43 2010 +0900
+++ b/pyrect/jitgrep.py	Wed Oct 27 20:46:41 2010 +0900
@@ -11,7 +11,8 @@
 def main(argv):
     myusage = """%prog [--buf-size=size] [--dump]
                   [--time] [--debug] [--cc=compiler] [-c]
-                  [-Olevel] regexp [file..] [--out=file]"""
+                  [-Olevel] regexp [file..] [--out=file]
+                  [--thread=thread_num]"""
     psr = OptionParser(usage=myusage)
 
     redirect = ""
@@ -24,6 +25,7 @@
     psr.add_option("--buf-size=size", action="store", type="string", dest="bufsize", default="1M" , help="Set read-buffer size (e.x. 1024, 1024K, 2M)")
     psr.add_option("--CFLAGS", action="store", type="string", dest="cflags", default="-O3", help="Print compile/matching time.")
     psr.add_option("--time", action="store_true", dest="time", default=False, help="Print compile/matching time.")
+    psr.add_option("--thread", action="store", type="string", dest="thread", default="0", metavar="FILE", help="number of thread.")
     psr.add_option("--debug", action="store_true", dest="debug", default=False, help="Dump commands, not evalute matching (except interactive mode).")
     psr.add_option("--label", action="store_true", dest="label", default=False, help="label implimentation in C.")
     psr.add_option("--dump", action="store_true", dest="dump", default=False, help="Dump generated grep-source.")
@@ -67,6 +69,7 @@
     else:
         grept = GREPTranslator(reg)
     grept.bufsize = bufsize
+    grept.thread_num = int(opts.thread)
 
     if opts.dump:
         grept.translate()
--- a/pyrect/translator/grep_translator.py	Tue Oct 26 16:37:43 2010 +0900
+++ b/pyrect/translator/grep_translator.py	Wed Oct 27 20:46:41 2010 +0900
@@ -23,6 +23,8 @@
     def __init__(self, regexp):
         CTranslator.__init__(self, regexp, fa="DFA")
         self.__bufsize = 1024 * 1024
+        self.parallel_match = False
+        self.thread_num = 0
 
     def getbufsize(self,):
         return self.__bufsize
@@ -35,6 +37,13 @@
         CTranslator.emit_initialization(self)
         self.emit("#define LINEBUFSIZE %d" % self.bufsize)
         self.emit("#define READBUFSIZE %d" % self.bufsize)
+        self.emit('#define THREAD_NUM %d' % self.thread_num)
+        self.emit('#define THREAD_BUF %d' % 3)
+        self.emit('#include <pthread.h>')
+        if self.thread_num > 1:
+            self.emit("#define GREP paragrep")
+        else:
+            self.emit("#define GREP grep")
         self.emit("#include <stdlib.h>")
         self.emit("#include <string.h>")
         self.emit("char readbuf[%d];" % (self.bufsize))
--- a/pyrect/translator/template/grep.c	Tue Oct 26 16:37:43 2010 +0900
+++ b/pyrect/translator/template/grep.c	Wed Oct 27 20:46:41 2010 +0900
@@ -1,3 +1,49 @@
+typedef struct _thread_arg {
+  unsigned char *buf;
+  int match;
+} thread_arg_t;
+
+void* thread_dfa(void *arg) {
+  thread_arg_t* targ = (thread_arg_t*)arg;
+  targ->match = DFA(targ->buf);
+  return NULL;
+}
+
+int paragrep(char *regexp, FILE *f, char *name) {
+  int nmatch, used_buf = 0,
+    reading = 1;
+  char lbuf[THREAD_NUM][LINEBUFSIZE];
+  pthread_t hundle[THREAD_NUM];
+  thread_arg_t targ[THREAD_NUM];
+  int i, j, n, m;
+  do {
+    for (i = 0; i < THREAD_NUM; i++) {
+      if (fgets(lbuf[i], sizeof lbuf[i], f) == NULL) {
+        reading = 0;
+        break;
+      } else {
+        n = strlen(lbuf[i]);
+        if (n > 0 && lbuf[i][n-1] == '\n')
+          lbuf[i][n-1] = '\0';
+      }
+    }
+    for (j = 0; j < i; j++) {
+      targ[j].buf = (unsigned char *)lbuf[j];
+      pthread_create(&hundle[j], NULL, (void *)thread_dfa, (void *)&targ[j]);
+    }
+    for (j = 0; j < i; j++) {
+      pthread_join(hundle[j], NULL);
+      if (targ[j].match) {
+        nmatch++;
+        if (name != NULL)
+          printf("%s:", name);
+        printf("%s\n", targ[j].buf);
+      }
+    }
+  } while (i != 0);
+  return nmatch;
+}
+
 int grep(char *regexp, FILE *f, char *name) {
   int n, nmatch;
   char lbuf[LINEBUFSIZE];
@@ -27,7 +73,7 @@
   }
   nmatch = 0;
   if (argc == 2) {
-    if (grep(argv[1], stdin, NULL))
+    if (GREP(argv[1], stdin, NULL))
       nmatch++;
   } else {
     for (i = 2; i < argc; i++) {
@@ -38,7 +84,7 @@
       }
       if (READBUFSIZE > 0)
         setvbuf(f, readbuf, _IOFBF, READBUFSIZE);
-      if (grep(argv[1], f, argc > 3 ? argv[i] : NULL) > 0)
+      if (GREP(argv[1], f, argc > 3 ? argv[i] : NULL) > 0)
         nmatch++;
       fclose(f);
     }