Compare commits

..

No commits in common. "5503e48db8edcec7a48239e56bc21207e0cc498f" and "0d2d628eef407008ea67a42f6df552d652c81e00" have entirely different histories.

3 changed files with 21 additions and 130 deletions

View file

@ -5,15 +5,14 @@ from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Any, cast from typing import Any, cast
from gi.repository import Gdk, GLib, Gtk, Pango from gi.repository import Gdk, GLib, GObject, Gtk, Pango
from .file_model import FileItem, FileListModel, FileType from .file_model import FileItem, FileListModel, FileType
from .reactive import Watcher, update_all_computed
from .thumbnailer import Thumbnailer from .thumbnailer import Thumbnailer
from .video_player import VideoPlayer from .video_player import VideoPlayer
class MainWindow(Gtk.ApplicationWindow, Watcher): class MainWindow(Gtk.ApplicationWindow):
stack: Gtk.Stack stack: Gtk.Stack
list_view: Gtk.ListView list_view: Gtk.ListView
list_model: FileListModel list_model: FileListModel
@ -129,7 +128,8 @@ class MainWindow(Gtk.ApplicationWindow, Watcher):
# Setup video player # Setup video player
self.video_player = VideoPlayer(self.video_widget) self.video_player = VideoPlayer(self.video_widget)
self.watch(self._sync_overlay_grid) self.video_player.connect("notify::is-playing", self._on_player_state_changed)
self.video_player.connect("notify::is-paused", self._on_player_state_changed)
# Add both main menu and overlay to stack # Add both main menu and overlay to stack
self.stack.add_named(main_box, "menu") self.stack.add_named(main_box, "menu")
@ -493,17 +493,16 @@ class MainWindow(Gtk.ApplicationWindow, Watcher):
# Set absolute time when overlay should hide # Set absolute time when overlay should hide
self.overlay_hide_time = self.now + timeout_seconds self.overlay_hide_time = self.now + timeout_seconds
def _sync_overlay_grid(self): def _on_player_state_changed(self, player: VideoPlayer, param: GObject.ParamSpec) -> None:
"""Update grid visibility based on player state.""" """Update grid visibility based on player state"""
is_playing = player.get_property("is-playing")
is_paused = player.get_property("is-paused")
is_playing = self.video_player.is_playing.value # Show grid only when paused and playing
is_paused = self.video_player.is_paused.value if self.grid_overlay:
self.grid_overlay.set_visible(is_playing and is_paused) self.grid_overlay.set_visible(is_playing and is_paused)
def _on_tick(self, widget: Gtk.Widget, frame_clock: Gdk.FrameClock, data: Any) -> bool: def _on_tick(self, widget: Gtk.Widget, frame_clock: Gdk.FrameClock, data: Any) -> bool:
# Update all reactive values.
update_all_computed()
current_time = frame_clock.get_frame_time() / 1_000_000 current_time = frame_clock.get_frame_time() / 1_000_000
current_time_str = datetime.now().strftime("%H:%M") current_time_str = datetime.now().strftime("%H:%M")

View file

@ -1,99 +0,0 @@
from __future__ import annotations
from contextvars import ContextVar
from typing import Any, Callable, ClassVar, Generic, Self, TypeVar
from weakref import WeakSet
__all__ = ["Ref", "Computed", "update_all_computed"]
T = TypeVar("T")
class Ref(Generic[T]):
_users: WeakSet[Computed[Any]]
_value: T
def __init__(self, initial: T):
self._value = initial
self._users = WeakSet()
@property
def value(self):
computed = Computed._stack.get()
if computed is not None:
self._users.add(computed)
return self._value
@value.setter
def value(self, new_value: T):
self._value = new_value
self.trigger()
def trigger(self):
Computed._dirty.update(self._users)
def __repr__(self):
return f"Ref({self._value!r})"
def __str__(self):
return repr(self)
class Computed(Ref[T]):
_dirty: ClassVar[WeakSet[Self]] = WeakSet()
_stack: ClassVar[ContextVar[Computed[Any] | None]] = ContextVar("stack", default=None)
_update: Callable[[], T]
def __init__(self, update: Callable[[], T]):
self._update = update
self._users = WeakSet()
self.update()
def update(self):
token = self._stack.set(self)
try:
self.value = self._update()
finally:
self._stack.reset(token)
def __repr__(self):
if hasattr(self, "_value"):
return f"Computed({self._value!r})"
else:
return "Computed(...)"
class Watcher:
_watches: set[Computed[Any]]
def watch(self, handler: Callable[[], Any]):
if not hasattr(self, "_watches"):
self._watches = set()
def run_handler():
handler()
self._watches.add(Computed(run_handler))
def update_all_computed(max_iters: int = 100):
"""
Update all computed values.
If they do not settle in given number of iterations, raise RuntimeError.
"""
for _ in range(max_iters):
if not Computed._dirty:
return
dirty = set(Computed._dirty)
Computed._dirty.clear()
for computed in dirty:
computed.update()
raise RuntimeError("Infinite loop in computed values")

