from __future__ import annotations import sys from enum import Enum, auto import gi from .file_model import FileItem gi.require_version("Gst", "1.0") from gi.repository import Gst # NOQA: E402 class State(Enum): IDLE = auto() INITIALIZING = auto() SEEKING = auto() CAPTURING = auto() CLEANING_UP = auto() class Thumbnailer: pipeline: Gst.Pipeline uridecodebin: Gst.Element sink: Gst.Element queue: list[FileItem] current_item: FileItem | None state: State def __init__(self): self.queue = [] self.current_item = None self.state = State.IDLE pipeline_str = ( "uridecodebin name=uridecodebin ! " "videoconvert ! " "videoscale ! " "videobox name=box,autocrop=true ! " "video/x-raw,width=384,height=216 ! " "jpegenc quality=85 ! " "appsink name=sink" ) pipeline = Gst.parse_launch(pipeline_str) if not isinstance(pipeline, Gst.Pipeline): return self.pipeline = pipeline uridecodebin = self.pipeline.get_by_name("uridecodebin") assert uridecodebin is not None self.uridecodebin = uridecodebin sink = self.pipeline.get_by_name("sink") assert sink is not None self.sink = sink # Configure appsink for better reliability self.sink.set_property("emit-signals", True) self.sink.set_property("max-buffers", 1) self.sink.set_property("drop", True) self.sink.connect("new-sample", self._on_new_sample) # Set up bus message handler bus = self.pipeline.get_bus() bus.add_signal_watch() bus.connect("message", self._on_message) def generate_thumbnail(self, file_item: FileItem) -> None: """Add a file item to the thumbnail queue""" if not file_item.full_path.is_file(): return self.queue.append(file_item) if self.current_item is None: self._process_next() def _on_message(self, bus: Gst.Bus, message: Gst.Message) -> None: if message.type == Gst.MessageType.ERROR: err, _ = message.parse_error() print(f"Error: {err.message}", file=sys.stderr) self._cleanup() return if message.type == Gst.MessageType.STATE_CHANGED: if message.src != self.pipeline: return _, new_state, _ = message.parse_state_changed() if new_state == Gst.State.PAUSED and self.state == State.INITIALIZING: self.state = State.SEEKING self._on_pipeline_ready() elif new_state == Gst.State.PLAYING: self.state = State.CAPTURING elif new_state == Gst.State.NULL: if self.state == State.CAPTURING: self._on_capture_complete() self.state = State.IDLE return if message.type == Gst.MessageType.ASYNC_DONE: if self.state == State.SEEKING: # Let the pipeline run to capture the frame self.pipeline.set_state(Gst.State.PLAYING) return if message.type == Gst.MessageType.EOS: self._on_capture_complete() return def _on_pipeline_ready(self) -> None: """Called when pipeline is ready to seek""" success, duration = self.pipeline.query_duration(Gst.Format.TIME) if not success: self._cleanup() return seek_pos = duration // 3 success = self.pipeline.seek_simple( Gst.Format.TIME, Gst.SeekFlags.FLUSH | Gst.SeekFlags.ACCURATE, seek_pos, ) if not success: self._cleanup() return def _on_new_sample(self, sink: Gst.Element) -> Gst.FlowReturn: """Handle new sample from appsink""" if self.state != State.CAPTURING or not self.current_item: return Gst.FlowReturn.OK sample = sink.emit("pull-sample") if not sample: self._cleanup() return Gst.FlowReturn.ERROR buffer = sample.get_buffer() if not buffer: self._cleanup() return Gst.FlowReturn.ERROR success, map_info = buffer.map(Gst.MapFlags.READ) if not success: self._cleanup() return Gst.FlowReturn.ERROR try: self.current_item.thumbnail = bytes(map_info.data) self.current_item.has_thumbnail = True finally: buffer.unmap(map_info) # We got our sample, clean up self._cleanup() return Gst.FlowReturn.OK def _on_capture_complete(self) -> None: """Called when capture is complete""" self._cleanup() def _cleanup(self) -> None: """Clean up resources and process next item""" # Ensure pipeline is stopped self.pipeline.set_state(Gst.State.NULL) # Reset state self.state = State.IDLE self.current_item = None # Process next item self._process_next() def _process_next(self) -> None: """Start processing the next item in the queue""" if not self.queue: self.state = State.IDLE return self.state = State.INITIALIZING self.current_item = self.queue.pop(0) # Update URI and start pipeline video_uri = Gst.filename_to_uri(str(self.current_item.full_path)) self.uridecodebin.set_property("uri", video_uri) self.pipeline.set_state(Gst.State.PLAYING)