Automated update

This commit is contained in:
klein panic
2025-02-21 22:00:16 -05:00
parent 3b6cc2dc0e
commit a573a508ac
2351 changed files with 522265 additions and 91 deletions

View File

@@ -0,0 +1,97 @@
"""Package with Display implementations for urwid."""
from __future__ import annotations
import importlib
import typing
__all__ = (
"BLACK",
"BROWN",
"DARK_BLUE",
"DARK_CYAN",
"DARK_GRAY",
"DARK_GREEN",
"DARK_MAGENTA",
"DARK_RED",
"DEFAULT",
"LIGHT_BLUE",
"LIGHT_CYAN",
"LIGHT_GRAY",
"LIGHT_GREEN",
"LIGHT_MAGENTA",
"LIGHT_RED",
"UPDATE_PALETTE_ENTRY",
"WHITE",
"YELLOW",
"AttrSpec",
"AttrSpecError",
"BaseScreen",
"RealTerminal",
"ScreenError",
# Lazy imported
"html_fragment",
"lcd",
"raw",
"web",
)
from . import raw
from .common import (
BLACK,
BROWN,
DARK_BLUE,
DARK_CYAN,
DARK_GRAY,
DARK_GREEN,
DARK_MAGENTA,
DARK_RED,
DEFAULT,
LIGHT_BLUE,
LIGHT_CYAN,
LIGHT_GRAY,
LIGHT_GREEN,
LIGHT_MAGENTA,
LIGHT_RED,
UPDATE_PALETTE_ENTRY,
WHITE,
YELLOW,
AttrSpec,
AttrSpecError,
BaseScreen,
RealTerminal,
ScreenError,
)
try:
from . import curses
__all__ += ("curses",)
except ImportError:
pass
# Moved modules handling
__locals: dict[str, typing.Any] = locals() # use mutable access for pure lazy loading
# Lazy load modules
_lazy_load: frozenset[str] = frozenset(
(
"html_fragment",
"lcd",
"web",
)
)
def __getattr__(name: str) -> typing.Any:
"""Get attributes lazy.
:return: attribute by name
:raises AttributeError: attribute is not defined for lazy load
"""
if name in _lazy_load:
mod = importlib.import_module(f"{__package__}.{name}")
__locals[name] = mod
return mod
raise AttributeError(f"{name} not found in {__package__}")

View File

