from __future__ import annotations
"""
Classes to facilitate communication between this process and the pytotex half.
"""
from abc import ABC, abstractmethod
import typing
import sys
from typing import List, Any, Optional, Dict, Type, TypeVar, IO, Callable
from dataclasses import dataclass
from io import StringIO
import argparse
char_to_communicator: Dict[str, Type[Communicator]]={}
T1=TypeVar("T1", bound="Communicator")
ListenForwarder=Callable[[], None] # function that listens to data sent by a communicator and forward them to stdout
[docs]class Communicator(ABC):
character = typing.cast(str, None)
[docs] @abstractmethod
def send(self, data: bytes)->None:
"""
Send data. This function is called in the textopy part.
"""
...
[docs] @staticmethod
@abstractmethod
def setup()->tuple[Communicator, ListenForwarder]:
"""
Constructs an communicator object, that can be used to send information to this process.
The Communicator should be sent to the other process as part of the GlobalConfiguration object.
The ListenForwarder should be called in this process.
"""
...
[docs] @staticmethod
@abstractmethod
def is_available()->bool: ...
from multiprocessing.connection import Client, Connection
import dataclasses
[docs]@dataclass
class MultiprocessingNetworkCommunicator(Communicator):
port: int
connection: Optional[Connection]=dataclasses.field(default=None, repr=False)
[docs] def send(self, data: bytes)->None:
if self.connection is None:
self.address=("localhost", self.port)
self.connection=Client(self.address)
self.connection.send_bytes(data)
[docs] @staticmethod
def setup()->tuple[MultiprocessingNetworkCommunicator, ListenForwarder]:
from multiprocessing.connection import Listener
# pick address randomly and create listener with it until it succeeds
import socket
import random
while True:
try:
port = random.randint(1024, 65535)
address=("localhost", port)
listener=Listener(address)
break
except socket.error:
pass
def listen_forwarder()->None:
with listener:
with listener.accept() as connection:
while True:
try:
data=connection.recv_bytes()
sys.__stdout__.buffer.write(data) # will go to TeX
sys.__stdout__.buffer.flush()
except EOFError: break
return MultiprocessingNetworkCommunicator(port), listen_forwarder
[docs] @staticmethod
def is_available()->bool:
return True
[docs]@dataclass
class UnnamedPipeCommunicator(Communicator):
pid: int
fileno: int
connection: Optional[IO[bytes]]=dataclasses.field(default=None, repr=False)
[docs] def send(self, data: bytes)->None:
if self.connection is None:
self.connection=open(f"/proc/{self.pid}/fd/{self.fileno}", "wb")
self.connection.write(data)
self.connection.flush() # just in case
[docs] @staticmethod
def setup()->tuple[UnnamedPipeCommunicator, ListenForwarder]:
import os
r, w = os.pipe()
def listen_forwarder()->None:
closed_w=False
# TODO if the other process never write anything, this will block forever
for line in os.fdopen(r, "rb"):
if not closed_w:
os.close(w) # so that the loop will end when the other process closes the pipe
closed_w=True
sys.stdout.buffer.write(line)
sys.stdout.buffer.flush()
return UnnamedPipeCommunicator(os.getpid(), w), listen_forwarder
[docs] @staticmethod
def is_available()->bool:
import os
from pathlib import Path
return os.name=="posix" and Path("/proc").is_dir()
[docs]@dataclass
class GlobalConfiguration:
debug: int=0
communicator: Communicator=typing.cast(Communicator, None)
sanity_check_extra_line: bool=False
debug_force_buffered: bool=False
naive_flush: bool=False
def __post_init__(self)->None:
assert 0<=self.debug<=9
[docs] @staticmethod
def from_args(args: argparse.Namespace, communicator: Communicator)->GlobalConfiguration:
return GlobalConfiguration(
debug=args.debug,
communicator=communicator,
sanity_check_extra_line=args.sanity_check_extra_line,
debug_force_buffered=args.debug_force_buffered,
naive_flush=args.naive_flush,
)