diff --git a/lazy_player/file_model.py b/lazy_player/file_model.py index 3bba64a..dcff896 100644 --- a/lazy_player/file_model.py +++ b/lazy_player/file_model.py @@ -55,6 +55,15 @@ class FileItem(GObject.Object): self._save_attribute("subtitle_track", value if value >= -1 else None) self.notify("saved-subtitle-track") + @GObject.Property(type=str) + def saved_thumbnail(self) -> str: + return self._load_attribute("thumbnail", "") + + @saved_thumbnail.setter + def set_saved_thumbnail(self, value: str) -> None: + self._save_attribute("thumbnail", value) + self.notify("saved-thumbnail") + @overload def _load_attribute(self, name: str, dfl: str) -> str: ... diff --git a/lazy_player/thumbnailer.py b/lazy_player/thumbnailer.py new file mode 100644 index 0000000..6d70614 --- /dev/null +++ b/lazy_player/thumbnailer.py @@ -0,0 +1,150 @@ +from __future__ import annotations + +import base64 +import sys + +import gi + +from .file_model import FileItem + +gi.require_version("Gst", "1.0") +from gi.repository import Gst # NOQA: E402 + + +class Thumbnailer: + pipeline: Gst.Pipeline + uridecodebin: Gst.Element + sink: Gst.Element + queue: list[FileItem] + current_item: FileItem | None + + def __init__(self): + self.queue = [] + self.current_item = None + + pipeline_str = ( + "uridecodebin name=uridecodebin ! " + "videoconvert ! " + "videoscale ! video/x-raw,width=480,height=270 ! " + "videobox name=box ! " + "jpegenc ! " + "appsink name=sink" + ) + + pipeline = Gst.parse_launch(pipeline_str) + if not isinstance(pipeline, Gst.Pipeline): + return + + self.pipeline = pipeline + uridecodebin = self.pipeline.get_by_name("uridecodebin") + assert uridecodebin is not None + self.uridecodebin = uridecodebin + + # Get elements + box = self.pipeline.get_by_name("box") + assert box is not None + + sink = self.pipeline.get_by_name("sink") + assert sink is not None + self.sink = sink + + # Set up bus message handler + bus = self.pipeline.get_bus() + bus.add_signal_watch() + bus.connect("message", self._on_message) + + def generate_thumbnail(self, file_item: FileItem) -> None: + """Add a file item to the thumbnail queue""" + + if not file_item.full_path.is_file(): + return + + self.queue.append(file_item) + + if self.current_item is None: + self._process_next() + + def _on_message(self, bus: Gst.Bus, message: Gst.Message) -> None: + if message.type == Gst.MessageType.ERROR: + err, _ = message.parse_error() + print(f"Error: {err.message}", file=sys.stderr) + self._cleanup() + return + + if message.type == Gst.MessageType.STATE_CHANGED: + if self.pipeline and message.src == self.pipeline: + _, new_state, _ = message.parse_state_changed() + if new_state == Gst.State.PAUSED: + self._on_pipeline_ready() + return + + if message.type == Gst.MessageType.ASYNC_DONE: + self._on_seek_complete() + return + + if message.type == Gst.MessageType.EOS: + self._on_capture_complete() + return + + def _on_pipeline_ready(self) -> None: + """Called when pipeline is ready to seek""" + + success, duration = self.pipeline.query_duration(Gst.Format.TIME) + if not success: + self._cleanup() + return + + seek_pos = duration // 3 + self.pipeline.seek_simple( + Gst.Format.TIME, + Gst.SeekFlags.FLUSH | Gst.SeekFlags.ACCURATE, + seek_pos, + ) + + def _on_seek_complete(self) -> None: + """Called when seek operation completes""" + + # Let the pipeline run to capture the frame + self.pipeline.set_state(Gst.State.PLAYING) + + def _on_capture_complete(self) -> None: + """Called when capture is complete""" + + if not (self.sink and self.current_item): + self._cleanup() + return + + sample = self.sink.emit("pull-sample") + if sample: + buffer = sample.get_buffer() + success, map_info = buffer.map(Gst.MapFlags.READ) + if success: + try: + jpeg_bytes = bytes(map_info.data) + base64_data = base64.b64encode(jpeg_bytes).decode("utf-8") + data_url = f"data:image/jpeg;base64,{base64_data}" + self.current_item.saved_thumbnail = data_url + finally: + buffer.unmap(map_info) + + self._cleanup() + + def _cleanup(self) -> None: + """Clean up resources and process next item""" + + self.pipeline.set_state(Gst.State.NULL) + self.current_item = None + self._process_next() + + def _process_next(self) -> None: + """Start processing the next item in the queue""" + + if not self.queue: + return + + self.current_item = self.queue.pop(0) + + # Update URI and start pipeline + video_uri = Gst.filename_to_uri(str(self.current_item.full_path.resolve())) + self.uridecodebin.set_property("uri", video_uri) + self.pipeline.set_state(Gst.State.PLAYING)