from __future__ import annotations import sys import threading from queue import Empty, Full, LifoQueue from typing import TYPE_CHECKING from gi.repository import GLib, Gst if TYPE_CHECKING: from .file_model import FileItem DEFAULT_SEEK_FLAGS = ( Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT | Gst.SeekFlags.SNAP_NEAREST | Gst.SeekFlags.TRICKMODE | Gst.SeekFlags.TRICKMODE_KEY_UNITS | Gst.SeekFlags.TRICKMODE_NO_AUDIO ) __all__ = ["Thumbnailer", "generate_thumbnail_sync"] class Thumbnailer(threading.Thread): queue: LifoQueue[FileItem | None] def __init__(self): super().__init__(daemon=True) self.queue = LifoQueue(maxsize=20) def generate_thumbnail(self, file_item: FileItem): """Add a file item to the thumbnail queue""" if not file_item.full_path.is_file(): return if file_item.attempted_thumbnail: return try: self.queue.put_nowait(file_item) except Full: pass file_item.attempted_thumbnail = True def stop(self): """Stop the thumbnailer thread""" try: # Drop all pending items while True: self.queue.get_nowait() except Empty: pass self.queue.put_nowait(None) self.join() def run(self) -> None: """Process items from the queue continuously""" while True: file_item = self.queue.get(block=True) if file_item is None: break generate_thumbnail_sync(file_item) def generate_thumbnail_sync(file_item: FileItem): """Generate thumbnail for a single file""" pipeline_str = ( "uridecodebin name=uridecodebin ! " "videoconvert ! " "jpegenc quality=85 ! " "appsink sync=false name=sink" ) pipeline = Gst.parse_launch(pipeline_str) assert isinstance(pipeline, Gst.Pipeline) sink = pipeline.get_by_name("sink") assert isinstance(sink, Gst.Element) uridecodebin = pipeline.get_by_name("uridecodebin") assert isinstance(uridecodebin, Gst.Element) # Set file URI uridecodebin.set_property("uri", Gst.filename_to_uri(str(file_item.full_path))) try: # Set pipeline to PAUSED to get duration pipeline.set_state(Gst.State.PAUSED) pipeline.get_state(Gst.SECOND) # Seek to 1/3 of duration success, duration = pipeline.query_duration(Gst.Format.TIME) if not success: return seek_pos = duration // 3 pipeline.seek_simple(Gst.Format.TIME, DEFAULT_SEEK_FLAGS, seek_pos) # Start playing to capture frame pipeline.set_state(Gst.State.PLAYING) sample = sink.emit("pull-sample") if not sample: return # Extract image data buffer = sample.get_buffer() if not buffer: return success, map_info = buffer.map(Gst.MapFlags.READ) if not success: return try: thumbnail = bytes(map_info.data) finally: buffer.unmap(map_info) def set_thumbnail(): file_item.thumbnail = thumbnail file_item.has_thumbnail = True GLib.idle_add(set_thumbnail) except Exception as err: print("[thumbnailer] Error:", file_item.full_path, err, file=sys.stderr) finally: pipeline.set_state(Gst.State.NULL)