Use thread pool thumbnailer

This commit is contained in:
Jan Hamal Dvořák 2025-03-11 11:46:33 +01:00
parent 97af0de6c2
commit fd4e46791c
2 changed files with 26 additions and 44 deletions

View file

@ -29,7 +29,6 @@ def main():
app = Application(thumbnailer=thumbnailer) app = Application(thumbnailer=thumbnailer)
try: try:
thumbnailer.start()
app.run(None) app.run(None)
finally: finally:
thumbnailer.stop() thumbnailer.shutdown()

View file

@ -1,8 +1,8 @@
from __future__ import annotations from __future__ import annotations
import multiprocessing
import sys import sys
import threading from concurrent.futures import ThreadPoolExecutor
from queue import Empty, Full, LifoQueue
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from gi.repository import GLib, Gst from gi.repository import GLib, Gst
@ -22,15 +22,18 @@ DEFAULT_SEEK_FLAGS = (
__all__ = ["Thumbnailer", "generate_thumbnail_sync"] __all__ = ["Thumbnailer", "generate_thumbnail_sync"]
class Thumbnailer(threading.Thread): MAX_WORKERS = max(1, multiprocessing.cpu_count() // 2)
queue: LifoQueue[FileItem | None]
class Thumbnailer(ThreadPoolExecutor):
def __init__(self): def __init__(self):
super().__init__(daemon=True) super().__init__(
self.queue = LifoQueue(maxsize=20) thread_name_prefix="Thumbnailer",
max_workers=MAX_WORKERS,
)
def generate_thumbnail(self, file_item: FileItem): def generate_thumbnail(self, file_item: FileItem):
"""Add a file item to the thumbnail queue""" """Schedule thumbnail generation"""
if not file_item.full_path.is_file(): if not file_item.full_path.is_file():
return return
@ -38,41 +41,19 @@ class Thumbnailer(threading.Thread):
if file_item.attempted_thumbnail: if file_item.attempted_thumbnail:
return return
try: if self._work_queue.qsize() >= MAX_WORKERS:
self.queue.put_nowait(file_item) # print("[thumbnail] Ignoring:", file_item.full_path.name)
except Full: return
pass
self.submit(generate_thumbnail_sync, file_item)
file_item.attempted_thumbnail = True file_item.attempted_thumbnail = True
def stop(self):
"""Stop the thumbnailer thread"""
try:
# Drop all pending items
while True:
self.queue.get_nowait()
except Empty:
pass
self.queue.put_nowait(None)
self.join()
def run(self) -> None:
"""Process items from the queue continuously"""
while True:
file_item = self.queue.get(block=True)
if file_item is None:
break
generate_thumbnail_sync(file_item)
def generate_thumbnail_sync(file_item: FileItem): def generate_thumbnail_sync(file_item: FileItem):
"""Generate thumbnail for a single file""" """Generate thumbnail for a single file"""
# print("[thumbnailer] Generate:", file_item.full_path.name)
pipeline_str = ( pipeline_str = (
"uridecodebin name=uridecodebin ! " "uridecodebin name=uridecodebin ! "
"videoconvert ! " "videoconvert ! "
@ -95,13 +76,14 @@ def generate_thumbnail_sync(file_item: FileItem):
try: try:
# Set pipeline to PAUSED to get duration # Set pipeline to PAUSED to get duration
pipeline.set_state(Gst.State.PAUSED) pipeline.set_state(Gst.State.PAUSED)
pipeline.get_state(Gst.SECOND) pipeline.get_state(Gst.CLOCK_TIME_NONE)
# Seek to 1/3 of duration # Obtain total duration
success, duration = pipeline.query_duration(Gst.Format.TIME) success, duration = pipeline.query_duration(Gst.Format.TIME)
if not success: if not success:
return raise RuntimeError("Failed to query duration")
# Seek to 1/3 of duration
seek_pos = duration // 3 seek_pos = duration // 3
pipeline.seek_simple(Gst.Format.TIME, DEFAULT_SEEK_FLAGS, seek_pos) pipeline.seek_simple(Gst.Format.TIME, DEFAULT_SEEK_FLAGS, seek_pos)
@ -110,16 +92,16 @@ def generate_thumbnail_sync(file_item: FileItem):
sample = sink.emit("pull-sample") sample = sink.emit("pull-sample")
if not sample: if not sample:
return raise RuntimeError("Failed to pull sample")
# Extract image data # Extract image data
buffer = sample.get_buffer() buffer = sample.get_buffer()
if not buffer: if not buffer:
return raise RuntimeError("Failed to get buffer")
success, map_info = buffer.map(Gst.MapFlags.READ) success, map_info = buffer.map(Gst.MapFlags.READ)
if not success: if not success:
return raise RuntimeError("Failed to map buffer")
try: try:
thumbnail = bytes(map_info.data) thumbnail = bytes(map_info.data)
@ -133,7 +115,8 @@ def generate_thumbnail_sync(file_item: FileItem):
GLib.idle_add(set_thumbnail) GLib.idle_add(set_thumbnail)
except Exception as err: except Exception as err:
print("[thumbnailer] Error:", file_item.full_path, err, file=sys.stderr) print("[thumbnailer] Error:", file_item.full_path.name, file=sys.stderr)
print("[thumbnailer]", err, file=sys.stderr)
finally: finally:
pipeline.set_state(Gst.State.NULL) pipeline.set_state(Gst.State.NULL)