from __future__ import annotations
"""
Classes to facilitate communication between this process and the pytotex half.
"""
from abc import ABC, abstractmethod
import typing
import sys
from typing import List, Any, Optional, Dict, Type, TypeVar, IO, Callable
import dataclasses
from dataclasses import dataclass
from io import StringIO
import argparse
from pathlib import Path
from multiprocessing.connection import Client, Connection
char_to_communicator: Dict[str, Type[Communicator]]={}
T1=TypeVar("T1", bound="Communicator")
ListenForwarder=Callable[[], None] # function that listens to data sent by a communicator and forward them to stdout
[docs]class Communicator(ABC):
character = typing.cast(str, None)
[docs] @abstractmethod
def send(self, data: bytes)->None:
"""
Send data. This function is called in the textopy part.
"""
...
[docs] @staticmethod
@abstractmethod
def setup()->tuple[Communicator, ListenForwarder]:
"""
Constructs an communicator object, that can be used to send information to this process.
The Communicator should be sent to the other process as part of the GlobalConfiguration object.
The ListenForwarder should be called in this process.
"""
...
@staticmethod
@abstractmethod
def is_available()->bool: ...
[docs]@dataclass
class MultiprocessingNetworkCommunicator(Communicator):
port: int
connection: Optional[Connection]=dataclasses.field(default=None, repr=False)
[docs] def send(self, data: bytes)->None:
if self.connection is None:
self.address=("localhost", self.port)
self.connection=Client(self.address)
self.connection.send_bytes(data)
[docs] @staticmethod
def setup()->tuple[MultiprocessingNetworkCommunicator, ListenForwarder]:
from multiprocessing.connection import Listener
# pick address randomly and create listener with it until it succeeds
import socket
import random
while True:
try:
port = random.randint(1024, 65535)
address=("localhost", port)
listener=Listener(address)
break
except socket.error:
pass
def listen_forwarder()->None:
with listener:
with listener.accept() as connection:
while True:
try:
data=connection.recv_bytes()
sys.__stdout__.buffer.write(data) # will go to TeX
sys.__stdout__.buffer.flush()
except EOFError: break
return MultiprocessingNetworkCommunicator(port), listen_forwarder
@staticmethod
def is_available()->bool:
return True
[docs]@dataclass
class UnnamedPipeCommunicator(Communicator):
pid: int
fileno: int
connection: Optional[IO[bytes]]=dataclasses.field(default=None, repr=False)
[docs] def send(self, data: bytes)->None:
if self.connection is None:
self.connection=open(f"/proc/{self.pid}/fd/{self.fileno}", "wb")
self.connection.write(data)
self.connection.flush() # just in case
[docs] @staticmethod
def setup()->tuple[UnnamedPipeCommunicator, ListenForwarder]:
import os
r, w = os.pipe()
def listen_forwarder()->None:
closed_w=False
# TODO if the other process never write anything, this will block forever
for line in os.fdopen(r, "rb"):
if not closed_w:
os.close(w) # so that the loop will end when the other process closes the pipe
closed_w=True
sys.stdout.buffer.write(line)
sys.stdout.buffer.flush()
return UnnamedPipeCommunicator(os.getpid(), w), listen_forwarder
@staticmethod
def is_available()->bool:
import os
from pathlib import Path
return os.name=="posix" and Path("/proc").is_dir()
[docs]@dataclass
class GlobalConfiguration:
"""
Represents the configuration.
This will be parsed from command-line argument in pytotex
using :meth:`from_args`, then sent
to textopy side (which is where most of the processing is done)
with a base64 of pickle encoding,
on the textopy side a pseudo-config is created from the passed command-line arguments,
then the real config is read in :class:`.ParentProcessEngine`'s constructor.
Which means it's preferable to avoid any mutable state in the configuration object.
"""
debug: int=0
communicator: Communicator=typing.cast(Communicator, None)
sanity_check_extra_line: bool=False
debug_force_buffered: bool=False
debug_log_communication: Optional[str]=None
naive_flush: bool=False
def __post_init__(self)->None:
assert -9<=self.debug<=9
@staticmethod
def from_args(args: argparse.Namespace, communicator: Communicator)->GlobalConfiguration:
return GlobalConfiguration(
debug=args.debug,
communicator=communicator,
sanity_check_extra_line=args.sanity_check_extra_line,
debug_force_buffered=args.debug_force_buffered,
debug_log_communication=args.debug_log_communication,
naive_flush=args.naive_flush,
)