Source code for ase2sprkkr.common.process_output_reader

""" Readers for processes outputs.

They employ asyncio to makes possible read stdio and stderr concurently.
"""

import asyncio
import functools
import subprocess
import os
import numpy as np
from threading import Thread
from .decorators import maybeclassmethod


[docs] class ProcessOutputReader: """ Class, that run a process, optionally saves all the output of the process to a file, and pass the stdout and stderr of the process to its two async routines, read_error and read_output. The descendant can redefine the routines to parse the output (or its parts). """
[docs] def __init__(self, print_output=False, read_callback=None): self.set_print_output(print_output) self.read_callback = read_callback self._stopped = False
[docs] def stop_the_process(self): self._stopped = True self._kill()
[docs] def _kill(self): if self.proc and self.proc.returncode is None: self.proc.kill() # SIGKILL on Unix, TerminateProcess on Windows proc.terminate() # SIGTERM on Unix, CTRL-BREAK-ish on Windows try: # 2) Give it time to clean up run_coro_sync(asyncio.wait_for(proc.wait(), 0.1)) return except asyncio.TimeoutError: pass proc.kill() # SIGKILL / TerminateProcess
[docs] async def run_subprocess(self, read_args=[]): if self.directory: dr = os.getcwd() os.chdir(self.directory) else: dr = None if self._stopped: raise asyncio.CancelledError() self.proc = await asyncio.create_subprocess_exec(*self.cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **self.kwargs) if self._stopped: self._kill() if dr: os.chdir(dr) exception = None patch = self.outfile or self.print_output or self.read_callback def process(data, kind): utf8 = None if self.print_output: utf8 = data.decode('utf8') print(utf8, end='') if self.outfile: self.outfile.write(data) if self.read_callback: if utf8 is None: utf8 = data.decode('utf8') self.read_callback(utf8, kind) def replace_feed_data(stream_reader, kind): nonlocal exception if stream_reader._buffer: process(stread_reader._buffer, kind) def feed_data(data): nonlocal exception try: process(data, kind) except Exception as e: if not exception: exception = e raise fd(data) fd = stream_reader.feed_data stream_reader.feed_data = feed_data if self.outfile: replace_feed_data(self.proc.stdout, 'out') replace_feed_data(self.proc.stderr, 'err') try: out = await asyncio.gather( self.read_output(self.proc.stdout, *read_args), self.read_error(self.proc.stderr, *read_args), self.proc.wait(), ) finally: if self.outfile: self.outfile.close() if exception: raise exception return self.result(*out)
[docs] def result(self, output, error, wait): """ This function is for postprocessing the results. It is intended to be predefined in the descendants Parameters ---------- output: mixed Result of the self.read_output error: mixed Result of the self.read_error wait: int The process return value Return ------ out: mixed Currently, the tuple (output, error, return_code) is returned, however, subclasses can return anything they want. """ if wait != 0: e = ValueError(f"The process ended with return value {wait}") e.output = output e.error = error raise e return output, error, wait
[docs] def run_async(self, cmd, outfile, directory=None, read_args=[], **kwargs): self.cmd = cmd self.outfile = outfile self.directory = directory self.kwargs = kwargs return self.run_subprocess(read_args)
[docs] def _run(self, cmd, outfile, directory=None, read_args=[], **kwargs): coro = self.run_async(cmd, outfile, directory, read_args, **kwargs) return run_coro_sync(coro)
[docs] async def read_error(self, stderr, *args): while True: line=await stderr.readline() if not line: return print(line.decode('utf8'))
[docs] async def read_output(self, stdout, *args): raise NotImplementedError('Please, redefine BaseProcess.read_output coroutine')
[docs] def set_print_output(self, print_output): self.print_output = print_output
[docs] @maybeclassmethod def read_from_file(self, cls, output, error=None, read_args=[], return_code=0): """ Synchronous wrapper for reading output/error from files. """ if not self: self = cls() return run_coro_sync( self.read_output_file(output, error, read_args, return_code) )
[docs] async def read_output_file(self, output, error=None, read_args=[], return_code=0): """ Async version: reads output and error from files using AsyncioFileReader. """ async def read_output_file(): with AsyncioFileReader(output) as air: return await self.read_output(air, *read_args) async def read_error_file(): if not error: return None with AsyncioFileReader(error) as air: return await self.read_error(air, *read_args) out_result, err_result = await asyncio.gather( read_output_file(), read_error_file() ) return self.result(out_result, err_result, return_code)
[docs] class AsyncioFileReader: """ File reader that mimics asyncio StreamReader. In fact, it is synchronous, but it offers the same interface as the asyncio StreamReader. """
[docs] def __init__(self, filename, buffersize=8192): self.file = None # just to set it in case the open fail self.file = open(filename, 'rb') if isinstance(filename, str) else filename self.buffersize = buffersize
[docs] def close(self): if self.file: self.file.close() self.file=None
def __del__(self): self.close() def __enter__(self): return self def __exit__(self, type, value, traceback): self.close()
[docs] async def readline(self): return self.file.readline()
[docs] @staticmethod @functools.lru_cache(maxsize=128) def separator_shifts(sep): shifts = np.ones(len(sep) + 1, dtype=int) shift = 1 for pos in range(len(sep)): while shift <= pos and sep[pos] != sep[pos - shift]: shift += shifts[pos - shift] shifts[pos + 1] = shift return shifts
[docs] async def readuntil(self, sep=b'\n'): lsep = len(sep) if lsep == 0: return b'' # https://stackoverflow.com/questions/14128763/how-to-find-the-overlap-between-2-sequences-and-return-it shifts = self.separator_shifts(sep) # do the actual search # read the bytes into the array to avoid incrementing the array one by one out = b'' buffer = bytearray(self.buffersize + lsep) bufEnd = 0 bufPos = 0 def result(): if not out: return bytes(buffer[:bufPos]) return out + bytes(buffer[:bufPos]) startPos = 0 matchLen = 0 while True: if bufPos == bufEnd: if bufEnd >= self.buffersize: out += buffer[:bufEnd] bufEnd = bufPos = 0 n = lsep - matchLen data = self.file.read(n) ldata = len(data) buffer[bufPos:bufPos + ldata] = data bufEnd += ldata if len(data) < n: raise asyncio.IncompleteReadError(result(), sep) c = buffer[bufPos] bufPos += 1 while matchLen >= 0 and sep[matchLen] != c: startPos += shifts[matchLen] matchLen -= shifts[matchLen] matchLen += 1 if matchLen == lsep: return result()
[docs] async def readline(stdout): line = await stdout.readline() if not line: raise EOFError() return line.decode('utf8')
[docs] async def readline_until(stdout, cond, can_end=True): while True: line = await stdout.readline() if not line: if can_end: return '' raise EOFError() if cond(line): return line.decode('utf8')
[docs] def run_coro_sync(coro_func): """ Run an async coroutine from a synchronous context, even if an event loop is already running. `coro_func` must be an async function, not a coroutine object. """ # Is there a running event loop? try: asyncio.get_running_loop() except RuntimeError: return asyncio.run(coro_func) # YES → must run in another thread result = None exc = None def runner(): nonlocal result, exc try: result = asyncio.run(coro_func) except Exception as e: exc = e t = Thread(target=runner) t.start() t.join() if exc: raise exc return result