from __future__ import annotations import base64 import sys import gi from .file_model import FileItem gi.require_version("Gst", "1.0") from gi.repository import Gst # NOQA: E402 class Thumbnailer: pipeline: Gst.Pipeline uridecodebin: Gst.Element sink: Gst.Element queue: list[FileItem] current_item: FileItem | None def __init__(self): self.queue = [] self.current_item = None pipeline_str = ( "uridecodebin name=uridecodebin ! " "videoconvert ! " "videoscale ! video/x-raw,width=480,height=270 ! " "videobox name=box ! " "jpegenc ! " "appsink name=sink" ) pipeline = Gst.parse_launch(pipeline_str) if not isinstance(pipeline, Gst.Pipeline): return self.pipeline = pipeline uridecodebin = self.pipeline.get_by_name("uridecodebin") assert uridecodebin is not None self.uridecodebin = uridecodebin # Get elements box = self.pipeline.get_by_name("box") assert box is not None sink = self.pipeline.get_by_name("sink") assert sink is not None self.sink = sink # Set up bus message handler bus = self.pipeline.get_bus() bus.add_signal_watch() bus.connect("message", self._on_message) def generate_thumbnail(self, file_item: FileItem) -> None: """Add a file item to the thumbnail queue""" if not file_item.full_path.is_file(): return self.queue.append(file_item) if self.current_item is None: self._process_next() def _on_message(self, bus: Gst.Bus, message: Gst.Message) -> None: if message.type == Gst.MessageType.ERROR: err, _ = message.parse_error() print(f"Error: {err.message}", file=sys.stderr) self._cleanup() return if message.type == Gst.MessageType.STATE_CHANGED: if self.pipeline and message.src == self.pipeline: _, new_state, _ = message.parse_state_changed() if new_state == Gst.State.PAUSED: self._on_pipeline_ready() return if message.type == Gst.MessageType.ASYNC_DONE: self._on_seek_complete() return if message.type == Gst.MessageType.EOS: self._on_capture_complete() return def _on_pipeline_ready(self) -> None: """Called when pipeline is ready to seek""" success, duration = self.pipeline.query_duration(Gst.Format.TIME) if not success: self._cleanup() return seek_pos = duration // 3 self.pipeline.seek_simple( Gst.Format.TIME, Gst.SeekFlags.FLUSH | Gst.SeekFlags.ACCURATE, seek_pos, ) def _on_seek_complete(self) -> None: """Called when seek operation completes""" # Let the pipeline run to capture the frame self.pipeline.set_state(Gst.State.PLAYING) def _on_capture_complete(self) -> None: """Called when capture is complete""" if not (self.sink and self.current_item): self._cleanup() return sample = self.sink.emit("pull-sample") if sample: buffer = sample.get_buffer() success, map_info = buffer.map(Gst.MapFlags.READ) if success: try: jpeg_bytes = bytes(map_info.data) base64_data = base64.b64encode(jpeg_bytes).decode("utf-8") data_url = f"data:image/jpeg;base64,{base64_data}" self.current_item.saved_thumbnail = data_url finally: buffer.unmap(map_info) self._cleanup() def _cleanup(self) -> None: """Clean up resources and process next item""" self.pipeline.set_state(Gst.State.NULL) self.current_item = None self._process_next() def _process_next(self) -> None: """Start processing the next item in the queue""" if not self.queue: return self.current_item = self.queue.pop(0) # Update URI and start pipeline video_uri = Gst.filename_to_uri(str(self.current_item.full_path.resolve())) self.uridecodebin.set_property("uri", video_uri) self.pipeline.set_state(Gst.State.PLAYING)