#!/bin/python3
from __future__ import annotations
import sys
import os
import inspect
import contextlib
import io
import functools
from typing import Optional, Union, Callable, Any, Iterator, Protocol, Iterable, Sequence, Type, Tuple, List, Dict
import typing
from abc import ABC, abstractmethod
from pathlib import Path
from dataclasses import dataclass
import tempfile
import signal
import traceback
import re
import collections
import enum
T1 = typing.TypeVar("T1")
[docs]def user_documentation(x: T1)->T1:
return x
#debug_file=open(Path(tempfile.gettempdir())/"pythonimmediate_debug_textopy.txt", "w", encoding='u8', buffering=2)
#debug=functools.partial(print, file=debug_file, flush=True)
debug=functools.partial(print, file=sys.stderr, flush=True)
debug=lambda *args, **kwargs: None # type: ignore
expansion_only_can_call_Python=False # normally. May be different in LuaTeX etc.
from .engine import Engine, default_engine, ParentProcessEngine
pythonimmediate: Any
import pythonimmediate # type: ignore
pythonimmediate.debugging=True
pythonimmediate.debug=debug
FunctionType = typing.TypeVar("FunctionType", bound=Callable)
[docs]def export_function_to_module(f: FunctionType)->FunctionType:
"""
the functions decorated with this decorator are accessible from user code with
import pythonimmediate
pythonimmediate.⟨function name⟩(...)
"""
assert not hasattr(pythonimmediate, f.__name__), f.__name__
setattr(pythonimmediate, f.__name__, f)
return f
[docs]def send_raw(s: str, engine: Engine)->None:
debug("======== sending", s)
engine.write(s.encode('u8'))
[docs]def send_finish(s: str, engine: Engine)->None:
engine.check_not_finished()
engine.action_done=True
assert s.endswith("\n")
send_raw(s, engine=engine)
import random
[docs]def surround_delimiter(block: str)->str:
while True:
delimiter=str(random.randint(0, 10**12))
if delimiter not in block: break
return delimiter + "\n" + block + "\n" + delimiter + "\n"
EngineDependentCode=Callable[[Engine], str]
bootstrap_code_functions: list[EngineDependentCode]=[]
"""
Internal constant.
Contains functions that takes an engine object and returns some code before :meth:`substitute_private` is applied on it.
"""
[docs]def mark_bootstrap(code: str)->None:
bootstrap_code_functions.append(lambda engine: code)
[docs]def substitute_private(code: str)->str:
assert "_pythonimmediate_" not in code # avoid double-apply this function
return (code
#.replace("\n", ' ') # because there are comments in code, cannot
.replace("__", "_" + "pythonimmediate" + "_")
)
[docs]def send_bootstrap_code(engine: Engine)->None:
send_raw(surround_delimiter(substitute_private(get_bootstrap_code(engine))), engine=engine)
[docs]def postprocess_send_code(s: str, put_sync: bool)->str:
assert have_naive_replace(s)
if put_sync:
# keep the naive-sync operation
pass
else:
# remove it
s=naive_replace(s, False)
return s
# ========
# as the name implies, this reads one "command" from Python side and execute it.
# the command might do additional tasks e.g. read more [TeX]-code.
#
# e.g. if `block' is read from the communication channel, run ``\__run_block:``.
@bootstrap_code_functions.append
def _naive_flush_data_define(engine: Engine)->str:
if engine.config.naive_flush:
# the function x-expand to something in a single line that is at least 4095 bytes long and ignored by Python
# (plus the newline would be 4096)
return r"""
\cs_new:Npn \__naive_flush_data: {
pythonimmediate-naive-flush-line \prg_replicate:nn {4063} {~}
}
"""
return ""
@typing.overload
def naive_replace(code: str, naive_flush: bool, /)->str: ...
@typing.overload
def naive_replace(code: str, engine: Engine, /)->str: ...
[docs]def naive_replace(code: str, x: Union[Engine, bool])->str:
code1=code
if (x.config.naive_flush if isinstance(x, Engine) else x):
code1=code1.replace("%naive_inline%", r"^^J \__naive_flush_data: ")
code1=code1.replace("%naive_flush%", r"\__send_content:e {\__naive_flush_data:}")
code1=code1.replace("%naive_send%", r"_naive_flush")
else:
code1=code1.replace("%naive_inline%", "")
code1=code1.replace("%naive_flush%", "")
code1=code1.replace("%naive_send%", "")
return code1
[docs]def have_naive_replace(code: str)->bool:
return naive_replace(code, False)!=code
[docs]def wrap_naive_replace(code: str)->EngineDependentCode:
return functools.partial(naive_replace, code)
[docs]def mark_bootstrap_naive_replace(code: str)->None:
r"""
Similar to :func:`mark_bootstrap`, but code may contain one of the following:
- ``%naive_inline``: replaced with ``^^J \__naive_flush_data:``
if :attr:`Engine.config.naive_flush` is ``True``, else become empty
- ``%naive_flush%``: replaced with ``\__send_content:e {\__naive_flush_data:}``
if :attr:`Engine.config.naive_flush` is ``True``
"""
bootstrap_code_functions.append(wrap_naive_replace(code))
mark_bootstrap_naive_replace(
r"""
\cs_new_protected:Npn \__read_do_one_command: {
\begingroup
\endlinechar=-1~
\readline \__read_file to \__line
\expandafter
\endgroup % also this will give an error instead of silently do nothing when command is invalid
\csname __run_ \__line :\endcsname
}
% read documentation of ``_peek`` commands for details what this command does.
\cs_new_protected:Npn \pythonimmediatecontinue #1 {
\__send_content:e {r #1 %naive_inline% }
\__read_do_one_command:
}
\cs_new_protected:Npn \pythonimmediatecontinuenoarg {
\pythonimmediatecontinue {}
}
\cs_new_protected:Npn \__send_content:n #1 {
\__send_content:e { \unexpanded{#1} }
}
\cs_new_protected:Npn \__send_content_naive_flush:e #1 {
\__send_content:e { #1 %naive_inline% }
}
\cs_new_protected:Npn \__send_content_naive_flush:n #1 {
\__send_content_naive_flush:e { \unexpanded{#1} }
}
% the names are such that \__send_content%naive_send%:n {something} results in the correct content
% internal function. Just send an arbitrary block of data to Python.
% this function only works properly when newlinechar = 10.
\cs_new_protected:Npn \__send_block:e #1 {
\__send_content:e {
#1 ^^J
pythonimm?""" + '"""' + r"""?'''? % following character will be newline
}
}
\cs_new_protected:Npn \__send_block:n #1 {
\__send_block:e {\unexpanded{#1}}
}
\cs_new_protected:Npn \__send_block_naive_flush:e #1 {
\__send_content:e {
#1 ^^J
pythonimm?""" + '"""' + r"""?'''? % following character will be newline
%naive_inline%
}
}
\cs_new_protected:Npn \__send_block_naive_flush:n #1 {
\__send_block_naive_flush:e {\unexpanded{#1}}
}
\cs_generate_variant:Nn \__send_block:n {V}
\cs_generate_variant:Nn \__send_block_naive_flush:n {V}
\AtEndDocument{
\__send_content:e {r %naive_inline%}
\__close_write:
}
""")
# the last one don't need to flush because will close anyway (right?)
# ========
# when 'i⟨string⟩' is sent from TeX to Python, the function with index ⟨string⟩ in this dict is called
TeX_handlers: Dict[str, Callable[[Engine], None]]={}
TeXToPyObjectType=Optional[str]
[docs]def run_main_loop(engine: Engine)->TeXToPyObjectType:
while True:
line=readline(engine=engine)
if not line: return None
if line[0]=="i":
TeX_handlers[line[1:]](engine)
elif line[0]=="r":
return line[1:]
else:
raise RuntimeError("Internal error: unexpected line "+line)
[docs]def run_main_loop_get_return_one(engine: Engine)->str:
line=readline(engine=engine)
assert line[0]=="r"
return line[1:]
user_documentation(
r"""
All exported functions can be accessed through the module as ``import pythonimmediate``.
The ``_finish`` functions are internal functions, which must be called \emph{at most} once in each
``\pythonimmediate:n`` call from [TeX]-to tell [TeX]-what to do.
The ``_local`` functions simply execute the code. These functions will only return when
the [TeX]-code finishes executing; nevertheless, the [TeX]-code might recursively execute some Python code
inside it.
A simple example is ``pythonimmediate.run_block_local('123')`` which simply typesets ``123``.
The ``_peek`` functions is the same as above; however, the [TeX]-code must contain an explicit command
``\pythonimmediatecontinue{...}``.
The argument of ``\pythonimmediatecontinue`` will be ``e``-expanded
by ``\write`` (note that the written content must not contain any newline character,
otherwise the behavior is undefined), then returned as a string by the Python code.
The Python function will only return when ``\pythonimmediatecontinue`` is called.
In other words, ``run_*_local(code)`` is almost identical to ``run_*_peek(code + "\pythonimmediatecontinue {}")``.
""")
#@export_function_to_module
[docs]def run_block_finish(block: str, engine: Engine= default_engine)->None:
send_finish("block\n" + surround_delimiter(block), engine=engine)
[docs]def check_line(line: str, *, braces: bool, newline: bool, continue_: Optional[bool])->None:
"""
check user-provided line before sending to TeX for execution
"""
if braces:
assert line.count("{") == line.count("}")
if newline:
assert '\n' not in line
assert '\r' not in line # this is not the line separator but just in case
if continue_==True: assert "pythonimmediatecontinue" in line
elif continue_==False: assert "pythonimmediatecontinue" not in line
user_scope: Dict[str, Any]={} # consist of user's local variables etc.
[docs]def readline(engine: Engine)->str:
line=engine.read().decode('u8')
debug("======== saw line", line)
return line
block_delimiter: str="pythonimm?\"\"\"?'''?"
[docs]def read_block(engine: Engine)->str:
r"""
Internal function to read one block sent from [TeX](including the final delimiter line,
but the delimiter line is not returned)
"""
lines: List[str]=[]
while True:
line=readline(engine=engine)
if line==block_delimiter:
return '\n'.join(lines)
else:
lines.append(line)
#@export_function_to_module
[docs]class NToken(ABC):
"""
Represent a possibly-notexpanded token.
For convenience, a notexpanded token is called a blue token.
It's not always possible to determine the notexpanded status of a following token in the input stream.
Implementation note: Token objects must be frozen.
"""
@abstractmethod
def __str__(self)->str: ...
[docs] @abstractmethod
def repr1(self)->str: ...
[docs] def meaning_str(self, engine: Engine= default_engine)->str:
r"""
get the meaning of this token as a string.
Note that all blue tokens have the meaning equal to ``\relax``
(or ``[unknown command code! (0, 1)]`` in a buggy LuaTeX implementation)
with the backslash replaced
by the current ``escapechar``.
"""
return NTokenList([T.meaning, self]).expand_x(engine=engine).str(engine=engine)
@property
@abstractmethod
def noexpand(self)->"NToken":
r"""
Return the result of ``\noexpand`` applied on this token.
"""
...
@property
@abstractmethod
def no_blue(self)->"Token":
r"""
Return the result of this token after being "touched", which drops its blue status if any.
"""
...
[docs] @abstractmethod
def put_next(self, engine: Engine=default_engine):
"""
Put this token forward in the input stream.
"""
...
[docs] def meaning_equal(self, other: "NToken", engine: Engine= default_engine)->bool:
"""
Whether this token is the same in meaning as the token specified in the parameter *other*.
Note that two tokens might have different meaning despite having equal :meth:`meaning_str`.
"""
return NTokenList([T.ifx, self, other, Catcode.other("1"), T["else"], Catcode.other("0"), T.fi]).expand_x(engine=engine).bool()
[docs] def str_unicode(self)->str:
"""
``self`` must represent a character of a [TeX] string. (i.e. equal to itself when detokenized)
:return: the string content.
.. note::
See :meth:`NTokenList.str_unicode`.
"""
# default implementation, might not be correct. Subclass overrides as needed.
raise ValueError("Token does not represent a string!")
[docs] def degree(self)->int:
"""
return the imbalance degree for this token (``{`` -> 1, ``}`` -> -1, everything else -> 0)
"""
# default implementation, might not be correct. Subclass overrides as needed.
return 0
#@export_function_to_module
[docs]class Token(NToken):
"""
Represent a [TeX] token, excluding the notexpanded possibility.
See also documentation of :class:`NToken`.
"""
@property
@abstractmethod
def can_blue(self)->bool:
"""
Return whether this token can possibly be blue i.e. expandable.
"""
...
@property
def blue(self)->"BlueToken":
r"""
Return a :class:`BlueToken` containing ``self``. :attr:`can_blue` must be true.
"""
if not self.can_blue:
raise ValueError("Token cannot be blue!")
return BlueToken(self)
@property
def noexpand(self)->"NToken":
if not self.can_blue:
return self
return BlueToken(self)
[docs] @abstractmethod
def serialize(self)->str:
"""
Internal function, serialize this token to be able to pass to [TeX].
"""
...
[docs] @abstractmethod
def simple_detokenize(self, get_catcode: Callable[[int], Catcode])->str:
"""
Simple approximate detokenizer, implemented in Python.
"""
...
@property
@abstractmethod
def assignable(self)->bool:
"""
Whether this token can be assigned to i.e. it's control sequence or active character.
"""
...
[docs] def assign(self, other: "NToken", engine: Engine=default_engine)->None:
"""
Assign the meaning of this token to be equivalent to that of the other token.
"""
assert self.assignable
NTokenList([T.let, self, C.other("="), C.space(' '), other]).execute(engine=engine)
[docs] def assign_future(self, engine: Engine= default_engine)->None:
r"""
Assign the meaning of this token to be equivalent to that of the following token in the input stream.
For example if this token is ``\a``, and the input stream starts with ``bcde``, then ``\a``'s meaning
will be assigned to that of the explicit character ``b``.
.. note::
Tokenizes one more token in the input stream, and remove its blue status if any.
"""
assert self.assignable
typing.cast(Callable[[PTTBalancedTokenList, Engine], None], Python_call_TeX_local(
r"""
\cs_new_protected:Npn %name% {
%read_arg0(\__data)%
\expandafter \futurelet \__data \pythonimmediatecontinuenoarg
}
""" , sync=True))(PTTBalancedTokenList(BalancedTokenList([self])), engine)
[docs] def assign_futurenext(self, engine: Engine= default_engine)->None:
r"""
Assign the meaning of this token to be equivalent to that of the second-next token in the input stream.
For example if this token is ``\a``, and the input stream starts with ``bcde``, then ``\a``'s meaning
will be assigned to that of the explicit character ``c``.
.. note::
Tokenizes two more tokens in the input stream, and remove their blue status if any.
"""
assert self.assignable
typing.cast(Callable[[PTTBalancedTokenList, Engine], None], Python_call_TeX_local(
r"""
\cs_new_protected:Npn %name% {
%read_arg0(\__data)%
\afterassignment \pythonimmediatecontinuenoarg \expandafter \futurelet \__data
}
""" , sync=True))(PTTBalancedTokenList(BalancedTokenList([self])), engine)
[docs] def assign_value(self, content: "BalancedTokenList")->None:
"""
Given ``self`` is an expl3 ``tl``-variable, assign *content* to it locally.
"""
BalancedTokenList([T.edef, self, [T.unexpanded, content]]).execute()
[docs] def value(self, engine: Engine= default_engine)->"BalancedTokenList":
"""
given ``self`` is a expl3 ``tl``-variable, return the content.
"""
return BalancedTokenList([self]).expand_o(engine=engine)
[docs] def value_str(self, engine: Engine= default_engine)->str:
"""
given ``self`` is a expl3 ``str``-variable, return the content.
"""
return self.value(engine=engine).str(engine=engine)
@property
def no_blue(self)->"Token": return self
def __repr__(self)->str:
return f"<Token: {self.repr1()}>"
[docs] @staticmethod
def deserialize(s: str|bytes)->"Token":
"""
See documentation of :meth:`TokenList.deserialize`.
Always return a single token.
"""
t=TokenList.deserialize(s)
assert len(t)==1
return t[0]
[docs] @staticmethod
def deserialize_bytes(data: bytes, engine: Engine)->"Token":
"""
See documentation of :meth:`TokenList.deserialize_bytes`.
Always return a single token.
"""
if engine.is_unicode:
return Token.deserialize(data.decode('u8'))
else:
return Token.deserialize(data)
[docs] @staticmethod
def get_next(engine: Engine= default_engine)->"Token":
r"""
Get the following token.
.. note::
in LaTeX3 versions without the commit https://github.com/latex3/latex3/commit/24f7188904d6
sometimes this may error out.
.. note::
because of the internal implementation of ``\peek_analysis_map_inline:n``, this may
tokenize up to 2 tokens ahead (including the returned token),
as well as occasionally return the wrong token in unavoidable cases.
"""
return Token.deserialize_bytes(
typing.cast(Callable[[Engine], TTPRawLine], Python_call_TeX_local(
r"""
\cs_new_protected:Npn \__get_next_callback #1 {
\peek_analysis_map_break:n { \pythonimmediatecontinue {^^J#1} }
}
\cs_new_protected:Npn %name% {
\peek_analysis_map_inline:n {
\__tlserialize_char_unchecked:nNnN {##2}##3{##1} \__get_next_callback
}
}
""", recursive=False))(engine), engine)
[docs] @staticmethod
def peek_next(engine: Engine= default_engine)->"Token":
"""
Get the following token without removing it from the input stream.
Equivalent to :meth:`get_next` then :meth:`put_next` immediately. See documentation of :meth:`get_next` for some notes.
"""
t=Token.get_next(engine=engine)
t.put_next(engine=engine)
return t
[docs] def put_next(self, engine: Engine= default_engine)->None:
d=self.degree()
if d==0:
BalancedTokenList([self]).put_next(engine=engine)
else:
assert isinstance(self, CharacterToken)
if not engine.is_unicode and self.index>=256:
raise ValueError("Cannot put this token for non-Unicode engine!")
if d==1:
typing.cast(Callable[[PTTInt, Engine], None], Python_call_TeX_local(
r"""
\cs_new_protected:Npn %name% {
%read_arg0(\__index)%
\expandafter \expandafter \expandafter \pythonimmediatecontinuenoarg
\char_generate:nn {\__index} {1}
}
""", recursive=False, sync=True))(PTTInt(self.index), engine)
else:
assert d==-1
typing.cast(Callable[[PTTInt, Engine], None], Python_call_TeX_local(
r"""
\cs_new_protected:Npn %name% {
%read_arg0(\__index)%
\expandafter \expandafter \expandafter \pythonimmediatecontinuenoarg
\char_generate:nn {\__index} {2}
}
""", recursive=False, sync=True))(PTTInt(self.index), engine)
# TeX code for serializing and deserializing a token list.
# Convert a token list from/to a string.
mark_bootstrap(
r"""
\precattl_exec:n {
% here #1 is the target token list to store the result to, #2 is a string with the final '.'.
\cs_new_protected:Npn \__tldeserialize_dot:Nn #1 #2 {
\begingroup
\tl_gset:Nn \__gtmp {#2}
\tl_greplace_all:Nnn \__gtmp {~} {\cO\ }
\def \start ##1 { \csname ##1 \endcsname }
\def \^ ##1 ##2 { \csname ##1 \expandafter \expandafter \expandafter \endcsname \char_generate:nn {`##2-64} {12} }
\def \> ##1 ##2 \cO\ { \csname ##1 \endcsname ##2 \cU\ }
\def \* ##1 ##2 \cO\ ##3 { \csname ##1 \endcsname ##2 \char_generate:nn {`##3-64} {12} }
\def \\ ##1 \cO\ ##2 { \expandafter \noexpand \csname ##1 \endcsname \csname ##2 \endcsname }
\def \1 ##1 ##2 { \char_generate:nn {`##1} {1} \csname ##2 \endcsname }
\def \2 ##1 ##2 { \char_generate:nn {`##1} {2} \csname ##2 \endcsname }
\def \3 ##1 ##2 { \char_generate:nn {`##1} {3} \csname ##2 \endcsname }
\def \4 ##1 ##2 { \char_generate:nn {`##1} {4} \csname ##2 \endcsname }
\def \6 ##1 ##2 { #### \char_generate:nn {`##1} {6} \csname ##2 \endcsname }
\def \7 ##1 ##2 { \char_generate:nn {`##1} {7} \csname ##2 \endcsname }
\def \8 ##1 ##2 { \char_generate:nn {`##1} {8} \csname ##2 \endcsname }
\def \A ##1 ##2 { \char_generate:nn {`##1} {10} \csname ##2 \endcsname }
\def \B ##1 ##2 { \char_generate:nn {`##1} {11} \csname ##2 \endcsname }
\def \C ##1 ##2 { \char_generate:nn {`##1} {12} \csname ##2 \endcsname }
\def \D ##1 ##2 { \expandafter \expandafter \expandafter \noexpand \char_generate:nn {`##1} {13} \csname ##2 \endcsname }
\def \R ##1 { \cFrozenRelax \csname ##1 \endcsname }
\let \. \empty
\tl_gset:Nx \__gtmp {\expandafter \start \__gtmp}
\endgroup
\tl_set_eq:NN #1 \__gtmp
}
"""
+
# callback will be called exactly once with the serialized result (either other or space catcode)
# and, as usual, with nothing leftover following in the input stream
# the token itself can be gobbled or \edef-ed to discard it.
# if it's active outer or control sequence outer then gobble fails.
# if it's { or } then edef fails.
(
r"""
\cs_new_protected:Npn \__char_unchecked:nNnN #char #cat {
\int_compare:nNnTF {
\if #cat 1 1 \fi
\if #cat 2 1 \fi
0
} = {0} {
% it's neither 1 nor 2, can edef
\tl_set:Nn \__process_after_edef { \__continue_after_edef {#char} #cat }
\afterassignment \__process_after_edef
\edef \__the_token
} {
% it's either 1 or 2
% might not be able to edef, but can gobble
\__process_gobble {#char} #cat
}
}
\def \__frozen_relax_container { \cFrozenRelax }
\def \__null_cs_container { \cC{} }
%\edef \__endwrite_container { \noexpand \cEndwrite }
%\tl_if_eq:NnT \__endwrite_container { \cC{cEndwrite} } {
% \errmessage { endwrite~token~not~supported }
%}
\cs_new:Npn \__prefix_escaper #1 {
\ifnum 0<\__if_weird_charcode_or_space:n {`#1} ~
*
\fi
}
\cs_new:Npn \__content_escaper #1 {
\ifnum 0<\__if_weird_charcode_or_space:n {`#1} ~
\cO\ \char_generate:nn {`#1+64} {12}
\else
#1
\fi
}
% fully expand to zero if #1 is not weird, otherwise expand to nonzero
% weird means as can be seen below <32 or =127 (those that will be ^^-escaped without -8bit)
% XeLaTeX also make 80..9f weird
\cs_new:Npn \__if_weird_charcode:n #1 {
\ifnum #1 < 32 ~ 1 \fi
\ifnum #1 > 126 ~ \ifnum #1 < 160 ~ 1 \fi \fi
0
}
\cs_new:Npn \__if_weird_charcode_or_space:n #1 {
\ifnum #1 < 33 ~ 1 \fi
\ifnum #1 > 126 ~ \ifnum #1 < 160 ~ 1 \fi \fi
0
}
\cs_new_protected:Npn \__continue_after_edef #char #cat #callback {
\token_if_eq_charcode:NNTF #cat 0 {
\tl_if_eq:NNTF \__the_token \__frozen_relax_container {
#callback {\cO{ R }}
} {
\tl_if_eq:NNTF \__the_token \__null_cs_container {
#callback {\cO{ \\\ }}
} {
\tl_set:Nx \__name { \expandafter \cs_to_str:N \__the_token }
\exp_args:Nx #callback {
\str_map_function:NN \__name \__prefix_escaper
\cO\\
\str_map_function:NN \__name \__content_escaper
\cO\ }
}
}
} {
\exp_args:Nx #callback {
\ifnum 0<\__if_weird_charcode:n {#char} ~
\cO{^} #cat \char_generate:nn {#char+64} {12}
\else
#cat \expandafter \string \__the_token
\fi
}
}
}
"""
.replace("#char", "#1")
.replace("#cat", "#2")
.replace("#callback", "#3")
+
r"""
\cs_new_protected:Npn \__process_gobble #char #cat #token #callback {
\exp_args:Nx #callback {
\ifnum 0<\__if_weird_charcode:n {#char} ~
\cO{^} #cat \char_generate:nn {#char+64} {12}
\else
#cat \expandafter \string #token
\fi
}
}
"""
.replace("#char", "#1")
.replace("#cat", "#2")
.replace("#token", "#3")
.replace("#callback", "#4")
).replace("__", "__tlserialize_")
+
r"""
}
% deserialize as above but #2 does not end with '.'.
\cs_new_protected:Npn \__tldeserialize_nodot:Nn #1 #2 {
\__tldeserialize_dot:Nn #1 {#2 .}
}
% serialize token list in #2 store to #1.
\cs_new_protected:Npn \__tlserialize_nodot_unchecked:Nn #1 #2 {
\tl_build_begin:N #1
\tl_set:Nn \__tlserialize_callback { \tl_build_put_right:Nn #1 }
\tl_analysis_map_inline:nn {#2} {
\__tlserialize_char_unchecked:nNnN {##2}##3{##1} \__tlserialize_callback
}
\tl_build_end:N #1
}
% serialize token list in #2 store to #1. Call T or F branch depends on whether serialize is successful.
% #1 must be different from \__tlserialize_tmp.
\cs_new_protected:Npn \__tlserialize_nodot:NnTF #1 #2 {
\__tlserialize_nodot_unchecked:Nn #1 {#2}
\__tldeserialize_nodot:NV \__tlserialize_nodot_tmp #1
\tl_if_eq:NnTF \__tlserialize_nodot_tmp {#2} % dangling
}
\cs_new_protected:Npn \__tlserialize_nodot:NnF #1 #2 {
\__tlserialize_nodot:NnTF #1 {#2} {} % dangling
}
\cs_new_protected:Npn \__tlserialize_nodot:NnT #1 #2 #3 { \__tlserialize_nodot:NnTF #1 {#2} {#3} {} }
\msg_new:nnn {pythonimmediate} {cannot-serialize} {Token~list~cannot~be~serialized}
\cs_new_protected:Npn \__tlserialize_nodot:Nn #1 #2{
\__tlserialize_nodot:NnF #1 {#2} {
\msg_error:nn {pythonimmediate} {cannot-serialize}
}
}
\cs_generate_variant:Nn \__tldeserialize_dot:Nn {NV}
\cs_generate_variant:Nn \__tldeserialize_nodot:Nn {NV}
\cs_generate_variant:Nn \__tlserialize_nodot:Nn {NV}
""")
enable_get_attribute=True
if os.environ.get("SPHINX_BUILD"):
enable_get_attribute=False # otherwise it conflicts with sphinx-autodoc's mechanism to inspect the objects
[docs]class ControlSequenceTokenMaker:
r"""
Shorthand to create control sequence objects in Python easier.
There's a default one that can be used as, if you assign ``T=ControlSequenceToken.make``,
then ``T.hello`` returns the token ``\hello``.
"""
def __init__(self, prefix: str)->None:
self.prefix=prefix
if enable_get_attribute:
def __getattribute__(self, a: str)->"ControlSequenceToken":
return ControlSequenceToken(object.__getattribute__(self, "prefix")+a)
def __getitem__(self, a: str)->"ControlSequenceToken":
return ControlSequenceToken(object.__getattribute__(self, "prefix")+a)
#@export_function_to_module
[docs]@dataclass(repr=False, frozen=True)
class ControlSequenceToken(Token):
r"""
Represents a control sequence.
Note that currently, on non-Unicode engines, the ``csname`` field is represented in a particular way: each
character represents a byte in the TokenList, and thus it has character code no more than 255.
So for example, the control sequence obtained by expanding ``\csname ℝ\endcsname`` once
has ``.csname`` field equal to ``"\xe2\x84\x9d"`` (which has ``len=3``).
"""
csname: str
make=typing.cast(ControlSequenceTokenMaker, None) # some interference makes this incorrect. Manually assign below
"""
Refer to the documentation of :class:`ControlSequenceTokenMaker`.
"""
can_blue=True
@property
def assignable(self)->bool:
return True
def __str__(self)->str:
if self.csname=="": return r"\csname\endcsname"
return "\\"+self.csname
[docs] def serialize(self)->str:
return (
"*"*sum(1 for x in self.csname if ord(x)<33) +
"\\" +
"".join(' '+chr(ord(x)+64) if ord(x)<33 else x for x in self.csname)
+ " ")
[docs] def repr1(self)->str:
return f"\\" + repr(self.csname.replace(' ', "␣"))[1:-1]
[docs] def simple_detokenize(self, get_catcode: Callable[[int], Catcode])->str:
if not self.csname:
raise NotImplementedError("This isn't simple!")
if len(self.csname)>1 or get_catcode(ord(self.csname))==Catcode.letter:
for ch in self.csname:
if get_catcode(ord(ch))!=Catcode.letter:
raise NotImplementedError("This isn't simple!")
return "\\"+self.csname+" "
return "\\"+self.csname
ControlSequenceToken.make=ControlSequenceTokenMaker("")
T=ControlSequenceToken.make
P=ControlSequenceTokenMaker("_pythonimmediate_") # create private tokens
if enable_get_attribute:
assert isinstance(T.testa, ControlSequenceToken)
#@export_function_to_module
[docs]class Catcode(enum.Enum):
"""
This class contains a shorthand to allow creating a token with little Python code.
The individual :class:`Catcode` objects
can be called with either a character or a character code to create the object::
Catcode.letter("a") # creates a token with category code letter and character code "a"=chr(97)
Catcode.letter(97) # same as above
Both of the above forms are equivalent to ``CharacterToken(index=97, catcode=Catcode.letter)``.
See also :ref:`token-list-construction` for more ways of constructing token lists.
"""
begin_group=bgroup=1
end_group=egroup=2
math_toggle=math=3
alignment=4
parameter=param=6
math_superscript=superscript=7
math_subscript=subscript=8
space=10
letter=11
other=12
active=13
escape=0
end_of_line=paragraph=line=5
ignored=9
comment=14
invalid=15
@property
def for_token(self)->bool:
"""
Return whether a :class:`CharacterToken` may have this catcode.
"""
return self not in (Catcode.escape, Catcode.line, Catcode.ignored, Catcode.comment, Catcode.invalid)
def __call__(self, ch: Union[str, int])->"CharacterToken":
"""
Shorthand:
Catcode.letter("a") = Catcode.letter(97) = CharacterToken(index=97, catcode=Catcode.letter)
"""
if isinstance(ch, str): ch=ord(ch)
return CharacterToken(ch, self)
C=Catcode
#@export_function_to_module
[docs]@dataclass(repr=False, frozen=True) # must be frozen because bgroup and egroup below are reused
class CharacterToken(Token):
index: int
"""
The character code of this token. For example ``Catcode.letter("a").index==97``.
"""
catcode: Catcode
@property
def can_blue(self)->bool:
return self.catcode==Catcode.active
@property
def chr(self)->str:
"""
The character of this token. For example ``Catcode.letter("a").chr=="a"``.
"""
return chr(self.index)
def __post_init__(self)->None:
assert isinstance(self.index, int)
assert self.index>=0
assert self.catcode.for_token
def __str__(self)->str:
return self.chr
[docs] def serialize(self)->str:
if self.index<0x10:
return f"^{self.catcode.value:X}{chr(self.index+0x40)}"
else:
return f"{self.catcode.value:X}{self.chr}"
[docs] def repr1(self)->str:
cat=str(self.catcode.value).translate(str.maketrans("0123456789", "₀₁₂₃₄₅₆₇₈₉"))
return f"{repr(self.chr)[1:-1]}{cat}"
@property
def assignable(self)->bool:
return self.catcode==Catcode.active
[docs] def degree(self)->int:
if self.catcode==Catcode.bgroup:
return 1
elif self.catcode==Catcode.egroup:
return -1
else:
return 0
[docs] def str_unicode(self)->str:
catcode=Catcode.space if self.index==32 else Catcode.other
if catcode!=self.catcode:
raise ValueError("this CharacterToken does not represent a string!")
return self.chr
[docs] def simple_detokenize(self, get_catcode: Callable[[int], Catcode])->str:
return self.chr
[docs]class FrozenRelaxToken(Token):
can_blue=False
assignable=False
def __str__(self)->str:
return r"\relax"
[docs] def serialize(self)->str:
return "R"
[docs] def repr1(self)->str:
return r"[frozen]\relax"
[docs] def simple_detokenize(self, get_catcode: Callable[[int], Catcode])->str:
raise NotImplementedError("This isn't simple!")
frozen_relax_token=FrozenRelaxToken()
pythonimmediate.frozen_relax_token=frozen_relax_token
# other special tokens later...
bgroup=Catcode.bgroup("{")
egroup=Catcode.egroup("}")
space=Catcode.space(" ")
#@export_function_to_module
[docs]@dataclass(frozen=True)
class BlueToken(NToken):
"""
Represents a blue token (see documentation of :class:`NToken`).
"""
token: Token
@property
def noexpand(self)->"BlueToken": return self
@property
def no_blue(self)->"Token": return self.token
def __str__(self)->str: return str(self.token)
[docs] def repr1(self)->str: return "notexpanded:"+self.token.repr1()
[docs] def put_next(self, engine: Engine= default_engine)->None:
typing.cast(Callable[[PTTBalancedTokenList, Engine], None], Python_call_TeX_local(
r"""
\cs_new_protected:Npn \__put_next_blue_tmp {
%optional_sync%
\expandafter \__read_do_one_command: \noexpand
}
\cs_new_protected:Npn %name% {
%read_arg0(\__target)%
\expandafter \__put_next_blue_tmp \__target
}
""", recursive=False))(PTTBalancedTokenList(BalancedTokenList([self.token])), engine)
doc_catcode_table: Dict[int, Catcode]={}
doc_catcode_table[ord("{")]=Catcode.begin_group
doc_catcode_table[ord("}")]=Catcode.end_group
doc_catcode_table[ord("$")]=Catcode.math_toggle
doc_catcode_table[ord("&")]=Catcode.alignment
doc_catcode_table[ord("#")]=Catcode.parameter
doc_catcode_table[ord("^")]=Catcode.math_superscript
doc_catcode_table[ord("_")]=Catcode.math_subscript
doc_catcode_table[ord(" ")]=Catcode.space
doc_catcode_table[ord("~")]=Catcode.active
for ch in range(ord('a'), ord('z')+1): doc_catcode_table[ch]=Catcode.letter
for ch in range(ord('A'), ord('Z')+1): doc_catcode_table[ch]=Catcode.letter
doc_catcode_table[ord("\\")]=Catcode.escape
doc_catcode_table[ord("%")]=Catcode.comment
e3_catcode_table=dict(doc_catcode_table)
e3_catcode_table[ord("_")]=Catcode.letter
e3_catcode_table[ord(":")]=Catcode.letter
e3_catcode_table[ord(" ")]=Catcode.ignored
e3_catcode_table[ord("~")]=Catcode.space
TokenListType = typing.TypeVar("TokenListType", bound="TokenList")
if typing.TYPE_CHECKING:
TokenListBaseClass = collections.UserList[Token]
else: # Python 3.8 compatibility
TokenListBaseClass = collections.UserList
#@export_function_to_module
[docs]class TokenList(TokenListBaseClass):
r"""
Represent a [TeX] token list, none of which can contain a blue token.
The class can be used identical to a Python list consist of :class:`Token` objects,
plus some additional methods to operate on token lists.
The list of tokens represented by this class does not need to be balanced.
Usually you would want to use :class:`BalancedTokenList` instead.
.. _token-list-construction:
Token list construction
-----------------------
There are some functions to quickly construct token lists, see :meth:`from_string`
and :meth:`__init__`.
:meth:`__init__` is the constructor of the class, for a :class:`TokenList` it can be used as follows:
- Given an existing token list, construct a copy (this usage is identical to that of the Python list constructor)::
a=TokenList.doc("hello world")
b=TokenList(a)
- Construct a token list from a list of tokens::
a=TokenList([Catcode.letter("a"), Catcode.other("b"), T.test])
The above will define ``a`` to be ``ab\test``, provided ``T`` is
the object referred to in :class:`ControlSequenceTokenMaker`.
See also :class:`Catcode` for the explanation of the ``Catcode.letter("a")`` form.
- Construct a token list, using lists to represent nesting levels::
a=TokenList([T.edef, T.a, [T.x, T.y]])
The above creates a token list with content ``\edef\a{\x\y}``.
The constructor of other classes such as :class:`BalancedTokenList` and :class:`NTokenList`
works the same way.
"""
[docs] @staticmethod
def force_token_list(a: Iterable)->Iterable[Token]:
for x in a:
if isinstance(x, Token):
yield x
elif isinstance(x, Sequence):
yield bgroup
child=BalancedTokenList(x)
assert child.is_balanced()
yield from child
yield egroup
else:
raise RuntimeError(f"Cannot make TokenList from object {x} of type {type(x)}")
[docs] def is_balanced(self)->bool:
"""
See :meth:`NTokenList.is_balanced`.
"""
degree=0
for x in self:
degree+=x.degree()
if degree<0: return False
return degree==0
[docs] def check_balanced(self)->None:
"""
ensure that this is balanced.
:raises ValueError: if this is not balanced.
"""
if not self.is_balanced():
raise ValueError(f"Token list {self} is not balanced")
[docs] def balanced_parts(self)->"List[Union[BalancedTokenList, Token]]":
"""
Internal function, used for serialization and sending to [TeX].
Split this :class:`TokenList` into a list of balanced parts and unbalanced ``{``/``}`` tokens.
"""
degree=0
min_degree=0, 0
for i, token in enumerate(self):
degree+=token.degree()
min_degree=min(min_degree, (degree, i+1))
min_degree_pos=min_degree[1]
left_half: List[Union[BalancedTokenList, Token]]=[]
degree=0
last_pos=0
for i in range(min_degree_pos):
d=self[i].degree()
degree+=d
if degree<0:
degree=0
if last_pos!=i:
left_half.append(BalancedTokenList(self[last_pos:i]))
left_half.append(self[i])
last_pos=i+1
if min_degree_pos!=last_pos:
left_half.append(BalancedTokenList(self[last_pos:min_degree_pos]))
right_half: List[Union[BalancedTokenList, Token]]=[]
degree=0
last_pos=len(self)
for i in range(len(self)-1, min_degree_pos-1, -1):
d=self[i].degree()
degree-=d
if degree<0:
degree=0
if i+1!=last_pos:
right_half.append(BalancedTokenList(self[i+1:last_pos]))
right_half.append(self[i])
last_pos=i
if min_degree_pos!=last_pos:
right_half.append(BalancedTokenList(self[min_degree_pos:last_pos]))
return left_half+right_half[::-1]
[docs] def put_next(self, engine: Engine= default_engine)->None:
"""
Put this token list forward in the input stream.
"""
for part in reversed(self.balanced_parts()): part.put_next(engine=engine)
@property
def balanced(self)->"BalancedTokenList":
"""
``self`` must be balanced.
:return: a :class:`BalancedTokenList` containing the content of this object.
"""
return BalancedTokenList(self)
def __init__(self, a: Iterable=())->None:
"""
Refer to :class:`TokenList` on how to use this function.
"""
super().__init__(TokenList.force_token_list(a))
[docs] @staticmethod
def iterable_from_string(s: str, get_catcode: Callable[[int], Catcode])->Iterable[Token]:
"""
refer to documentation of from_string() for details.
"""
i=0
while i<len(s):
ch=s[i]
i+=1
cat=get_catcode(ord(ch))
if cat==Catcode.space:
yield space
# special case: collapse multiple spaces into one but only if character code is space
if get_catcode(32) in (Catcode.space, Catcode.ignored):
while i<len(s) and s[i]==' ':
i+=1
elif cat.for_token:
yield cat(ch)
elif cat==Catcode.ignored:
continue
else:
assert cat==Catcode.escape, f"cannot create TokenList from string containing catcode {cat}"
cat=get_catcode(ord(s[i]))
if cat!=Catcode.letter:
yield ControlSequenceToken(s[i])
i+=1
else:
csname=s[i]
i+=1
while i<len(s) and get_catcode(ord(s[i]))==Catcode.letter:
csname+=s[i]
i+=1
yield ControlSequenceToken(csname)
# special case: remove spaces after control sequence but only if character code is space
if get_catcode(32) in (Catcode.space, Catcode.ignored):
while i<len(s) and s[i]==' ':
i+=1
[docs] @classmethod
def from_string(cls: Type[TokenListType], s: str, get_catcode: Callable[[int], Catcode])->TokenListType:
"""
Approximate tokenizer implemented in Python.
Convert a string to a :class:`TokenList` (or some subclass of it such as :class:`BalancedTokenList` approximately.
This is an internal function and should not be used directly. Use one of :meth:`e3` or :meth:`doc` instead.
These are used to allow constructing a :class:`TokenList` object in Python without being too verbose.
Refer to :ref:`token-list-construction` for alternatives.
The tokenization algorithm is slightly different from [TeX]'s in the following respect:
* multiple spaces are collapsed to one space, but only if it has character code space (32).
i.e. in expl3 catcode, ``~~`` get tokenized to two spaces.
* spaces with character code different from space (32) after a control sequence is not ignored.
i.e. in expl3 catcode, ``~`` always become a space.
* ``^^`` syntax are not supported. Use Python's escape syntax (e.g. ``\x01``) as usual
(of course that does not work in raw Python strings ``r"..."``).
:param get_catcode: A function that given a character code, return its desired category code.
"""
return cls(TokenList.iterable_from_string(s, get_catcode))
[docs] @classmethod
def e3(cls: Type[TokenListType], s: str)->TokenListType:
r"""
Approximate tokenizer in expl3 (``\ExplSyntaxOn``) catcode regime.
Refer to documentation of :meth:`from_string` for details.
Usage example::
BalancedTokenList.e3(r'\cs_new_protected:Npn \__mymodule_myfunction:n #1 { #1 #1 }')
# returns an instance of BalancedTokenList with the expected content
"""
return cls.from_string(s, lambda x: e3_catcode_table.get(x, Catcode.other))
[docs] @classmethod
def doc(cls: Type[TokenListType], s: str)->TokenListType:
r"""
Approximate tokenizer in document (normal) catcode regime.
Refer to documentation of :meth:`from_string` for details.
Usage example::
BalancedTokenList.doc(r'\def\a{b}') # returns an instance of BalancedTokenList with the expected content
BalancedTokenList.doc('}') # raises an error
TokenList.doc('}') # returns an instance of TokenList with the expected content
"""
return cls.from_string(s, lambda x: doc_catcode_table.get(x, Catcode.other))
[docs] def serialize(self)->str:
return "".join(t.serialize() for t in self)
[docs] def serialize_bytes(self, engine: Engine)->bytes:
"""
Internal function.
Given an engine, serialize it in a form that is suitable for writing directly to the engine.
"""
if engine.is_unicode:
return self.serialize().encode('u8')
else:
result=self.serialize()
try:
return bytes(ord(ch) for ch in result)
except ValueError:
raise ValueError("Cannot serialize TokenList for non-Unicode engine!")
[docs] @classmethod
def deserialize(cls: Type[TokenListType], data: str|bytes)->TokenListType:
result: List[Token]=[]
i=0
# hack
if isinstance(data, bytes):
data="".join(chr(i) for i in data)
while i<len(data):
if data[i] in "\\>*":
start=data.find("\\", i)
pos=start+1
csname=""
for op in data[i:start]:
if op==">":
assert False
elif op=="*":
n=data.find(' ', pos)+2
csname+=data[pos:n-2]+chr(ord(data[n-1])-64)
pos=n
else:
assert False
i=data.find(' ', pos)+1
csname+=data[pos:i-1]
result.append(ControlSequenceToken(csname))
elif data[i]=="R":
result.append(frozen_relax_token)
i+=1
elif data[i]=="^":
result.append(CharacterToken(index=ord(data[i+2])-0x40, catcode=Catcode(int(data[i+1], 16))))
i+=3
else:
result.append(CharacterToken(index=ord(data[i+1]), catcode=Catcode(int(data[i], 16))))
i+=2
return cls(result)
[docs] @classmethod
def deserialize_bytes(cls: Type[TokenListType], data: bytes, engine: Engine)->TokenListType:
"""
Internal function.
Given a bytes object read directly from the engine, deserialize it.
"""
if engine.is_unicode:
return cls.deserialize(data.decode('u8'))
else:
return cls.deserialize(data)
def __repr__(self)->str:
return '<' + type(self).__name__ + ': ' + ' '.join(t.repr1() for t in self) + '>'
[docs] def execute(self, engine: Engine= default_engine)->None:
r"""
Execute this token list. It must not "peek ahead" in the input stream.
For example the token list ``\catcode1=2\relax`` can be executed safely
(and sets the corresponding category code),
but there's no guarantee what will be assigned to ``\tmp`` when ``\futurelet\tmp`` is executed.
"""
NTokenList(self).execute(engine=engine)
[docs] def expand_x(self, engine: Engine= default_engine)->"BalancedTokenList":
"""
Return the ``x``-expansion of this token list.
The result must be balanced, otherwise the behavior is undefined.
"""
return NTokenList(self).expand_x(engine=engine)
[docs] def bool(self)->bool:
"""
See :meth:`NTokenList.bool`.
"""
return NTokenList(self).bool()
[docs] def str_unicode(self)->str:
"""
See :meth:`NTokenList.str_unicode`.
"""
return NTokenList(self).str_unicode()
[docs] def simple_detokenize(self, get_catcode: Callable[[int], Catcode])->str:
return "".join(token.simple_detokenize(get_catcode) for token in self)
[docs] def str(self, engine: Engine=default_engine)->str:
"""
See :meth:`NTokenList.str`.
"""
return NTokenList(self).str(engine)
#@export_function_to_module
[docs]class BalancedTokenList(TokenList):
"""
Represents a balanced token list.
Some useful methods to interact with [TeX]
include :meth:`expand_o`, :meth:`expand_x`, :meth:`get_next` and :meth:`put_next`.
See the corresponding methods' documentation for usage examples.
See also :ref:`token-list-construction` for shorthands to construct token lists in Python code.
.. note::
Runtime checking is not strictly enforced,
use :meth:`~TokenList.is_balanced()` method explicitly if you need to check.
"""
def __init__(self, a: Iterable=())->None:
"""
Constructor.
:raises ValueError: if the token list is not balanced.
"""
super().__init__(a)
self.check_balanced()
[docs] def expand_o(self, engine: Engine= default_engine)->"BalancedTokenList":
"""
Return the ``o``-expansion of this token list.
The result must be balanced, otherwise the behavior is undefined.
"""
return typing.cast(Callable[[PTTBalancedTokenList, Engine], TTPBalancedTokenList], Python_call_TeX_local(
r"""
\cs_new_protected:Npn %name% {
%read_arg0(\__data)%
\exp_args:NNV \tl_set:No \__data \__data
%sync%
%send_arg0_var(\__data)%
\__read_do_one_command:
}
""", recursive=expansion_only_can_call_Python))(PTTBalancedTokenList(self), engine)
[docs] def expand_x(self, engine: Engine= default_engine)->"BalancedTokenList":
return typing.cast(Callable[[PTTBalancedTokenList, Engine], TTPBalancedTokenList], Python_call_TeX_local(
r"""
\cs_new_protected:Npn %name% {
%read_arg0(\__data)%
\tl_set:Nx \__data {\__data}
%sync%
%send_arg0_var(\__data)%
\__read_do_one_command:
}
""", recursive=expansion_only_can_call_Python))(PTTBalancedTokenList(self), engine)
[docs] def execute(self, engine: Engine= default_engine)->None:
typing.cast(Callable[[PTTBalancedTokenList, Engine], None], Python_call_TeX_local(
r"""
\cs_new_protected:Npn %name% {
%read_arg0(\__data)%
\__data
%optional_sync%
\__read_do_one_command:
}
"""))(PTTBalancedTokenList(self), engine)
[docs] def put_next(self, engine: Engine= default_engine)->None:
typing.cast(Callable[[PTTBalancedTokenList, Engine], None], Python_call_TeX_local(
r"""
\cs_new_protected:Npn \__put_next_tmp {
%optional_sync%
\__read_do_one_command:
}
\cs_new_protected:Npn %name% {
%read_arg0(\__target)%
\expandafter \__put_next_tmp \__target
}
""", recursive=False))(PTTBalancedTokenList(self), engine)
[docs] @staticmethod
def get_next(engine: Engine= default_engine)->"BalancedTokenList":
"""
Get an (undelimited) argument from the [TeX] input stream.
"""
return typing.cast(Callable[[Engine], TTPBalancedTokenList], Python_call_TeX_local(
r"""
\cs_new_protected:Npn %name% #1 {
%sync%
%send_arg0(#1)%
\__read_do_one_command:
}
""", recursive=False))(engine)
[docs] def detokenize(self, engine: Engine= default_engine)->str:
r"""
:return: a string, equal to the result of ``\detokenize`` applied to this token list.
"""
return BalancedTokenList([T.detokenize, self]).expand_x(engine=engine).str(engine=engine)
if typing.TYPE_CHECKING:
NTokenListBaseClass = collections.UserList[NToken]
else: # Python 3.8 compatibility
NTokenListBaseClass = collections.UserList
#@export_function_to_module
[docs]class NTokenList(NTokenListBaseClass):
"""
Similar to :class:`TokenList`, but can contain blue tokens.
The class can be used identical to a Python list consist of :class:`NToken` objects,
plus some additional methods to operate on token lists.
Refer to the documentation of :class:`TokenList` for some usage example.
"""
[docs] @staticmethod
def force_token_list(a: Iterable)->Iterable[NToken]:
for x in a:
if isinstance(x, NToken):
yield x
elif isinstance(x, Sequence):
yield bgroup
child=NTokenList(x)
assert child.is_balanced()
yield from child
yield egroup
else:
raise RuntimeError(f"Cannot make NTokenList from object {x} of type {type(x)}")
def __init__(self, a: Iterable=())->None:
super().__init__(NTokenList.force_token_list(a))
[docs] def is_balanced(self)->bool:
"""
Check if this is balanced.
"""
return TokenList(self).is_balanced() # a bit inefficient (need to construct a TokenList) but good enough
[docs] def simple_parts(self)->List[Union[BalancedTokenList, Token, BlueToken]]:
"""
Internal function.
Split this :class:`NTokenList` into a list of balanced non-blue parts,
unbalanced ``{``/``}`` tokens, and blue tokens.
"""
parts: List[Union[TokenList, BlueToken]]=[TokenList()]
for i in self:
if isinstance(i, BlueToken):
parts+=i, TokenList()
else:
assert isinstance(i, Token)
last_part=parts[-1]
assert isinstance(last_part, TokenList)
last_part.append(i)
result: List[Union[BalancedTokenList, Token, BlueToken]]=[]
for large_part in parts:
if isinstance(large_part, BlueToken):
result.append(large_part)
else:
result+=large_part.balanced_parts()
return result
[docs] def put_next(self, engine: Engine= default_engine)->None:
"""
See :meth:`BalancedTokenList.put_next`.
"""
for part in reversed(self.simple_parts()): part.put_next(engine=engine)
[docs] def execute(self, engine: Engine= default_engine)->None:
"""
See :meth:`BalancedTokenList.execute`.
"""
parts=self.simple_parts()
if len(parts)==1:
x=parts[0]
if isinstance(x, BalancedTokenList):
x.execute(engine=engine)
return
NTokenList([*self, T.pythonimmediatecontinue, []]).put_next(engine=engine)
continue_until_passed_back()
[docs] def expand_x(self, engine: Engine= default_engine)->BalancedTokenList:
"""
See :meth:`BalancedTokenList.expand_x`.
"""
NTokenList([T.edef, P.tmp, bgroup, *self, egroup]).execute(engine=engine)
return BalancedTokenList([P.tmp]).expand_o(engine=engine)
[docs] def str_unicode(self)->str:
"""
``self`` must represent a [TeX] string. (i.e. equal to itself when detokenized)
:return: the string content.
.. note::
In non-Unicode engines, each token will be replaced with a character
with character code equal to the character code of that token.
UTF-8 characters with character code ``>=0x80`` will be represented by multiple
characters in the returned string.
"""
return "".join(t.str_unicode() for t in self)
[docs] def str(self, engine: Engine)->str:
"""
``self`` must represent a [TeX] string. (i.e. equal to itself when detokenized)
:return: the string content.
"""
if engine.is_unicode:
return self.str_unicode()
else:
return bytes(ord(ch) for ch in self.str_unicode()).decode('u8')
[docs] def bool(self)->bool:
"""
Internal function. ``self`` must represent a [TeX] string either equal to "0" or "1".
:return: the boolean represented by the string.
"""
s=self.str_unicode()
return {"0": False, "1": True}[s]
[docs]class TeXToPyData(ABC):
"""
Internal class (for now). Represent a data type that can be sent from [TeX] to Python.
"""
[docs] @staticmethod
@abstractmethod
def read(engine: Engine)->"TeXToPyData":
"""
Given that [TeX] has just sent the data, read into a Python object.
"""
...
[docs] @staticmethod
@abstractmethod
def send_code(arg: str)->str:
"""
Return some [TeX] code that sends the argument to Python, where *arg* represents a token list or equivalent (such as ``#1``).
"""
pass
[docs] @staticmethod
@abstractmethod
def send_code_var(var: str)->str:
r"""
Return some [TeX] code that sends the argument to Python, where *var* represents a token list variable
(such as ``\l__my_var_tl``) that contains the content to be sent.
"""
pass
# tried and failed
#@typing.runtime_checkable
#class TeXToPyData(Protocol):
# @staticmethod
# def read()->"TeXToPyData":
# ...
#
# #send_code: str
#
# #@staticmethod
# #@property
# #def send_code()->str:
# # ...
[docs]class TTPRawLine(TeXToPyData, bytes):
send_code=r"\__send_content%naive_send%:n {{ {} }}".format
send_code_var=r"\__send_content%naive_send%:n {{ {} }}".format
[docs] @staticmethod
def read(engine: Engine)->"TTPRawLine":
line=engine.read()
return TTPRawLine(line)
[docs]class TTPLine(TeXToPyData, str):
send_code=r"\__send_content%naive_send%:n {{ {} }}".format
send_code_var=r"\__send_content%naive_send%:n {{ {} }}".format
[docs] @staticmethod
def read(engine: Engine)->"TTPLine":
return TTPLine(readline(engine=engine))
[docs]class TTPELine(TeXToPyData, str):
"""
Same as TTPEBlock, but for a single line only.
"""
send_code=r"\__begingroup_setup_estr: \__send_content%naive_send%:e {{ {} }} \endgroup".format
send_code_var=r"\__begingroup_setup_estr: \__send_content%naive_send%:e {{ {} }} \endgroup".format
[docs] @staticmethod
def read(engine: Engine)->"TTPELine":
return TTPELine(readline(engine=engine))
[docs]class TTPEmbeddedLine(TeXToPyData, str):
[docs] @staticmethod
def send_code(self)->str:
raise RuntimeError("Must be manually handled")
[docs] @staticmethod
def send_code_var(self)->str:
raise RuntimeError("Must be manually handled")
[docs] @staticmethod
def read(engine: Engine)->"TTPEmbeddedLine":
raise RuntimeError("Must be manually handled")
[docs]class TTPBlock(TeXToPyData, str):
send_code=r"\__send_block:n {{ {} }} %naive_flush%".format
send_code_var=r"\__send_block:V {} %naive_flush%".format
[docs] @staticmethod
def read(engine: Engine)->"TTPBlock":
return TTPBlock(read_block(engine=engine))
[docs]class TTPEBlock(TeXToPyData, str):
r"""
A kind of argument that interprets "escaped string" and fully expand anything inside.
For example, {\\} sends a single backslash to Python, {\{} sends a single '{' to Python.
Done by fully expand the argument in \escapechar=-1 and convert it to a string.
Additional precaution is needed, see the note above.
"""
send_code=r"\__begingroup_setup_estr: \__send_block%naive_send%:e {{ {} }} \endgroup".format
send_code_var=r"\__begingroup_setup_estr: \__send_block%naive_send%:e {{ {} }} \endgroup".format
[docs] @staticmethod
def read(engine: Engine)->"TTPEBlock":
return TTPEBlock(read_block(engine=engine))
[docs]class TTPBalancedTokenList(TeXToPyData, BalancedTokenList):
send_code=r"\__tlserialize_nodot:Nn \__tmp {{ {} }} \__send_content%naive_send%:e {{\unexpanded\expandafter{{ \__tmp }} }}".format
send_code_var=r"\__tlserialize_nodot:NV \__tmp {} \__send_content%naive_send%:e {{\unexpanded\expandafter{{ \__tmp }} }}".format
[docs] @staticmethod
def read(engine: Engine)->"TTPBalancedTokenList":
if engine.is_unicode:
return TTPBalancedTokenList(BalancedTokenList.deserialize(readline(engine=engine)))
else:
return TTPBalancedTokenList(BalancedTokenList.deserialize(engine.read()))
[docs]class PyToTeXData(ABC):
"""
Internal class (for now). Represent a data type that can be sent from Python to [TeX].
"""
[docs] @staticmethod
@abstractmethod
def read_code(var: str)->str:
r"""
Takes an argument, the variable name (with backslash prefixed such as ``"\abc"``.)
:return: some [TeX] code that when executed in expl3 category code regime,
will read a value of the specified data type and assign it to the variable.
"""
...
[docs] @abstractmethod
def serialize(self, engine: Engine)->bytes:
"""
Return a bytes object that can be passed to ``engine.write()`` directly.
"""
...
[docs]@dataclass
class PTTVerbatimRawLine(PyToTeXData):
r"""
Represents a line to be tokenized verbatim. Internally the ``\readline`` primitive is used, as such, any trailing spaces are stripped.
The trailing newline is not included, i.e. it's read under ``\endlinechar=-1``.
"""
data: bytes
read_code=r"\__str_get:N {} ".format
[docs] def serialize(self, engine: Engine)->bytes:
assert b"\n" not in self.data
assert self.data.rstrip()==self.data, "Cannot send verbatim line with trailing spaces!"
return self.data+b"\n"
[docs]@dataclass
class PTTVerbatimLine(PyToTeXData):
data: str
read_code=PTTVerbatimRawLine.read_code
[docs] def serialize(self, engine: Engine)->bytes:
return PTTVerbatimRawLine(self.data.encode('u8')).serialize(engine)
[docs]@dataclass
class PTTInt(PyToTeXData):
data: int
read_code=PTTVerbatimLine.read_code
[docs] def serialize(self, engine: Engine)->bytes:
return PTTVerbatimLine(str(self.data)).serialize(engine=engine)
[docs]@dataclass
class PTTTeXLine(PyToTeXData):
r"""
Represents a line to be tokenized in \TeX's current catcode regime.
The trailing newline is not included, i.e. it's tokenized under ``\endlinechar=-1``.
"""
data: str
read_code=r"\exp_args:Nno \use:nn {{ \endlinechar-1 \ior_get:NN \__read_file {} \endlinechar}} {{\the\endlinechar\relax}}".format
[docs] def serialize(self, engine: Engine)->bytes:
assert "\n" not in self.data
return (self.data+"\n").encode('u8')
[docs]@dataclass
class PTTBlock(PyToTeXData):
data: str
read_code=r"\__read_block:N {}".format
[docs] def serialize(self, engine: Engine)->bytes:
return surround_delimiter(self.data).encode('u8')
[docs]@dataclass
class PTTBalancedTokenList(PyToTeXData):
data: BalancedTokenList
read_code=r"\__str_get:N {0} \__tldeserialize_dot:NV {0} {0}".format
[docs] def serialize(self, engine: Engine)->bytes:
return PTTVerbatimRawLine(self.data.serialize_bytes(engine)+b".").serialize(engine=engine)
# ======== define TeX functions that execute Python code ========
# ======== implementation of ``\py`` etc. Doesn't support verbatim argument yet. ========
import itertools
import string
[docs]def random_identifiers()->Iterator[str]: # do this to avoid TeX hash collision while keeping the length short
for len_ in itertools.count(0):
for value in range(1<<len_):
for initial in string.ascii_letters:
identifier = initial
if len_>0:
identifier += f"{value:0{len_}b}".translate({ord("0"): "a", ord("1"): "b"})
yield identifier
random_identifier_iterable=random_identifiers()
[docs]def get_random_identifier()->str:
return next(random_identifier_iterable)
[docs]def define_TeX_call_Python(f: Callable[..., None], name: Optional[str]=None, argtypes: Optional[List[Type[TeXToPyData]]]=None, identifier: Optional[str]=None)->EngineDependentCode:
r"""
This function setups some internal data structure, and
returns the [TeX]-code to be executed on the [TeX]-side to define the macro.
f: the Python function to be executed.
It should take some arguments plus a keyword argument `engine` and eventually (optionally) call one of the ``_finish`` functions.
name: the macro name on the [TeX]-side. This should only consist of letter characters in ``expl3`` catcode regime.
argtypes: list of argument types. If it's None it will be automatically deduced from the function ``f``'s signature.
Returns: some code (to be executed in ``expl3`` catcode regime) as explained above.
"""
if argtypes is None:
argtypes=[p.annotation for p in inspect.signature(f).parameters.values()]
for i, argtype in enumerate(argtypes):
if isinstance(argtype, str):
assert argtype in globals(), f"cannot resolve string annotation {argtype}"
argtypes[i]=argtype=globals()[argtype]
argtypes=[t for t in argtypes if t is not Engine] # temporary hack
for argtype in argtypes:
if not issubclass(argtype, TeXToPyData):
raise RuntimeError(f"Argument type {argtype} is incorrect, should be a subclass of TeXToPyData")
if name is None: name=f.__name__
if identifier is None: identifier=get_random_identifier()
assert identifier not in TeX_handlers, identifier
@functools.wraps(f)
def g(engine: Engine)->None:
if engine.config.debug>=5:
print("TeX macro", name, "called")
assert argtypes is not None
args=[argtype.read(engine=engine) for argtype in argtypes]
old_action_done=engine.action_done
engine.action_done=False
try:
f(*args, engine=engine)
except:
if engine.action_done:
# error occurred after 'finish' is called, cannot signal the error to TeX, will just ignore (after printing out the traceback)...
pass
else:
# TODO what should be done here? What if the error raised below is caught
engine.action_done=True
raise
finally:
if not engine.action_done:
run_none_finish(engine)
engine.action_done=old_action_done
TeX_handlers[identifier]=g
TeX_argspec = ""
TeX_send_input_commands = ""
for i, argtype in enumerate(argtypes):
arg = f"#{i+1}"
TeX_send_input_commands += postprocess_send_code(argtype.send_code(arg), put_sync=i==len(argtypes)-1)
TeX_argspec += arg
if not argtypes:
TeX_send_input_commands += "%naive_flush%"
assert have_naive_replace(TeX_send_input_commands)
return wrap_naive_replace(r"""
\cs_new_protected:Npn """ + "\\"+name + TeX_argspec + r""" {
\__send_content:e { i """ + identifier + """ }
""" + TeX_send_input_commands + r"""
\__read_do_one_command:
}
""")
[docs]def define_internal_handler(f: Callable)->Callable:
"""
define a TeX function with TeX name = f.__name__ that calls f().
this does not define the specified function in any particular engine, just add them to the bootstrap_code.
essert self.process is not None, "process is already closed!"
"""
bootstrap_code_functions.append(define_TeX_call_Python(f))
return f
import linecache
# https://stackoverflow.com/questions/47183305/file-string-traceback-with-line-preview
[docs]def exec_or_eval_with_linecache(code: str, globals: dict, mode: str)->Any:
sourcename: str="<usercode>"
i=0
while sourcename in linecache.cache:
sourcename="<usercode" + str(i) + ">"
i+=1
lines=code.splitlines(keepends=True)
linecache.cache[sourcename] = len(code), None, lines, sourcename
compiled_code=compile(code, sourcename, mode)
return (exec if mode=="exec" else eval)(compiled_code, globals)
#del linecache.cache[sourcename]
# we never delete the cache, in case some function is defined here then later are called...
[docs]def exec_with_linecache(code: str, globals: Dict[str, Any])->None:
exec_or_eval_with_linecache(code, globals, "exec")
[docs]def eval_with_linecache(code: str, globals: Dict[str, Any])->Any:
return exec_or_eval_with_linecache(code, globals, "eval")
[docs]@define_internal_handler
def py(code: TTPEBlock, engine: Engine)->None:
pythonimmediate.run_block_finish(str(eval_with_linecache(code, user_scope))+"%", engine=engine)
[docs]@define_internal_handler
def pyfile(filename: TTPELine, engine: Engine)->None:
with open(filename, "r") as f:
source=f.read()
exec(compile(source, filename, "exec"), user_scope)
[docs]class RedirectPrintTeX:
def __init__(self, t)->None:
self.t=t
def __enter__(self)->None:
if hasattr(pythonimmediate, "file"):
self.old=pythonimmediate.file
pythonimmediate.file=self.t
def __exit__(self, exc_type, exc_value, tb)->None:
if hasattr(self, "old"):
pythonimmediate.file=self.old
else:
del pythonimmediate.file
[docs]def run_code_redirect_print_TeX(f: Callable[[], Any], engine: Engine)->None:
with io.StringIO() as t:
with RedirectPrintTeX(t):
result=f()
if result is not None:
t.write(str(result)+"%")
content=t.getvalue()
if content.endswith("\n"):
content=content[:-1]
elif not content:
run_none_finish(engine)
return
else:
#content+=r"\empty" # this works too
content+="%"
run_block_finish(content, engine=engine)
[docs]@define_internal_handler
def pyc(code: TTPEBlock, engine: Engine)->None:
run_code_redirect_print_TeX(lambda: exec_with_linecache(code, user_scope), engine=engine)
[docs]@define_internal_handler
def pycq(code: TTPEBlock, engine: Engine)->None:
with RedirectPrintTeX(None):
exec_with_linecache(code, user_scope)
run_none_finish(engine)
mark_bootstrap(
r"""
\NewDocumentCommand\pyv{v}{\py{#1}}
\NewDocumentCommand\pycv{v}{\pyc{#1}}
""")
# ======== implementation of ``pycode`` environment
mark_bootstrap(
r"""
\NewDocumentEnvironment{pycode}{}{
\saveenvreinsert \__code {
\exp_last_unbraced:Nx \__pycodex {{\__code} {\the\inputlineno} {
\ifdefined\currfilename \currfilename \fi
} {
\ifdefined\currfileabspath \currfileabspath \fi
}}
}
}{
\endsaveenvreinsert
}
""")
"""
In some engine, when -8bit option is not enabled, the code will be escaped before being sent to Python.
So for example if the original code contains a literal tab character, `^^I` might be sent to Python instead.
This do a fuzzy-normalization over these so that the sourcecode can be correctly matched.
"""
potentially_escaped_characters=str.maketrans({
chr(i): "^^" + chr(i^0x40)
for i in [*range(0, 32), 127]
})
[docs]def normalize_line(line: str)->str:
assert line.endswith("\n")
line=line[:-1]
while line.endswith("^^I"): line=line[:-3]
return line.rstrip(" \t").translate(potentially_escaped_characters)
[docs]def can_be_mangled_to(original: str, mangled: str)->bool:
r"""
If *original* is put in a [TeX] file, read in other catcode regime (possibly drop trailing spaces/tabs),
and then sent through ``\write`` (possibly convert control characters to ``^^``-notation),
is it possible that the written content is equal to *mangled*?
The function is somewhat tolerant (might return ``True`` in some cases where ``False`` should be returned), but not too tolerant.
Example::
>>> can_be_mangled_to("a\n", "a\n")
True
>>> can_be_mangled_to("\n", "\n")
True
>>> can_be_mangled_to("\t\n", "\n")
True
>>> can_be_mangled_to("\t\n", "\t\n")
True
>>> can_be_mangled_to("\t\n", "^^I\n")
True
>>> can_be_mangled_to("\ta\n", "^^Ia\n")
True
>>> can_be_mangled_to("a b\n", "a b\n")
True
>>> can_be_mangled_to("a b \n", "a b\n")
True
>>> can_be_mangled_to("a\n", "b\n")
False
"""
return normalize_line(original)==normalize_line(mangled)
@define_internal_handler
def __pycodex(code: TTPBlock, lineno_: TTPLine, filename: TTPLine, fileabspath: TTPLine, engine: Engine)->None:
"""
Auxiliary function to execute TeX_code in a `pycode` environment.
The `code` is not executed immediately, instead we search for the source TeX file that contains the code
(it must be found), then read the sowed from that file.
TeX might mangle the code a bit before passing it to Python, as such we use `can_be_mangled_to`
(might return some false positive)
to compare it with the code read from the sourcecode.
"""
if not code: return
lineno=int(lineno_)
# find where the code comes from... (for easy meaningful traceback)
target_filename: Optional[str] = None
code_lines=code.splitlines(keepends=True)
file_lines=[]
for f in (fileabspath, filename):
if not f: continue
p=Path(f)
if not p.is_file(): continue
file_lines=p.read_text(encoding='u8').splitlines(keepends=True)[lineno-len(code_lines)-1:lineno-1]
if len(file_lines)==len(code_lines) and all(
can_be_mangled_to(file_line, code_line) for file_line, code_line in zip(file_lines, code_lines)
):
target_filename=f
break
if not target_filename:
if len(file_lines)==len(code_lines):
for file_line, code_line in zip(file_lines, code_lines):
if not can_be_mangled_to(file_line, code_line):
debug(f"different {file_line!r} {code_line!r}")
raise RuntimeError(f"Source file not found! (cwd={os.getcwd()}, attempted {(fileabspath, filename)})")
with io.StringIO() as t:
with RedirectPrintTeX(t):
if target_filename:
code_=''.join(file_lines) # restore missing trailing spaces
code_="\n"*(lineno-len(code_lines)-1)+code_
if target_filename:
compiled_code=compile(code_, target_filename, "exec")
exec(compiled_code, user_scope)
else:
exec(code_, user_scope)
run_block_finish(t.getvalue(), engine=engine)
# ======== Python-call-TeX functions
# ======== additional functions...
user_documentation(
r"""
These functions get an argument in the input stream and returns it detokenized.
Which means, for example, ``#`` are doubled, multiple spaces might be collapsed into one, spaces might be introduced
after a control sequence.
It's undefined behavior if the message's "string representation" contains a "newline character".
""")
[docs]def template_substitute(template: str, pattern: str, substitute: Union[str, Callable[[re.Match], str]], optional: bool=False)->str:
"""
pattern is a regex
"""
if not optional:
#assert template.count(pattern)==1
assert len(re.findall(pattern, template))==1
return re.sub(pattern, substitute, template)
#typing.TypeVarTuple(PyToTeXData)
#PythonCallTeXFunctionType=Callable[[PyToTeXData], Optional[Tuple[TeXToPyData, ...]]]
[docs]class PythonCallTeXFunctionType(Protocol): # https://stackoverflow.com/questions/57658879/python-type-hint-for-callable-with-variable-number-of-str-same-type-arguments
def __call__(self, *args: PyToTeXData, engine: Engine)->Optional[Tuple[TeXToPyData, ...]]: ...
[docs]class PythonCallTeXSyncFunctionType(PythonCallTeXFunctionType, Protocol): # https://stackoverflow.com/questions/57658879/python-type-hint-for-callable-with-variable-number-of-str-same-type-arguments
def __call__(self, *args: PyToTeXData, engine: Engine)->Tuple[TeXToPyData, ...]: ...
[docs]@dataclass(frozen=True)
class Python_call_TeX_data:
TeX_code: str
recursive: bool
finish: bool
sync: Optional[bool]
Python_call_TeX_defined: Dict[Python_call_TeX_data, Tuple[Python_call_TeX_extra, Callable]]={}
[docs]def Python_call_TeX_local(TeX_code: str, *, recursive: bool=True, sync: Optional[bool]=None, finish: bool=False)->Callable:
"""
Internal function. See :func:`scan_Python_call_TeX`.
"""
data=Python_call_TeX_data(
TeX_code=TeX_code, recursive=recursive, sync=sync, finish=finish
)
return Python_call_TeX_defined[data][1]
[docs]def build_Python_call_TeX(T: Type, TeX_code: str, *, recursive: bool=True, sync: Optional[bool]=None, finish: bool=False)->None:
"""
Internal function. See :func:`scan_Python_call_TeX`.
T has the form Callable[[T1, T2], Tuple[U1, U2]]
where the Tx are subclasses of PyToTeXData and the Ux are subclasses of TeXToPyData
The Tuple[...] can optionally be a single type, then it is almost equivalent to a tuple of one element
It can also be None
"""
assert T.__origin__ == typing.Callable[[], None].__origin__ # type: ignore
# might be typing.Callable or collections.abc.Callable depends on Python version
data=Python_call_TeX_data(
TeX_code=TeX_code, recursive=recursive, sync=sync, finish=finish
)
# T.__args__ consist of the argument types int
Tx=T.__args__[:-1]
assert Tx and Tx[-1]==Engine
Tx=Tx[:-1]
for Ti in Tx: assert issubclass(Ti, PyToTeXData)
result_type: Any = T.__args__[-1] # Tuple[U1, U2]
ttp_argtypes: Union[Type[TeXToPyData], Tuple[Type[TeXToPyData], ...]]
if result_type is type(None):
ttp_argtypes = ()
elif isinstance(result_type, type) and issubclass(result_type, TeXToPyData):
# special case, return a single object instead of a tuple of length 1
ttp_argtypes = result_type
else:
ttp_argtypes = result_type.__args__ # type: ignore
extra=Python_call_TeX_extra(
ptt_argtypes=Tx,
ttp_argtypes=ttp_argtypes
) # type: ignore
if data in Python_call_TeX_defined:
assert Python_call_TeX_defined[data][0]==extra, "different function with exact same code is not supported for now"
else:
if isinstance(ttp_argtypes, type) and issubclass(ttp_argtypes, TeXToPyData):
# special case, return a single object instead of a tuple of length 1
code, result1=define_Python_call_TeX(TeX_code=TeX_code, ptt_argtypes=[*extra.ptt_argtypes], ttp_argtypes=[ttp_argtypes],
recursive=recursive, sync=sync, finish=finish,
)
def result(*args, engine: Engine):
tmp=result1(*args, engine=engine)
assert tmp is not None
assert len(tmp)==1
return tmp[0]
else:
for t in ttp_argtypes:
assert issubclass(t, TeXToPyData)
code, result=define_Python_call_TeX(TeX_code=TeX_code, ptt_argtypes=[*extra.ptt_argtypes], ttp_argtypes=[*ttp_argtypes],
recursive=recursive, sync=sync, finish=finish,
)
bootstrap_code_functions.append(code)
def result2(*args):
engine=args[-1]
assert isinstance(engine, Engine)
return result(*args[:-1], engine=engine)
Python_call_TeX_defined[data]=extra, result2
[docs]def scan_Python_call_TeX(sourcecode: str)->None:
"""
Internal function.
Scan the file in filename for occurrences of ``typing.cast(T, Python_call_TeX_local(...))``,
then call ``build_Python_call_TeX(T, ...)`` for each occurrence.
The way the whole thing work is:
- In the Python code, some ``typing.cast(T, Python_call_TeX_local(...))`` are used.
- This function is called on all the library source codes to scan for those occurrences,
build necessary data structures for the :meth:`Python_call_TeX_local` function calls to work correctly.
- When :meth:`Python_call_TeX_local` is actually called, it does some magic to return the correct function.
Done this way, the type checking works correctly and it's not necessary to define global
temporary variables.
Don't use this function on untrusted code.
"""
import ast
from copy import deepcopy
for node in ast.walk(ast.parse(sourcecode, mode="exec")):
try:
if isinstance(node, ast.Call):
if (
isinstance(node.func, ast.Attribute) and
isinstance(node.func.value, ast.Name) and
node.func.value.id == "typing" and
node.func.attr == "cast"
):
T = node.args[0]
if isinstance(node.args[1], ast.Call):
f_call = node.args[1]
if isinstance(f_call.func, ast.Name):
if f_call.func.id == "Python_call_TeX_local":
f_call=deepcopy(f_call)
assert isinstance(f_call.func, ast.Name)
f_call.func.id="build_Python_call_TeX"
f_call.args=[T]+f_call.args
eval(compile(ast.Expression(body=f_call), "<string>", "eval"))
except:
print("======== while scanning file for Python_call_TeX_local(...) -- error on line", node.lineno, "of file ========", file=sys.stderr)
raise
[docs]def scan_Python_call_TeX_module(name: str)->None:
"""
Internal function.
Can be used as ``scan_Python_call_TeX_module(__name__)`` to scan the current module.
"""
assert name != "__main__" # https://github.com/python/cpython/issues/86291
scan_Python_call_TeX(inspect.getsource(sys.modules[name]))
[docs]def define_Python_call_TeX(TeX_code: str, ptt_argtypes: List[Type[PyToTeXData]], ttp_argtypes: List[Type[TeXToPyData]],
*,
recursive: bool=True,
sync: Optional[bool]=None,
finish: bool=False,
)->Tuple[EngineDependentCode, PythonCallTeXFunctionType]:
r"""
Internal function.
``TeX_code`` should be some expl3 code that defines a function with name ``%name%`` that when called should:
* run some [TeX]-code (which includes reading the arguments, if any)
* do the following if ``sync``:
* send ``r`` to Python (equivalently write %sync%)
* send whatever needed for the output (as in ``ttp_argtypes``)
* call ``\__read_do_one_command:`` iff not ``finish``.
This is allowed to contain the following:
* %name%: the name of the function to be defined as explained above.
* %read_arg0(\var_name)%, %read_arg1(...)%: will be expanded to code that reads the input.
* %send_arg0(...)%, %send_arg1(...)%: will be expanded to code that sends the content.
* %send_arg0_var(\var_name)%, %send_arg1_var(...)%: will be expanded to code that sends the content in the variable.
* %optional_sync%: expanded to code that writes ``r`` (to sync), if ``sync`` is True.
* %naive_flush% and %naive_inline%: as explained in :func:`mark_bootstrap_naive_replace`.
(although usually you don't need to explicitly write this, it's embedded in the ``send*()`` command
of the last argument, or ``%sync%``)
ptt_argtypes: list of argument types to be sent from Python to TeX (i.e. input of the TeX function)
ttp_argtypes: list of argument types to be sent from TeX to Python (i.e. output of the TeX function)
recursive: whether the TeX_code might call another Python function. Default to True.
It does not hurt to always specify True, but performance would be a bit slower.
sync: whether the Python function need to wait for the TeX function to finish.
Required if ``ttp_argtypes`` is not empty.
This should be left to be the default None most of the time. (which will make it always sync if ``debugging``,
otherwise only sync if needed i.e. there's some output)
finish: Include this if and only if ``\__read_do_one_command:`` is omitted.
Normally this is not needed, but it can be used as a slight optimization; and it's needed internally to implement
``run_none_finish`` among others.
For each TeX-call-Python layer, \emph{exactly one} ``finish`` call can be made. If the function itself doesn't call
any ``finish`` call (which happens most of the time), then the wrapper will call ``run_none_finish``.
Return some TeX code to be executed, and a Python function object that when called will call the TeX function
on the passed-in TeX engine and return the result.
Note that the TeX_code must eventually be executed on the corresponding engine for the program to work correctly.
Possible optimizations:
* the ``r`` is not needed if not recursive and ``ttp_argtypes`` is nonempty
(the output itself tells Python when the [TeX]-code finished)
* the first line of the output may be on the same line as the ``r`` itself (done, use :class:`TTPEmbeddedLine` type, although a bit hacky)
"""
if ttp_argtypes!=[]:
assert sync!=False
sync=True
if sync is None:
sync=pythonimmediate.debugging
assert not ttp_argtypes
TeX_code=template_substitute(TeX_code, "%optional_sync%",
lambda _: r'\__send_content%naive_send%:e { r }' if sync else '',)
if sync:
sync_code=r'\__send_content%naive_send%:e { r }'
if ttp_argtypes is not None:
# then don't need to sync here, can sync when the last argument is sent
sync_code=naive_replace(sync_code, False)
else:
sync_code=""
TeX_code=template_substitute(TeX_code, "%sync%", lambda _: sync_code, optional=True)
assert sync is not None
if ttp_argtypes: assert sync
assert ttp_argtypes.count(TTPEmbeddedLine)<=1
identifier=get_random_identifier() # TODO to be fair it isn't necessary to make the identifier both ways distinct, can reuse
TeX_code=template_substitute(TeX_code, "%name%", lambda _: r"\__run_" + identifier + ":")
for i, argtype_ in enumerate(ptt_argtypes):
TeX_code=template_substitute(TeX_code, r"%read_arg" + str(i) + r"\(([^)]*)\)%",
lambda match: argtype_.read_code(match[1]),
optional=True)
for i, argtype in enumerate(ttp_argtypes):
TeX_code=template_substitute(TeX_code, f"%send_arg{i}" + r"\(([^)]*)\)%",
lambda match: postprocess_send_code(argtype.send_code(match[1]), i==len(ttp_argtypes)-1),
optional=True)
TeX_code=template_substitute(TeX_code, f"%send_arg{i}_var" + r"\(([^)]*)\)%",
lambda match: postprocess_send_code(argtype.send_code_var(match[1]), i==len(ttp_argtypes)-1),
optional=True)
def f(*args, engine: Engine)->Optional[Tuple[TeXToPyData, ...]]:
assert len(args)==len(ptt_argtypes), f"passed in {len(args)} = {args}, expect {len(ptt_argtypes)}"
# send function header
engine.check_not_finished()
if finish:
engine.action_done=True
sending_content=(identifier+"\n").encode('u8')
# function args. We build all the arguments before sending anything, just in case some serialize() error out
for arg, argtype in zip(args, ptt_argtypes):
assert isinstance(arg, argtype)
sending_content+=arg.serialize(engine=engine)
engine.write(sending_content)
if not sync: return None
# wait for the result
if recursive:
result_=run_main_loop(engine=engine)
else:
result_=run_main_loop_get_return_one(engine=engine)
result: List[TeXToPyData]=[]
if TTPEmbeddedLine not in ttp_argtypes:
assert not result_
for argtype_ in ttp_argtypes:
if argtype_==TTPEmbeddedLine:
result.append(TTPEmbeddedLine(result_))
else:
result.append(argtype_.read(engine))
return tuple(result)
return wrap_naive_replace(TeX_code), f
scan_Python_call_TeX_module(__name__)
run_none_finish=typing.cast(Callable[[Engine], None], Python_call_TeX_local(
r"""
\cs_new_eq:NN %name% \relax
""", finish=True, sync=False))
"""
Internal function.
``run_error_finish`` is fatal to [TeX], so we only run it when it's fatal to Python.
We want to make sure the Python traceback is printed strictly before run_error_finish() is called,
so that the Python traceback is not interleaved with [TeX] error messages.
"""
run_error_finish=typing.cast(Callable[[PTTBlock, PTTBlock, Engine], None], Python_call_TeX_local(
r"""
\msg_new:nnn {pythonimmediate} {python-error} {Python~error:~#1.}
\cs_new_protected:Npn %name% {
%read_arg0(\__data)%
%read_arg1(\__summary)%
\wlog{^^JPython~error~traceback:^^J\__data^^J}
\msg_error:nnx {pythonimmediate} {python-error} {\__summary}
}
""", finish=True, sync=False))
#@export_function_to_module
[docs]def run_tokenized_line_peek(line: str, *, check_braces: bool=True, check_newline: bool=True, check_continue: bool=True, engine: Engine= default_engine)->str:
check_line(line, braces=check_braces, newline=check_newline, continue_=(True if check_continue else None))
return typing.cast(
Callable[[PTTTeXLine, Engine], Tuple[TTPEmbeddedLine]],
Python_call_TeX_local(
r"""
\cs_new_protected:Npn %name% {
%read_arg0(\__data)%
\__data
}
""")
)(PTTTeXLine(line), engine)[0]
#@export_function_to_module
[docs]def run_block_local(block: str, engine: Engine= default_engine)->None:
typing.cast(Callable[[PTTBlock, Engine], None], Python_call_TeX_local(
r"""
\cs_new_protected:Npn %name% {
%read_arg0(\__data)%
\begingroup \newlinechar=10~ \expandafter \endgroup
\scantokens \expandafter{\__data}
% trick described in https://tex.stackexchange.com/q/640274 to scantokens the code with \newlinechar=10
%optional_sync%
\__read_do_one_command:
}
"""))(PTTBlock(block), engine)
#@export_function_to_module
[docs]def continue_until_passed_back_str(engine: Engine= default_engine)->str:
r"""
Usage:
First put some tokens in the input stream that includes ``\pythonimmediatecontinue{...}``
(or ``%sync% \__read_do_one_command:``), then call ``continue_until_passed_back()``.
The function will only return when the ``\pythonimmediatecontinue`` is called.
"""
return typing.cast(Callable[[Engine], TTPEmbeddedLine], Python_call_TeX_local(
r"""
\cs_new_eq:NN %name% \relax
"""))(engine)
#@export_function_to_module
[docs]def continue_until_passed_back(engine: Engine= default_engine)->None:
"""
Same as ``continue_until_passed_back_str()`` but nothing can be returned from TeX to Python.
"""
result=continue_until_passed_back_str()
assert not result
#@export_function_to_module
[docs]def expand_once(engine: Engine= default_engine)->None:
r"""
Expand the following content in the input stream once.
For example, if the following tokens in the input stream are ``\iffalse 1 \else 2 \fi``,
then after ``expand_once()`` being called once, the tokens in the input stream will be ``2 \fi``.
"""
typing.cast(Callable[[Engine], None], Python_call_TeX_local(
r"""
\cs_new_protected:Npn %name% { \expandafter \pythonimmediatecontinuenoarg }
""", recursive=False, sync=True))(engine)
# ========
#@export_function_to_module
[docs]@user_documentation
def peek_next_meaning(engine: Engine= default_engine)->str:
r"""
Get the meaning of the following token, as a string, using the current ``\escapechar``.
This is recommended over :func:`peek_next_token` as it will not tokenize an extra token.
It's undefined behavior if there's a newline (``\newlinechar`` or ``^^J``, the latter is OS-specific)
in the meaning string.
"""
return typing.cast(Callable[[Engine], TTPEmbeddedLine], Python_call_TeX_local(
r"""
\cs_new_protected:Npn \__peek_next_meaning_callback: {
\edef \__tmp {\meaning \__tmp} % just in case ``\__tmp`` is outer, ``\write`` will not be able to handle it
\__send_content%naive_send%:e { r \__tmp }
\__read_do_one_command:
}
\cs_new_protected:Npn %name% {
\futurelet \__tmp \__peek_next_meaning_callback:
}
""", recursive=False))(engine)
meaning_str_to_catcode: Dict[str, Catcode]={
"begin-group character ": Catcode.bgroup,
"end-group character ": Catcode.egroup,
"math shift character ": Catcode.math,
"alignment tab character ": Catcode.alignment,
"macro parameter character ": Catcode.parameter,
"superscript character ": Catcode.superscript,
"subscript character ": Catcode.subscript,
"blank space ": Catcode.space,
"the letter ": Catcode.letter,
"the character ": Catcode.other,
}
[docs]def parse_meaning_str(s: str)->Optional[Tuple[Catcode, str]]:
if s and s[:-1] in meaning_str_to_catcode:
return meaning_str_to_catcode[s[:-1]], s[-1]
return None
from .simple import *
# also scan the source code and populate bootstrap_code
[docs]def get_bootstrap_code(engine: Engine)->str:
"""
Return the bootstrap code for an engine.
This is before the call to :meth:`substitute_private`.
"""
return "\n".join(
f(engine)
for f in bootstrap_code_functions)