# HG changeset patch # User Brendan Rius # Date 1463393662 -3600 # Node ID 758e2e0670c504fe57faa10e49042cffb55fdda1 # Parent fae3b8f7cbfb83176cd582b1b72f7be7d96ed8ec# Parent fecdf8733f3b409e4cd57db1b21d6fa82ed6f98a Merge pull request #11 from brendan-rius/issue-4 Issue 4 diff -r fae3b8f7cbfb -r 758e2e0670c5 jupyter_c_kernel/kernel.py --- a/jupyter_c_kernel/kernel.py Sat Apr 30 21:44:41 2016 +0100 +++ b/jupyter_c_kernel/kernel.py Mon May 16 11:14:22 2016 +0100 @@ -1,7 +1,68 @@ +from queue import Queue +from threading import Thread + from ipykernel.kernelbase import Kernel import subprocess import tempfile import os +import os.path as path + + +class RealTimeSubprocess(subprocess.Popen): + """ + A subprocess that allows to read its stdout and stderr in real time + """ + + def __init__(self, cmd, write_to_stdout, write_to_stderr): + """ + :param cmd: the command to execute + :param write_to_stdout: a callable that will be called with chunks of data from stdout + :param write_to_stderr: a callable that will be called with chunks of data from stderr + """ + self._write_to_stdout = write_to_stdout + self._write_to_stderr = write_to_stderr + + super().__init__(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0) + + self._stdout_queue = Queue() + self._stdout_thread = Thread(target=RealTimeSubprocess._enqueue_output, args=(self.stdout, self._stdout_queue)) + self._stdout_thread.daemon = True + self._stdout_thread.start() + + self._stderr_queue = Queue() + self._stderr_thread = Thread(target=RealTimeSubprocess._enqueue_output, args=(self.stderr, self._stderr_queue)) + self._stderr_thread.daemon = True + self._stderr_thread.start() + + @staticmethod + def _enqueue_output(stream, queue): + """ + Add chunks of data from a stream to a queue until the stream is empty. + """ + for line in iter(lambda: stream.read(4096), b''): + queue.put(line) + stream.close() + + def write_contents(self): + """ + Write the available content from stdin and stderr where specified when the instance was created + :return: + """ + + def read_all_from_queue(queue): + res = b'' + size = queue.qsize() + while size != 0: + res += queue.get_nowait() + size -= 1 + return res + + stdout_contents = read_all_from_queue(self._stdout_queue) + if stdout_contents: + self._write_to_stdout(stdout_contents) + stderr_contents = read_all_from_queue(self._stderr_queue) + if stderr_contents: + self._write_to_stderr(stderr_contents) class CKernel(Kernel): @@ -18,11 +79,17 @@ def __init__(self, *args, **kwargs): super(CKernel, self).__init__(*args, **kwargs) self.files = [] + mastertemp = tempfile.mkstemp(suffix='.out') + os.close(mastertemp[0]) + self.master_path = mastertemp[1] + filepath = path.join(path.dirname(path.realpath(__file__)), '..', 'resources', 'master.c') + subprocess.call(['gcc', filepath, '-std=c11', '-rdynamic', '-ldl', '-o', self.master_path]) def cleanup_files(self): """Remove all the temporary files created by the kernel""" for file in self.files: os.remove(file) + os.remove(self.master_path) def new_temp_file(self, **kwargs): """Create a new temp file to be deleted when the kernel shuts down""" @@ -33,50 +100,45 @@ self.files.append(file.name) return file - @staticmethod - def execute_command(cmd): - """Execute a command and returns the return code, stdout and stderr""" - p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = p.communicate() - return p.returncode, stdout.decode('utf-8'), stderr.decode('utf-8') + def _write_to_stdout(self, contents): + self.send_response(self.iopub_socket, 'stream', {'name': 'stdout', 'text': contents}) + + def _write_to_stderr(self, contents): + self.send_response(self.iopub_socket, 'stream', {'name': 'stderr', 'text': contents}) - @staticmethod - def compile_with_gcc(source_filename, binary_filename): - args = ['gcc', source_filename, '-std=c11', '-o', binary_filename] - return CKernel.execute_command(args) + def create_jupyter_subprocess(self, cmd): + return RealTimeSubprocess(cmd, + lambda contents: self._write_to_stdout(contents.decode()), + lambda contents: self._write_to_stderr(contents.decode())) + + def compile_with_gcc(self, source_filename, binary_filename): + args = ['gcc', source_filename, '-std=c11', '-fPIC', '-shared', '-rdynamic', '-o', binary_filename] + return self.create_jupyter_subprocess(args) def do_execute(self, code, silent, store_history=True, user_expressions=None, allow_stdin=False): - - retcode, stdout, stderr = None, '', '' with self.new_temp_file(suffix='.c') as source_file: source_file.write(code) source_file.flush() with self.new_temp_file(suffix='.out') as binary_file: - retcode, stdout, stderr = self.compile_with_gcc(source_file.name, binary_file.name) - if retcode != 0: - stderr += "[C kernel] GCC exited with code {}, the executable will not be executed".format(retcode) - self.log.info("GCC return code: {}".format(retcode)) - self.log.info("GCC stdout: {}".format(stdout)) - self.log.info("GCC stderr: {}".format(stderr)) + p = self.compile_with_gcc(source_file.name, binary_file.name) + while p.poll() is None: + p.write_contents() + p.write_contents() + if p.returncode != 0: # Compilation failed + self._write_to_stderr( + "[C kernel] GCC exited with code {}, the executable will not be executed".format( + p.returncode)) + return {'status': 'ok', 'execution_count': self.execution_count, 'payload': [], + 'user_expressions': {}} - if retcode == 0: # If the compilation succeeded - retcode, out, err = CKernel.execute_command([binary_file.name]) - if retcode != 0: - stderr += "[C kernel] Executable exited with code {}".format(retcode) - self.log.info("Executable retcode: {}".format(retcode)) - self.log.info("Executable stdout: {}".format(out)) - self.log.info("Executable stderr: {}".format(err)) - stdout += out - stderr += err - else: - self.log.info('Compilation failed, the program will not be executed') + p = self.create_jupyter_subprocess([self.master_path, binary_file.name]) + while p.poll() is None: + p.write_contents() + p.write_contents() - if not silent: - stream_content = {'name': 'stderr', 'text': stderr} - self.send_response(self.iopub_socket, 'stream', stream_content) - stream_content = {'name': 'stdout', 'text': stdout} - self.send_response(self.iopub_socket, 'stream', stream_content) + if p.returncode != 0: + self._write_to_stderr("[C kernel] Executable exited with code {}".format(p.returncode)) return {'status': 'ok', 'execution_count': self.execution_count, 'payload': [], 'user_expressions': {}} def do_shutdown(self, restart): diff -r fae3b8f7cbfb -r 758e2e0670c5 resources/master.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/resources/master.c Mon May 16 11:14:22 2016 +0100 @@ -0,0 +1,29 @@ +#include +#include +#include + +typedef int (*main_t)(int, char **, char **); + +int main(int argc, char **argv, char **envp) +{ + char *error = NULL; + + setbuf(stdout, NULL); + setbuf(stderr, NULL); + if (argc < 2) { + fprintf(stderr, "USAGE: %s PROGRAM\nWhere PROGRAM is the user's program to supervise\n", argv[0]); + return EXIT_FAILURE; + } + void *userhandle = dlopen(argv[1], RTLD_LAZY); + if (userhandle == NULL) { + fprintf(stderr, "%s: %s\n", argv[0], dlerror()); + return EXIT_FAILURE; + } + dlerror(); + main_t usermain = dlsym(userhandle, "main"); + if ((error = dlerror()) != NULL) { + fprintf(stderr, "%s: %s\n", argv[0], error); + return EXIT_FAILURE; + } + return usermain(argc, argv, envp); +} \ No newline at end of file