View file

@ -4,8 +4,6 @@ from pathlib import Path
from gi.repository import GObject, Gst, Gtk from gi.repository import GObject, Gst, Gtk
from .reactive import Ref
DEFAULT_SEEK_FLAGS = Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT DEFAULT_SEEK_FLAGS = Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT
@ -15,15 +13,12 @@ class VideoPlayer(GObject.Object):
__gtype_name__ = "VideoPlayer" __gtype_name__ = "VideoPlayer"
is_playing: Ref[bool] is_playing = GObject.Property(type=bool, default=False)
is_paused: Ref[bool] is_paused = GObject.Property(type=bool, default=True)
def __init__(self, picture: Gtk.Picture): def __init__(self, picture: Gtk.Picture):
super().__init__() super().__init__()
self.is_playing = Ref(False)
self.is_paused = Ref(True)
self.pipeline = Gst.Pipeline.new("video-player") self.pipeline = Gst.Pipeline.new("video-player")
playbin = Gst.ElementFactory.make("playbin", "playbin") playbin = Gst.ElementFactory.make("playbin", "playbin")
@ -76,26 +71,24 @@ class VideoPlayer(GObject.Object):
# Start playing # Start playing
self.pipeline.set_state(Gst.State.PLAYING) self.pipeline.set_state(Gst.State.PLAYING)
self.is_playing.value = True self.set_property("is-playing", True)
self.is_paused.value = False self.set_property("is-paused", False)
def stop(self) -> None: def stop(self) -> None:
"""Stop playback and release resources""" """Stop playback and release resources"""
self.pipeline.set_state(Gst.State.NULL) self.pipeline.set_state(Gst.State.NULL)
self.set_property("is-paused", True)
self.is_playing.value = True self.set_property("is-playing", False)
self.is_paused.value = False
def toggle_play_pause(self) -> None: def toggle_play_pause(self) -> None:
"""Toggle between play and pause states""" """Toggle between play and pause states"""
_, state, _ = self.pipeline.get_state(0) _, state, _ = self.pipeline.get_state(0)
if state == Gst.State.PLAYING: if state == Gst.State.PLAYING:
self.pipeline.set_state(Gst.State.PAUSED) self.pipeline.set_state(Gst.State.PAUSED)
self.is_paused.value = True self.set_property("is-paused", True)
else: else:
self.pipeline.set_state(Gst.State.PLAYING) self.pipeline.set_state(Gst.State.PLAYING)
self.is_paused.value = False self.set_property("is-paused", False)
def seek_relative(self, offset: float) -> None: def seek_relative(self, offset: float) -> None:
"""Seek relative to current position by offset seconds""" """Seek relative to current position by offset seconds"""
@ -115,19 +108,17 @@ class VideoPlayer(GObject.Object):
def seek_end(self): def seek_end(self):
"""Seek to the end of the video.""" """Seek to the end of the video."""
duration = self.get_duration()
if duration := self.get_duration(): if duration:
self.pipeline.seek_simple(Gst.Format.TIME, DEFAULT_SEEK_FLAGS, duration) self.pipeline.seek_simple(Gst.Format.TIME, DEFAULT_SEEK_FLAGS, duration)
def get_position(self) -> int | None: def get_position(self) -> int | None:
"""Get current playback position in nanoseconds""" """Get current playback position in nanoseconds"""
success, position = self.pipeline.query_position(Gst.Format.TIME) success, position = self.pipeline.query_position(Gst.Format.TIME)
return position if success else None return position if success else None
def get_duration(self) -> int | None: def get_duration(self) -> int | None:
"""Get total duration in nanoseconds""" """Get total duration in nanoseconds"""
success, duration = self.pipeline.query_duration(Gst.Format.TIME) success, duration = self.pipeline.query_duration(Gst.Format.TIME)
return duration if success else None return duration if success else None