from __future__ import annotations
import sys
import os
import inspect
import threading
import contextlib
import io
import functools
from fractions import Fraction
from typing import Optional, Union, Callable, Any, Iterator, Protocol, Iterable, Sequence, Type, Tuple, List, Dict, IO, Set, Literal, Generator
import typing
from abc import ABC, abstractmethod
from pathlib import Path
from dataclasses import dataclass
import tempfile
import signal
import traceback
import re
import collections
from collections import defaultdict
import enum
from weakref import WeakKeyDictionary
import weakref
import itertools
import string
import numbers
import random
import linecache
from .engine import Engine, default_engine, default_engine as engine, ParentProcessEngine, EngineStatus, TeXProcessError, TeXProcessExited, ChildProcessEngine
T1 = typing.TypeVar("T1")
is_sphinx_build = "SPHINX_BUILD" in os.environ
expansion_only_can_call_Python=False # normally. May be different in LuaTeX etc.
debugging: bool=True
if os.environ.get("pythonimmediatenodebug", "").lower() in ["true", "1"]:
debugging=False
def surround_delimiter(block: str)->str:
while True:
delimiter=str(random.randint(0, 10**12))
if delimiter not in block: break
return delimiter + "\n" + block + "\n" + delimiter + "\n"
EngineDependentCode=Callable[[Engine], str]
bootstrap_code_functions: list[EngineDependentCode]=[]
"""
Internal constant.
Contains functions that takes an engine object and returns some code before :meth:`substitute_private` is applied on it.
:meta hide-value:
"""
def mark_bootstrap(code: str|EngineDependentCode)->None:
if isinstance(code, str):
bootstrap_code_functions.append(lambda _engine: code)
else:
bootstrap_code_functions.append(code)
# check TeX package version.
mark_bootstrap(r"""
\exp_args:Nx \str_if_in:nnF {\csname ver@pythonimmediate.sty\endcsname} {%} {
\msg_new:nnn {pythonimmediate} {incompatible-version} {Incompatible~ TeX~ package~ version~ (#1)~ installed!~ Need~ at~ least~ %.}
\msg_error:nnx {pythonimmediate} {incompatible-version} {\csname ver@pythonimmediate.sty\endcsname}
}
""".replace("%", "v0.5.0"))
def substitute_private(code: str)->str:
assert "_pythonimmediate_" not in code # avoid double-apply this function
return (code
#.replace("\n", ' ') # because there are comments in code, cannot
.replace("__", "_" + "pythonimmediate" + "_")
)
def postprocess_send_code(s: str, put_sync: bool)->str:
assert have_naive_replace(s), s
if put_sync:
# keep the naive-sync operation
pass
else:
# remove it
s=naive_replace(s, False)
return s
# ========
# as the name implies, this reads one "command" from Python side and execute it.
# the command might do additional tasks e.g. read more [TeX]-code.
#
# e.g. if ``block`` is read from the communication channel, run ``\__run_block:``.
@mark_bootstrap
def _naive_flush_data_define(engine: Engine)->str:
if engine.config.naive_flush:
# the function x-expand to something in a single line that is at least 4095 bytes long and ignored by Python
# (plus the newline would be 4096)
return r"""
\cs_new:Npn \__naive_flush_data: {
pythonimmediate-naive-flush-line \prg_replicate:nn {4063} {~}
}
"""
return ""
@typing.overload
def naive_replace(code: str, naive_flush: bool, /)->str: ...
@typing.overload
def naive_replace(code: str, engine: Engine, /)->str: ...
def naive_replace(code: str, x: Union[Engine, bool])->str:
code1=code
if (x.config.naive_flush if isinstance(x, Engine) else x):
code1=code1.replace("%naive_inline%", r"^^J \__naive_flush_data: ")
code1=code1.replace("%naive_flush%", r"\__send_content:e {\__naive_flush_data:}")
code1=code1.replace("%naive_send%", r"_naive_flush")
else:
code1=code1.replace("%naive_inline%", "")
code1=code1.replace("%naive_flush%", "")
code1=code1.replace("%naive_send%", "")
code1=code1.replace("%naive_ignore%", "")
return code1
def have_naive_replace(code: str)->bool:
return naive_replace(code, False)!=code
def wrap_naive_replace(code: str)->EngineDependentCode:
return functools.partial(naive_replace, code)
[docs]def mark_bootstrap_naive_replace(code: str)->None:
r"""
Similar to :func:`mark_bootstrap`, but code may contain one of the following:
- ``%naive_inline``: replaced with ``^^J \__naive_flush_data:``
if :attr:`Engine.config.naive_flush` is ``True``, else become empty
- ``%naive_flush%``: replaced with ``\__send_content:e {\__naive_flush_data:}``
if :attr:`Engine.config.naive_flush` is ``True``
"""
mark_bootstrap(wrap_naive_replace(code))
mark_bootstrap_naive_replace(
r"""
\cs_new_protected:Npn \pythonimmediatelisten {
\begingroup
\endlinechar=-1~
\readline \__read_file to \__line
\expandafter
\endgroup % also this will give an error instead of silently do nothing when command is invalid
\csname __run_ \__line :\endcsname
}
\cs_new_protected:Npn \pythonimmediatecallhandlerasync #1 {
\__send_content:e {i #1 %naive_inline% }
}
\cs_new_protected:Npn \pythonimmediatecallhandler #1 {
\pythonimmediatecallhandlerasync {#1} \pythonimmediatelisten
}
% read documentation of ``_peek`` commands for details what this command does.
\cs_new_protected:Npn \pythonimmediatecontinue #1 {
\__send_content:e {r #1 %naive_inline% }
\pythonimmediatelisten
}
\cs_new_protected:Npn \pythonimmediatecontinuenoarg {
\pythonimmediatecontinue {}
}
\cs_new_protected:Npn \__send_content:n #1 {
\__send_content:e { \unexpanded{#1} }
}
\cs_new_protected:Npn \__send_content_naive_flush:e #1 {
\__send_content:e { #1 %naive_inline% }
}
\cs_new_protected:Npn \__send_content_naive_flush:n #1 {
\__send_content_naive_flush:e { \unexpanded{#1} }
}
% the names are such that \__send_content%naive_send%:n {something} results in the correct content
% internal function. Just send an arbitrary block of data to Python.
% this function only works properly when newlinechar = 10.
\cs_new_protected:Npn \__send_block:e #1 {
\__send_content:e {
#1 ^^J
pythonimm?""" + '"""' + r"""?'''? % following character will be newline
}
}
\cs_new_protected:Npn \__send_block:n #1 {
\__send_block:e {\unexpanded{#1}}
}
\cs_new_protected:Npn \__send_block_naive_flush:e #1 {
\__send_content:e {
#1 ^^J
pythonimm?""" + '"""' + r"""?'''? % following character will be newline
%naive_inline%
}
}
\cs_new_protected:Npn \__send_block_naive_flush:n #1 {
\__send_block_naive_flush:e {\unexpanded{#1}}
}
\cs_generate_variant:Nn \__send_block:n {V}
\cs_generate_variant:Nn \__send_block_naive_flush:n {V}
\bool_if:NF \__child_process {
\AtEndDocument{
\__send_content:e {r %naive_inline%}
\pythonimmediatelisten
\__close_write:
}
}
""")
# the last one don't need to flush because will close anyway (right?)
TeXToPyObjectType=Optional[str]
def run_main_loop()->TeXToPyObjectType:
assert engine.status==EngineStatus.running
while True:
line=_readline()
if line[0]=="i":
identifier=line[1:]
f=_handlers.get(identifier)
if f is None: _per_engine_handlers[default_engine.get_engine()][identifier]()
else: f()
elif line[0]=="r":
return line[1:]
else:
raise RuntimeError("Internal error: unexpected line "+line)
def run_main_loop_get_return_one()->str:
assert engine.status==EngineStatus.running
line=_readline()
assert line[0]=="r", line
return line[1:]
def _run_block_finish(block: str)->None:
assert default_engine.status==EngineStatus.waiting
engine.write(("block\n" + surround_delimiter(block)).encode('u8'))
default_engine.status=EngineStatus.running
[docs]def check_line(line: str, *, braces: bool, newline: bool, continue_: Optional[bool])->None:
"""
check user-provided line before sending to TeX for execution
"""
if braces:
assert line.count("{") == line.count("}")
if newline:
assert '\n' not in line
assert '\r' not in line # this is not the line separator but just in case
if continue_==True: assert "pythonimmediatecontinue" in line
elif continue_==False: assert "pythonimmediatecontinue" not in line
def _readline()->str:
line=engine.read().decode('u8')
return line
block_delimiter: str="pythonimm?\"\"\"?'''?"
def _read_block()->str:
r"""
Internal function to read one block sent from [TeX]
(including the final delimiter line, but the delimiter line is not returned)
"""
lines: List[str]=[]
while True:
line=_readline()
if line==block_delimiter:
return '\n'.join(lines)
else:
lines.append(line)
[docs]class TeXToPyData(ABC):
"""
Internal class (for now). Represent a data type that can be sent from [TeX] to Python.
"""
[docs] @staticmethod
@abstractmethod
def read()->"TeXToPyData":
"""
Given that [TeX] has just sent the data, read into a Python object.
"""
...
[docs] @staticmethod
@abstractmethod
def send_code(arg: str)->str:
"""
Return some [TeX] code that sends the argument to Python, where *arg* represents a token list or equivalent (such as ``#1``).
"""
pass
[docs] @staticmethod
@abstractmethod
def send_code_var(var: str)->str:
r"""
Return some [TeX] code that sends the argument to Python, where *var* represents a token list variable
(such as ``\l__my_var_tl``) that contains the content to be sent.
"""
pass
# tried and failed
#@typing.runtime_checkable
#class TeXToPyData(Protocol):
# @staticmethod
# def read()->"TeXToPyData":
# ...
#
# #send_code: str
#
# #@staticmethod
# #@property
# #def send_code()->str:
# # ...
def _format(s: str)->Callable:
def _result(*args: str)->str:
"""
"""
return s.format(*args)
return _result
[docs]class TTPRawLine(TeXToPyData, bytes):
send_code=_format(r"\__send_content%naive_send%:n {{ {} }}")
send_code_var=_format(r"\__send_content%naive_send%:n {{ {} }}")
[docs] @staticmethod
def read()->"TTPRawLine":
line=engine.read()
return TTPRawLine(line)
[docs]class TTPLine(TeXToPyData, str):
send_code=_format(r"\__send_content%naive_send%:n {{ {} }}")
send_code_var=_format(r"\__send_content%naive_send%:n {{ {} }}")
[docs] @staticmethod
def read()->"TTPLine":
return TTPLine(_readline())
[docs]class TTPELine(TeXToPyData, str):
"""
Same as :class:`TTPEBlock`, but for a single line only.
"""
send_code=_format(r"\__begingroup_setup_estr: \__send_content%naive_send%:e {{ {} }} \endgroup")
send_code_var=_format(r"\__begingroup_setup_estr: \__send_content%naive_send%:e {{ {} }} \endgroup")
[docs] @staticmethod
def read()->"TTPELine":
return TTPELine(_readline())
[docs]class TTPEmbeddedLine(TeXToPyData, str):
[docs] @staticmethod
def send_code(arg: str)->str:
raise RuntimeError("Must be manually handled")
[docs] @staticmethod
def send_code_var(arg: str)->str:
raise RuntimeError("Must be manually handled")
[docs] @staticmethod
def read()->"TTPEmbeddedLine":
raise RuntimeError("Must be manually handled")
[docs]class TTPBlock(TeXToPyData, str):
send_code=_format(r"\__send_block:n {{ {} }} %naive_flush%")
send_code_var=_format(r"\__send_block:V {} %naive_flush%")
[docs] @staticmethod
def read()->"TTPBlock":
return TTPBlock(_read_block())
[docs]class TTPEBlock(TeXToPyData, str):
r"""
A kind of argument that interprets "escaped string" and fully expand anything inside.
For example, ``{\\}`` sends a single backslash to Python, ``{\{}`` sends a single ``{`` to Python.
Done by fully expand the argument in ``\escapechar=-1`` and convert it to a string.
Additional precaution is needed, see the note above (TODO write documentation).
Refer to :ref:`estr-expansion` for more details.
"""
send_code=_format(r"\__begingroup_setup_estr: \__send_block%naive_send%:e {{ {} }} \endgroup")
send_code_var=_format(r"\__begingroup_setup_estr: \__send_block%naive_send%:e {{ {} }} \endgroup")
[docs] @staticmethod
def read()->"TTPEBlock":
return TTPEBlock(_read_block())
@mark_bootstrap
def _send_balanced_tl(engine: Engine)->str:
if engine.name=="luatex": return ""
return naive_replace(r"""
\cs_new_protected:Npn \__send_balanced_tl:n #1 {
\__tlserialize_nodot:Nn \__tmp { #1 }
\__send_content%naive_send%:e {\unexpanded\expandafter{ \__tmp } }
}
""", engine)
[docs]class PyToTeXData(ABC):
"""
Internal class (for now). Represent a data type that can be sent from Python to [TeX].
"""
[docs] @staticmethod
@abstractmethod
def read_code(var: str)->str:
r"""
Takes an argument, the variable name (with backslash prefixed such as ``"\abc"``.)
:return: some [TeX] code that when executed in expl3 category code regime,
will read a value of the specified data type and assign it to the variable.
"""
...
[docs] @abstractmethod
def serialize(self)->bytes:
"""
Return a bytes object that can be passed to ``engine.write()`` directly.
"""
...
[docs]@dataclass
class PTTVerbatimRawLine(PyToTeXData):
r"""
Represents a line to be tokenized verbatim. Internally the ``\readline`` primitive is used, as such, any trailing spaces are stripped.
The trailing newline is not included, i.e. it's read under ``\endlinechar=-1``.
"""
data: bytes
read_code=_format(r"\__str_get:N {} ")
def valid(self)->bool:
return b"\n" not in self.data and self.data.rstrip()==self.data
[docs] def serialize(self)->bytes:
assert self.valid()
return self.data+b"\n"
[docs]@dataclass
class PTTVerbatimLine(PyToTeXData):
data: str
read_code=PTTVerbatimRawLine.read_code
@property
def _raw(self)->PTTVerbatimRawLine:
return PTTVerbatimRawLine(self.data.encode('u8'))
def valid(self)->bool:
return self._raw.valid()
[docs] def serialize(self)->bytes:
return self._raw.serialize()
[docs]@dataclass
class PTTInt(PyToTeXData):
data: int
read_code=PTTVerbatimLine.read_code
[docs] def serialize(self)->bytes:
return PTTVerbatimLine(str(self.data)).serialize()
[docs]@dataclass
class PTTTeXLine(PyToTeXData):
r"""
Represents a line to be tokenized in \TeX's current catcode regime.
The trailing newline is not included, i.e. it's tokenized under ``\endlinechar=-1``.
"""
data: str
read_code=_format(r"\__get:N {}")
[docs] def serialize(self)->bytes:
assert "\n" not in self.data
return (self.data+"\n").encode('u8')
[docs]@dataclass
class PTTBlock(PyToTeXData):
data: str
read_code=_format(r"\__read_block:N {}")
[docs] @staticmethod
def ignore_last_space(s: str)->PTTBlock:
"""
Construct a block from arbitrary string, deleting trailing spaces on each line.
"""
return PTTBlock("\n".join(line.rstrip() for line in s.split("\n")))
[docs] @staticmethod
def coerce(s: str)->PTTBlock:
"""
Construct a block from arbitrary string, delete some content if needed.
"""
return PTTBlock("\n".join(line.rstrip() for line in s.split("\n")))
def valid(self)->bool:
return "\r" not in self.data and all(line==line.rstrip() for line in self.data.splitlines())
[docs] def serialize(self)->bytes:
assert self.valid(), self
return surround_delimiter(self.data).encode('u8')
[docs]@dataclass
class PTTBalancedTokenList(PyToTeXData):
data: BalancedTokenList
read_code=_format(r"\__str_get:N {0} \__tldeserialize_dot:NV {0} {0}")
[docs] def serialize(self)->bytes:
return PTTVerbatimRawLine(self.data.serialize_bytes()+b".").serialize()
# ======== define TeX functions that execute Python code ========
# ======== implementation of ``\py`` etc. Doesn't support verbatim argument yet. ========
def random_TeX_identifiers()->Iterator[str]: # do this to avoid TeX hash collision while keeping the length short
for len_ in itertools.count(0):
for value in range(1<<len_):
for initial in string.ascii_letters:
identifier = initial
if len_>0:
identifier += f"{value:0{len_}b}".translate({ord("0"): "a", ord("1"): "b"})
yield identifier
def random_Python_identifiers()->Iterator[str]: # these are used for keys in
for len_ in itertools.count(0):
for s in itertools.product(string.ascii_letters, repeat=len_):
yield "".join(s)
random_TeX_identifier_iterable=random_TeX_identifiers()
random_Python_identifier_iterable=random_Python_identifiers()
def get_random_TeX_identifier()->str: return next(random_TeX_identifier_iterable)
def get_random_Python_identifier()->str: return next(random_Python_identifier_iterable)
_handlers: Dict[str, Callable[[], None]]={}
_per_engine_handlers: WeakKeyDictionary[Engine, Dict[str, Callable[[], None]]]=WeakKeyDictionary()
[docs]def define_TeX_call_Python(f: Callable[..., None], name: Optional[str]=None, argtypes: Optional[List[Type[TeXToPyData]]]=None, identifier: Optional[str]=None)->EngineDependentCode:
r"""
This function setups some internal data structure, and
returns the [TeX]-code to be executed on the [TeX]-side to define the macro.
:param f: the Python function to be executed.
It should take some arguments plus a keyword argument ``engine`` and eventually (optionally) call one of the ``_finish`` functions.
:param name: the macro name on the [TeX]-side. This should only consist of letter characters in ``expl3`` catcode regime.
:param argtypes: list of argument types. If it's None it will be automatically deduced from the function ``f``'s signature.
:param identifier: should be obtained by :func:`get_random_Python_identifier`.
:returns: some code (to be executed in ``expl3`` catcode regime) as explained above.
"""
if argtypes is None:
argtypes=[p.annotation for p in inspect.signature(f).parameters.values()]
for i, argtype in enumerate(argtypes):
if isinstance(argtype, str):
assert argtype in globals(), f"cannot resolve string annotation {argtype}"
argtypes[i]=argtype=globals()[argtype]
argtypes=[t for t in argtypes if t is not Engine] # temporary hack
for argtype in argtypes:
if not issubclass(argtype, TeXToPyData):
raise RuntimeError(f"Argument type {argtype} is incorrect, should be a subclass of TeXToPyData")
if name is None: name=f.__name__
if identifier is None: identifier=get_random_Python_identifier()
assert identifier not in _handlers, identifier
@functools.wraps(f)
def g()->None:
if engine.config.debug>=5:
print("TeX macro", name, "called")
assert argtypes is not None
args=[argtype.read() for argtype in argtypes]
assert engine.status==EngineStatus.running # this is the status just before the handler is called
engine.status=EngineStatus.waiting
f(*args)
if engine.status==EngineStatus.waiting:
run_none_finish()
assert engine.status==EngineStatus.running, engine.status
assert engine.status==EngineStatus.running, engine.status
_handlers[identifier]=g
TeX_argspec = ""
TeX_send_input_commands = ""
for i, argtype in enumerate(argtypes):
arg = f"#{i+1}"
TeX_send_input_commands += postprocess_send_code(argtype.send_code(arg), put_sync=i==len(argtypes)-1)
TeX_argspec += arg
if not argtypes:
TeX_send_input_commands += "%naive_flush%"
assert have_naive_replace(TeX_send_input_commands)
return wrap_naive_replace(r"""
\cs_new_protected:Npn """ + "\\"+name + TeX_argspec + r""" {
\__send_content:e { i """ + identifier + """ }
""" + TeX_send_input_commands + r"""
\pythonimmediatelisten
}
""")
FunctionType = typing.TypeVar("FunctionType", bound=Callable)
[docs]def define_internal_handler(f: FunctionType)->FunctionType:
"""
Define a TeX function with TeX name = ``f.__name__`` that calls f().
This does not define the specified function in any particular engine, just add them to the :const:`bootstrap_code`.
"""
mark_bootstrap(define_TeX_call_Python(f))
return f
# https://stackoverflow.com/questions/47183305/file-string-traceback-with-line-preview
def exec_or_eval_with_linecache(code: str, globals: dict, mode: str)->Any:
sourcename: str="<usercode>"
i=0
while sourcename in linecache.cache:
sourcename="<usercode" + str(i) + ">"
i+=1
lines=code.splitlines(keepends=True)
linecache.cache[sourcename] = len(code), None, lines, sourcename
compiled_code=compile(code, sourcename, mode)
if mode=="exec": exec(compiled_code, globals)
else: eval(compiled_code, globals)
#del linecache.cache[sourcename]
# we never delete the cache, in case some function is defined here then later are called...
def exec_with_linecache(code: str, globals: Dict[str, Any])->None:
exec_or_eval_with_linecache(code, globals, "exec")
def eval_with_linecache(code: str, globals: Dict[str, Any])->Any:
return exec_or_eval_with_linecache(code, globals, "eval")
"""
In some engine, when -8bit option is not enabled, the code will be escaped before being sent to Python.
So for example if the original code contains a literal tab character, ``^^I`` might be sent to Python instead.
This do a fuzzy-normalization over these so that the sourcecode can be correctly matched.
"""
potentially_escaped_characters=str.maketrans({
chr(i): "^^" + chr(i^0x40)
for i in [*range(0, 32), 127]
})
def normalize_line(line: str)->str:
assert line.endswith("\n")
line=line[:-1]
while line.endswith("^^I"): line=line[:-3]
return line.rstrip(" \t").translate(potentially_escaped_characters)
[docs]def can_be_mangled_to(original: str, mangled: str)->bool:
r"""
Internal functions, used to implemented :func:`.pycode` environment.
If *original* is put in a [TeX] file, read in other catcode regime (possibly drop trailing spaces/tabs),
and then sent through ``\write`` (possibly convert control characters to ``^^``-notation),
is it possible that the written content is equal to *mangled*?
The function is somewhat tolerant (might return ``True`` in some cases where ``False`` should be returned), but not too tolerant.
Example::
>>> can_be_mangled_to("a\n", "a\n")
True
>>> can_be_mangled_to("\n", "\n")
True
>>> can_be_mangled_to("\t\n", "\n")
True
>>> can_be_mangled_to("\t\n", "\t\n")
True
>>> can_be_mangled_to("\t\n", "^^I\n")
True
>>> can_be_mangled_to("\ta\n", "^^Ia\n")
True
>>> can_be_mangled_to("a b\n", "a b\n")
True
>>> can_be_mangled_to("a b \n", "a b\n")
True
>>> can_be_mangled_to("a\n", "b\n")
False
"""
return normalize_line(original)==normalize_line(mangled)
def _template_substitute(template: str, pattern: str, substitute: Union[str, Callable[[re.Match], str]], optional: bool=False)->str:
if not optional:
#assert template.count(pattern)==1
assert len(re.findall(pattern, template))==1
return re.sub(pattern, substitute, template)
#typing.TypeVarTuple(PyToTeXData)
#PythonCallTeXFunctionType=Callable[[PyToTeXData], Optional[Tuple[TeXToPyData, ...]]]
[docs]class PythonCallTeXFunctionType(Protocol): # https://stackoverflow.com/questions/57658879/python-type-hint-for-callable-with-variable-number-of-str-same-type-arguments
def __call__(self, *args: PyToTeXData)->Optional[Tuple[TeXToPyData, ...]]: ...
[docs]class PythonCallTeXSyncFunctionType(PythonCallTeXFunctionType, Protocol): # https://stackoverflow.com/questions/57658879/python-type-hint-for-callable-with-variable-number-of-str-same-type-arguments
def __call__(self, *args: PyToTeXData)->Tuple[TeXToPyData, ...]: ...
[docs]@dataclass(frozen=True)
class Python_call_TeX_data:
TeX_code: str
recursive: bool
finish: bool
sync: Optional[bool]
Python_call_TeX_defined: Dict[Python_call_TeX_data, Tuple[Python_call_TeX_extra, Callable]]={}
[docs]def Python_call_TeX_local(TeX_code: str, *, recursive: bool=True, sync: Optional[bool]=None, finish: bool=False)->Callable:
"""
Internal function. See :func:`scan_Python_call_TeX`.
"""
data=Python_call_TeX_data(
TeX_code=TeX_code, recursive=recursive, sync=sync, finish=finish
)
return Python_call_TeX_defined[data][1]
[docs]def build_Python_call_TeX(T: Type, TeX_code: str, *, recursive: bool=True, sync: Optional[bool]=None, finish: bool=False)->None:
"""
Internal function. See :func:`scan_Python_call_TeX`.
T has the form Callable[[T1, T2], Tuple[U1, U2]]
where the Tx are subclasses of PyToTeXData and the Ux are subclasses of TeXToPyData
The Tuple[...] can optionally be a single type, then it is almost equivalent to a tuple of one element
It can also be None
"""
assert T.__origin__ == typing.Callable[[], None].__origin__ # type: ignore
# might be typing.Callable or collections.abc.Callable depends on Python version
data=Python_call_TeX_data(
TeX_code=TeX_code, recursive=recursive, sync=sync, finish=finish
)
# T.__args__ consist of the argument types int
Tx=T.__args__[:-1]
for Ti in Tx: assert issubclass(Ti, PyToTeXData), Ti
result_type: Any = T.__args__[-1] # Tuple[U1, U2]
ttp_argtypes: Union[Type[TeXToPyData], Tuple[Type[TeXToPyData], ...]]
if result_type is type(None):
ttp_argtypes = ()
elif isinstance(result_type, type) and issubclass(result_type, TeXToPyData):
# special case, return a single object instead of a tuple of length 1
ttp_argtypes = result_type
else:
ttp_argtypes = result_type.__args__
extra=Python_call_TeX_extra(
ptt_argtypes=Tx,
ttp_argtypes=ttp_argtypes
)
if data in Python_call_TeX_defined:
assert Python_call_TeX_defined[data][0]==extra, "different function with exact same code is not supported for now"
else:
if isinstance(ttp_argtypes, type) and issubclass(ttp_argtypes, TeXToPyData):
# special case, return a single object instead of a tuple of length 1
code, result1=define_Python_call_TeX(TeX_code=TeX_code, ptt_argtypes=[*extra.ptt_argtypes], ttp_argtypes=[ttp_argtypes],
recursive=recursive, sync=sync, finish=finish,
)
def result(*args: Any)->Any:
tmp=result1(*args)
assert tmp is not None
assert len(tmp)==1
return tmp[0]
else:
for t in ttp_argtypes:
assert issubclass(t, TeXToPyData)
code, result=define_Python_call_TeX(TeX_code=TeX_code, ptt_argtypes=[*extra.ptt_argtypes], ttp_argtypes=[*ttp_argtypes],
recursive=recursive, sync=sync, finish=finish,
)
mark_bootstrap(code)
Python_call_TeX_defined[data]=extra, result
[docs]def scan_Python_call_TeX(sourcecode: str, filename: Optional[str]=None)->None:
"""
Internal function.
Scan the file in filename for occurrences of ``typing.cast(T, Python_call_TeX_local(...))``,
then call ``build_Python_call_TeX(T, ...)`` for each occurrence.
The way the whole thing work is:
- In the Python code, some ``typing.cast(T, Python_call_TeX_local(...))`` are used.
- This function is called on all the library source codes to scan for those occurrences,
build necessary data structures for the :meth:`Python_call_TeX_local` function calls to work correctly.
- When :meth:`Python_call_TeX_local` is actually called, it does some magic to return the correct function.
Done this way, the type checking works correctly and it's not necessary to define global
temporary variables.
Don't use this function on untrusted code.
"""
import ast
from copy import deepcopy
from . import TTPBalancedTokenList # as explained in TTPBalancedTokenList sourcecode, this is caused by abuse of inherentance
for node in ast.walk(ast.parse(sourcecode, mode="exec")):
try:
if isinstance(node, ast.Call):
if (
isinstance(node.func, ast.Attribute) and
isinstance(node.func.value, ast.Name) and
node.func.value.id == "typing" and
node.func.attr == "cast"
):
T = node.args[0]
if isinstance(node.args[1], ast.Call):
f_call = node.args[1]
if isinstance(f_call.func, ast.Name):
if f_call.func.id == "Python_call_TeX_local":
f_call=deepcopy(f_call)
assert isinstance(f_call.func, ast.Name)
f_call.func.id="build_Python_call_TeX"
f_call.args=[T]+f_call.args
eval(compile(ast.Expression(body=f_call), "<string>", "eval"))
except:
print(f"======== while scanning file for Python_call_TeX_local(...) -- error on line {node.lineno} of file {filename} ========", file=sys.stderr)
raise
[docs]def scan_Python_call_TeX_module(name: str)->None:
"""
Internal function.
Can be used as ``scan_Python_call_TeX_module(__name__)`` to scan the current module.
"""
assert name != "__main__" # https://github.com/python/cpython/issues/86291
scan_Python_call_TeX(inspect.getsource(sys.modules[name]), name)
[docs]def define_Python_call_TeX(TeX_code: str, ptt_argtypes: List[Type[PyToTeXData]], ttp_argtypes: List[Type[TeXToPyData]],
*,
recursive: bool=True,
sync: Optional[bool]=None,
finish: bool=False,
)->Tuple[EngineDependentCode, PythonCallTeXFunctionType]:
r"""
Internal function.
``TeX_code`` should be some expl3 code that defines a function with name ``%name%`` that when called should:
* run some [TeX]-code (which includes reading the arguments, if any)
* do the following if ``sync``:
* send ``r`` to Python (equivalently write %sync%)
* send whatever needed for the output (as in ``ttp_argtypes``)
* call ``\pythonimmediatelisten`` iff not ``finish``.
This is allowed to contain the following:
* %name%: the name of the function to be defined as explained above.
* %read_arg0(\var_name)%, %read_arg1(...)%: will be expanded to code that reads the input.
* %send_arg0(...)%, %send_arg1(...)%: will be expanded to code that sends the content.
* %send_arg0_var(\var_name)%, %send_arg1_var(...)%: will be expanded to code that sends the content in the variable.
* %optional_sync%: expanded to code that writes ``r`` (to sync), if ``sync`` is True.
* %naive_flush% and %naive_inline%: as explained in :func:`mark_bootstrap_naive_replace`.
(although usually you don't need to explicitly write this, it's embedded in the ``send*()`` command
of the last argument, or ``%sync%``)
:param ptt_argtypes: list of argument types to be sent from Python to TeX (i.e. input of the TeX function)
:param ttp_argtypes: list of argument types to be sent from TeX to Python (i.e. output of the TeX function)
:param recursive: whether the TeX_code might call another Python function. Default to True.
It does not hurt to always specify True, but performance would be a bit slower.
:param sync: whether the Python function need to wait for the TeX function to finish.
Required if ``ttp_argtypes`` is not empty.
This should be left to be the default None most of the time. (which will make it always sync if ``debugging``,
otherwise only sync if needed i.e. there's some output)
:param finish: Include this if and only if ``\pythonimmediatelisten`` is omitted.
Normally this is not needed, but it can be used as a slight optimization; and it's needed internally to implement
``run_none_finish`` among others.
For each TeX-call-Python layer, \emph{exactly one} ``finish`` call can be made. If the function itself doesn't call
any ``finish`` call (which happens most of the time), then the wrapper will call ``run_none_finish``.
:returns: some TeX code to be executed, and a Python function object that when called will call the TeX function
on the passed-in TeX engine and return the result.
Note that the TeX_code must eventually be executed on the corresponding engine for the program to work correctly.
Possible optimizations:
* the ``r`` is not needed if not recursive and ``ttp_argtypes`` is nonempty
(the output itself tells Python when the [TeX]-code finished)
* the first line of the output may be on the same line as the ``r`` itself (done, use :class:`TTPEmbeddedLine` type, although a bit hacky)
"""
if ttp_argtypes!=[]:
assert sync!=False
sync=True
if sync is None:
sync=debugging
assert not ttp_argtypes
TeX_code=_template_substitute(TeX_code, "%optional_sync%",
lambda _: r'\__send_content%naive_send%:e { r }' if sync else '',)
if sync:
sync_code=r'\__send_content%naive_send%:e { r }'
if ttp_argtypes is not None:
# then don't need to sync here, can sync when the last argument is sent
sync_code=naive_replace(sync_code, False)
else:
sync_code=""
TeX_code=_template_substitute(TeX_code, "%sync%", lambda _: sync_code, optional=True)
assert sync is not None
if ttp_argtypes: assert sync
assert ttp_argtypes.count(TTPEmbeddedLine)<=1
identifier=get_random_TeX_identifier()
TeX_code=_template_substitute(TeX_code, "%name%", lambda _: r"\__run_" + identifier + ":")
for i, argtype_ in enumerate(ptt_argtypes):
TeX_code=_template_substitute(TeX_code, r"%read_arg" + str(i) + r"\(([^)]*)\)%",
lambda match: argtype_.read_code(match[1]),
optional=True)
for i, argtype in enumerate(ttp_argtypes):
TeX_code=_template_substitute(TeX_code, f"%send_arg{i}" + r"\(([^)]*)\)%",
lambda match: postprocess_send_code(argtype.send_code(match[1]), i==len(ttp_argtypes)-1),
optional=True)
TeX_code=_template_substitute(TeX_code, f"%send_arg{i}_var" + r"\(([^)]*)\)%",
lambda match: postprocess_send_code(argtype.send_code_var(match[1]), i==len(ttp_argtypes)-1),
optional=True)
def f(*args: Any)->Optional[Tuple[TeXToPyData, ...]]:
assert len(args)==len(ptt_argtypes), f"passed in {len(args)} = {args}, expect {len(ptt_argtypes)}"
if engine.status==EngineStatus.error:
raise TeXProcessError("error already happened")
assert engine.status==EngineStatus.waiting, engine.status
sending_content=(identifier+"\n").encode('u8') # function header
# function args. We build all the arguments before sending anything, just in case some serialize() error out
for arg, argtype in zip(args, ptt_argtypes):
assert isinstance(arg, argtype)
sending_content+=arg.serialize()
engine.write(sending_content)
if sync:
# wait for the result
engine.status=EngineStatus.running
if recursive:
result_=run_main_loop()
else:
result_=run_main_loop_get_return_one()
assert engine.status==EngineStatus.running, engine.status
result: List[TeXToPyData]=[]
if TTPEmbeddedLine not in ttp_argtypes:
assert not result_
for argtype_ in ttp_argtypes:
if argtype_==TTPEmbeddedLine:
result.append(TTPEmbeddedLine(result_))
else:
result.append(argtype_.read())
if finish:
engine.status=EngineStatus.running
else:
engine.status=EngineStatus.waiting
if sync: return tuple(result)
else: return None
return wrap_naive_replace(TeX_code), f
def run_none_finish()->None:
typing.cast(Callable[[], None], Python_call_TeX_local(
r"""
\cs_new_eq:NN %name% \relax
""", finish=True, sync=False))()
[docs]def run_error_finish(full_error: PTTBlock, short_error: PTTBlock)->None:
"""
Internal function.
``run_error_finish`` is fatal to [TeX], so we only run it when it's fatal to Python.
We want to make sure the Python traceback is printed strictly before run_error_finish() is called,
so that the Python traceback is not interleaved with [TeX] error messages.
"""
typing.cast(Callable[[PTTBlock, PTTBlock], None], Python_call_TeX_local(
r"""
\msg_new:nnn {pythonimmediate} {python-error} {Python~error:~#1.}
\cs_new_protected:Npn %name% {
%read_arg0(\__data)%
%read_arg1(\__summary)%
\wlog{^^JPython~error~traceback:^^J\__data^^J}
\msg_error:nnx {pythonimmediate} {python-error} {\__summary}
\__close_write:
}
""", finish=True, sync=False))(full_error, short_error)
# normally the close_write above is not necessary but sometimes error can be skipped through
# in which case we must make sure the pipe is not written to anymore
# https://github.com/user202729/pythonimmediate-tex/issues/1
def run_tokenized_line_peek(line: str, *, check_braces: bool=True, check_newline: bool=True, check_continue: bool=True)->str:
check_line(line, braces=check_braces, newline=check_newline, continue_=(True if check_continue else None))
return typing.cast(
Callable[[PTTTeXLine], Tuple[TTPEmbeddedLine]],
Python_call_TeX_local(
r"""
\cs_new_protected:Npn %name% {
%read_arg0(\__data)%
\__data
}
""")
)(PTTTeXLine(line))[0]
def run_block_local(block: str)->None:
typing.cast(Callable[[PTTBlock], None], Python_call_TeX_local(
r"""
\cs_new_protected:Npn %name% {
%read_arg0(\__data)%
\begingroup \newlinechar=10~ \expandafter \endgroup
\scantokens \expandafter{\__data}
% trick described in https://tex.stackexchange.com/q/640274 to scantokens the code with \newlinechar=10
%optional_sync%
\pythonimmediatelisten
}
"""))(PTTBlock.ignore_last_space(block))
[docs]def get_bootstrap_code(engine: Engine)->str:
"""
Return the bootstrap code for an engine.
This is before the call to :meth:`substitute_private`.
"""
return "\n".join(
f(engine)
for f in bootstrap_code_functions)
if typing.TYPE_CHECKING:
from . import BalancedTokenList