Compare commits

...

4 commits

4 changed files with 260 additions and 18 deletions

View file

@ -8,13 +8,14 @@ from typing import Any, cast
import gi import gi
from .file_model import FileItem, FileListModel, FileType from .file_model import FileItem, FileListModel, FileType
from .thumbnailer import Thumbnailer
from .video_player import VideoPlayer from .video_player import VideoPlayer
gi.require_version("Gdk", "4.0") gi.require_version("Gdk", "4.0")
gi.require_version("Gtk", "4.0") gi.require_version("Gtk", "4.0")
gi.require_version("Gst", "1.0") gi.require_version("Gst", "1.0")
gi.require_version("Pango", "1.0") gi.require_version("Pango", "1.0")
from gi.repository import Gdk, Gst, Gtk, Pango # NOQA: E402 from gi.repository import Gdk, GLib, Gst, Gtk, Pango # NOQA: E402
class MainWindow(Gtk.ApplicationWindow): class MainWindow(Gtk.ApplicationWindow):
@ -29,8 +30,9 @@ class MainWindow(Gtk.ApplicationWindow):
overlay_hide_time: float overlay_hide_time: float
last_position_save: float last_position_save: float
def __init__(self, *args: Any, **kwargs: Any): def __init__(self, *args: Any, thumbnailer: Thumbnailer, **kwargs: Any):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.thumbnailer = thumbnailer
# Directory history stack # Directory history stack
self.directory_history: list[Path] = [] self.directory_history: list[Path] = []
@ -99,6 +101,15 @@ class MainWindow(Gtk.ApplicationWindow):
left_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL) left_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
left_box.set_valign(Gtk.Align.CENTER) left_box.set_valign(Gtk.Align.CENTER)
left_box.set_halign(Gtk.Align.FILL) left_box.set_halign(Gtk.Align.FILL)
# Create image widget for thumbnail with fixed 16:9 aspect
self.thumbnail_image = Gtk.Picture()
self.thumbnail_image.set_size_request(384, 216) # 16:9 aspect ratio
self.thumbnail_image.set_can_shrink(True)
self.thumbnail_image.set_keep_aspect_ratio(True)
self.thumbnail_image.set_resource(None)
left_box.append(self.thumbnail_image)
self.file_info_label = Gtk.Label(label="") self.file_info_label = Gtk.Label(label="")
self.file_info_label.set_wrap(True) self.file_info_label.set_wrap(True)
left_box.append(self.file_info_label) left_box.append(self.file_info_label)
@ -266,10 +277,17 @@ class MainWindow(Gtk.ApplicationWindow):
if selection_model.get_selected() == Gtk.INVALID_LIST_POSITION: if selection_model.get_selected() == Gtk.INVALID_LIST_POSITION:
self.file_info_label.set_text("") self.file_info_label.set_text("")
else: else:
selected_item = selection_model.get_selected_item() file_item = self.selection
if selected_item: self.file_info_label.set_text(file_item.name)
file_item = cast(FileItem, selected_item)
self.file_info_label.set_text(file_item.name) # Update thumbnail if available
if file_item.thumbnail:
gbytes = GLib.Bytes.new(cast(Any, file_item.thumbnail))
texture = Gdk.Texture.new_from_bytes(gbytes)
self.thumbnail_image.set_paintable(texture)
else:
self.thumbnailer.generate_thumbnail(file_item)
self.thumbnail_image.set_paintable(None)
def _toggle_play_pause(self) -> None: def _toggle_play_pause(self) -> None:
"""Toggle between play and pause states""" """Toggle between play and pause states"""
@ -424,6 +442,13 @@ class MainWindow(Gtk.ApplicationWindow):
self._save_position() self._save_position()
self.last_position_save = frame_time self.last_position_save = frame_time
# Update thumbnail if available
file_item = self.selection
if file_item.thumbnail and not self.thumbnail_image.get_paintable():
gbytes = GLib.Bytes.new(cast(Any, file_item.thumbnail))
texture = Gdk.Texture.new_from_bytes(gbytes)
self.thumbnail_image.set_paintable(texture)
return True return True
def _populate_file_list(self) -> None: def _populate_file_list(self) -> None:
@ -441,7 +466,8 @@ class MainWindow(Gtk.ApplicationWindow):
if path.is_dir(): if path.is_dir():
items.append(FileItem(path.name, FileType.DIRECTORY, path.resolve())) items.append(FileItem(path.name, FileType.DIRECTORY, path.resolve()))
elif path.suffix in (".mkv", ".mp4", ".avi"): elif path.suffix in (".mkv", ".mp4", ".avi"):
items.append(FileItem(path.name, FileType.VIDEO, path.resolve())) file_item = FileItem(path.name, FileType.VIDEO, path.resolve())
items.append(file_item)
# Sort directories first, then files, both alphabetically # Sort directories first, then files, both alphabetically
items.sort(key=lambda x: (x.file_type != FileType.DIRECTORY, x.name.lower())) items.sort(key=lambda x: (x.file_type != FileType.DIRECTORY, x.name.lower()))
@ -459,6 +485,9 @@ class App(Gtk.Application):
# Initialize GStreamer # Initialize GStreamer
Gst.init(None) Gst.init(None)
# Create thumbnailer
self.thumbnailer = Thumbnailer()
# Load CSS # Load CSS
css_provider = Gtk.CssProvider() css_provider = Gtk.CssProvider()
css_file = Path(__file__).parent / "style.css" css_file = Path(__file__).parent / "style.css"
@ -475,7 +504,7 @@ class App(Gtk.Application):
) )
def do_activate(self): def do_activate(self):
win = MainWindow(application=self) win = MainWindow(application=self, thumbnailer=self.thumbnailer)
win.present() win.present()

