Compare commits

...

2 commits

Author SHA1 Message Date
5503e48db8 Experiment with reactivity 2025-03-11 16:11:47 +01:00
d845d3c3a9 Add reactivity toolkit 2025-03-11 16:01:30 +01:00
3 changed files with 130 additions and 21 deletions

View file

@ -5,14 +5,15 @@ from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Any, cast from typing import Any, cast
from gi.repository import Gdk, GLib, GObject, Gtk, Pango from gi.repository import Gdk, GLib, Gtk, Pango
from .file_model import FileItem, FileListModel, FileType from .file_model import FileItem, FileListModel, FileType
from .reactive import Watcher, update_all_computed
from .thumbnailer import Thumbnailer from .thumbnailer import Thumbnailer
from .video_player import VideoPlayer from .video_player import VideoPlayer
class MainWindow(Gtk.ApplicationWindow): class MainWindow(Gtk.ApplicationWindow, Watcher):
stack: Gtk.Stack stack: Gtk.Stack
list_view: Gtk.ListView list_view: Gtk.ListView
list_model: FileListModel list_model: FileListModel
@ -128,8 +129,7 @@ class MainWindow(Gtk.ApplicationWindow):
# Setup video player # Setup video player
self.video_player = VideoPlayer(self.video_widget) self.video_player = VideoPlayer(self.video_widget)
self.video_player.connect("notify::is-playing", self._on_player_state_changed) self.watch(self._sync_overlay_grid)
self.video_player.connect("notify::is-paused", self._on_player_state_changed)
# Add both main menu and overlay to stack # Add both main menu and overlay to stack
self.stack.add_named(main_box, "menu") self.stack.add_named(main_box, "menu")
@ -493,16 +493,17 @@ class MainWindow(Gtk.ApplicationWindow):
# Set absolute time when overlay should hide # Set absolute time when overlay should hide
self.overlay_hide_time = self.now + timeout_seconds self.overlay_hide_time = self.now + timeout_seconds
def _on_player_state_changed(self, player: VideoPlayer, param: GObject.ParamSpec) -> None: def _sync_overlay_grid(self):
"""Update grid visibility based on player state""" """Update grid visibility based on player state."""
is_playing = player.get_property("is-playing")
is_paused = player.get_property("is-paused")
# Show grid only when paused and playing is_playing = self.video_player.is_playing.value
if self.grid_overlay: is_paused = self.video_player.is_paused.value
self.grid_overlay.set_visible(is_playing and is_paused) self.grid_overlay.set_visible(is_playing and is_paused)
def _on_tick(self, widget: Gtk.Widget, frame_clock: Gdk.FrameClock, data: Any) -> bool: def _on_tick(self, widget: Gtk.Widget, frame_clock: Gdk.FrameClock, data: Any) -> bool:
# Update all reactive values.
update_all_computed()
current_time = frame_clock.get_frame_time() / 1_000_000 current_time = frame_clock.get_frame_time() / 1_000_000
current_time_str = datetime.now().strftime("%H:%M") current_time_str = datetime.now().strftime("%H:%M")

99
lazy_player/reactive.py Normal file
View file

@ -0,0 +1,99 @@
from __future__ import annotations
from contextvars import ContextVar
from typing import Any, Callable, ClassVar, Generic, Self, TypeVar
from weakref import WeakSet
__all__ = ["Ref", "Computed", "update_all_computed"]
T = TypeVar("T")
class Ref(Generic[T]):
_users: WeakSet[Computed[Any]]
_value: T
def __init__(self, initial: T):
self._value = initial
self._users = WeakSet()
@property
def value(self):
computed = Computed._stack.get()
if computed is not None:
self._users.add(computed)
return self._value
@value.setter
def value(self, new_value: T):
self._value = new_value
self.trigger()
def trigger(self):
Computed._dirty.update(self._users)
def __repr__(self):
return f"Ref({self._value!r})"
def __str__(self):
return repr(self)
class Computed(Ref[T]):
_dirty: ClassVar[WeakSet[Self]] = WeakSet()
_stack: ClassVar[ContextVar[Computed[Any] | None]] = ContextVar("stack", default=None)
_update: Callable[[], T]
def __init__(self, update: Callable[[], T]):
self._update = update
self._users = WeakSet()
self.update()
def update(self):
token = self._stack.set(self)
try:
self.value = self._update()
finally:
self._stack.reset(token)
def __repr__(self):
if hasattr(self, "_value"):
return f"Computed({self._value!r})"
else:
return "Computed(...)"
class Watcher:
_watches: set[Computed[Any]]
def watch(self, handler: Callable[[], Any]):
if not hasattr(self, "_watches"):
self._watches = set()
def run_handler():
handler()
self._watches.add(Computed(run_handler))
def update_all_computed(max_iters: int = 100):
"""
Update all computed values.
If they do not settle in given number of iterations, raise RuntimeError.
"""
for _ in range(max_iters):
if not Computed._dirty:
return
dirty = set(Computed._dirty)
Computed._dirty.clear()
for computed in dirty:
computed.update()
raise RuntimeError("Infinite loop in computed values")

