Source code for pythonimmediate.communicate

from __future__ import annotations

"""
Classes to facilitate communication between this process and the pytotex half.
"""

from abc import ABC, abstractmethod
import typing
import sys
from typing import List, Any, Optional, Dict, Type, TypeVar, IO, Callable
from dataclasses import dataclass
from io import StringIO
import argparse


char_to_communicator: Dict[str, Type[Communicator]]={}

T1=TypeVar("T1", bound="Communicator")

ListenForwarder=Callable[[], None]  # function that listens to data sent by a communicator and forward them to stdout

[docs]class Communicator(ABC): character = typing.cast(str, None)
[docs] @abstractmethod def send(self, data: bytes)->None: """ Send data. This function is called in the textopy part. """ ...
[docs] @staticmethod @abstractmethod def setup()->tuple[Communicator, ListenForwarder]: """ Constructs an communicator object, that can be used to send information to this process. The Communicator should be sent to the other process as part of the GlobalConfiguration object. The ListenForwarder should be called in this process. """ ...
[docs] @staticmethod @abstractmethod def is_available()->bool: ...
from multiprocessing.connection import Client, Connection import dataclasses
[docs]@dataclass class MultiprocessingNetworkCommunicator(Communicator): port: int connection: Optional[Connection]=dataclasses.field(default=None, repr=False)
[docs] def send(self, data: bytes)->None: if self.connection is None: self.address=("localhost", self.port) self.connection=Client(self.address) self.connection.send_bytes(data)
[docs] @staticmethod def setup()->tuple[MultiprocessingNetworkCommunicator, ListenForwarder]: from multiprocessing.connection import Listener # pick address randomly and create listener with it until it succeeds import socket import random while True: try: port = random.randint(1024, 65535) address=("localhost", port) listener=Listener(address) break except socket.error: pass def listen_forwarder()->None: with listener: with listener.accept() as connection: while True: try: data=connection.recv_bytes() sys.__stdout__.buffer.write(data) # will go to TeX sys.__stdout__.buffer.flush() except EOFError: break return MultiprocessingNetworkCommunicator(port), listen_forwarder
[docs] @staticmethod def is_available()->bool: return True
[docs]@dataclass class UnnamedPipeCommunicator(Communicator): pid: int fileno: int connection: Optional[IO[bytes]]=dataclasses.field(default=None, repr=False)
[docs] def send(self, data: bytes)->None: if self.connection is None: self.connection=open(f"/proc/{self.pid}/fd/{self.fileno}", "wb") self.connection.write(data) self.connection.flush() # just in case
[docs] @staticmethod def setup()->tuple[UnnamedPipeCommunicator, ListenForwarder]: import os r, w = os.pipe() def listen_forwarder()->None: closed_w=False # TODO if the other process never write anything, this will block forever for line in os.fdopen(r, "rb"): if not closed_w: os.close(w) # so that the loop will end when the other process closes the pipe closed_w=True sys.stdout.buffer.write(line) sys.stdout.buffer.flush() return UnnamedPipeCommunicator(os.getpid(), w), listen_forwarder
[docs] @staticmethod def is_available()->bool: import os from pathlib import Path return os.name=="posix" and Path("/proc").is_dir()
[docs]@dataclass class GlobalConfiguration: debug: int=0 communicator: Communicator=typing.cast(Communicator, None) sanity_check_extra_line: bool=False debug_force_buffered: bool=False naive_flush: bool=False def __post_init__(self)->None: assert 0<=self.debug<=9
[docs] @staticmethod def from_args(args: argparse.Namespace, communicator: Communicator)->GlobalConfiguration: return GlobalConfiguration( debug=args.debug, communicator=communicator, sanity_check_extra_line=args.sanity_check_extra_line, debug_force_buffered=args.debug_force_buffered, naive_flush=args.naive_flush, )