View file

@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
import os import os
import sys
from enum import Enum, auto from enum import Enum, auto
from pathlib import Path from pathlib import Path
from typing import Optional, overload from typing import Optional, overload
@ -19,6 +20,8 @@ class FileType(Enum):
class FileItem(GObject.Object): class FileItem(GObject.Object):
file_type: FileType file_type: FileType
full_path: Path full_path: Path
thumbnail: bytes
_has_thumbnail: bool
__gtype_name__ = "FileItem" __gtype_name__ = "FileItem"
@ -27,6 +30,8 @@ class FileItem(GObject.Object):
self.name = name self.name = name
self.file_type = file_type self.file_type = file_type
self.full_path = full_path self.full_path = full_path
self.thumbnail = b""
self._has_thumbnail = False
@GObject.Property(type=GObject.TYPE_UINT64) @GObject.Property(type=GObject.TYPE_UINT64)
def saved_position(self) -> int: def saved_position(self) -> int:
@ -55,27 +60,44 @@ class FileItem(GObject.Object):
self._save_attribute("subtitle_track", value if value >= -1 else None) self._save_attribute("subtitle_track", value if value >= -1 else None)
self.notify("saved-subtitle-track") self.notify("saved-subtitle-track")
@GObject.Property(type=bool, default=False)
def has_thumbnail(self):
return self._has_thumbnail
@has_thumbnail.setter
def set_has_thumbnail(self, value: bool):
self._has_thumbnail = value
self.notify("has-thumbnail")
@overload @overload
def _load_attribute(self, name: str, dfl: str) -> str: ... def _load_attribute(self, name: str, dfl: str) -> str: ...
@overload
def _load_attribute(self, name: str, dfl: bytes) -> bytes: ...
@overload @overload
def _load_attribute(self, name: str, dfl: int) -> int: ... def _load_attribute(self, name: str, dfl: int) -> int: ...
def _load_attribute(self, name: str, dfl: str | int) -> str | int: def _load_attribute(self, name: str, dfl: str | bytes | int) -> str | bytes | int:
try: try:
strval = os.getxattr(self.full_path, f"user.lazy_player.{name}") strval = os.getxattr(self.full_path, f"user.lazy_player.{name}")
return type(dfl)(strval) return type(dfl)(strval)
except OSError: except OSError:
return dfl return dfl
def _save_attribute(self, name: str, value: str | float | int | None) -> None: def _save_attribute(self, name: str, value: str | bytes | float | int | None) -> None:
try: try:
if value is None: if value is None:
os.removexattr(self.full_path, f"user.lazy_player.{name}") os.removexattr(self.full_path, f"user.lazy_player.{name}")
else: else:
os.setxattr(self.full_path, f"user.lazy_player.{name}", str(value).encode("utf8")) if isinstance(value, bytes):
except OSError: os.setxattr(self.full_path, f"user.lazy_player.{name}", value)
pass else:
os.setxattr(
self.full_path, f"user.lazy_player.{name}", str(value).encode("utf8")
)
except OSError as err:
print(err, file=sys.stderr)
class FileListModel(GObject.Object, Gio.ListModel): class FileListModel(GObject.Object, Gio.ListModel):

195
lazy_player/thumbnailer.py Normal file
View file