View file

@ -4,6 +4,8 @@ from pathlib import Path
from gi.repository import GObject, Gst, Gtk from gi.repository import GObject, Gst, Gtk
from .reactive import Ref
DEFAULT_SEEK_FLAGS = Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT DEFAULT_SEEK_FLAGS = Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT
@ -13,12 +15,15 @@ class VideoPlayer(GObject.Object):
__gtype_name__ = "VideoPlayer" __gtype_name__ = "VideoPlayer"
is_playing = GObject.Property(type=bool, default=False) is_playing: Ref[bool]
is_paused = GObject.Property(type=bool, default=True) is_paused: Ref[bool]
def __init__(self, picture: Gtk.Picture): def __init__(self, picture: Gtk.Picture):
super().__init__() super().__init__()
self.is_playing = Ref(False)
self.is_paused = Ref(True)
self.pipeline = Gst.Pipeline.new("video-player") self.pipeline = Gst.Pipeline.new("video-player")
playbin = Gst.ElementFactory.make("playbin", "playbin") playbin = Gst.ElementFactory.make("playbin", "playbin")
@ -71,24 +76,26 @@ class VideoPlayer(GObject.Object):
# Start playing # Start playing
self.pipeline.set_state(Gst.State.PLAYING) self.pipeline.set_state(Gst.State.PLAYING)
self.set_property("is-playing", True) self.is_playing.value = True
self.set_property("is-paused", False) self.is_paused.value = False
def stop(self) -> None: def stop(self) -> None:
"""Stop playback and release resources""" """Stop playback and release resources"""
self.pipeline.set_state(Gst.State.NULL) self.pipeline.set_state(Gst.State.NULL)
self.set_property("is-paused", True)
self.set_property("is-playing", False) self.is_playing.value = True
self.is_paused.value = False
def toggle_play_pause(self) -> None: def toggle_play_pause(self) -> None:
"""Toggle between play and pause states""" """Toggle between play and pause states"""
_, state, _ = self.pipeline.get_state(0) _, state, _ = self.pipeline.get_state(0)
if state == Gst.State.PLAYING: if state == Gst.State.PLAYING:
self.pipeline.set_state(Gst.State.PAUSED) self.pipeline.set_state(Gst.State.PAUSED)
self.set_property("is-paused", True) self.is_paused.value = True
else: else:
self.pipeline.set_state(Gst.State.PLAYING) self.pipeline.set_state(Gst.State.PLAYING)
self.set_property("is-paused", False) self.is_paused.value = False
def seek_relative(self, offset: float) -> None: def seek_relative(self, offset: float) -> None:
"""Seek relative to current position by offset seconds""" """Seek relative to current position by offset seconds"""
@ -108,17 +115,19 @@ class VideoPlayer(GObject.Object):
def seek_end(self): def seek_end(self):
"""Seek to the end of the video.""" """Seek to the end of the video."""
duration = self.get_duration()
if duration: if duration := self.get_duration():
self.pipeline.seek_simple(Gst.Format.TIME, DEFAULT_SEEK_FLAGS, duration) self.pipeline.seek_simple(Gst.Format.TIME, DEFAULT_SEEK_FLAGS, duration)
def get_position(self) -> int | None: def get_position(self) -> int | None:
"""Get current playback position in nanoseconds""" """Get current playback position in nanoseconds"""
success, position = self.pipeline.query_position(Gst.Format.TIME) success, position = self.pipeline.query_position(Gst.Format.TIME)
return position if success else None return position if success else None
def get_duration(self) -> int | None: def get_duration(self) -> int | None:
"""Get total duration in nanoseconds""" """Get total duration in nanoseconds"""
success, duration = self.pipeline.query_duration(Gst.Format.TIME) success, duration = self.pipeline.query_duration(Gst.Format.TIME)
return duration if success else None return duration if success else None