@@ -0,0 +1,424 @@
# Urwid raw display module
# Copyright (C) 2004-2009 Ian Ward
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Urwid web site: https://urwid.org/
"""
Direct terminal UI implementation
"""
from __future__ import annotations
import contextlib
import fcntl
import functools
import os
import selectors
import signal
import struct
import sys
import termios
import tty
import typing
from subprocess import PIPE, Popen
from urwid import signals
from . import _raw_display_base, escape
from .common import INPUT_DESCRIPTORS_CHANGED
if typing.TYPE_CHECKING:
import socket
from collections.abc import Callable
from types import FrameType
from urwid.event_loop import EventLoop
class Screen(_raw_display_base.Screen):
def __init__(
self,
input: typing.TextIO = sys.stdin, # noqa: A002 # pylint: disable=redefined-builtin
output: typing.TextIO = sys.stdout,
bracketed_paste_mode=False,
focus_reporting=False,
) -> None:
"""Initialize a screen that directly prints escape codes to an output
terminal.
bracketed_paste_mode -- enable bracketed paste mode in the host terminal.
If the host terminal supports it, the application will receive `begin paste`
and `end paste` keystrokes when the user pastes text.
focus_reporting -- enable focus reporting in the host terminal.
If the host terminal supports it, the application will receive `focus in`
and `focus out` keystrokes when the application gains and loses focus.
"""
super().__init__(input, output)
self.gpm_mev: Popen | None = None
self.gpm_event_pending: bool = False
self.bracketed_paste_mode = bracketed_paste_mode
self.focus_reporting = focus_reporting
# These store the previous signal handlers after setting ours
self._prev_sigcont_handler = None
self._prev_sigtstp_handler = None
self._prev_sigwinch_handler = None
def __repr__(self) -> str:
return (
f"<{self.__class__.__name__}("
f"input={self._term_input_file}, "
f"output={self._term_output_file}, "
f"bracketed_paste_mode={self.bracketed_paste_mode}, "
f"focus_reporting={self.focus_reporting})>"
)
def _sigwinch_handler(self, signum: int = 28, frame: FrameType | None = None) -> None:
"""
frame -- will always be None when the GLib event loop is being used.
"""
super()._sigwinch_handler(signum, frame)
if callable(self._prev_sigwinch_handler):
self._prev_sigwinch_handler(signum, frame)
def _sigtstp_handler(self, signum: int, frame: FrameType | None = None) -> None:
self.stop() # Restores the previous signal handlers
self._prev_sigcont_handler = self.signal_handler_setter(signal.SIGCONT, self._sigcont_handler)
# Handled by the previous handler.
# If non-default, it may set its own SIGCONT handler which should hopefully call our own.
os.kill(os.getpid(), signal.SIGTSTP)
def _sigcont_handler(self, signum: int, frame: FrameType | None = None) -> None:
"""
frame -- will always be None when the GLib event loop is being used.
"""
self.signal_restore()
if callable(self._prev_sigcont_handler):
# May set its own SIGTSTP handler which would be stored and replaced in
# `signal_init()` (via `start()`).
self._prev_sigcont_handler(signum, frame)
self.start()
self._sigwinch_handler(28, None)
def signal_init(self) -> None:
"""
Called in the startup of run wrapper to set the SIGWINCH
and SIGTSTP signal handlers.
Override this function to call from main thread in threaded
applications.
"""
self._prev_sigwinch_handler = self.signal_handler_setter(signal.SIGWINCH, self._sigwinch_handler)
self._prev_sigtstp_handler = self.signal_handler_setter(signal.SIGTSTP, self._sigtstp_handler)
def signal_restore(self) -> None:
"""
Called in the finally block of run wrapper to restore the
SIGTSTP, SIGCONT and SIGWINCH signal handlers.
Override this function to call from main thread in threaded
applications.
"""
self.signal_handler_setter(signal.SIGTSTP, self._prev_sigtstp_handler or signal.SIG_DFL)
self.signal_handler_setter(signal.SIGCONT, self._prev_sigcont_handler or signal.SIG_DFL)
self.signal_handler_setter(signal.SIGWINCH, self._prev_sigwinch_handler or signal.SIG_DFL)
def _mouse_tracking(self, enable: bool) -> None:
super()._mouse_tracking(enable)
if enable:
self._start_gpm_tracking()
else:
self._stop_gpm_tracking()
def _start_gpm_tracking(self) -> None:
if not os.path.isfile("/usr/bin/mev"):
return
if not os.environ.get("TERM", "").lower().startswith("linux"):
return
m = Popen( # noqa: S603 # pylint: disable=consider-using-with
["/usr/bin/mev", "-e", "158"],
stdin=PIPE,
stdout=PIPE,
close_fds=True,
encoding="ascii",
)
fcntl.fcntl(m.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
self.gpm_mev = m
def _stop_gpm_tracking(self) -> None:
if not self.gpm_mev:
return
os.kill(self.gpm_mev.pid, signal.SIGINT)
os.waitpid(self.gpm_mev.pid, 0)
self.gpm_mev = None
def _start(self, alternate_buffer: bool = True) -> None:
"""
Initialize the screen and input mode.
alternate_buffer -- use alternate screen buffer
"""
if alternate_buffer:
self.write(escape.SWITCH_TO_ALTERNATE_BUFFER)
self._rows_used = None
else:
self._rows_used = 0
if self.bracketed_paste_mode:
self.write(escape.ENABLE_BRACKETED_PASTE_MODE)
if self.focus_reporting:
self.write(escape.ENABLE_FOCUS_REPORTING)
fd = self._input_fileno()
if fd is not None and os.isatty(fd):
self._old_termios_settings = termios.tcgetattr(fd)
tty.setcbreak(fd)
self.signal_init()
self._alternate_buffer = alternate_buffer
self._next_timeout = self.max_wait
if not self._signal_keys_set:
self._old_signal_keys = self.tty_signal_keys(fileno=fd)
signals.emit_signal(self, INPUT_DESCRIPTORS_CHANGED)
# restore mouse tracking to previous state
self._mouse_tracking(self._mouse_tracking_enabled)
return super()._start()
def _stop(self) -> None:
"""
Restore the screen.
"""
self.clear()
if self.bracketed_paste_mode:
self.write(escape.DISABLE_BRACKETED_PASTE_MODE)
if self.focus_reporting:
self.write(escape.DISABLE_FOCUS_REPORTING)
signals.emit_signal(self, INPUT_DESCRIPTORS_CHANGED)
self.signal_restore()
self._stop_mouse_restore_buffer()
fd = self._input_fileno()
if fd is not None and os.isatty(fd):
termios.tcsetattr(fd, termios.TCSAFLUSH, self._old_termios_settings)
if self._old_signal_keys:
self.tty_signal_keys(*self._old_signal_keys, fd)
super()._stop()
def get_input_descriptors(self) -> list[socket.socket | typing.IO | int]:
"""
Return a list of integer file descriptors that should be
polled in external event loops to check for user input.
Use this method if you are implementing your own event loop.
This method is only called by `hook_event_loop`, so if you override
that, you can safely ignore this.
"""
if not self._started:
return []
fd_list = super().get_input_descriptors()
if self.gpm_mev is not None and self.gpm_mev.stdout is not None:
fd_list.append(self.gpm_mev.stdout)
return fd_list
def unhook_event_loop(self, event_loop: EventLoop) -> None:
"""
Remove any hooks added by hook_event_loop.
"""
for handle in self._current_event_loop_handles:
event_loop.remove_watch_file(handle)
if self._input_timeout:
event_loop.remove_alarm(self._input_timeout)
self._input_timeout = None
def hook_event_loop(
self,
event_loop: EventLoop,
callback: Callable[[list[str], list[int]], typing.Any],
) -> None:
"""
Register the given callback with the event loop, to be called with new
input whenever it's available. The callback should be passed a list of
processed keys and a list of unprocessed keycodes.
Subclasses may wish to use parse_input to wrap the callback.
"""
if hasattr(self, "get_input_nonblocking"):
wrapper = self._make_legacy_input_wrapper(event_loop, callback)
else:
@functools.wraps(callback)
def wrapper() -> tuple[list[str], typing.Any] | None:
self.logger.debug('Calling callback for "watch file"')
return self.parse_input(event_loop, callback, self.get_available_raw_input())
fds = self.get_input_descriptors()
handles = [event_loop.watch_file(fd if isinstance(fd, int) else fd.fileno(), wrapper) for fd in fds]
self._current_event_loop_handles = handles
def _get_input_codes(self) -> list[int]:
return super()._get_input_codes() + self._get_gpm_codes()
def _get_gpm_codes(self) -> list[int]:
codes = []
try:
while self.gpm_mev is not None and self.gpm_event_pending:
codes.extend(self._encode_gpm_event())
except OSError as e:
if e.args[0] != 11:
raise
return codes
def _read_raw_input(self, timeout: int) -> bytearray:
ready = self._wait_for_input_ready(timeout)
if self.gpm_mev is not None and self.gpm_mev.stdout.fileno() in ready:
self.gpm_event_pending = True
fd = self._input_fileno()
chars = bytearray()
if fd is None or fd not in ready:
return chars
with selectors.DefaultSelector() as selector:
selector.register(fd, selectors.EVENT_READ)
input_ready = selector.select(0)
while input_ready:
chars.extend(os.read(fd, 1024))
input_ready = selector.select(0)
return chars
def _encode_gpm_event(self) -> list[int]:
self.gpm_event_pending = False
s = self.gpm_mev.stdout.readline()
result = s.split(", ")
if len(result) != 6:
# unexpected output, stop tracking
self._stop_gpm_tracking()
signals.emit_signal(self, INPUT_DESCRIPTORS_CHANGED)
return []
ev, x, y, _ign, b, m = s.split(",")
ev = int(ev.split("x")[-1], 16)
x = int(x.split(" ")[-1])
y = int(y.lstrip().split(" ")[0])
b = int(b.split(" ")[-1])
m = int(m.split("x")[-1].rstrip(), 16)
# convert to xterm-like escape sequence
last_state = next_state = self.last_bstate
result = []
mod = 0
if m & 1:
mod |= 4 # shift
if m & 10:
mod |= 8 # alt
if m & 4:
mod |= 16 # ctrl
def append_button(b: int) -> None:
b |= mod
result.extend([27, ord("["), ord("M"), b + 32, x + 32, y + 32])
if ev in {20, 36, 52}: # press
if b & 4 and last_state & 1 == 0:
append_button(0)
next_state |= 1
if b & 2 and last_state & 2 == 0:
append_button(1)
next_state |= 2
if b & 1 and last_state & 4 == 0:
append_button(2)
next_state |= 4
elif ev == 146: # drag
if b & 4:
append_button(0 + escape.MOUSE_DRAG_FLAG)
elif b & 2:
append_button(1 + escape.MOUSE_DRAG_FLAG)
elif b & 1:
append_button(2 + escape.MOUSE_DRAG_FLAG)
else: # release
if b & 4 and last_state & 1:
append_button(0 + escape.MOUSE_RELEASE_FLAG)
next_state &= ~1
if b & 2 and last_state & 2:
append_button(1 + escape.MOUSE_RELEASE_FLAG)
next_state &= ~2
if b & 1 and last_state & 4:
append_button(2 + escape.MOUSE_RELEASE_FLAG)
next_state &= ~4
if ev == 40: # double click (release)
if b & 4 and last_state & 1:
append_button(0 + escape.MOUSE_MULTIPLE_CLICK_FLAG)
if b & 2 and last_state & 2:
append_button(1 + escape.MOUSE_MULTIPLE_CLICK_FLAG)
if b & 1 and last_state & 4:
append_button(2 + escape.MOUSE_MULTIPLE_CLICK_FLAG)
elif ev == 52:
if b & 4 and last_state & 1:
append_button(0 + escape.MOUSE_MULTIPLE_CLICK_FLAG * 2)
if b & 2 and last_state & 2:
append_button(1 + escape.MOUSE_MULTIPLE_CLICK_FLAG * 2)
if b & 1 and last_state & 4:
append_button(2 + escape.MOUSE_MULTIPLE_CLICK_FLAG * 2)
self.last_bstate = next_state
return result
def get_cols_rows(self) -> tuple[int, int]:
"""Return the terminal dimensions (num columns, num rows)."""
y, x = super().get_cols_rows()
with contextlib.suppress(OSError): # Term size could not be determined
if hasattr(self._term_output_file, "fileno"):
buf = fcntl.ioctl(self._term_output_file.fileno(), termios.TIOCGWINSZ, b" " * 4)
y, x = struct.unpack("hh", buf)
# Provide some lightweight fallbacks in case the TIOCWINSZ doesn't
# give sane answers
if (x <= 0 or y <= 0) and self.term in {"ansi", "vt100"}:
y, x = 24, 80
self.maxrow = y
return x, y
def _test():
import doctest
doctest.testmod()
if __name__ == "__main__":
_test()

View File

@@ -0,0 +1,917 @@
# Urwid raw display module
# Copyright (C) 2004-2009 Ian Ward
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Urwid web site: https://urwid.org/
"""
Direct terminal UI implementation
"""
from __future__ import annotations
import abc
import contextlib
import functools
import os
import platform
import selectors
import signal
import socket
import sys
import typing
from urwid import signals, str_util, util
from . import escape
from .common import UNPRINTABLE_TRANS_TABLE, UPDATE_PALETTE_ENTRY, AttrSpec, BaseScreen, RealTerminal
if typing.TYPE_CHECKING:
from collections.abc import Callable, Iterable
from types import FrameType
from typing_extensions import Literal
from urwid import Canvas, EventLoop
IS_WINDOWS = sys.platform == "win32"
IS_WSL = (sys.platform == "linux") and ("wsl" in platform.platform().lower())
class Screen(BaseScreen, RealTerminal):
def __init__(self, input: typing.IO, output: typing.IO) -> None: # noqa: A002 # pylint: disable=redefined-builtin
"""Initialize a screen that directly prints escape codes to an output
terminal.
"""
super().__init__()
self._partial_codes: list[int] = []
self._pal_escape: dict[str | None, str] = {}
self._pal_attrspec: dict[str | None, AttrSpec] = {}
self._alternate_buffer: bool = False
signals.connect_signal(self, UPDATE_PALETTE_ENTRY, self._on_update_palette_entry)
self.colors: Literal[1, 16, 88, 256, 16777216] = 16 # FIXME: detect this
self.has_underline = True # FIXME: detect this
self.prev_input_resize = 0
self.set_input_timeouts()
self.screen_buf = None
self._screen_buf_canvas = None
self._resized = False
self.maxrow = None
self._mouse_tracking_enabled = False
self.last_bstate = 0
self._setup_G1_done = False
self._rows_used = None
self._cy = 0
self.term = os.environ.get("TERM", "")
self.fg_bright_is_bold = not self.term.startswith("xterm")
self.bg_bright_is_blink = self.term == "linux"
self.back_color_erase = not self.term.startswith("screen")
self.register_palette_entry(None, "default", "default")
self._next_timeout = None
self.signal_handler_setter = signal.signal
# Our connections to the world
self._term_output_file = output
self._term_input_file = input
# pipe for signalling external event loops about resize events
self._resize_pipe_rd, self._resize_pipe_wr = socket.socketpair()
self._resize_pipe_rd.setblocking(False)
def __del__(self) -> None:
self._resize_pipe_rd.close()
self._resize_pipe_wr.close()
def __repr__(self) -> str:
return f"<{self.__class__.__name__}(input={self._term_input_file}, output={self._term_output_file})>"
def _sigwinch_handler(self, signum: int = 28, frame: FrameType | None = None) -> None:
"""
frame -- will always be None when the GLib event loop is being used.
"""
logger = self.logger.getChild("signal_handlers")
logger.debug(f"SIGWINCH handler called with signum={signum!r}, frame={frame!r}")
if IS_WINDOWS or not self._resized:
self._resize_pipe_wr.send(b"R")
logger.debug("Sent fake resize input to the pipe")
self._resized = True
self.screen_buf = None
@property
def _term_input_io(self) -> typing.IO | None:
if hasattr(self._term_input_file, "fileno"):
return self._term_input_file
return None
def _input_fileno(self) -> int | None:
"""Returns the fileno of the input stream, or None if it doesn't have one.
A stream without a fileno can't participate in whatever.
"""
if hasattr(self._term_input_file, "fileno"):
return self._term_input_file.fileno()
return None
def _on_update_palette_entry(self, name: str | None, *attrspecs: AttrSpec):
# copy the attribute to a dictionary containing the escape seqences
a: AttrSpec = attrspecs[{16: 0, 1: 1, 88: 2, 256: 3, 2**24: 4}[self.colors]]
self._pal_attrspec[name] = a
self._pal_escape[name] = self._attrspec_to_escape(a)
def set_input_timeouts(
self,
max_wait: float | None = None,
complete_wait: float = 0.125,
resize_wait: float = 0.125,
) -> None:
"""
Set the get_input timeout values. All values are in floating
point numbers of seconds.
max_wait -- amount of time in seconds to wait for input when
there is no input pending, wait forever if None
complete_wait -- amount of time in seconds to wait when
get_input detects an incomplete escape sequence at the
end of the available input
resize_wait -- amount of time in seconds to wait for more input
after receiving two screen resize requests in a row to
stop Urwid from consuming 100% cpu during a gradual
window resize operation
"""
self.max_wait = max_wait
if max_wait is not None:
if self._next_timeout is None:
self._next_timeout = max_wait
else:
self._next_timeout = min(self._next_timeout, self.max_wait)
self.complete_wait = complete_wait
self.resize_wait = resize_wait
def set_mouse_tracking(self, enable: bool = True) -> None:
"""
Enable (or disable) mouse tracking.
After calling this function get_input will include mouse
click events along with keystrokes.
"""
enable = bool(enable) # noqa: FURB123,RUF100
if enable == self._mouse_tracking_enabled:
return
self._mouse_tracking(enable)
self._mouse_tracking_enabled = enable
def _mouse_tracking(self, enable: bool) -> None:
if enable:
self.write(escape.MOUSE_TRACKING_ON)
else:
self.write(escape.MOUSE_TRACKING_OFF)
@abc.abstractmethod
def _start(self, alternate_buffer: bool = True) -> None:
"""
Initialize the screen and input mode.
alternate_buffer -- use alternate screen buffer
"""
def _stop_mouse_restore_buffer(self) -> None:
"""Stop mouse tracking and restore the screen."""
self._mouse_tracking(False)
move_cursor = ""
if self._alternate_buffer:
move_cursor = escape.RESTORE_NORMAL_BUFFER
elif self.maxrow is not None:
move_cursor = escape.set_cursor_position(0, self.maxrow)
self.write(self._attrspec_to_escape(AttrSpec("", "")) + escape.SI + move_cursor + escape.SHOW_CURSOR)
self.flush()
@abc.abstractmethod
def _stop(self) -> None:
"""
Restore the screen.
"""
def write(self, data):
"""Write some data to the terminal.
You may wish to override this if you're using something other than
regular files for input and output.
"""
self._term_output_file.write(data)
def flush(self):
"""Flush the output buffer.
You may wish to override this if you're using something other than
regular files for input and output.
"""
self._term_output_file.flush()
@typing.overload
def get_input(self, raw_keys: Literal[False]) -> list[str]: ...
@typing.overload
def get_input(self, raw_keys: Literal[True]) -> tuple[list[str], list[int]]: ...
def get_input(self, raw_keys: bool = False) -> list[str] | tuple[list[str], list[int]]:
"""Return pending input as a list.
raw_keys -- return raw keycodes as well as translated versions
This function will immediately return all the input since the
last time it was called. If there is no input pending it will
wait before returning an empty list. The wait time may be
configured with the set_input_timeouts function.
If raw_keys is False (default) this function will return a list
of keys pressed. If raw_keys is True this function will return
a ( keys pressed, raw keycodes ) tuple instead.
Examples of keys returned:
* ASCII printable characters: " ", "a", "0", "A", "-", "/"
* ASCII control characters: "tab", "enter"
* Escape sequences: "up", "page up", "home", "insert", "f1"
* Key combinations: "shift f1", "meta a", "ctrl b"
* Window events: "window resize"
When a narrow encoding is not enabled:
* "Extended ASCII" characters: "\\xa1", "\\xb2", "\\xfe"
When a wide encoding is enabled:
* Double-byte characters: "\\xa1\\xea", "\\xb2\\xd4"
When utf8 encoding is enabled:
* Unicode characters: u"\\u00a5", u'\\u253c"
Examples of mouse events returned:
* Mouse button press: ('mouse press', 1, 15, 13),
('meta mouse press', 2, 17, 23)
* Mouse drag: ('mouse drag', 1, 16, 13),
('mouse drag', 1, 17, 13),
('ctrl mouse drag', 1, 18, 13)
* Mouse button release: ('mouse release', 0, 18, 13),
('ctrl mouse release', 0, 17, 23)
"""
logger = self.logger.getChild("get_input")
if not self._started:
raise RuntimeError
self._wait_for_input_ready(self._next_timeout)
keys, raw = self.parse_input(None, None, self.get_available_raw_input())
# Avoid pegging CPU at 100% when slowly resizing
if keys == ["window resize"] and self.prev_input_resize:
logger.debug('get_input: got "window resize" > 1 times. Enable throttling for resize.')
for _ in range(2):
self._wait_for_input_ready(self.resize_wait)
new_keys, new_raw = self.parse_input(None, None, self.get_available_raw_input())
raw += new_raw
if new_keys and new_keys != ["window resize"]:
if "window resize" in new_keys:
keys = new_keys
else:
keys.extend(new_keys)
break
if keys == ["window resize"]:
self.prev_input_resize = 2
elif self.prev_input_resize == 2 and not keys:
self.prev_input_resize = 1
else:
self.prev_input_resize = 0
if raw_keys:
return keys, raw
return keys
def get_input_descriptors(self) -> list[socket.socket | typing.IO | int]:
"""
Return a list of integer file descriptors that should be
polled in external event loops to check for user input.
Use this method if you are implementing your own event loop.
This method is only called by `hook_event_loop`, so if you override
that, you can safely ignore this.
"""
if not self._started:
return []
fd_list: list[socket.socket | typing.IO | int] = [self._resize_pipe_rd]
input_io = self._term_input_io
if input_io is not None:
fd_list.append(input_io)
return fd_list
_current_event_loop_handles = ()
@abc.abstractmethod
def unhook_event_loop(self, event_loop: EventLoop) -> None:
"""
Remove any hooks added by hook_event_loop.
"""
@abc.abstractmethod
def hook_event_loop(
self,
event_loop: EventLoop,
callback: Callable[[list[str], list[int]], typing.Any],
) -> None:
"""
Register the given callback with the event loop, to be called with new
input whenever it's available. The callback should be passed a list of
processed keys and a list of unprocessed keycodes.
Subclasses may wish to use parse_input to wrap the callback.
"""
_input_timeout = None
def _make_legacy_input_wrapper(self, event_loop, callback):
"""
Support old Screen classes that still have a get_input_nonblocking and expect it to work.
"""
@functools.wraps(callback)
def wrapper():
if self._input_timeout:
event_loop.remove_alarm(self._input_timeout)
self._input_timeout = None
timeout, keys, raw = self.get_input_nonblocking() # pylint: disable=no-member # should we deprecate?
if timeout is not None:
self._input_timeout = event_loop.alarm(timeout, wrapper)
callback(keys, raw)
return wrapper
def _get_input_codes(self) -> list[int]:
return list(self._get_keyboard_codes())
def get_available_raw_input(self) -> list[int]:
"""
Return any currently available input. Does not block.
This method is only used by the default `hook_event_loop`
implementation; you can safely ignore it if you implement your own.
"""
logger = self.logger.getChild("get_available_raw_input")
codes = [*self._partial_codes, *self._get_input_codes()]
self._partial_codes = []
# clean out the pipe used to signal external event loops
# that a resize has occurred
with selectors.DefaultSelector() as selector:
selector.register(self._resize_pipe_rd, selectors.EVENT_READ)
present_resize_flag = selector.select(0) # nonblocking
while present_resize_flag:
logger.debug("Resize signal received. Cleaning socket.")
# Argument "size" is maximum buffer size to read. Since we're emptying, set it reasonably big.
self._resize_pipe_rd.recv(128)
present_resize_flag = selector.select(0)
return codes
@typing.overload
def parse_input(
self,
event_loop: None,
callback: None,
codes: list[int],
wait_for_more: bool = ...,
) -> tuple[list[str], list[int]]: ...
@typing.overload
def parse_input(
self,
event_loop: EventLoop,
callback: None,
codes: list[int],
wait_for_more: bool = ...,
) -> tuple[list[str], list[int]]: ...
@typing.overload
def parse_input(
self,
event_loop: EventLoop,
callback: Callable[[list[str], list[int]], typing.Any],
codes: list[int],
wait_for_more: bool = ...,
) -> None: ...
def parse_input(
self,
event_loop: EventLoop | None,
callback: Callable[[list[str], list[int]], typing.Any] | None,
codes: list[int],
wait_for_more: bool = True,
) -> tuple[list[str], list[int]] | None:
"""
Read any available input from get_available_raw_input, parses it into
keys, and calls the given callback.
The current implementation tries to avoid any assumptions about what
the screen or event loop look like; it only deals with parsing keycodes
and setting a timeout when an incomplete one is detected.
`codes` should be a sequence of keycodes, i.e. bytes. A bytearray is
appropriate, but beware of using bytes, which only iterates as integers
on Python 3.
"""
logger = self.logger.getChild("parse_input")
# Note: event_loop may be None for 100% synchronous support, only used
# by get_input. Not documented because you shouldn't be doing it.
if self._input_timeout and event_loop:
event_loop.remove_alarm(self._input_timeout)
self._input_timeout = None
original_codes = codes
decoded_codes = []
try:
while codes:
run, codes = escape.process_keyqueue(codes, wait_for_more)
decoded_codes.extend(run)
except escape.MoreInputRequired:
# Set a timer to wait for the rest of the input; if it goes off
# without any new input having come in, use the partial input
k = len(original_codes) - len(codes)
raw_codes = original_codes[:k]
self._partial_codes = codes
def _parse_incomplete_input():
self._input_timeout = None
self._partial_codes = []
self.parse_input(event_loop, callback, codes, wait_for_more=False)
if event_loop:
self._input_timeout = event_loop.alarm(self.complete_wait, _parse_incomplete_input)
else:
raw_codes = original_codes
self._partial_codes = []
logger.debug(f"Decoded codes: {decoded_codes!r}, raw codes: {raw_codes!r}")
if self._resized:
decoded_codes.append("window resize")
logger.debug('Added "window resize" to the codes')
self._resized = False
if callback:
callback(decoded_codes, raw_codes)
return None
# For get_input
return decoded_codes, raw_codes
def _wait_for_input_ready(self, timeout: float | None) -> list[int]:
logger = self.logger.getChild("wait_for_input_ready")
fd_list = self.get_input_descriptors()
logger.debug(f"Waiting for input: descriptors={fd_list!r}, timeout={timeout!r}")
with selectors.DefaultSelector() as selector:
for fd in fd_list:
selector.register(fd, selectors.EVENT_READ)
ready = selector.select(timeout)
logger.debug(f"Input ready: {ready}")
return [event.fd for event, _ in ready]
@abc.abstractmethod
def _read_raw_input(self, timeout: int) -> Iterable[int]: ...
def _get_keyboard_codes(self) -> Iterable[int]:
return self._read_raw_input(0)
def _setup_G1(self) -> None:
"""
Initialize the G1 character set to graphics mode if required.
"""
if self._setup_G1_done:
return
while True:
with contextlib.suppress(OSError):
self.write(escape.DESIGNATE_G1_SPECIAL)
self.flush()
break
self._setup_G1_done = True
def draw_screen(self, size: tuple[int, int], canvas: Canvas) -> None:
"""Paint screen with rendered canvas."""
def set_cursor_home() -> str:
if not partial_display():
return escape.set_cursor_position(0, 0)
return escape.CURSOR_HOME_COL + escape.move_cursor_up(cy)
def set_cursor_position(x: int, y: int) -> str:
if not partial_display():
return escape.set_cursor_position(x, y)
if cy > y:
return "\b" + escape.CURSOR_HOME_COL + escape.move_cursor_up(cy - y) + escape.move_cursor_right(x)
return "\b" + escape.CURSOR_HOME_COL + escape.move_cursor_down(y - cy) + escape.move_cursor_right(x)
def is_blank_row(row: list[tuple[object, Literal["0", "U"] | None], bytes]) -> bool:
if len(row) > 1:
return False
return not row[0][2].strip()
def attr_to_escape(a: AttrSpec | str | None) -> str:
if a in self._pal_escape:
return self._pal_escape[a]
if isinstance(a, AttrSpec):
return self._attrspec_to_escape(a)
if a is None:
return self._attrspec_to_escape(AttrSpec("default", "default"))
# undefined attributes use default/default
self.logger.debug(f"Undefined attribute: {a!r}")
return self._attrspec_to_escape(AttrSpec("default", "default"))
def using_standout_or_underline(a: AttrSpec | str) -> bool:
a = self._pal_attrspec.get(a, a)
return isinstance(a, AttrSpec) and (a.standout or a.underline)
encoding = util.get_encoding()
logger = self.logger.getChild("draw_screen")
(maxcol, maxrow) = size
if not self._started:
raise RuntimeError
if maxrow != canvas.rows():
raise ValueError(maxrow)
# quick return if nothing has changed
if self.screen_buf and canvas is self._screen_buf_canvas:
return
self._setup_G1()
if self._resized:
# handle resize before trying to draw screen
logger.debug("Not drawing screen: screen resized and resize was not handled")
return
logger.debug(f"Drawing screen with size {size!r}")
last_attributes = None # Default = empty
output: list[str] = [escape.HIDE_CURSOR, attr_to_escape(last_attributes)]
def partial_display() -> bool:
# returns True if the screen is in partial display mode ie. only some rows belong to the display
return self._rows_used is not None
if not partial_display():
output.append(escape.CURSOR_HOME)
if self.screen_buf:
osb = self.screen_buf
else:
osb = []
sb: list[list[tuple[object, Literal["0", "U"] | None, bytes]]] = []
cy = self._cy
y = -1
ins = None
output.append(set_cursor_home())
cy = 0
first = True
last_charset_flag = None
for row in canvas.content():
y += 1
if osb and y < len(osb) and osb[y] == row:
# this row of the screen buffer matches what is
# currently displayed, so we can skip this line
sb.append(osb[y])
continue
sb.append(row)
# leave blank lines off display when we are using
# the default screen buffer (allows partial screen)
if partial_display() and y > self._rows_used:
if is_blank_row(row):
continue
self._rows_used = y
if y or partial_display():
output.append(set_cursor_position(0, y))
# after updating the line we will be just over the
# edge, but terminals still treat this as being
# on the same line
cy = y
whitespace_at_end = False
if row:
a, cs, run = row[-1]
if run[-1:] == b" " and self.back_color_erase and not using_standout_or_underline(a):
whitespace_at_end = True
row = row[:-1] + [(a, cs, run.rstrip(b" "))] # noqa: PLW2901
elif y == maxrow - 1 and maxcol > 1:
row, back, ins = self._last_row(row) # noqa: PLW2901
for a, cs, run in row:
if not isinstance(run, bytes): # canvases render with bytes
raise TypeError(run)
if cs != "U":
run = run.translate(UNPRINTABLE_TRANS_TABLE) # noqa: PLW2901
if last_attributes != a:
output.append(attr_to_escape(a))
last_attributes = a
if encoding != "utf-8" and (first or last_charset_flag != cs):
if cs not in {None, "0", "U"}:
raise ValueError(cs)
if last_charset_flag == "U":
output.append(escape.IBMPC_OFF)
if cs is None:
output.append(escape.SI)
elif cs == "U":
output.append(escape.IBMPC_ON)
else:
output.append(escape.SO)
last_charset_flag = cs
output.append(run.decode(encoding, "replace"))
first = False
if ins:
(inserta, insertcs, inserttext) = ins
ias = attr_to_escape(inserta)
if insertcs not in {None, "0", "U"}:
raise ValueError(insertcs)
if isinstance(inserttext, bytes):
inserttext = inserttext.decode(encoding)
output.extend(("\x08" * back, ias)) # pylint: disable=used-before-assignment # defined in `if row`
if encoding != "utf-8":
if cs is None:
icss = escape.SI
elif cs == "U":
icss = escape.IBMPC_ON
else:
icss = escape.SO
output.append(icss)
if not IS_WINDOWS:
output += [escape.INSERT_ON, inserttext, escape.INSERT_OFF]
else:
output += [f"{escape.ESC}[{str_util.calc_width(inserttext, 0, len(inserttext))}@", inserttext]
if encoding != "utf-8" and cs == "U":
output.append(escape.IBMPC_OFF)
if whitespace_at_end:
output.append(escape.ERASE_IN_LINE_RIGHT)
if canvas.cursor is not None:
x, y = canvas.cursor
output += [set_cursor_position(x, y), escape.SHOW_CURSOR]
self._cy = y
if self._resized:
# handle resize before trying to draw screen
return
try:
for line in output:
if isinstance(line, bytes):
line = line.decode(encoding, "replace") # noqa: PLW2901
self.write(line)
self.flush()
except OSError as e:
# ignore interrupted syscall
if e.args[0] != 4:
raise
self.screen_buf = sb
self._screen_buf_canvas = canvas
def _last_row(self, row: list[tuple[object, Literal["0", "U"] | None, bytes]]) -> tuple[
list[tuple[object, Literal["0", "U"] | None, bytes]],
int,
tuple[object, Literal["0", "U"] | None, bytes],
]:
"""On the last row we need to slide the bottom right character
into place. Calculate the new line, attr and an insert sequence
to do that.
eg. last row:
XXXXXXXXXXXXXXXXXXXXYZ
Y will be drawn after Z, shifting Z into position.
"""
new_row = row[:-1]
z_attr, z_cs, last_text = row[-1]
last_cols = str_util.calc_width(last_text, 0, len(last_text))
last_offs, z_col = str_util.calc_text_pos(last_text, 0, len(last_text), last_cols - 1)
if last_offs == 0:
z_text = last_text
del new_row[-1]
# we need another segment
y_attr, y_cs, nlast_text = row[-2]
nlast_cols = str_util.calc_width(nlast_text, 0, len(nlast_text))
z_col += nlast_cols
nlast_offs, y_col = str_util.calc_text_pos(nlast_text, 0, len(nlast_text), nlast_cols - 1)
y_text = nlast_text[nlast_offs:]
if nlast_offs:
new_row.append((y_attr, y_cs, nlast_text[:nlast_offs]))
else:
z_text = last_text[last_offs:]
y_attr, y_cs = z_attr, z_cs
nlast_cols = str_util.calc_width(last_text, 0, last_offs)
nlast_offs, y_col = str_util.calc_text_pos(last_text, 0, last_offs, nlast_cols - 1)
y_text = last_text[nlast_offs:last_offs]
if nlast_offs:
new_row.append((y_attr, y_cs, last_text[:nlast_offs]))
new_row.append((z_attr, z_cs, z_text))
return new_row, z_col - y_col, (y_attr, y_cs, y_text)
def clear(self) -> None:
"""
Force the screen to be completely repainted on the next
call to draw_screen().
"""
self.screen_buf = None
def _attrspec_to_escape(self, a: AttrSpec) -> str:
"""
Convert AttrSpec instance a to an escape sequence for the terminal
>>> s = Screen()
>>> s.set_terminal_properties(colors=256)
>>> a2e = s._attrspec_to_escape
>>> a2e(s.AttrSpec('brown', 'dark green'))
'\\x1b[0;33;42m'
>>> a2e(s.AttrSpec('#fea,underline', '#d0d'))
'\\x1b[0;38;5;229;4;48;5;164m'
"""
if self.term == "fbterm":
fg = escape.ESC + f"[1;{a.foreground_number:d}}}"
bg = escape.ESC + f"[2;{a.background_number:d}}}"
return fg + bg
if a.foreground_true:
fg = f"38;2;{';'.join(str(part) for part in a.get_rgb_values()[0:3])}"
elif a.foreground_high:
fg = f"38;5;{a.foreground_number:d}"
elif a.foreground_basic:
if a.foreground_number > 7:
if self.fg_bright_is_bold:
fg = f"1;{a.foreground_number - 8 + 30:d}"
else:
fg = f"{a.foreground_number - 8 + 90:d}"
else:
fg = f"{a.foreground_number + 30:d}"
else:
fg = "39"
st = (
"1;" * a.bold
+ "3;" * a.italics
+ "4;" * a.underline
+ "5;" * a.blink
+ "7;" * a.standout
+ "9;" * a.strikethrough
)
if a.background_true:
bg = f"48;2;{';'.join(str(part) for part in a.get_rgb_values()[3:6])}"
elif a.background_high:
bg = f"48;5;{a.background_number:d}"
elif a.background_basic:
if a.background_number > 7:
if self.bg_bright_is_blink:
bg = f"5;{a.background_number - 8 + 40:d}"
else:
# this doesn't work on most terminals
bg = f"{a.background_number - 8 + 100:d}"
else:
bg = f"{a.background_number + 40:d}"
else:
bg = "49"
return f"{escape.ESC}[0;{fg};{st}{bg}m"
def set_terminal_properties(
self,
colors: Literal[1, 16, 88, 256, 16777216] | None = None,
bright_is_bold: bool | None = None,
has_underline: bool | None = None,
) -> None:
"""
colors -- number of colors terminal supports (1, 16, 88, 256, or 2**24)
or None to leave unchanged
bright_is_bold -- set to True if this terminal uses the bold
setting to create bright colors (numbers 8-15), set to False
if this Terminal can create bright colors without bold or
None to leave unchanged
has_underline -- set to True if this terminal can use the
underline setting, False if it cannot or None to leave
unchanged
"""
if colors is None:
colors = self.colors
if bright_is_bold is None:
bright_is_bold = self.fg_bright_is_bold
if has_underline is None:
has_underline = self.has_underline
if colors == self.colors and bright_is_bold == self.fg_bright_is_bold and has_underline == self.has_underline:
return
self.colors = colors
self.fg_bright_is_bold = bright_is_bold
self.has_underline = has_underline
self.clear()
self._pal_escape = {}
for p, v in self._palette.items():
self._on_update_palette_entry(p, *v)
def reset_default_terminal_palette(self) -> None:
"""
Attempt to set the terminal palette to default values as taken
from xterm. Uses number of colors from current
set_terminal_properties() screen setting.
"""
if self.colors == 1:
return
if self.colors == 2**24:
colors = 256
else:
colors = self.colors
def rgb_values(n) -> tuple[int | None, int | None, int | None]:
if colors == 16:
aspec = AttrSpec(f"h{n:d}", "", 256)
else:
aspec = AttrSpec(f"h{n:d}", "", colors)
return aspec.get_rgb_values()[:3]
entries = [(n, *rgb_values(n)) for n in range(min(colors, 256))]
self.modify_terminal_palette(entries)
def modify_terminal_palette(self, entries: list[tuple[int, int | None, int | None, int | None]]):
"""
entries - list of (index, red, green, blue) tuples.
Attempt to set part of the terminal palette (this does not work
on all terminals.) The changes are sent as a single escape
sequence so they should all take effect at the same time.
0 <= index < 256 (some terminals will only have 16 or 88 colors)
0 <= red, green, blue < 256
"""
if self.term == "fbterm":
modify = [f"{index:d};{red:d};{green:d};{blue:d}" for index, red, green, blue in entries]
self.write(f"\x1b[3;{';'.join(modify)}}}")
else:
modify = [f"{index:d};rgb:{red:02x}/{green:02x}/{blue:02x}" for index, red, green, blue in entries]
self.write(f"\x1b]4;{';'.join(modify)}\x1b\\")
self.flush()
# shortcut for creating an AttrSpec with this screen object's
# number of colors
def AttrSpec(self, fg, bg) -> AttrSpec:
return AttrSpec(fg, bg, self.colors)

View File

@@ -0,0 +1,12 @@
body { margin: 8px 8px 8px 8px; border: 0;
color: black; background-color: silver;
font-family: fixed; overflow: hidden; }
form { margin: 0 0 8px 0; }
#text { position: relative;
background-color: silver;
width: 100%; height: 100%;
margin: 3px 0 0 0; border: 1px solid #999; }
#page { position: relative; width: 100%;height: 100%;}

View File

@@ -0,0 +1,462 @@
// Urwid web (CGI/Asynchronous Javascript) display module
// Copyright (C) 2004-2005 Ian Ward
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
// Urwid web site: https://urwid.org/
colours = new Object();
colours = {
'0': "black",
'1': "#c00000",
'2': "green",
'3': "#804000",
'4': "#0000c0",
'5': "#c000c0",
'6': "teal",
'7': "silver",
'8': "gray",
'9': "#ff6060",
'A': "lime",
'B': "yellow",
'C': "#8080ff",
'D': "#ff40ff",
'E': "aqua",
'F': "white"
};
keycodes = new Object();
keycodes = {
8: "backspace", 9: "tab", 13: "enter", 27: "esc",
33: "page up", 34: "page down", 35: "end", 36: "home",
37: "left", 38: "up", 39: "right", 40: "down",
45: "insert", 46: "delete",
112: "f1", 113: "f2", 114: "f3", 115: "f4",
116: "f5", 117: "f6", 118: "f7", 119: "f8",
120: "f9", 121: "f10", 122: "f11", 123: "f12"
};
var conn = null;
var char_width = null;
var char_height = null;
var screen_x = null;
var screen_y = null;
var urwid_id = null;
var send_conn = null;
var send_queue_max = 32;
var send_queue = new Array(send_queue_max);
var send_queue_in = 0;
var send_queue_out = 0;
var check_font_delay = 1000;
var send_more_delay = 100;
var poll_again_delay = 500;
var document_location = null;
var update_method = "multipart";
var sending = false;
var lastkeydown = null;
function setup_connection() {
if (window.XMLHttpRequest) {
conn = new XMLHttpRequest();
} else if (window.ActiveXObject) {
conn = new ActiveXObject("Microsoft.XMLHTTP");
}
if (conn == null) {
set_status("Connection Failed");
alert( "Can't figure out how to send request." );
return;
}
try{
conn.multipart = true;
}catch(e){
update_method = "polling";
}
conn.onreadystatechange = handle_recv;
conn.open("POST", document_location, true);
conn.setRequestHeader("X-Urwid-Method",update_method);
conn.setRequestHeader("Content-type","text/plain");
conn.send("window resize " +screen_x+" "+screen_y+"\n");
}
function do_poll() {
if (urwid_id == null){
alert("that's unpossible!");
return;
}
if (window.XMLHttpRequest) {
conn = new XMLHttpRequest();
} else if (window.ActiveXObject) {
conn = new ActiveXObject("Microsoft.XMLHTTP");
}
conn.onreadystatechange = handle_recv;
conn.open("POST", document_location, true);
conn.setRequestHeader("X-Urwid-Method","polling");
conn.setRequestHeader("X-Urwid-ID",urwid_id);
conn.setRequestHeader("Content-type","text/plain");
conn.send("eh?");
}
function handle_recv() {
if( ! conn ){ return;}
if( conn.readyState != 4) {
return;
}
if( conn.status == 404 && urwid_id != null) {
set_status("Connection Closed");
return;
}
if( conn.status == 403 && update_method == "polling" ) {
set_status("Server Refused Connection");
alert("This server does not allow polling clients.\n\n" +
"Please use a web browser with multipart support " +
"such as Mozilla Firefox");
return;
}
if( conn.status == 503 ) {
set_status("Connection Failed");
alert("The server has reached its maximum number of "+
"connections.\n\nPlease try again later.");
return;
}
if( conn.status != 200) {
set_status("Connection Failed");
alert("Error from server: "+conn.statusText);
return;
}
if( urwid_id == null ){
urwid_id = conn.getResponseHeader("X-Urwid-ID");
if( send_queue_in != send_queue_out ){
// keys waiting
do_send();
}
if(update_method=="polling"){
set_status("Polling");
}else if(update_method=="multipart"){
set_status("Connected");
}
}
if( conn.responseText == "" ){
if(update_method=="polling"){
poll_again();
}
return; // keepalive
}
if( conn.responseText == "Z" ){
set_status("Connection Closed");
update_method = null;
return;
}
var text = document.getElementById('text');
var last_screen = Array(text.childNodes.length);
for( var i=0; i<text.childNodes.length; i++ ){
last_screen[i] = text.childNodes[i];
}
var frags = conn.responseText.split("\n");
var ln = document.createElement('span');
var k = 0;
for( var i=0; i<frags.length; i++ ){
var f = frags[i];
if( f == "" ){
var br = document.getElementById('br').cloneNode(true);
ln.appendChild( br );
if( text.childNodes.length > k ){
text.replaceChild(ln, text.childNodes[k]);
}else{
text.appendChild(ln);
}
k = k+1;
ln = document.createElement('span');
}else if( f.charAt(0) == "<" ){
line_number = parseInt(f.substr(1));
if( line_number == k ){
k = k +1;
continue;
}
var clone = last_screen[line_number].cloneNode(true);
if( text.childNodes.length > k ){
text.replaceChild(clone, text.childNodes[k]);
}else{
text.appendChild(clone);
}
k = k+1;
}else{
var span=make_span(f.substr(2),f.charAt(0),f.charAt(1));
ln.appendChild( span );
}
}
for( var i=k; i < text.childNodes.length; i++ ){
text.removeChild(last_screen[i]);
}
if(update_method=="polling"){
poll_again();
}
}
function poll_again(){
if(conn.status == 200){
setTimeout("do_poll();",poll_again_delay);
}
}
function load_web_display(){
if( document.documentURI ){
document_location = document.documentURI;
}else{
document_location = document.location;
}
document.onkeypress = body_keypress;
document.onkeydown = body_keydown;
document.onresize = body_resize;
body_resize();
send_queue_out = send_queue_in; // don't queue the first resize
set_status("Connecting");
setup_connection();
setTimeout("check_fontsize();",check_font_delay);
}
function set_status( status ){
var s = document.getElementById('status');
var t = document.createTextNode(status);
s.replaceChild(t, s.firstChild);
}
function make_span(s, fg, bg){
d = document.createElement('span');
d.style.backgroundColor = colours[bg];
d.style.color = colours[fg];
d.appendChild(document.createTextNode(s));
return d;
}
function body_keydown(e){
if (conn == null){
return;
}
if (!e) var e = window.event;
if (e.keyCode) code = e.keyCode;
else if (e.which) code = e.which;
var mod = "";
var key;
if( e.ctrlKey ){ mod = "ctrl " + mod; }
if( e.altKey || e.metaKey ){ mod = "meta " + mod; }
if( e.shiftKey && e.charCode == 0 ){ mod = "shift " + mod; }
key = keycodes[code];
if( key != undefined ){
lastkeydown = key;
send_key( mod + key );
stop_key_event(e);
return false;
}
}
function body_keypress(e){
if (conn == null){
return;
}
if (!e) var e = window.event;
if (e.keyCode) code = e.keyCode;
else if (e.which) code = e.which;
var mod = "";
var key;
if( e.ctrlKey ){ mod = "ctrl " + mod; }
if( e.altKey || e.metaKey ){ mod = "meta " + mod; }
if( e.shiftKey && e.charCode == 0 ){ mod = "shift " + mod; }
if( e.charCode != null && e.charCode != 0 ){
key = String.fromCharCode(e.charCode);
}else if( e.charCode == null ){
key = String.fromCharCode(code);
}else{
key = keycodes[code];
if( key == undefined || lastkeydown == key ){
lastkeydown = null;
stop_key_event(e);
return false;
}
}
send_key( mod + key );
stop_key_event(e);
return false;
}
function stop_key_event(e){
e.cancelBubble = true;
if( e.stopPropagation ){
e.stopPropagation();
}
if( e.preventDefault ){
e.preventDefault();
}
}
function send_key( key ){
if( (send_queue_in+1)%send_queue_max == send_queue_out ){
// buffer overrun
return;
}
send_queue[send_queue_in] = key;
send_queue_in = (send_queue_in+1)%send_queue_max;
if( urwid_id != null ){
if (send_conn == undefined || send_conn.ready_state != 4 ){
send_more();
return;
}
do_send();
}
}
function do_send() {
if( ! urwid_id ){ return; }
if( ! update_method ){ return; } // connection closed
if( send_queue_in == send_queue_out ){ return; }
if( sending ){
//var queue_delta = send_queue_in - send_queue_out;
//if( queue_delta < 0 ){ queue_delta += send_queue_max; }
//set_status("Sending (queued "+queue_delta+")");
return;
}
try{
sending = true;
//set_status("starting send");
if( send_conn == null ){
if (window.XMLHttpRequest) {
send_conn = new XMLHttpRequest();
} else if (window.ActiveXObject) {
send_conn = new ActiveXObject("Microsoft.XMLHTTP");
}
}else if( send_conn.status != 200) {
alert("Error from server: "+send_conn.statusText);
return;
}else if(send_conn.readyState != 4 ){
alert("not ready on send connection");
return;
}
} catch(e) {
alert(e);
sending = false;
return;
}
send_conn.open("POST", document_location, true);
send_conn.onreadystatechange = send_handle_recv;
send_conn.setRequestHeader("Content-type","text/plain");
send_conn.setRequestHeader("X-Urwid-ID",urwid_id);
var tmp_send_queue_in = send_queue_in;
var out = null;
if( send_queue_out > tmp_send_queue_in ){
out = send_queue.slice(send_queue_out).join("\n")
if( tmp_send_queue_in > 0 ){
out += "\n" + send_queue.slice(0,tmp_send_queue_in).join("\n");
}
}else{
out = send_queue.slice(send_queue_out,
tmp_send_queue_in).join("\n");
}
send_queue_out = tmp_send_queue_in;
//set_status("Sending");
send_conn.send( out +"\n" );
}
function send_handle_recv() {
if( send_conn.readyState != 4) {
return;
}
if( send_conn.status == 404) {
set_status("Connection Closed");
update_method = null;
return;
}
if( send_conn.status != 200) {
alert("Error from server: "+send_conn.statusText);
return;
}
sending = false;
if( send_queue_out != send_queue_in ){
send_more();
}
}
function send_more(){
setTimeout("do_send();",send_more_delay);
}
function check_fontsize(){
body_resize()
setTimeout("check_fontsize();",check_font_delay);
}
function body_resize(){
var t = document.getElementById('testchar');
var t2 = document.getElementById('testchar2');
var text = document.getElementById('text');
var window_width;
var window_height;
if (window.innerHeight) {
window_width = window.innerWidth;
window_height = window.innerHeight;
}else{
window_width = document.documentElement.clientWidth;
window_height = document.documentElement.clientHeight;
//var z = "CI:"; for(var i in bod){z = z + " " + i;} alert(z);
}
char_width = t.offsetLeft / 44;
var avail_width = window_width-18;
var avail_width_mod = avail_width % char_width;
var x_size = (avail_width - avail_width_mod)/char_width;
char_height = t2.offsetTop - t.offsetTop;
var avail_height = window_height-text.offsetTop-10;
var avail_height_mod = avail_height % char_height;
var y_size = (avail_height - avail_height_mod)/char_height;
text.style.width = x_size*char_width+"px";
text.style.height = y_size*char_height+"px";
if( screen_x != x_size || screen_y != y_size ){
send_key("window resize "+x_size+" "+y_size);
}
screen_x = x_size;
screen_y = y_size;
}

View File

@@ -0,0 +1,171 @@
from __future__ import annotations
import enum
import typing
from ctypes import POINTER, Structure, Union, windll
from ctypes.wintypes import BOOL, CHAR, DWORD, HANDLE, LPDWORD, SHORT, UINT, WCHAR, WORD
# https://docs.microsoft.com/de-de/windows/console/getstdhandle
STD_INPUT_HANDLE = -10
STD_OUTPUT_HANDLE = -11
# https://docs.microsoft.com/de-de/windows/console/setconsolemode
ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
DISABLE_NEWLINE_AUTO_RETURN = 0x0008
ENABLE_VIRTUAL_TERMINAL_INPUT = 0x0200
ENABLE_WINDOW_INPUT = 0x0008
class COORD(Structure):
"""https://docs.microsoft.com/en-us/windows/console/coord-str"""
_fields_: typing.ClassVar[list[tuple[str, type]]] = [
("X", SHORT),
("Y", SHORT),
]
class SMALL_RECT(Structure):
"""https://docs.microsoft.com/en-us/windows/console/small-rect-str"""
_fields_: typing.ClassVar[list[tuple[str, type]]] = [
("Left", SHORT),
("Top", SHORT),
("Right", SHORT),
("Bottom", SHORT),
]
class CONSOLE_SCREEN_BUFFER_INFO(Structure):
"""https://docs.microsoft.com/en-us/windows/console/console-screen-buffer-info-str"""
_fields_: typing.ClassVar[list[tuple[str, type]]] = [
("dwSize", COORD),
("dwCursorPosition", COORD),
("wAttributes", WORD),
("srWindow", SMALL_RECT),
("dwMaximumWindowSize", COORD),
]
class uChar(Union):
"""https://docs.microsoft.com/en-us/windows/console/key-event-record-str"""
_fields_: typing.ClassVar[list[tuple[str, type]]] = [
("AsciiChar", CHAR),
("UnicodeChar", WCHAR),
]
class KEY_EVENT_RECORD(Structure):
"""https://docs.microsoft.com/en-us/windows/console/key-event-record-str"""
_fields_: typing.ClassVar[list[tuple[str, type]]] = [
("bKeyDown", BOOL),
("wRepeatCount", WORD),
("wVirtualKeyCode", WORD),
("wVirtualScanCode", WORD),
("uChar", uChar),
("dwControlKeyState", DWORD),
]
class MOUSE_EVENT_RECORD(Structure):
"""https://docs.microsoft.com/en-us/windows/console/mouse-event-record-str"""
_fields_: typing.ClassVar[list[tuple[str, type]]] = [
("dwMousePosition", COORD),
("dwButtonState", DWORD),
("dwControlKeyState", DWORD),
("dwEventFlags", DWORD),
]
class MouseButtonState(enum.IntFlag):
"""https://learn.microsoft.com/en-us/windows/console/mouse-event-record-str"""
FROM_LEFT_1ST_BUTTON_PRESSED = 0x0001
RIGHTMOST_BUTTON_PRESSED = 0x0002
FROM_LEFT_2ND_BUTTON_PRESSED = 0x0004
FROM_LEFT_3RD_BUTTON_PRESSED = 0x0008
FROM_LEFT_4TH_BUTTON_PRESSED = 0x0010
class MouseEventFlags(enum.IntFlag):
"""https://learn.microsoft.com/en-us/windows/console/mouse-event-record-str"""
BUTTON_PRESSED = 0x0000 # Default action, used in examples, but not in official enum
MOUSE_MOVED = 0x0001
DOUBLE_CLICK = 0x0002
MOUSE_WHEELED = 0x0004
MOUSE_HWHEELED = 0x0008
class WINDOW_BUFFER_SIZE_RECORD(Structure):
"""https://docs.microsoft.com/en-us/windows/console/window-buffer-size-record-str"""
_fields_: typing.ClassVar[list[tuple[str, type]]] = [("dwSize", COORD)]
class MENU_EVENT_RECORD(Structure):
"""https://docs.microsoft.com/en-us/windows/console/menu-event-record-str"""
_fields_: typing.ClassVar[list[tuple[str, type]]] = [("dwCommandId", UINT)]
class FOCUS_EVENT_RECORD(Structure):
"""https://docs.microsoft.com/en-us/windows/console/focus-event-record-str"""
_fields_: typing.ClassVar[list[tuple[str, type]]] = [("bSetFocus", BOOL)]
class Event(Union):
"""https://docs.microsoft.com/en-us/windows/console/input-record-str"""
_fields_: typing.ClassVar[list[tuple[str, type]]] = [
("KeyEvent", KEY_EVENT_RECORD),
("MouseEvent", MOUSE_EVENT_RECORD),
("WindowBufferSizeEvent", WINDOW_BUFFER_SIZE_RECORD),
("MenuEvent", MENU_EVENT_RECORD),
("FocusEvent", FOCUS_EVENT_RECORD),
]
class INPUT_RECORD(Structure):
"""https://docs.microsoft.com/en-us/windows/console/input-record-str"""
_fields_: typing.ClassVar[list[tuple[str, type]]] = [("EventType", WORD), ("Event", Event)]
class EventType(enum.IntFlag):
KEY_EVENT = 0x0001
MOUSE_EVENT = 0x0002
WINDOW_BUFFER_SIZE_EVENT = 0x0004
MENU_EVENT = 0x0008
FOCUS_EVENT = 0x0010
# https://docs.microsoft.com/de-de/windows/console/getstdhandle
GetStdHandle = windll.kernel32.GetStdHandle
GetStdHandle.argtypes = [DWORD]
GetStdHandle.restype = HANDLE
# https://docs.microsoft.com/de-de/windows/console/getconsolemode
GetConsoleMode = windll.kernel32.GetConsoleMode
GetConsoleMode.argtypes = [HANDLE, LPDWORD]
GetConsoleMode.restype = BOOL
# https://docs.microsoft.com/de-de/windows/console/setconsolemode
SetConsoleMode = windll.kernel32.SetConsoleMode
SetConsoleMode.argtypes = [HANDLE, DWORD]
SetConsoleMode.restype = BOOL
# https://docs.microsoft.com/de-de/windows/console/readconsoleinput
ReadConsoleInputW = windll.kernel32.ReadConsoleInputW
# ReadConsoleInputW.argtypes = [HANDLE, POINTER(INPUT_RECORD), DWORD, LPDWORD]
ReadConsoleInputW.restype = BOOL
# https://docs.microsoft.com/en-us/windows/console/getconsolescreenbufferinfo
GetConsoleScreenBufferInfo = windll.kernel32.GetConsoleScreenBufferInfo
GetConsoleScreenBufferInfo.argtypes = [HANDLE, POINTER(CONSOLE_SCREEN_BUFFER_INFO)]
GetConsoleScreenBufferInfo.restype = BOOL

View File

@@ -0,0 +1,269 @@
# Urwid raw display module
# Copyright (C) 2004-2009 Ian Ward
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Urwid web site: https://urwid.org/
"""
Direct terminal UI implementation
"""
from __future__ import annotations
import contextlib
import functools
import logging
import selectors
import socket
import sys
import threading
import typing
from ctypes import byref
from ctypes.wintypes import DWORD
from urwid import signals
from . import _raw_display_base, _win32, escape
from .common import INPUT_DESCRIPTORS_CHANGED
if typing.TYPE_CHECKING:
from collections.abc import Callable
from urwid.event_loop import EventLoop
class Screen(_raw_display_base.Screen):
_term_input_file: socket.socket
def __init__(
self,
input: socket.socket | None = None, # noqa: A002 # pylint: disable=redefined-builtin
output: typing.TextIO = sys.stdout,
) -> None:
"""Initialize a screen that directly prints escape codes to an output
terminal.
"""
if input is None:
input, self._send_input = socket.socketpair() # noqa: A001
super().__init__(input, output)
_dwOriginalOutMode = None
_dwOriginalInMode = None
def _start(self, alternate_buffer: bool = True) -> None:
"""
Initialize the screen and input mode.
alternate_buffer -- use alternate screen buffer
"""
if alternate_buffer:
self.write(escape.SWITCH_TO_ALTERNATE_BUFFER)
self._rows_used = None
else:
self._rows_used = 0
handle_out = _win32.GetStdHandle(_win32.STD_OUTPUT_HANDLE)
handle_in = _win32.GetStdHandle(_win32.STD_INPUT_HANDLE)
self._dwOriginalOutMode = DWORD()
self._dwOriginalInMode = DWORD()
_win32.GetConsoleMode(handle_out, byref(self._dwOriginalOutMode))
_win32.GetConsoleMode(handle_in, byref(self._dwOriginalInMode))
# TODO: Restore on exit
dword_out_mode = DWORD(
self._dwOriginalOutMode.value
| _win32.ENABLE_VIRTUAL_TERMINAL_PROCESSING
| _win32.DISABLE_NEWLINE_AUTO_RETURN
)
dword_in_mode = DWORD(
self._dwOriginalInMode.value | _win32.ENABLE_WINDOW_INPUT | _win32.ENABLE_VIRTUAL_TERMINAL_INPUT
)
ok = _win32.SetConsoleMode(handle_out, dword_out_mode)
if not ok:
raise RuntimeError(f"ConsoleMode set failed for output. Err: {ok!r}")
ok = _win32.SetConsoleMode(handle_in, dword_in_mode)
if not ok:
raise RuntimeError(f"ConsoleMode set failed for input. Err: {ok!r}")
self._alternate_buffer = alternate_buffer
self._next_timeout = self.max_wait
signals.emit_signal(self, INPUT_DESCRIPTORS_CHANGED)
# restore mouse tracking to previous state
self._mouse_tracking(self._mouse_tracking_enabled)
return super()._start()
def _stop(self) -> None:
"""
Restore the screen.
"""
self.clear()
signals.emit_signal(self, INPUT_DESCRIPTORS_CHANGED)
self._stop_mouse_restore_buffer()
handle_out = _win32.GetStdHandle(_win32.STD_OUTPUT_HANDLE)
handle_in = _win32.GetStdHandle(_win32.STD_INPUT_HANDLE)
ok = _win32.SetConsoleMode(handle_out, self._dwOriginalOutMode)
if not ok:
raise RuntimeError(f"ConsoleMode set failed for output. Err: {ok!r}")
ok = _win32.SetConsoleMode(handle_in, self._dwOriginalInMode)
if not ok:
raise RuntimeError(f"ConsoleMode set failed for input. Err: {ok!r}")
super()._stop()
def unhook_event_loop(self, event_loop: EventLoop) -> None:
"""
Remove any hooks added by hook_event_loop.
"""
if self._input_thread is not None:
self._input_thread.should_exit = True
with contextlib.suppress(RuntimeError):
self._input_thread.join(5)
self._input_thread = None
for handle in self._current_event_loop_handles:
event_loop.remove_watch_file(handle)
if self._input_timeout:
event_loop.remove_alarm(self._input_timeout)
self._input_timeout = None
def hook_event_loop(
self,
event_loop: EventLoop,
callback: Callable[[list[str], list[int]], typing.Any],
) -> None:
"""
Register the given callback with the event loop, to be called with new
input whenever it's available. The callback should be passed a list of
processed keys and a list of unprocessed keycodes.
Subclasses may wish to use parse_input to wrap the callback.
"""
self._input_thread = ReadInputThread(self._send_input, lambda: self._sigwinch_handler(28))
self._input_thread.start()
if hasattr(self, "get_input_nonblocking"):
wrapper = self._make_legacy_input_wrapper(event_loop, callback)
else:
@functools.wraps(callback)
def wrapper() -> tuple[list[str], typing.Any] | None:
return self.parse_input(event_loop, callback, self.get_available_raw_input())
fds = self.get_input_descriptors()
handles = [event_loop.watch_file(fd if isinstance(fd, int) else fd.fileno(), wrapper) for fd in fds]
self._current_event_loop_handles = handles
_input_thread: ReadInputThread | None = None
def _read_raw_input(self, timeout: int) -> bytearray:
ready = self._wait_for_input_ready(timeout)
fd = self._input_fileno()
chars = bytearray()
if fd is None or fd not in ready:
return chars
with selectors.DefaultSelector() as selector:
selector.register(fd, selectors.EVENT_READ)
input_ready = selector.select(0)
while input_ready:
chars.extend(self._term_input_file.recv(1024))
input_ready = selector.select(0)
return chars
def get_cols_rows(self) -> tuple[int, int]:
"""Return the terminal dimensions (num columns, num rows)."""
y, x = super().get_cols_rows()
with contextlib.suppress(OSError): # Term size could not be determined
if hasattr(self._term_output_file, "fileno"):
if self._term_output_file != sys.stdout:
raise RuntimeError("Unexpected terminal output file")
handle = _win32.GetStdHandle(_win32.STD_OUTPUT_HANDLE)
info = _win32.CONSOLE_SCREEN_BUFFER_INFO()
ok = _win32.GetConsoleScreenBufferInfo(handle, byref(info))
if ok:
# Fallback will be used in case of term size could not be determined
y, x = info.dwSize.Y, info.dwSize.X
self.maxrow = y
return x, y
class ReadInputThread(threading.Thread):
name = "urwid Windows input reader"
daemon = True
should_exit: bool = False
def __init__(
self,
input_socket: socket.socket,
resize: Callable[[], typing.Any],
) -> None:
self._input = input_socket
self._resize = resize
self.logger = logging.getLogger(__name__).getChild(self.__class__.__name__)
super().__init__()
def run(self) -> None:
hIn = _win32.GetStdHandle(_win32.STD_INPUT_HANDLE)
MAX = 2048
read = DWORD(0)
arrtype = _win32.INPUT_RECORD * MAX
input_records = arrtype()
while True:
_win32.ReadConsoleInputW(hIn, byref(input_records), MAX, byref(read))
if self.should_exit:
return
for i in range(read.value):
inp = input_records[i]
if inp.EventType == _win32.EventType.KEY_EVENT:
if not inp.Event.KeyEvent.bKeyDown:
continue
input_data = inp.Event.KeyEvent.uChar.UnicodeChar
# On Windows atomic press/release of modifier keys produce phantom input with code NULL.
# This input cannot be decoded and should be handled as garbage.
input_bytes = input_data.encode("utf-8")
if input_bytes != b"\x00":
self._input.send(input_bytes)
elif inp.EventType == _win32.EventType.WINDOW_BUFFER_SIZE_EVENT:
self._resize()
else:
pass # TODO: handle mouse events
def _test():
import doctest
doctest.testmod()
if __name__ == "__main__":
_test()

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,690 @@
# Urwid curses output wrapper.. the horror..
# Copyright (C) 2004-2011 Ian Ward
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Urwid web site: https://urwid.org/
"""
Curses-based UI implementation
"""
from __future__ import annotations
import curses
import sys
import typing
from contextlib import suppress
from urwid import util
from . import escape
from .common import UNPRINTABLE_TRANS_TABLE, AttrSpec, BaseScreen, RealTerminal
if typing.TYPE_CHECKING:
from typing_extensions import Literal
from urwid import Canvas
IS_WINDOWS = sys.platform == "win32"
# curses.KEY_RESIZE (sometimes not defined)
if IS_WINDOWS:
KEY_MOUSE = 539 # under Windows key mouse is different
KEY_RESIZE = 546
COLOR_CORRECTION: dict[int, int] = dict(
enumerate(
(
curses.COLOR_BLACK,
curses.COLOR_RED,
curses.COLOR_GREEN,
curses.COLOR_YELLOW,
curses.COLOR_BLUE,
curses.COLOR_MAGENTA,
curses.COLOR_CYAN,
curses.COLOR_WHITE,
)
)
)
def initscr():
import curses # noqa: I001 # pylint: disable=redefined-outer-name,reimported # special case for monkeypatch
import _curses
stdscr = _curses.initscr()
for key, value in _curses.__dict__.items():
if key[:4] == "ACS_" or key in {"LINES", "COLS"}:
setattr(curses, key, value)
return stdscr
curses.initscr = initscr
else:
KEY_MOUSE = 409 # curses.KEY_MOUSE
KEY_RESIZE = 410
COLOR_CORRECTION = {}
_curses_colours = { # pylint: disable=consider-using-namedtuple-or-dataclass # historic test/debug data
"default": (-1, 0),
"black": (curses.COLOR_BLACK, 0),
"dark red": (curses.COLOR_RED, 0),
"dark green": (curses.COLOR_GREEN, 0),
"brown": (curses.COLOR_YELLOW, 0),
"dark blue": (curses.COLOR_BLUE, 0),
"dark magenta": (curses.COLOR_MAGENTA, 0),
"dark cyan": (curses.COLOR_CYAN, 0),
"light gray": (curses.COLOR_WHITE, 0),
"dark gray": (curses.COLOR_BLACK, 1),
"light red": (curses.COLOR_RED, 1),
"light green": (curses.COLOR_GREEN, 1),
"yellow": (curses.COLOR_YELLOW, 1),
"light blue": (curses.COLOR_BLUE, 1),
"light magenta": (curses.COLOR_MAGENTA, 1),
"light cyan": (curses.COLOR_CYAN, 1),
"white": (curses.COLOR_WHITE, 1),
}
class Screen(BaseScreen, RealTerminal):
def __init__(self) -> None:
super().__init__()
self.curses_pairs = [(None, None)] # Can't be sure what pair 0 will default to
self.palette = {}
self.has_color = False
self.s = None
self.cursor_state = None
self.prev_input_resize = 0
self.set_input_timeouts()
self.last_bstate = 0
self._mouse_tracking_enabled = False
self.register_palette_entry(None, "default", "default")
def set_mouse_tracking(self, enable: bool = True) -> None:
"""
Enable mouse tracking.
After calling this function get_input will include mouse
click events along with keystrokes.
"""
enable = bool(enable) # noqa: FURB123,RUF100
if enable == self._mouse_tracking_enabled:
return
if enable:
curses.mousemask(
0
| curses.BUTTON1_PRESSED
| curses.BUTTON1_RELEASED
| curses.BUTTON2_PRESSED
| curses.BUTTON2_RELEASED
| curses.BUTTON3_PRESSED
| curses.BUTTON3_RELEASED
| curses.BUTTON4_PRESSED
| curses.BUTTON4_RELEASED
| curses.BUTTON1_DOUBLE_CLICKED
| curses.BUTTON1_TRIPLE_CLICKED
| curses.BUTTON2_DOUBLE_CLICKED
| curses.BUTTON2_TRIPLE_CLICKED
| curses.BUTTON3_DOUBLE_CLICKED
| curses.BUTTON3_TRIPLE_CLICKED
| curses.BUTTON4_DOUBLE_CLICKED
| curses.BUTTON4_TRIPLE_CLICKED
| curses.BUTTON_SHIFT
| curses.BUTTON_ALT
| curses.BUTTON_CTRL
)
else:
raise NotImplementedError()
self._mouse_tracking_enabled = enable
def _start(self) -> None:
"""
Initialize the screen and input mode.
"""
self.s = curses.initscr()
self.has_color = curses.has_colors()
if self.has_color:
curses.start_color()
if curses.COLORS < 8:
# not colourful enough
self.has_color = False
if self.has_color:
try:
curses.use_default_colors()
self.has_default_colors = True
except curses.error:
self.has_default_colors = False
self._setup_colour_pairs()
curses.noecho()
curses.meta(True)
curses.halfdelay(10) # use set_input_timeouts to adjust
self.s.keypad(False)
if not self._signal_keys_set:
self._old_signal_keys = self.tty_signal_keys()
super()._start()
if IS_WINDOWS:
# halfdelay() seems unnecessary and causes everything to slow down a lot.
curses.nocbreak() # exits halfdelay mode
# keypad(1) is needed, or we get no special keys (cursor keys, etc.)
self.s.keypad(True)
def _stop(self) -> None:
"""
Restore the screen.
"""
curses.echo()
self._curs_set(1)
with suppress(curses.error):
curses.endwin()
# don't block original error with curses error
if self._old_signal_keys:
self.tty_signal_keys(*self._old_signal_keys)
super()._stop()
def _setup_colour_pairs(self) -> None:
"""
Initialize all 63 color pairs based on the term:
bg * 8 + 7 - fg
So to get a color, we just need to use that term and get the right color
pair number.
"""
if not self.has_color:
return
if IS_WINDOWS:
self.has_default_colors = False
for fg in range(8):
for bg in range(8):
# leave out white on black
if fg == curses.COLOR_WHITE and bg == curses.COLOR_BLACK:
continue
curses.init_pair(bg * 8 + 7 - fg, COLOR_CORRECTION.get(fg, fg), COLOR_CORRECTION.get(bg, bg))
def _curs_set(self, x: int):
if self.cursor_state in {"fixed", x}:
return
try:
curses.curs_set(x)
self.cursor_state = x
except curses.error:
self.cursor_state = "fixed"
def _clear(self) -> None:
self.s.clear()
self.s.refresh()
def _getch(self, wait_tenths: int | None) -> int:
if wait_tenths == 0:
return self._getch_nodelay()
if not IS_WINDOWS:
if wait_tenths is None:
curses.cbreak()
else:
curses.halfdelay(wait_tenths)
self.s.nodelay(False)
return self.s.getch()
def _getch_nodelay(self) -> int:
self.s.nodelay(True)
if not IS_WINDOWS:
while True:
# this call fails sometimes, but seems to work when I try again
with suppress(curses.error):
curses.cbreak()
break
return self.s.getch()
def set_input_timeouts(
self,
max_wait: float | None = None,
complete_wait: float = 0.1,
resize_wait: float = 0.1,
):
"""
Set the get_input timeout values. All values have a granularity
of 0.1s, ie. any value between 0.15 and 0.05 will be treated as
0.1 and any value less than 0.05 will be treated as 0. The
maximum timeout value for this module is 25.5 seconds.
max_wait -- amount of time in seconds to wait for input when
there is no input pending, wait forever if None
complete_wait -- amount of time in seconds to wait when
get_input detects an incomplete escape sequence at the
end of the available input
resize_wait -- amount of time in seconds to wait for more input
after receiving two screen resize requests in a row to
stop urwid from consuming 100% cpu during a gradual
window resize operation
"""
def convert_to_tenths(s):
if s is None:
return None
return int((s + 0.05) * 10)
self.max_tenths = convert_to_tenths(max_wait)
self.complete_tenths = convert_to_tenths(complete_wait)
self.resize_tenths = convert_to_tenths(resize_wait)
@typing.overload
def get_input(self, raw_keys: Literal[False]) -> list[str]: ...
@typing.overload
def get_input(self, raw_keys: Literal[True]) -> tuple[list[str], list[int]]: ...
def get_input(self, raw_keys: bool = False) -> list[str] | tuple[list[str], list[int]]:
"""Return pending input as a list.
raw_keys -- return raw keycodes as well as translated versions
This function will immediately return all the input since the
last time it was called. If there is no input pending it will
wait before returning an empty list. The wait time may be
configured with the set_input_timeouts function.
If raw_keys is False (default) this function will return a list
of keys pressed. If raw_keys is True this function will return
a ( keys pressed, raw keycodes ) tuple instead.
Examples of keys returned:
* ASCII printable characters: " ", "a", "0", "A", "-", "/"
* ASCII control characters: "tab", "enter"
* Escape sequences: "up", "page up", "home", "insert", "f1"
* Key combinations: "shift f1", "meta a", "ctrl b"
* Window events: "window resize"
When a narrow encoding is not enabled:
* "Extended ASCII" characters: "\\xa1", "\\xb2", "\\xfe"
When a wide encoding is enabled:
* Double-byte characters: "\\xa1\\xea", "\\xb2\\xd4"
When utf8 encoding is enabled:
* Unicode characters: u"\\u00a5", u'\\u253c"
Examples of mouse events returned:
* Mouse button press: ('mouse press', 1, 15, 13),
('meta mouse press', 2, 17, 23)
* Mouse button release: ('mouse release', 0, 18, 13),
('ctrl mouse release', 0, 17, 23)
"""
if not self._started:
raise RuntimeError
keys, raw = self._get_input(self.max_tenths)
# Avoid pegging CPU at 100% when slowly resizing, and work
# around a bug with some braindead curses implementations that
# return "no key" between "window resize" commands
if keys == ["window resize"] and self.prev_input_resize:
for _ in range(2):
new_keys, new_raw = self._get_input(self.resize_tenths)
raw += new_raw
if new_keys and new_keys != ["window resize"]:
if "window resize" in new_keys:
keys = new_keys
else:
keys.extend(new_keys)
break
if keys == ["window resize"]:
self.prev_input_resize = 2
elif self.prev_input_resize == 2 and not keys:
self.prev_input_resize = 1
else:
self.prev_input_resize = 0
if raw_keys:
return keys, raw
return keys
def _get_input(self, wait_tenths: int | None) -> tuple[list[str], list[int]]:
# this works around a strange curses bug with window resizing
# not being reported correctly with repeated calls to this
# function without a doupdate call in between
curses.doupdate()
key = self._getch(wait_tenths)
resize = False
raw = []
keys = []
while key >= 0:
raw.append(key)
if key == KEY_RESIZE:
resize = True
elif key == KEY_MOUSE:
keys += self._encode_mouse_event()
else:
keys.append(key)
key = self._getch_nodelay()
processed = []
try:
while keys:
run, keys = escape.process_keyqueue(keys, True)
processed += run
except escape.MoreInputRequired:
key = self._getch(self.complete_tenths)
while key >= 0:
raw.append(key)
if key == KEY_RESIZE:
resize = True
elif key == KEY_MOUSE:
keys += self._encode_mouse_event()
else:
keys.append(key)
key = self._getch_nodelay()
while keys:
run, keys = escape.process_keyqueue(keys, False)
processed += run
if resize:
processed.append("window resize")
return processed, raw
def _encode_mouse_event(self) -> list[int]:
# convert to escape sequence
last_state = next_state = self.last_bstate
(_id, x, y, _z, bstate) = curses.getmouse()
mod = 0
if bstate & curses.BUTTON_SHIFT:
mod |= 4
if bstate & curses.BUTTON_ALT:
mod |= 8
if bstate & curses.BUTTON_CTRL:
mod |= 16
result = []
def append_button(b: int) -> None:
b |= mod
result.extend([27, ord("["), ord("M"), b + 32, x + 33, y + 33])
if bstate & curses.BUTTON1_PRESSED and last_state & 1 == 0:
append_button(0)
next_state |= 1
if bstate & curses.BUTTON2_PRESSED and last_state & 2 == 0:
append_button(1)
next_state |= 2
if bstate & curses.BUTTON3_PRESSED and last_state & 4 == 0:
append_button(2)
next_state |= 4
if bstate & curses.BUTTON4_PRESSED and last_state & 8 == 0:
append_button(64)
next_state |= 8
if bstate & curses.BUTTON1_RELEASED and last_state & 1:
append_button(0 + escape.MOUSE_RELEASE_FLAG)
next_state &= ~1
if bstate & curses.BUTTON2_RELEASED and last_state & 2:
append_button(1 + escape.MOUSE_RELEASE_FLAG)
next_state &= ~2
if bstate & curses.BUTTON3_RELEASED and last_state & 4:
append_button(2 + escape.MOUSE_RELEASE_FLAG)
next_state &= ~4
if bstate & curses.BUTTON4_RELEASED and last_state & 8:
append_button(64 + escape.MOUSE_RELEASE_FLAG)
next_state &= ~8
if bstate & curses.BUTTON1_DOUBLE_CLICKED:
append_button(0 + escape.MOUSE_MULTIPLE_CLICK_FLAG)
if bstate & curses.BUTTON2_DOUBLE_CLICKED:
append_button(1 + escape.MOUSE_MULTIPLE_CLICK_FLAG)
if bstate & curses.BUTTON3_DOUBLE_CLICKED:
append_button(2 + escape.MOUSE_MULTIPLE_CLICK_FLAG)
if bstate & curses.BUTTON4_DOUBLE_CLICKED:
append_button(64 + escape.MOUSE_MULTIPLE_CLICK_FLAG)
if bstate & curses.BUTTON1_TRIPLE_CLICKED:
append_button(0 + escape.MOUSE_MULTIPLE_CLICK_FLAG * 2)
if bstate & curses.BUTTON2_TRIPLE_CLICKED:
append_button(1 + escape.MOUSE_MULTIPLE_CLICK_FLAG * 2)
if bstate & curses.BUTTON3_TRIPLE_CLICKED:
append_button(2 + escape.MOUSE_MULTIPLE_CLICK_FLAG * 2)
if bstate & curses.BUTTON4_TRIPLE_CLICKED:
append_button(64 + escape.MOUSE_MULTIPLE_CLICK_FLAG * 2)
self.last_bstate = next_state
return result
def _dbg_instr(self): # messy input string (intended for debugging)
curses.echo()
self.s.nodelay(0)
curses.halfdelay(100)
string = self.s.getstr()
curses.noecho()
return string
def _dbg_out(self, string) -> None: # messy output function (intended for debugging)
self.s.clrtoeol()
self.s.addstr(string)
self.s.refresh()
self._curs_set(1)
def _dbg_query(self, question): # messy query (intended for debugging)
self._dbg_out(question)
return self._dbg_instr()
def _dbg_refresh(self) -> None:
self.s.refresh()
def get_cols_rows(self) -> tuple[int, int]:
"""Return the terminal dimensions (num columns, num rows)."""
rows, cols = self.s.getmaxyx()
return cols, rows
def _setattr(self, a):
if a is None:
self.s.attrset(0)
return
if not isinstance(a, AttrSpec):
p = self._palette.get(a, (AttrSpec("default", "default"),))
a = p[0]
if self.has_color:
if a.foreground_basic:
if a.foreground_number >= 8:
fg = a.foreground_number - 8
else:
fg = a.foreground_number
else:
fg = 7
if a.background_basic:
bg = a.background_number
else:
bg = 0
attr = curses.color_pair(bg * 8 + 7 - fg)
else:
attr = 0
if a.bold:
attr |= curses.A_BOLD
if a.standout:
attr |= curses.A_STANDOUT
if a.underline:
attr |= curses.A_UNDERLINE
if a.blink:
attr |= curses.A_BLINK
self.s.attrset(attr)
def draw_screen(self, size: tuple[int, int], canvas: Canvas) -> None:
"""Paint screen with rendered canvas."""
logger = self.logger.getChild("draw_screen")
if not self._started:
raise RuntimeError
_cols, rows = size
if canvas.rows() != rows:
raise ValueError("canvas size and passed size don't match")
logger.debug(f"Drawing screen with size {size!r}")
y = -1
for row in canvas.content():
y += 1
try:
self.s.move(y, 0)
except curses.error:
# terminal shrunk?
# move failed so stop rendering.
return
first = True
lasta = None
for nr, (a, cs, seg) in enumerate(row):
if cs != "U":
seg = seg.translate(UNPRINTABLE_TRANS_TABLE) # noqa: PLW2901
if not isinstance(seg, bytes):
raise TypeError(seg)
if first or lasta != a:
self._setattr(a)
lasta = a
try:
if cs in {"0", "U"}:
for segment in seg:
self.s.addch(0x400000 + segment)
else:
if cs is not None:
raise ValueError(f"cs not in ('0', 'U' ,'None'): {cs!r}")
if not isinstance(seg, bytes):
raise TypeError(seg)
self.s.addstr(seg.decode(util.get_encoding()))
except curses.error:
# it's ok to get out of the
# screen on the lower right
if y != rows - 1 or nr != len(row) - 1:
# perhaps screen size changed
# quietly abort.
return
if canvas.cursor is not None:
x, y = canvas.cursor
self._curs_set(1)
with suppress(curses.error):
self.s.move(y, x)
else:
self._curs_set(0)
self.s.move(0, 0)
self.s.refresh()
self.keep_cache_alive_link = canvas
def clear(self) -> None:
"""
Force the screen to be completely repainted on the next call to draw_screen().
"""
self.s.clear()
class _test:
def __init__(self):
self.ui = Screen()
self.l = sorted(_curses_colours)
for c in self.l:
self.ui.register_palette(
[
(f"{c} on black", c, "black", "underline"),
(f"{c} on dark blue", c, "dark blue", "bold"),
(f"{c} on light gray", c, "light gray", "standout"),
]
)
with self.ui.start():
self.run()
def run(self) -> None:
class FakeRender:
pass
r = FakeRender()
text = [f" has_color = {self.ui.has_color!r}", ""]
attr = [[], []]
r.coords = {}
r.cursor = None
for c in self.l:
t = ""
a = []
for p in f"{c} on black", f"{c} on dark blue", f"{c} on light gray":
a.append((p, 27))
t += (p + 27 * " ")[:27]
text.append(t)
attr.append(a)
text += ["", "return values from get_input(): (q exits)", ""]
attr += [[], [], []]
cols, rows = self.ui.get_cols_rows()
keys = None
while keys != ["q"]:
r.text = ([t.ljust(cols) for t in text] + [""] * rows)[:rows]
r.attr = (attr + [[] for _ in range(rows)])[:rows]
self.ui.draw_screen((cols, rows), r)
keys, raw = self.ui.get_input(raw_keys=True)
if "window resize" in keys:
cols, rows = self.ui.get_cols_rows()
if not keys:
continue
t = ""
a = []
for k in keys:
if isinstance(k, str):
k = k.encode(util.get_encoding()) # noqa: PLW2901
t += f"'{k}' "
a += [(None, 1), ("yellow on dark blue", len(k)), (None, 2)]
text.append(f"{t}: {raw!r}")
attr.append(a)
text = text[-rows:]
attr = attr[-rows:]
if __name__ == "__main__":
_test()

View File

@@ -0,0 +1,627 @@
# Urwid escape sequences common to curses_display and raw_display
# Copyright (C) 2004-2011 Ian Ward
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Urwid web site: https://urwid.org/
"""
Terminal Escape Sequences for input and display
"""
from __future__ import annotations
import re
import sys
import typing
from collections.abc import MutableMapping, Sequence
from urwid import str_util
if typing.TYPE_CHECKING:
from collections.abc import Iterable
# NOTE: because of circular imports (urwid.util -> urwid.escape -> urwid.util)
# from urwid.util import is_mouse_event -- will not work here
import urwid.util # isort: skip # pylint: disable=wrong-import-position
IS_WINDOWS = sys.platform == "win32"
within_double_byte = str_util.within_double_byte
SO = "\x0e"
SI = "\x0f"
IBMPC_ON = "\x1b[11m"
IBMPC_OFF = "\x1b[10m"
DEC_TAG = "0"
DEC_SPECIAL_CHARS = "▮◆▒␉␌␍␊°±␤␋┘┐┌└┼⎺⎻─⎼⎽├┤┴┬│≤≥π≠£·"
ALT_DEC_SPECIAL_CHARS = "_`abcdefghijklmnopqrstuvwxyz{|}~"
DEC_SPECIAL_CHARMAP = {}
if len(DEC_SPECIAL_CHARS) != len(ALT_DEC_SPECIAL_CHARS):
raise RuntimeError(repr((DEC_SPECIAL_CHARS, ALT_DEC_SPECIAL_CHARS)))
for c, alt in zip(DEC_SPECIAL_CHARS, ALT_DEC_SPECIAL_CHARS):
DEC_SPECIAL_CHARMAP[ord(c)] = SO + alt + SI
SAFE_ASCII_DEC_SPECIAL_RE = re.compile(f"^[ -~{DEC_SPECIAL_CHARS}]*$")
DEC_SPECIAL_RE = re.compile(f"[{DEC_SPECIAL_CHARS}]")
###################
# Input sequences
###################
class MoreInputRequired(Exception):
pass
def escape_modifier(digit: str) -> str:
mode = ord(digit) - ord("1")
return "shift " * (mode & 1) + "meta " * ((mode & 2) // 2) + "ctrl " * ((mode & 4) // 4)
input_sequences = [
("[A", "up"),
("[B", "down"),
("[C", "right"),
("[D", "left"),
("[E", "5"),
("[F", "end"),
("[G", "5"),
("[H", "home"),
("[I", "focus in"),
("[O", "focus out"),
("[1~", "home"),
("[2~", "insert"),
("[3~", "delete"),
("[4~", "end"),
("[5~", "page up"),
("[6~", "page down"),
("[7~", "home"),
("[8~", "end"),
("[[A", "f1"),
("[[B", "f2"),
("[[C", "f3"),
("[[D", "f4"),
("[[E", "f5"),
("[11~", "f1"),
("[12~", "f2"),
("[13~", "f3"),
("[14~", "f4"),
("[15~", "f5"),
("[17~", "f6"),
("[18~", "f7"),
("[19~", "f8"),
("[20~", "f9"),
("[21~", "f10"),
("[23~", "f11"),
("[24~", "f12"),
("[25~", "f13"),
("[26~", "f14"),
("[28~", "f15"),
("[29~", "f16"),
("[31~", "f17"),
("[32~", "f18"),
("[33~", "f19"),
("[34~", "f20"),
("OA", "up"),
("OB", "down"),
("OC", "right"),
("OD", "left"),
("OH", "home"),
("OF", "end"),
("OP", "f1"),
("OQ", "f2"),
("OR", "f3"),
("OS", "f4"),
("Oo", "/"),
("Oj", "*"),
("Om", "-"),
("Ok", "+"),
("[Z", "shift tab"),
("On", "."),
("[200~", "begin paste"),
("[201~", "end paste"),
*(
(prefix + letter, modifier + key)
for prefix, modifier in zip("O[", ("meta ", "shift "))
for letter, key in zip("abcd", ("up", "down", "right", "left"))
),
*(
(f"[{digit}{symbol}", modifier + key)
for modifier, symbol in zip(("shift ", "meta "), "$^")
for digit, key in zip("235678", ("insert", "delete", "page up", "page down", "home", "end"))
),
*((f"O{ord('p') + n:c}", str(n)) for n in range(10)),
*(
# modified cursor keys + home, end, 5 -- [#X and [1;#X forms
(prefix + digit + letter, escape_modifier(digit) + key)
for prefix in ("[", "[1;")
for digit in "12345678"
for letter, key in zip("ABCDEFGH", ("up", "down", "right", "left", "5", "end", "5", "home"))
),
*(
# modified F1-F4 keys - O#X form and [1;#X form
(prefix + digit + letter, escape_modifier(digit) + f"f{number}")
for prefix in ("O", "[1;")
for digit in "12345678"
for number, letter in enumerate("PQRS", start=1)
),
*(
# modified F1-F13 keys -- [XX;#~ form
(f"[{num!s};{digit}~", escape_modifier(digit) + key)
for digit in "12345678"
for num, key in zip(
(3, 5, 6, 11, 12, 13, 14, 15, 17, 18, 19, 20, 21, 23, 24, 25, 26, 28, 29, 31, 32, 33, 34),
(
"delete",
"page up",
"page down",
*(f"f{idx}" for idx in range(1, 21)),
),
)
),
# mouse reporting (special handling done in KeyqueueTrie)
("[M", "mouse"),
# mouse reporting for SGR 1006
("[<", "sgrmouse"),
# report status response
("[0n", "status ok"),
]
class KeyqueueTrie:
__slots__ = ("data",)
def __init__(self, sequences: Iterable[tuple[str, str]]) -> None:
self.data: dict[int, str | dict[int, str | dict[int, str]]] = {}
for s, result in sequences:
if isinstance(result, dict):
raise TypeError(result)
self.add(self.data, s, result)
def add(
self,
root: MutableMapping[int, str | MutableMapping[int, str | MutableMapping[int, str]]],
s: str,
result: str,
) -> None:
if not isinstance(root, MutableMapping) or not s:
raise RuntimeError("trie conflict detected")
if ord(s[0]) in root:
self.add(root[ord(s[0])], s[1:], result)
return
if len(s) > 1:
d = {}
root[ord(s[0])] = d
self.add(d, s[1:], result)
return
root[ord(s)] = result
def get(self, keys, more_available: bool):
result = self.get_recurse(self.data, keys, more_available)
if not result:
result = self.read_cursor_position(keys, more_available)
return result
def get_recurse(
self,
root: (
MutableMapping[int, str | MutableMapping[int, str | MutableMapping[int, str]]]
| typing.Literal["mouse", "sgrmouse"]
),
keys: Sequence[int],
more_available: bool,
):
if not isinstance(root, MutableMapping):
if root == "mouse":
return self.read_mouse_info(keys, more_available)
if root == "sgrmouse":
return self.read_sgrmouse_info(keys, more_available)
return (root, keys)
if not keys:
# get more keys
if more_available:
raise MoreInputRequired()
return None
if keys[0] not in root:
return None
return self.get_recurse(root[keys[0]], keys[1:], more_available)
def read_mouse_info(self, keys: Sequence[int], more_available: bool):
if len(keys) < 3:
if more_available:
raise MoreInputRequired()
return None
b = keys[0] - 32
x, y = (keys[1] - 33) % 256, (keys[2] - 33) % 256 # supports 0-255
prefixes = []
if b & 4:
prefixes.append("shift ")
if b & 8:
prefixes.append("meta ")
if b & 16:
prefixes.append("ctrl ")
if (b & MOUSE_MULTIPLE_CLICK_MASK) >> 9 == 1:
prefixes.append("double ")
if (b & MOUSE_MULTIPLE_CLICK_MASK) >> 9 == 2:
prefixes.append("triple ")
prefix = "".join(prefixes)
# 0->1, 1->2, 2->3, 64->4, 65->5
button = ((b & 64) // 64 * 3) + (b & 3) + 1
if b & 3 == 3:
action = "release"
button = 0
elif b & MOUSE_RELEASE_FLAG:
action = "release"
elif b & MOUSE_DRAG_FLAG:
action = "drag"
elif b & MOUSE_MULTIPLE_CLICK_MASK:
action = "click"
else:
action = "press"
return ((f"{prefix}mouse {action}", button, x, y), keys[3:])
def read_sgrmouse_info(self, keys: Sequence[int], more_available: bool):
# Helpful links:
# https://stackoverflow.com/questions/5966903/how-to-get-mousemove-and-mouseclick-in-bash
# http://invisible-island.net/xterm/ctlseqs/ctlseqs.pdf
if not keys:
if more_available:
raise MoreInputRequired()
return None
value = ""
pos_m = 0
found_m = False
for k in keys:
value += chr(k)
if k in {ord("M"), ord("m")}:
found_m = True
break
pos_m += 1
if not found_m:
if more_available:
raise MoreInputRequired()
return None
(b, x, y) = (int(val) for val in value[:-1].split(";"))
action = value[-1]
# Double and triple clicks are not supported.
# They can be implemented by using a timer.
# This timer can check if the last registered click is below a certain threshold.
# This threshold is normally set in the operating system itself,
# so setting one here will cause an inconsistent behaviour.
prefixes = []
if b & 4:
prefixes.append("shift ")
if b & 8:
prefixes.append("meta ")
if b & 16:
prefixes.append("ctrl ")
prefix = "".join(prefixes)
wheel_used: typing.Literal[0, 1] = (b & 64) >> 6
button = (wheel_used * 3) + (b & 3) + 1
x -= 1
y -= 1
if action == "M":
if b & MOUSE_DRAG_FLAG:
action = "drag"
else:
action = "press"
elif action == "m":
action = "release"
else:
raise ValueError(f"Unknown mouse action: {action!r}")
return ((f"{prefix}mouse {action}", button, x, y), keys[pos_m + 1 :])
def read_cursor_position(self, keys, more_available: bool):
"""
Interpret cursor position information being sent by the
user's terminal. Returned as ('cursor position', x, y)
where (x, y) == (0, 0) is the top left of the screen.
"""
if not keys:
if more_available:
raise MoreInputRequired()
return None
if keys[0] != ord("["):
return None
# read y value
y = 0
i = 1
for k in keys[i:]:
i += 1
if k == ord(";"):
if not y:
return None
break
if k < ord("0") or k > ord("9"):
return None
if not y and k == ord("0"):
return None
y = y * 10 + k - ord("0")
if not keys[i:]:
if more_available:
raise MoreInputRequired()
return None
# read x value
x = 0
for k in keys[i:]:
i += 1
if k == ord("R"):
if not x:
return None
return (("cursor position", x - 1, y - 1), keys[i:])
if k < ord("0") or k > ord("9"):
return None
if not x and k == ord("0"):
return None
x = x * 10 + k - ord("0")
if not keys[i:] and more_available:
raise MoreInputRequired()
return None
# This is added to button value to signal mouse release by curses_display
# and raw_display when we know which button was released. NON-STANDARD
MOUSE_RELEASE_FLAG = 2048
# This 2-bit mask is used to check if the mouse release from curses or gpm
# is a double or triple release. 00 means single click, 01 double,
# 10 triple. NON-STANDARD
MOUSE_MULTIPLE_CLICK_MASK = 1536
# This is added to button value at mouse release to differentiate between
# single, double and triple press. Double release adds this times one,
# triple release adds this times two. NON-STANDARD
MOUSE_MULTIPLE_CLICK_FLAG = 512
# xterm adds this to the button value to signal a mouse drag event
MOUSE_DRAG_FLAG = 32
#################################################
# Build the input trie from input_sequences list
input_trie = KeyqueueTrie(input_sequences)
#################################################
_keyconv = {
-1: None,
8: "backspace",
9: "tab",
10: "enter",
13: "enter",
127: "backspace",
# curses-only keycodes follow.. (XXX: are these used anymore?)
258: "down",
259: "up",
260: "left",
261: "right",
262: "home",
263: "backspace",
265: "f1",
266: "f2",
267: "f3",
268: "f4",
269: "f5",
270: "f6",
271: "f7",
272: "f8",
273: "f9",
274: "f10",
275: "f11",
276: "f12",
277: "shift f1",
278: "shift f2",
279: "shift f3",
280: "shift f4",
281: "shift f5",
282: "shift f6",
283: "shift f7",
284: "shift f8",
285: "shift f9",
286: "shift f10",
287: "shift f11",
288: "shift f12",
330: "delete",
331: "insert",
338: "page down",
339: "page up",
343: "enter", # on numpad
350: "5", # on numpad
360: "end",
}
if IS_WINDOWS:
_keyconv[351] = "shift tab"
_keyconv[358] = "end"
def process_keyqueue(codes: Sequence[int], more_available: bool) -> tuple[list[str], Sequence[int]]:
"""
codes -- list of key codes
more_available -- if True then raise MoreInputRequired when in the
middle of a character sequence (escape/utf8/wide) and caller
will attempt to send more key codes on the next call.
returns (list of input, list of remaining key codes).
"""
code = codes[0]
if 32 <= code <= 126:
key = chr(code)
return [key], codes[1:]
if code in _keyconv:
return [_keyconv[code]], codes[1:]
if 0 < code < 27:
return [f"ctrl {ord('a') + code - 1:c}"], codes[1:]
if 27 < code < 32:
return [f"ctrl {ord('A') + code - 1:c}"], codes[1:]
em = str_util.get_byte_encoding()
if (
em == "wide"
and code < 256
and within_double_byte(
code.to_bytes(1, "little"),
0,
0,
)
):
if not codes[1:] and more_available:
raise MoreInputRequired()
if codes[1:] and codes[1] < 256:
db = chr(code) + chr(codes[1])
if within_double_byte(db, 0, 1):
return [db], codes[2:]
if em == "utf8" and 127 < code < 256:
if code & 0xE0 == 0xC0: # 2-byte form
need_more = 1
elif code & 0xF0 == 0xE0: # 3-byte form
need_more = 2
elif code & 0xF8 == 0xF0: # 4-byte form
need_more = 3
else:
return [f"<{code:d}>"], codes[1:]
for i in range(1, need_more + 1):
if len(codes) <= i:
if more_available:
raise MoreInputRequired()
return [f"<{code:d}>"], codes[1:]
k = codes[i]
if k > 256 or k & 0xC0 != 0x80:
return [f"<{code:d}>"], codes[1:]
s = bytes(codes[: need_more + 1])
try:
return [s.decode("utf-8")], codes[need_more + 1 :]
except UnicodeDecodeError:
return [f"<{code:d}>"], codes[1:]
if 127 < code < 256:
key = chr(code)
return [key], codes[1:]
if code != 27:
return [f"<{code:d}>"], codes[1:]
result = input_trie.get(codes[1:], more_available)
if result is not None:
result, remaining_codes = result
return [result], remaining_codes
if codes[1:]:
# Meta keys -- ESC+Key form
run, remaining_codes = process_keyqueue(codes[1:], more_available)
if urwid.util.is_mouse_event(run[0]):
return ["esc", *run], remaining_codes
if run[0] == "esc" or run[0].find("meta ") >= 0:
return ["esc", *run], remaining_codes
return [f"meta {run[0]}"] + run[1:], remaining_codes
return ["esc"], codes[1:]
####################
# Output sequences
####################
ESC = "\x1b"
CURSOR_HOME = f"{ESC}[H"
CURSOR_HOME_COL = "\r"
APP_KEYPAD_MODE = f"{ESC}="
NUM_KEYPAD_MODE = f"{ESC}>"
SWITCH_TO_ALTERNATE_BUFFER = f"{ESC}[?1049h"
RESTORE_NORMAL_BUFFER = f"{ESC}[?1049l"
ENABLE_BRACKETED_PASTE_MODE = f"{ESC}[?2004h"
DISABLE_BRACKETED_PASTE_MODE = f"{ESC}[?2004l"
ENABLE_FOCUS_REPORTING = f"{ESC}[?1004h"
DISABLE_FOCUS_REPORTING = f"{ESC}[?1004l"
# RESET_SCROLL_REGION = ESC+"[;r"
# RESET = ESC+"c"
REPORT_STATUS = f"{ESC}[5n"
REPORT_CURSOR_POSITION = f"{ESC}[6n"
INSERT_ON = f"{ESC}[4h"
INSERT_OFF = f"{ESC}[4l"
def set_cursor_position(x: int, y: int) -> str:
if not isinstance(x, int):
raise TypeError(x)
if not isinstance(y, int):
raise TypeError(y)
return ESC + f"[{y + 1:d};{x + 1:d}H"
def move_cursor_right(x: int) -> str:
if x < 1:
return ""
return ESC + f"[{x:d}C"
def move_cursor_up(x: int) -> str:
if x < 1:
return ""
return ESC + f"[{x:d}A"
def move_cursor_down(x: int) -> str:
if x < 1:
return ""
return ESC + f"[{x:d}B"
HIDE_CURSOR = f"{ESC}[?25l"
SHOW_CURSOR = f"{ESC}[?25h"
MOUSE_TRACKING_ON = f"{ESC}[?1000h{ESC}[?1002h{ESC}[?1006h"
MOUSE_TRACKING_OFF = f"{ESC}[?1006l{ESC}[?1002l{ESC}[?1000l"
DESIGNATE_G1_SPECIAL = f"{ESC})0"
ERASE_IN_LINE_RIGHT = f"{ESC}[K"

View File

@@ -0,0 +1,244 @@
# Urwid html fragment output wrapper for "screen shots"
# Copyright (C) 2004-2007 Ian Ward
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Urwid web site: https://urwid.org/
"""
HTML PRE-based UI implementation
"""
from __future__ import annotations
import html
import typing
from urwid import str_util
from urwid.event_loop import ExitMainLoop
from urwid.util import get_encoding
from .common import AttrSpec, BaseScreen
if typing.TYPE_CHECKING:
from typing_extensions import Literal
from urwid import Canvas
# replace control characters with ?'s
_trans_table = "?" * 32 + "".join(chr(x) for x in range(32, 256))
_default_foreground = "black"
_default_background = "light gray"
class HtmlGeneratorSimulationError(Exception):
pass
class HtmlGenerator(BaseScreen):
# class variables
fragments: typing.ClassVar[list[str]] = []
sizes: typing.ClassVar[list[tuple[int, int]]] = []
keys: typing.ClassVar[list[list[str]]] = []
started = True
def __init__(self) -> None:
super().__init__()
self.colors = 16
self.bright_is_bold = False # ignored
self.has_underline = True # ignored
self.register_palette_entry(None, _default_foreground, _default_background)
def set_terminal_properties(
self,
colors: int | None = None,
bright_is_bold: bool | None = None,
has_underline: bool | None = None,
) -> None:
if colors is None:
colors = self.colors
if bright_is_bold is None:
bright_is_bold = self.bright_is_bold
if has_underline is None:
has_underline = self.has_underline
self.colors = colors
self.bright_is_bold = bright_is_bold
self.has_underline = has_underline
def set_input_timeouts(self, *args: typing.Any) -> None:
pass
def reset_default_terminal_palette(self, *args: typing.Any) -> None:
pass
def draw_screen(self, size: tuple[int, int], canvas: Canvas) -> None:
"""Create an html fragment from the render object.
Append it to HtmlGenerator.fragments list.
"""
# collect output in l
lines = []
_cols, rows = size
if canvas.rows() != rows:
raise ValueError(rows)
if canvas.cursor is not None:
cx, cy = canvas.cursor
else:
cx = cy = None
for y, row in enumerate(canvas.content()):
col = 0
for a, _cs, run in row:
t_run = run.decode(get_encoding()).translate(_trans_table)
if isinstance(a, AttrSpec):
aspec = a
else:
aspec = self._palette[a][{1: 1, 16: 0, 88: 2, 256: 3}[self.colors]]
if y == cy and col <= cx:
run_width = str_util.calc_width(t_run, 0, len(t_run))
if col + run_width > cx:
lines.append(html_span(t_run, aspec, cx - col))
else:
lines.append(html_span(t_run, aspec))
col += run_width
else:
lines.append(html_span(t_run, aspec))
lines.append("\n")
# add the fragment to the list
self.fragments.append(f"<pre>{''.join(lines)}</pre>")
def get_cols_rows(self):
"""Return the next screen size in HtmlGenerator.sizes."""
if not self.sizes:
raise HtmlGeneratorSimulationError("Ran out of screen sizes to return!")
return self.sizes.pop(0)
@typing.overload
def get_input(self, raw_keys: Literal[False]) -> list[str]: ...
@typing.overload
def get_input(self, raw_keys: Literal[True]) -> tuple[list[str], list[int]]: ...
def get_input(self, raw_keys: bool = False) -> list[str] | tuple[list[str], list[int]]:
"""Return the next list of keypresses in HtmlGenerator.keys."""
if not self.keys:
raise ExitMainLoop()
if raw_keys:
return (self.keys.pop(0), [])
return self.keys.pop(0)
_default_aspec = AttrSpec(_default_foreground, _default_background)
(_d_fg_r, _d_fg_g, _d_fg_b, _d_bg_r, _d_bg_g, _d_bg_b) = _default_aspec.get_rgb_values()
def html_span(s: str, aspec: AttrSpec, cursor: int = -1) -> str:
fg_r, fg_g, fg_b, bg_r, bg_g, bg_b = aspec.get_rgb_values()
# use real colours instead of default fg/bg
if fg_r is None:
fg_r, fg_g, fg_b = _d_fg_r, _d_fg_g, _d_fg_b
if bg_r is None:
bg_r, bg_g, bg_b = _d_bg_r, _d_bg_g, _d_bg_b
html_fg = f"#{fg_r:02x}{fg_g:02x}{fg_b:02x}"
html_bg = f"#{bg_r:02x}{bg_g:02x}{bg_b:02x}"
if aspec.standout:
html_fg, html_bg = html_bg, html_fg
extra = ";text-decoration:underline" * aspec.underline + ";font-weight:bold" * aspec.bold
def _span(fg: str, bg: str, string: str) -> str:
if not s:
return ""
return f'<span style="color:{fg};background:{bg}{extra}">{html.escape(string)}</span>'
if cursor >= 0:
c_off, _ign = str_util.calc_text_pos(s, 0, len(s), cursor)
c2_off = str_util.move_next_char(s, c_off, len(s))
return (
_span(html_fg, html_bg, s[:c_off])
+ _span(html_bg, html_fg, s[c_off:c2_off])
+ _span(html_fg, html_bg, s[c2_off:])
)
return _span(html_fg, html_bg, s)
def screenshot_init(sizes: list[tuple[int, int]], keys: list[list[str]]) -> None:
"""
Replace curses_display.Screen and raw_display.Screen class with
HtmlGenerator.
Call this function before executing an application that uses
curses_display.Screen to have that code use HtmlGenerator instead.
sizes -- list of ( columns, rows ) tuples to be returned by each call
to HtmlGenerator.get_cols_rows()
keys -- list of lists of keys to be returned by each call to
HtmlGenerator.get_input()
Lists of keys may include "window resize" to force the application to
call get_cols_rows and read a new screen size.
For example, the following call will prepare an application to:
1. start in 80x25 with its first call to get_cols_rows()
2. take a screenshot when it calls draw_screen(..)
3. simulate 5 "down" keys from get_input()
4. take a screenshot when it calls draw_screen(..)
5. simulate keys "a", "b", "c" and a "window resize"
6. resize to 20x10 on its second call to get_cols_rows()
7. take a screenshot when it calls draw_screen(..)
8. simulate a "Q" keypress to quit the application
screenshot_init( [ (80,25), (20,10) ],
[ ["down"]*5, ["a","b","c","window resize"], ["Q"] ] )
"""
for row, col in sizes:
if not isinstance(row, int):
raise TypeError(f"sizes must be list[tuple[int, int]], with values >0 : {row!r}")
if row <= 0:
raise ValueError(f"sizes must be list[tuple[int, int]], with values >0 : {row!r}")
if not isinstance(col, int):
raise TypeError(f"sizes must be list[tuple[int, int]], with values >0 : {col!r}")
if col <= 0:
raise ValueError(f"sizes must be list[tuple[int, int]], with values >0 : {col!r}")
for line in keys:
if not isinstance(line, list):
raise TypeError(f"keys must be list[list[str]]: {line!r}")
for k in line:
if not isinstance(k, str):
raise TypeError(f"keys must be list[list[str]]: {k!r}")
from . import curses, raw
curses.Screen = HtmlGenerator
raw.Screen = HtmlGenerator
HtmlGenerator.sizes = sizes
HtmlGenerator.keys = keys
def screenshot_collect() -> list[str]:
"""Return screenshots as a list of HTML fragments."""
fragments, HtmlGenerator.fragments = HtmlGenerator.fragments, []
return fragments

View File

@@ -0,0 +1,526 @@
# Urwid LCD display module
# Copyright (C) 2010 Ian Ward
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Urwid web site: https://urwid.org/
from __future__ import annotations
import abc
import time
import typing
from .common import BaseScreen
if typing.TYPE_CHECKING:
from collections.abc import Iterable, Sequence
from typing_extensions import Literal
from urwid import Canvas
class LCDScreen(BaseScreen, abc.ABC):
"""Base class for LCD-based screens."""
DISPLAY_SIZE: tuple[int, int]
def set_terminal_properties(
self,
colors: Literal[1, 16, 88, 256, 16777216] | None = None,
bright_is_bold: bool | None = None,
has_underline: bool | None = None,
) -> None:
pass
def set_input_timeouts(self, *args):
pass
def reset_default_terminal_palette(self, *args):
pass
def get_cols_rows(self):
return self.DISPLAY_SIZE
class CFLCDScreen(LCDScreen, abc.ABC):
"""
Common methods for Crystal Fonts LCD displays
"""
KEYS: typing.ClassVar[list[str | None]] = [
None, # no key with code 0
"up_press",
"down_press",
"left_press",
"right_press",
"enter_press",
"exit_press",
"up_release",
"down_release",
"left_release",
"right_release",
"enter_release",
"exit_release",
"ul_press",
"ur_press",
"ll_press",
"lr_press",
"ul_release",
"ur_release",
"ll_release",
"lr_release",
]
CMD_PING = 0
CMD_VERSION = 1
CMD_CLEAR = 6
CMD_CGRAM = 9
CMD_CURSOR_POSITION = 11 # data = [col, row]
CMD_CURSOR_STYLE = 12 # data = [style (0-4)]
CMD_LCD_CONTRAST = 13 # data = [contrast (0-255)]
CMD_BACKLIGHT = 14 # data = [power (0-100)]
CMD_LCD_DATA = 31 # data = [col, row] + text
CMD_GPO = 34 # data = [pin(0-12), value(0-100)]
# sent from device
CMD_KEY_ACTIVITY = 0x80
CMD_ACK = 0x40 # in high two bits ie. & 0xc0
CURSOR_NONE = 0
CURSOR_BLINKING_BLOCK = 1
CURSOR_UNDERSCORE = 2
CURSOR_BLINKING_BLOCK_UNDERSCORE = 3
CURSOR_INVERTING_BLINKING_BLOCK = 4
MAX_PACKET_DATA_LENGTH = 22
colors = 1
has_underline = False
def __init__(self, device_path: str, baud: int) -> None:
"""
device_path -- eg. '/dev/ttyUSB0'
baud -- baud rate
"""
super().__init__()
self.device_path = device_path
from serial import Serial
self._device = Serial(device_path, baud, timeout=0)
self._unprocessed = bytearray()
@classmethod
def get_crc(cls, buf: Iterable[int]) -> bytes:
# This seed makes the output of this shift based algorithm match
# the table based algorithm. The center 16 bits of the 32-bit
# "newCRC" are used for the CRC. The MSB of the lower byte is used
# to see what bit was shifted out of the center 16 bit CRC
# accumulator ("carry flag analog");
new_crc = 0x00F32100
for byte in buf:
# Push this byte's bits through a software
# implementation of a hardware shift & xor.
for bit_count in range(8):
# Shift the CRC accumulator
new_crc >>= 1
# The new MSB of the CRC accumulator comes
# from the LSB of the current data byte.
if byte & (0x01 << bit_count):
new_crc |= 0x00800000
# If the low bit of the current CRC accumulator was set
# before the shift, then we need to XOR the accumulator
# with the polynomial (center 16 bits of 0x00840800)
if new_crc & 0x00000080:
new_crc ^= 0x00840800
# All the data has been done. Do 16 more bits of 0 data.
for _bit_count in range(16):
# Shift the CRC accumulator
new_crc >>= 1
# If the low bit of the current CRC accumulator was set
# before the shift we need to XOR the accumulator with
# 0x00840800.
if new_crc & 0x00000080:
new_crc ^= 0x00840800
# Return the center 16 bits, making this CRC match the one's
# complement that is sent in the packet.
return (((~new_crc) >> 8) & 0xFFFF).to_bytes(2, "little")
def _send_packet(self, command: int, data: bytes) -> None:
"""
low-level packet sending.
Following the protocol requires waiting for ack packet between
sending each packet to the device.
"""
buf = bytearray([command, len(data)])
buf.extend(data)
buf.extend(self.get_crc(buf))
self._device.write(buf)
def _read_packet(self) -> tuple[int, bytearray] | None:
"""
low-level packet reading.
returns (command/report code, data) or None
This method stored data read and tries to resync when bad data
is received.
"""
# pull in any new data available
self._unprocessed += self._device.read()
while True:
try:
command, data, unprocessed = self._parse_data(self._unprocessed)
self._unprocessed = unprocessed
except self.MoreDataRequired: # noqa: PERF203
return None
except self.InvalidPacket:
# throw out a byte and try to parse again
self._unprocessed = self._unprocessed[1:]
else:
return command, data
class InvalidPacket(Exception):
pass
class MoreDataRequired(Exception):
pass
@classmethod
def _parse_data(cls, data: bytearray) -> tuple[int, bytearray, bytearray]:
"""
Try to read a packet from the start of data, returning
(command/report code, packet_data, remaining_data)
or raising InvalidPacket or MoreDataRequired
"""
if len(data) < 2:
raise cls.MoreDataRequired
command: int = data[0]
packet_len: int = data[1]
if packet_len > cls.MAX_PACKET_DATA_LENGTH:
raise cls.InvalidPacket("length value too large")
if len(data) < packet_len + 4:
raise cls.MoreDataRequired
data_end = 2 + packet_len
crc = cls.get_crc(data[:data_end])
pcrc = data[data_end : data_end + 2]
if crc != pcrc:
raise cls.InvalidPacket("CRC doesn't match")
return command, data[2:data_end], data[data_end + 2 :]
class KeyRepeatSimulator:
"""
Provide simulated repeat key events when given press and
release events.
If two or more keys are pressed disable repeating until all
keys are released.
"""
def __init__(self, repeat_delay: float, repeat_next: float) -> None:
"""
repeat_delay -- seconds to wait before starting to repeat keys
repeat_next -- time between each repeated key
"""
self.repeat_delay = repeat_delay
self.repeat_next = repeat_next
self.pressed: dict[str, float] = {}
self.multiple_pressed = False
def press(self, key: str) -> None:
if self.pressed:
self.multiple_pressed = True
self.pressed[key] = time.time()
def release(self, key: str) -> None:
if key not in self.pressed:
return # ignore extra release events
del self.pressed[key]
if not self.pressed:
self.multiple_pressed = False
def next_event(self) -> tuple[float, str] | None:
"""
Return (remaining, key) where remaining is the number of seconds
(float) until the key repeat event should be sent, or None if no
events are pending.
"""
if len(self.pressed) != 1 or self.multiple_pressed:
return None
for key, val in self.pressed.items():
return max(0.0, val + self.repeat_delay - time.time()), key
return None
def sent_event(self) -> None:
"""
Cakk this method when you have sent a key repeat event so the
timer will be reset for the next event
"""
if len(self.pressed) != 1:
return # ignore event that shouldn't have been sent
for key in self.pressed:
self.pressed[key] = time.time() - self.repeat_delay + self.repeat_next
return
class CF635Screen(CFLCDScreen):
"""
Crystal Fontz 635 display
20x4 character display + cursor
no foreground/background colors or settings supported
see CGROM for list of close unicode matches to characters available
6 button input
up, down, left, right, enter (check mark), exit (cross)
"""
DISPLAY_SIZE = (20, 4)
# ① through ⑧ are programmable CGRAM (chars 0-7, repeated at 8-15)
# double arrows (⇑⇓) appear as double arrowheads (chars 18, 19)
# ⑴ resembles a bell
# ⑵ resembles a filled-in "Y"
# ⑶ is the letters "Pt" together
# partial blocks (▇▆▄▃▁) are actually shorter versions of (▉▋▌▍▏)
# both groups are intended to draw horizontal bars with pixel
# precision, use ▇*[▆▄▃▁]? for a thin bar or ▉*[▋▌▍▏]? for a thick bar
CGROM = (
"①②③④⑤⑥⑦⑧①②③④⑤⑥⑦⑧"
"►◄⇑⇓«»↖↗↙↘▲▼↲^ˇ█"
" !\"%&'()*+,-./"
"0123456789:;<=>?"
"¡ABCDEFGHIJKLMNO"
"PQRSTUVWXYZÄÖÑܧ"
"¿abcdefghijklmno"
"pqrstuvwxyzäöñüà"
"⁰¹²³⁴⁵⁶⁷⁸⁹½¼±≥≤μ"
"♪♫⑴♥♦⑵⌜⌟“”()αɛδ∞"
"@£$¥èéùìòÇᴾØøʳÅå"
"⌂¢ΦτλΩπΨΣθΞ♈ÆæßÉ"
"ΓΛΠϒ_ÈÊêçğŞşİι~◊"
"▇▆▄▃▁ƒ▉▋▌▍▏⑶◽▪↑→"
"↓←ÁÍÓÚÝáíóúýÔôŮů"
r"ČĔŘŠŽčĕřšž[\]{|}"
)
cursor_style = CFLCDScreen.CURSOR_INVERTING_BLINKING_BLOCK
def __init__(
self,
device_path: str,
baud: int = 115200,
repeat_delay: float = 0.5,
repeat_next: float = 0.125,
key_map: Iterable[str] = ("up", "down", "left", "right", "enter", "esc"),
):
"""
device_path -- eg. '/dev/ttyUSB0'
baud -- baud rate
repeat_delay -- seconds to wait before starting to repeat keys
repeat_next -- time between each repeated key
key_map -- the keys to send for this device's buttons
"""
super().__init__(device_path, baud)
self.repeat_delay = repeat_delay
self.repeat_next = repeat_next
self.key_repeat = KeyRepeatSimulator(repeat_delay, repeat_next)
self.key_map = tuple(key_map)
self._last_command = None
self._last_command_time = 0
self._command_queue: list[tuple[int, bytearray]] = []
self._screen_buf = None
self._previous_canvas = None
self._update_cursor = False
def get_input_descriptors(self) -> list[int]:
"""
return the fd from our serial device so we get called
on input and responses
"""
return [self._device.fd]
def get_input_nonblocking(self) -> tuple[None, list[str], list[int]]:
"""
Return a (next_input_timeout, keys_pressed, raw_keycodes)
tuple.
The protocol for our device requires waiting for acks between
each command, so this method responds to those as well as key
press and release events.
Key repeat events are simulated here as the device doesn't send
any for us.
raw_keycodes are the bytes of messages we received, which might
not seem to have any correspondence to keys_pressed.
"""
data_input: list[str] = []
raw_data_input: list[int] = []
timeout = None
packet = self._read_packet()
while packet:
command, data = packet
if command == self.CMD_KEY_ACTIVITY and data:
d0 = data[0]
if 1 <= d0 <= 12:
release = d0 > 6
keycode = d0 - (release * 6) - 1
key = self.key_map[keycode]
if release:
self.key_repeat.release(key)
else:
data_input.append(key)
self.key_repeat.press(key)
raw_data_input.append(d0)
elif command & 0xC0 == 0x40 and command & 0x3F == self._last_command: # "ACK"
self._send_next_command()
packet = self._read_packet()
next_repeat = self.key_repeat.next_event()
if next_repeat:
timeout, key = next_repeat
if not timeout:
data_input.append(key)
self.key_repeat.sent_event()
timeout = None
return timeout, data_input, raw_data_input
def _send_next_command(self) -> None:
"""
send out the next command in the queue
"""
if not self._command_queue:
self._last_command = None
return
command, data = self._command_queue.pop(0)
self._send_packet(command, data)
self._last_command = command # record command for ACK
self._last_command_time = time.time()
def queue_command(self, command: int, data: bytearray) -> None:
self._command_queue.append((command, data))
# not waiting? send away!
if self._last_command is None:
self._send_next_command()
def draw_screen(self, size: tuple[int, int], canvas: Canvas) -> None:
if size != self.DISPLAY_SIZE:
raise ValueError(size)
if self._screen_buf:
osb = self._screen_buf
else:
osb = []
sb = []
for y, row in enumerate(canvas.content()):
text = [run for _a, _cs, run in row]
if not osb or osb[y] != text:
data = bytearray([0, y])
for elem in text:
data.extend(elem)
self.queue_command(self.CMD_LCD_DATA, data)
sb.append(text)
if (
self._previous_canvas
and self._previous_canvas.cursor == canvas.cursor
and (not self._update_cursor or not canvas.cursor)
):
pass
elif canvas.cursor is None:
self.queue_command(self.CMD_CURSOR_STYLE, bytearray([self.CURSOR_NONE]))
else:
x, y = canvas.cursor
self.queue_command(self.CMD_CURSOR_POSITION, bytearray([x, y]))
self.queue_command(self.CMD_CURSOR_STYLE, bytearray([self.cursor_style]))
self._update_cursor = False
self._screen_buf = sb
self._previous_canvas = canvas
def program_cgram(self, index: int, data: Sequence[int]) -> None:
"""
Program character data.
Characters available as chr(0) through chr(7), and repeated as chr(8) through chr(15).
index -- 0 to 7 index of character to program
data -- list of 8, 6-bit integer values top to bottom with MSB on the left side of the character.
"""
if not 0 <= index <= 7:
raise ValueError(index)
if len(data) != 8:
raise ValueError(data)
self.queue_command(self.CMD_CGRAM, bytearray([index]) + bytearray(data))
def set_cursor_style(self, style: Literal[1, 2, 3, 4]) -> None:
"""
style -- CURSOR_BLINKING_BLOCK, CURSOR_UNDERSCORE,
CURSOR_BLINKING_BLOCK_UNDERSCORE or
CURSOR_INVERTING_BLINKING_BLOCK
"""
if not 1 <= style <= 4:
raise ValueError(style)
self.cursor_style = style
self._update_cursor = True
def set_backlight(self, value: int) -> None:
"""
Set backlight brightness
value -- 0 to 100
"""
if not 0 <= value <= 100:
raise ValueError(value)
self.queue_command(self.CMD_BACKLIGHT, bytearray([value]))
def set_lcd_contrast(self, value: int) -> None:
"""
value -- 0 to 255
"""
if not 0 <= value <= 255:
raise ValueError(value)
self.queue_command(self.CMD_LCD_CONTRAST, bytearray([value]))
def set_led_pin(self, led: Literal[0, 1, 2, 3], rg: Literal[0, 1], value: int) -> None:
"""
led -- 0 to 3
rg -- 0 for red, 1 for green
value -- 0 to 100
"""
if not 0 <= led <= 3:
raise ValueError(led)
if rg not in {0, 1}:
raise ValueError(rg)
if not 0 <= value <= 100:
raise ValueError(value)
self.queue_command(self.CMD_GPO, bytearray([12 - 2 * led - rg, value]))

View File

@@ -0,0 +1,37 @@
# Urwid raw display module
# Copyright (C) 2004-2009 Ian Ward
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Urwid web site: https://urwid.org/
"""
Direct terminal UI implementation
"""
from __future__ import annotations
import sys
__all__ = ("Screen",)
IS_WINDOWS = sys.platform == "win32"
if IS_WINDOWS:
from ._win32_raw_display import Screen
else:
from ._posix_raw_display import Screen

View File

@@ -0,0 +1,631 @@
# Urwid web (CGI/Asynchronous Javascript) display module
# Copyright (C) 2004-2007 Ian Ward
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Urwid web site: https://urwid.org/
"""
Urwid web application display module
"""
from __future__ import annotations
import dataclasses
import glob
import html
import os
import pathlib
import random
import selectors
import signal
import socket
import string
import sys
import tempfile
import typing
from contextlib import suppress
from urwid.str_util import calc_text_pos, calc_width, move_next_char
from urwid.util import StoppingContext, get_encoding
from .common import BaseScreen
if typing.TYPE_CHECKING:
from typing_extensions import Literal
from urwid import Canvas
TEMP_DIR = tempfile.gettempdir()
CURRENT_DIR = pathlib.Path(__file__).parent
_js_code = CURRENT_DIR.joinpath("_web.js").read_text("utf-8")
ALARM_DELAY = 60
POLL_CONNECT = 3
MAX_COLS = 200
MAX_ROWS = 100
MAX_READ = 4096
BUF_SZ = 16384
_code_colours = {
"black": "0",
"dark red": "1",
"dark green": "2",
"brown": "3",
"dark blue": "4",
"dark magenta": "5",
"dark cyan": "6",
"light gray": "7",
"dark gray": "8",
"light red": "9",
"light green": "A",
"yellow": "B",
"light blue": "C",
"light magenta": "D",
"light cyan": "E",
"white": "F",
}
# replace control characters with ?'s
_trans_table = "?" * 32 + "".join([chr(x) for x in range(32, 256)])
_css_style = CURRENT_DIR.joinpath("_web.css").read_text("utf-8")
# HTML Initial Page
_html_page = [
"""<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
"http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<title>Urwid Web Display - """,
"""</title>
<style type="text/css">
"""
+ _css_style
+ r"""
</style>
</head>
<body id="body" onload="load_web_display()">
<div style="position:absolute; visibility:hidden;">
<br id="br"\>
<pre>The quick brown fox jumps over the lazy dog.<span id="testchar">X</span>
<span id="testchar2">Y</span></pre>
</div>
Urwid Web Display - <b>""",
"""</b> -
Status: <span id="status">Set up</span>
<script type="text/javascript">
//<![CDATA[
"""
+ _js_code
+ """
//]]>
</script>
<pre id="text"></pre>
</body>
</html>
""",
]
class Screen(BaseScreen):
def __init__(self) -> None:
super().__init__()
self.palette = {}
self.has_color = True
self._started = False
@property
def started(self) -> bool:
return self._started
def register_palette(self, palette) -> None:
"""Register a list of palette entries.
palette -- list of (name, foreground, background) or
(name, same_as_other_name) palette entries.
calls self.register_palette_entry for each item in l
"""
for item in palette:
if len(item) in {3, 4}:
self.register_palette_entry(*item)
continue
if len(item) != 2:
raise ValueError(f"Invalid register_palette usage: {item!r}")
name, like_name = item
if like_name not in self.palette:
raise KeyError(f"palette entry '{like_name}' doesn't exist")
self.palette[name] = self.palette[like_name]
def register_palette_entry(
self,
name: str | None,
foreground: str,
background: str,
mono: str | None = None,
foreground_high: str | None = None,
background_high: str | None = None,
) -> None:
"""Register a single palette entry.
name -- new entry/attribute name
foreground -- foreground colour
background -- background colour
mono -- monochrome terminal attribute
See curses_display.register_palette_entry for more info.
"""
if foreground == "default":
foreground = "black"
if background == "default":
background = "light gray"
self.palette[name] = (foreground, background, mono)
def set_mouse_tracking(self, enable: bool = True) -> None:
"""Not yet implemented"""
def tty_signal_keys(self, *args, **vargs):
"""Do nothing."""
def start(self) -> StoppingContext:
"""
This function reads the initial screen size, generates a
unique id and handles cleanup when fn exits.
web_display.set_preferences(..) must be called before calling
this function for the preferences to take effect
"""
if self._started:
return StoppingContext(self)
client_init = sys.stdin.read(50)
if not client_init.startswith("window resize "):
raise ValueError(client_init)
_ignore1, _ignore2, x, y = client_init.split(" ", 3)
x = int(x)
y = int(y)
self._set_screen_size(x, y)
self.last_screen = {}
self.last_screen_width = 0
self.update_method = os.environ["HTTP_X_URWID_METHOD"]
if self.update_method not in {"multipart", "polling"}:
raise ValueError(self.update_method)
if self.update_method == "polling" and not _prefs.allow_polling:
sys.stdout.write("Status: 403 Forbidden\r\n\r\n")
sys.exit(0)
clients = glob.glob(os.path.join(_prefs.pipe_dir, "urwid*.in"))
if len(clients) >= _prefs.max_clients:
sys.stdout.write("Status: 503 Sever Busy\r\n\r\n")
sys.exit(0)
urwid_id = f"{random.randrange(10 ** 9):09d}{random.randrange(10 ** 9):09d}" # noqa: S311
self.pipe_name = os.path.join(_prefs.pipe_dir, f"urwid{urwid_id}")
os.mkfifo(f"{self.pipe_name}.in", 0o600)
signal.signal(signal.SIGTERM, self._cleanup_pipe)
self.input_fd = os.open(f"{self.pipe_name}.in", os.O_NONBLOCK | os.O_RDONLY)
self.input_tail = ""
self.content_head = (
"Content-type: "
"multipart/x-mixed-replace;boundary=ZZ\r\n"
"X-Urwid-ID: " + urwid_id + "\r\n"
"\r\n\r\n"
"--ZZ\r\n"
)
if self.update_method == "polling":
self.content_head = f"Content-type: text/plain\r\nX-Urwid-ID: {urwid_id}\r\n\r\n\r\n"
signal.signal(signal.SIGALRM, self._handle_alarm)
signal.alarm(ALARM_DELAY)
self._started = True
return StoppingContext(self)
def stop(self) -> None:
"""
Restore settings and clean up.
"""
if not self._started:
return
# XXX which exceptions does this actually raise? EnvironmentError?
with suppress(Exception):
self._close_connection()
signal.signal(signal.SIGTERM, signal.SIG_DFL)
self._cleanup_pipe()
self._started = False
def set_input_timeouts(self, *args: typing.Any) -> None:
pass
def _close_connection(self) -> None:
if self.update_method == "polling child":
self.server_socket.settimeout(0)
sock, _addr = self.server_socket.accept()
sock.sendall(b"Z")
sock.close()
if self.update_method == "multipart":
sys.stdout.write("\r\nZ\r\n--ZZ--\r\n")
sys.stdout.flush()
def _cleanup_pipe(self, *args) -> None:
if not self.pipe_name:
return
# XXX which exceptions does this actually raise? EnvironmentError?
with suppress(Exception):
os.remove(f"{self.pipe_name}.in")
os.remove(f"{self.pipe_name}.update")
def _set_screen_size(self, cols: int, rows: int) -> None:
"""Set the screen size (within max size)."""
cols = min(cols, MAX_COLS)
rows = min(rows, MAX_ROWS)
self.screen_size = cols, rows
def draw_screen(self, size: tuple[int, int], canvas: Canvas) -> None:
"""Send a screen update to the client."""
(cols, rows) = size
if cols != self.last_screen_width:
self.last_screen = {}
sendq = [self.content_head]
if self.update_method == "polling":
send = sendq.append
elif self.update_method == "polling child":
signal.alarm(0)
try:
s, _addr = self.server_socket.accept()
except socket.timeout:
sys.exit(0)
send = s.sendall
else:
signal.alarm(0)
send = sendq.append
send("\r\n")
self.content_head = ""
if canvas.rows() != rows:
raise ValueError(rows)
if canvas.cursor is not None:
cx, cy = canvas.cursor
else:
cx = cy = None
new_screen = {}
y = -1
for row in canvas.content():
y += 1
l_row = tuple((attr_, line.decode(get_encoding())) for attr_, _, line in row)
line = []
sig = l_row
if y == cy:
sig = (*sig, cx)
new_screen[sig] = [*new_screen.get(sig, []), y]
old_line_numbers = self.last_screen.get(sig, None)
if old_line_numbers is not None:
if y in old_line_numbers:
old_line = y
else:
old_line = old_line_numbers[0]
send(f"<{old_line:d}\n")
continue
col = 0
for a, run in l_row:
t_run = run.translate(_trans_table)
if a is None:
fg, bg, _mono = "black", "light gray", None
else:
fg, bg, _mono = self.palette[a]
if y == cy and col <= cx:
run_width = calc_width(t_run, 0, len(t_run))
if col + run_width > cx:
line.append(code_span(t_run, fg, bg, cx - col))
else:
line.append(code_span(t_run, fg, bg))
col += run_width
else:
line.append(code_span(t_run, fg, bg))
send(f"{''.join(line)}\n")
self.last_screen = new_screen
self.last_screen_width = cols
if self.update_method == "polling":
sys.stdout.write("".join(sendq))
sys.stdout.flush()
sys.stdout.close()
self._fork_child()
elif self.update_method == "polling child":
s.close()
else: # update_method == "multipart"
send("\r\n--ZZ\r\n")
sys.stdout.write("".join(sendq))
sys.stdout.flush()
signal.alarm(ALARM_DELAY)
def clear(self) -> None:
"""
Force the screen to be completely repainted on the next
call to draw_screen().
(does nothing for web_display)
"""
def _fork_child(self) -> None:
"""
Fork a child to run CGI disconnected for polling update method.
Force parent process to exit.
"""
daemonize(f"{self.pipe_name}.err")
self.input_fd = os.open(f"{self.pipe_name}.in", os.O_NONBLOCK | os.O_RDONLY)
self.update_method = "polling child"
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.bind(f"{self.pipe_name}.update")
s.listen(1)
s.settimeout(POLL_CONNECT)
self.server_socket = s
def _handle_alarm(self, sig, frame) -> None:
if self.update_method not in {"multipart", "polling child"}:
raise ValueError(self.update_method)
if self.update_method == "polling child":
# send empty update
try:
s, _addr = self.server_socket.accept()
s.close()
except socket.timeout:
sys.exit(0)
else:
# send empty update
sys.stdout.write("\r\n\r\n--ZZ\r\n")
sys.stdout.flush()
signal.alarm(ALARM_DELAY)
def get_cols_rows(self) -> tuple[int, int]:
"""Return the screen size."""
return self.screen_size
@typing.overload
def get_input(self, raw_keys: Literal[False]) -> list[str]: ...
@typing.overload
def get_input(self, raw_keys: Literal[True]) -> tuple[list[str], list[int]]: ...
def get_input(self, raw_keys: bool = False) -> list[str] | tuple[list[str], list[int]]:
"""Return pending input as a list."""
pending_input = []
resized = False
with selectors.DefaultSelector() as selector:
selector.register(self.input_fd, selectors.EVENT_READ)
iready = [event.fd for event, _ in selector.select(0.5)]
if not iready:
if raw_keys:
return [], []
return []
keydata = os.read(self.input_fd, MAX_READ).decode(get_encoding())
os.close(self.input_fd)
self.input_fd = os.open(f"{self.pipe_name}.in", os.O_NONBLOCK | os.O_RDONLY)
# sys.stderr.write( repr((keydata,self.input_tail))+"\n" )
keys = keydata.split("\n")
keys[0] = self.input_tail + keys[0]
self.input_tail = keys[-1]
for k in keys[:-1]:
if k.startswith("window resize "):
_ign1, _ign2, x, y = k.split(" ", 3)
x = int(x)
y = int(y)
self._set_screen_size(x, y)
resized = True
else:
pending_input.append(k)
if resized:
pending_input.append("window resize")
if raw_keys:
return pending_input, []
return pending_input
def code_span(s, fg, bg, cursor=-1) -> str:
code_fg = _code_colours[fg]
code_bg = _code_colours[bg]
if cursor >= 0:
c_off, _ign = calc_text_pos(s, 0, len(s), cursor)
c2_off = move_next_char(s, c_off, len(s))
return (
code_fg
+ code_bg
+ s[:c_off]
+ "\n"
+ code_bg
+ code_fg
+ s[c_off:c2_off]
+ "\n"
+ code_fg
+ code_bg
+ s[c2_off:]
+ "\n"
)
return f"{code_fg + code_bg + s}\n"
def is_web_request() -> bool:
"""
Return True if this is a CGI web request.
"""
return "REQUEST_METHOD" in os.environ
def handle_short_request() -> bool:
"""
Handle short requests such as passing keystrokes to the application
or sending the initial html page. If returns True, then this
function recognised and handled a short request, and the calling
script should immediately exit.
web_display.set_preferences(..) should be called before calling this
function for the preferences to take effect
"""
if not is_web_request():
return False
if os.environ["REQUEST_METHOD"] == "GET":
# Initial request, send the HTML and javascript.
sys.stdout.write("Content-type: text/html\r\n\r\n" + html.escape(_prefs.app_name).join(_html_page))
return True
if os.environ["REQUEST_METHOD"] != "POST":
# Don't know what to do with head requests etc.
return False
if "HTTP_X_URWID_ID" not in os.environ:
# If no urwid id, then the application should be started.
return False
urwid_id = os.environ["HTTP_X_URWID_ID"]
if len(urwid_id) > 20:
# invalid. handle by ignoring
# assert 0, "urwid id too long!"
sys.stdout.write("Status: 414 URI Too Long\r\n\r\n")
return True
for c in urwid_id:
if c not in string.digits:
# invald. handle by ignoring
# assert 0, "invalid chars in id!"
sys.stdout.write("Status: 403 Forbidden\r\n\r\n")
return True
if os.environ.get("HTTP_X_URWID_METHOD", None) == "polling":
# this is a screen update request
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
s.connect(os.path.join(_prefs.pipe_dir, f"urwid{urwid_id}.update"))
data = f"Content-type: text/plain\r\n\r\n{s.recv(BUF_SZ)}"
while data:
sys.stdout.write(data)
data = s.recv(BUF_SZ)
except OSError:
sys.stdout.write("Status: 404 Not Found\r\n\r\n")
return True
return True
# this is a keyboard input request
try:
fd = os.open((os.path.join(_prefs.pipe_dir, f"urwid{urwid_id}.in")), os.O_WRONLY)
except OSError:
sys.stdout.write("Status: 404 Not Found\r\n\r\n")
return True
# FIXME: use the correct encoding based on the request
keydata = sys.stdin.read(MAX_READ)
os.write(fd, keydata.encode("ascii"))
os.close(fd)
sys.stdout.write("Content-type: text/plain\r\n\r\n")
return True
@dataclasses.dataclass
class _Preferences:
app_name: str = "Unnamed Application"
pipe_dir: str = TEMP_DIR
allow_polling: bool = True
max_clients: int = 20
_prefs = _Preferences()
def set_preferences(
app_name: str,
pipe_dir: str = TEMP_DIR,
allow_polling: bool = True,
max_clients: int = 20,
) -> None:
"""
Set web_display preferences.
app_name -- application name to appear in html interface
pipe_dir -- directory for input pipes, daemon update sockets
and daemon error logs
allow_polling -- allow creation of daemon processes for
browsers without multipart support
max_clients -- maximum concurrent client connections. This
pool is shared by all urwid applications
using the same pipe_dir
"""
_prefs.app_name = app_name
_prefs.pipe_dir = pipe_dir
_prefs.allow_polling = allow_polling
_prefs.max_clients = max_clients
class ErrorLog:
def __init__(self, errfile: str | pathlib.PurePath) -> None:
self.errfile = errfile
def write(self, err: str) -> None:
with open(self.errfile, "a", encoding="utf-8") as f:
f.write(err)
def daemonize(errfile: str) -> None:
"""
Detach process and become a daemon.
"""
pid = os.fork()
if pid:
os._exit(0)
os.setsid()
signal.signal(signal.SIGHUP, signal.SIG_IGN)
os.umask(0)
pid = os.fork()
if pid:
os._exit(0)
os.chdir("/")
for fd in range(0, 20):
with suppress(OSError):
os.close(fd)
sys.stdin = open("/dev/null", encoding="utf-8") # noqa: SIM115 # pylint: disable=consider-using-with
sys.stdout = open("/dev/null", "w", encoding="utf-8") # noqa: SIM115 # pylint: disable=consider-using-with
sys.stderr = ErrorLog(errfile)