@ -0,0 +1,195 @@
from __future__ import annotations
import sys
from enum import Enum, auto
import gi
from .file_model import FileItem
gi.require_version("Gst", "1.0")
from gi.repository import Gst # NOQA: E402
class State(Enum):
IDLE = auto()
INITIALIZING = auto()
SEEKING = auto()
CAPTURING = auto()
CLEANING_UP = auto()
class Thumbnailer:
pipeline: Gst.Pipeline
uridecodebin: Gst.Element
sink: Gst.Element
queue: list[FileItem]
current_item: FileItem | None
state: State
def __init__(self):
self.queue = []
self.current_item = None
self.state = State.IDLE
pipeline_str = (
"uridecodebin name=uridecodebin ! "
"videoconvert ! "
"videoscale ! "
"videobox name=box,autocrop=true ! "
"video/x-raw,width=384,height=216 ! "
"jpegenc quality=85 ! "
"appsink name=sink"
)
pipeline = Gst.parse_launch(pipeline_str)
if not isinstance(pipeline, Gst.Pipeline):
return
self.pipeline = pipeline
uridecodebin = self.pipeline.get_by_name("uridecodebin")
assert uridecodebin is not None
self.uridecodebin = uridecodebin
sink = self.pipeline.get_by_name("sink")
assert sink is not None
self.sink = sink
# Configure appsink for better reliability
self.sink.set_property("emit-signals", True)
self.sink.set_property("max-buffers", 1)
self.sink.set_property("drop", True)
self.sink.connect("new-sample", self._on_new_sample)
# Set up bus message handler
bus = self.pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", self._on_message)
def generate_thumbnail(self, file_item: FileItem) -> None:
"""Add a file item to the thumbnail queue"""
if not file_item.full_path.is_file():
return
self.queue.append(file_item)
if self.current_item is None:
self._process_next()
def _on_message(self, bus: Gst.Bus, message: Gst.Message) -> None:
if message.type == Gst.MessageType.ERROR:
err, _ = message.parse_error()
print(f"Error: {err.message}", file=sys.stderr)
self._cleanup()
return
if message.type == Gst.MessageType.STATE_CHANGED:
if message.src != self.pipeline:
return
_, new_state, _ = message.parse_state_changed()
if new_state == Gst.State.PAUSED and self.state == State.INITIALIZING:
self.state = State.SEEKING
self._on_pipeline_ready()
elif new_state == Gst.State.PLAYING:
self.state = State.CAPTURING
elif new_state == Gst.State.NULL:
if self.state == State.CAPTURING:
self._on_capture_complete()
self.state = State.IDLE
return
if message.type == Gst.MessageType.ASYNC_DONE:
if self.state == State.SEEKING:
# Let the pipeline run to capture the frame
self.pipeline.set_state(Gst.State.PLAYING)
return
if message.type == Gst.MessageType.EOS:
self._on_capture_complete()
return
def _on_pipeline_ready(self) -> None:
"""Called when pipeline is ready to seek"""
success, duration = self.pipeline.query_duration(Gst.Format.TIME)
if not success:
self._cleanup()
return
seek_pos = duration // 3
success = self.pipeline.seek_simple(
Gst.Format.TIME,
Gst.SeekFlags.FLUSH | Gst.SeekFlags.ACCURATE,
seek_pos,
)
if not success:
self._cleanup()
return
def _on_new_sample(self, sink: Gst.Element) -> Gst.FlowReturn:
"""Handle new sample from appsink"""
if self.state != State.CAPTURING or not self.current_item:
return Gst.FlowReturn.OK
sample = sink.emit("pull-sample")
if not sample:
self._cleanup()
return Gst.FlowReturn.ERROR
buffer = sample.get_buffer()
if not buffer:
self._cleanup()
return Gst.FlowReturn.ERROR
success, map_info = buffer.map(Gst.MapFlags.READ)
if not success:
self._cleanup()
return Gst.FlowReturn.ERROR
try:
self.current_item.thumbnail = bytes(map_info.data)
self.current_item.has_thumbnail = True
finally:
buffer.unmap(map_info)
# We got our sample, clean up
self._cleanup()
return Gst.FlowReturn.OK
def _on_capture_complete(self) -> None:
"""Called when capture is complete"""
self._cleanup()
def _cleanup(self) -> None:
"""Clean up resources and process next item"""
# Ensure pipeline is stopped
self.pipeline.set_state(Gst.State.NULL)
# Reset state
self.state = State.IDLE
self.current_item = None
# Process next item
self._process_next()
def _process_next(self) -> None:
"""Start processing the next item in the queue"""
if not self.queue:
self.state = State.IDLE
return
self.state = State.INITIALIZING
self.current_item = self.queue.pop(0)
# Update URI and start pipeline
video_uri = Gst.filename_to_uri(str(self.current_item.full_path))
self.uridecodebin.set_property("uri", video_uri)
self.pipeline.set_state(Gst.State.PLAYING)

View file

@ -1,6 +1,5 @@
from __future__ import annotations from __future__ import annotations
import os
from pathlib import Path from pathlib import Path
import gi import gi
@ -45,10 +44,7 @@ class VideoPlayer(GObject.Object):
def play(self, file_path: Path | str, position: int = 0, subtitle_track: int = -2) -> None: def play(self, file_path: Path | str, position: int = 0, subtitle_track: int = -2) -> None:
"""Start playing a video file""" """Start playing a video file"""
if isinstance(file_path, Path): self.playbin.set_property("uri", Gst.filename_to_uri(str(file_path)))
file_path = os.path.abspath(file_path)
self.playbin.set_property("uri", f"file://{file_path}")
if subtitle_track >= 0: if subtitle_track >= 0:
flags = self.playbin.get_property("flags") flags = self.playbin.get_property("flags")