from __future__ import annotations import multiprocessing import os import sys from concurrent.futures import ThreadPoolExecutor from queue import LifoQueue from typing import TYPE_CHECKING, Any, cast from gi.repository import GLib, Gst if TYPE_CHECKING: from .file_model import FileItem DEFAULT_SEEK_FLAGS = ( Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT | Gst.SeekFlags.SNAP_NEAREST | Gst.SeekFlags.TRICKMODE | Gst.SeekFlags.TRICKMODE_KEY_UNITS | Gst.SeekFlags.TRICKMODE_NO_AUDIO ) __all__ = ["Thumbnailer", "generate_thumbnail_sync"] MAX_WORKERS = max(1, multiprocessing.cpu_count() // 2) class Thumbnailer(ThreadPoolExecutor): def __init__(self): super().__init__( thread_name_prefix="Thumbnailer", max_workers=MAX_WORKERS, ) self._work_queue = cast(Any, LifoQueue()) def generate_thumbnail(self, file_item: FileItem): """Schedule thumbnail generation.""" if not file_item.full_path.is_file(): return if file_item.attempted_thumbnail.value: return self.submit(generate_thumbnail_sync_nicely, file_item) file_item.attempted_thumbnail.value = True def generate_thumbnail_sync_nicely(file_item: FileItem): os.nice(10) return generate_thumbnail_sync(file_item) def generate_thumbnail_sync(file_item: FileItem): """Generate thumbnail for a single file""" # print("[thumbnailer] Generate:", file_item.full_path.name) pipeline_str = ( "uridecodebin name=uridecodebin force-sw-decoders=true ! " "videoconvert ! " "jpegenc quality=85 ! " "appsink sync=false name=sink" ) pipeline: Gst.Pipeline | None = None try: pipeline = cast(Gst.Pipeline, Gst.parse_launch(pipeline_str)) sink = pipeline.get_by_name("sink") assert isinstance(sink, Gst.Element) uridecodebin = pipeline.get_by_name("uridecodebin") assert isinstance(uridecodebin, Gst.Element) # Set file URI uridecodebin.set_property("uri", Gst.filename_to_uri(str(file_item.full_path))) # Set pipeline to PAUSED to get duration pipeline.set_state(Gst.State.PAUSED) pipeline.get_state(Gst.CLOCK_TIME_NONE) # Obtain total duration success, duration = pipeline.query_duration(Gst.Format.TIME) if not success: raise RuntimeError("Failed to query duration") def get_sample(seek_pos: int) -> bytes: pipeline.seek_simple(Gst.Format.TIME, DEFAULT_SEEK_FLAGS, seek_pos) # Start playing to capture frame pipeline.set_state(Gst.State.PLAYING) sample = sink.emit("pull-sample") if not sample: raise RuntimeError("Failed to pull sample") # Extract image data buffer = sample.get_buffer() if not buffer: raise RuntimeError("Failed to get buffer") success, map_info = buffer.map(Gst.MapFlags.READ) if not success: raise RuntimeError("Failed to map buffer") try: return bytes(map_info.data) finally: buffer.unmap(map_info) candidates: list[bytes] = [] for i in range(3): candidates.append(get_sample(duration // 3 - Gst.SECOND + i * Gst.SECOND)) thumbnail = max(candidates, key=len) def set_thumbnail(): file_item.thumbnail.value = thumbnail GLib.idle_add(set_thumbnail) except Exception as err: print("[thumbnailer] Error:", file_item.full_path.name, file=sys.stderr) print("[thumbnailer]", err, file=sys.stderr) finally: if pipeline: pipeline.set_state(Gst.State.NULL)