lazy-player/lazy_player/video_player.py

216 lines
6.7 KiB
Python
Raw Normal View History

2025-03-09 17:07:52 +01:00
from __future__ import annotations
2025-03-11 20:13:15 +01:00
from time import time
2025-03-09 17:07:52 +01:00
2025-03-11 09:58:17 +01:00
from gi.repository import GObject, Gst, Gtk
2025-03-09 17:07:52 +01:00
2025-03-12 15:28:25 +01:00
from .file_model import FileItem
2025-03-11 16:11:47 +01:00
from .reactive import Ref
2025-03-11 20:13:15 +01:00
SEEK_FORWARD = Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT | Gst.SeekFlags.SNAP_AFTER
SEEK_BACKWARD = Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT | Gst.SeekFlags.SNAP_BEFORE
SEEK_ACCURATE = Gst.SeekFlags.FLUSH | Gst.SeekFlags.ACCURATE
2025-03-11 10:05:41 +01:00
2025-03-09 17:07:52 +01:00
class VideoPlayer(GObject.Object):
2025-03-11 19:02:15 +01:00
picture: Gtk.Picture
2025-03-09 17:07:52 +01:00
pipeline: Gst.Pipeline
playbin: Gst.Element
__gtype_name__ = "VideoPlayer"
2025-03-12 15:28:25 +01:00
file_item: Ref[FileItem | None]
2025-03-11 16:11:47 +01:00
is_playing: Ref[bool]
is_paused: Ref[bool]
2025-03-09 17:07:52 +01:00
2025-03-11 20:13:15 +01:00
last_user_input: Ref[float]
2025-03-09 17:07:52 +01:00
def __init__(self, picture: Gtk.Picture):
super().__init__()
2025-03-09 17:07:52 +01:00
2025-03-11 19:02:15 +01:00
self.picture = picture
2025-03-12 15:28:25 +01:00
self.file_item = Ref(None)
2025-03-11 16:11:47 +01:00
self.is_playing = Ref(False)
self.is_paused = Ref(True)
2025-03-11 20:13:15 +01:00
self.last_user_input = Ref(time())
2025-03-11 16:11:47 +01:00
2025-03-09 17:07:52 +01:00
self.pipeline = Gst.Pipeline.new("video-player")
playbin = Gst.ElementFactory.make("playbin", "playbin")
if not playbin:
raise RuntimeError("Failed to create playbin element")
self.playbin = playbin
video_sink = Gst.ElementFactory.make("gtk4paintablesink", "gtk4paintablesink")
if not video_sink:
raise RuntimeError("Failed to create gtk4paintablesink element")
self.playbin.set_property("video-sink", video_sink)
self.pipeline.add(self.playbin)
# Link picture to sink
paintable = video_sink.get_property("paintable")
2025-03-11 19:02:15 +01:00
self.picture.set_paintable(paintable)
2025-03-09 17:07:52 +01:00
2025-03-09 23:16:00 +01:00
def play(
self,
2025-03-12 15:28:25 +01:00
file_item: FileItem,
2025-03-09 23:16:00 +01:00
position: int = 0,
subtitle_track: int = -2,
audio_track: int = 0,
) -> None:
2025-03-09 17:07:52 +01:00
"""Start playing a video file"""
2025-03-13 21:07:41 +01:00
self.file_item.value = file_item
2025-03-12 15:28:25 +01:00
uri = Gst.filename_to_uri(str(file_item.full_path))
assert uri is not None
self.playbin.set_property("uri", uri)
2025-03-09 17:07:52 +01:00
if subtitle_track >= 0:
flags = self.playbin.get_property("flags")
flags |= 0x00000004 # TEXT flag
self.playbin.set_property("flags", flags)
self.playbin.set_property("current-text", subtitle_track)
elif subtitle_track == -1:
flags = self.playbin.get_property("flags")
flags &= ~0x00000004 # TEXT flag
self.playbin.set_property("flags", flags)
2025-03-09 23:16:00 +01:00
self.playbin.set_property("current-audio", audio_track)
# Pause and wait for it to complete
self.pipeline.set_state(Gst.State.PAUSED)
self.pipeline.get_state(Gst.CLOCK_TIME_NONE)
2025-03-09 17:07:52 +01:00
if position:
2025-03-09 17:07:52 +01:00
# Seek to saved position
2025-03-11 20:13:15 +01:00
self.pipeline.seek_simple(Gst.Format.TIME, SEEK_ACCURATE, position)
2025-03-09 17:07:52 +01:00
# Start playing
self.pipeline.set_state(Gst.State.PLAYING)
2025-03-11 16:11:47 +01:00
self.is_playing.value = True
self.is_paused.value = False
2025-03-11 20:13:15 +01:00
self.last_user_input.value = time()
2025-03-09 17:07:52 +01:00
def stop(self) -> None:
"""Stop playback and release resources"""
2025-03-11 16:11:47 +01:00
2025-03-09 17:07:52 +01:00
self.pipeline.set_state(Gst.State.NULL)
2025-03-12 15:28:25 +01:00
self.file_item.value = None
2025-03-11 16:11:47 +01:00
self.is_playing.value = True
self.is_paused.value = False
2025-03-11 20:13:15 +01:00
self.last_user_input.value = time()
2025-03-09 17:07:52 +01:00
def toggle_play_pause(self) -> None:
"""Toggle between play and pause states"""
2025-03-11 20:13:15 +01:00
self.last_user_input.value = time()
2025-03-09 17:07:52 +01:00
_, state, _ = self.pipeline.get_state(0)
2025-03-11 20:13:15 +01:00
2025-03-09 17:07:52 +01:00
if state == Gst.State.PLAYING:
self.pipeline.set_state(Gst.State.PAUSED)
2025-03-11 16:11:47 +01:00
self.is_paused.value = True
2025-03-09 17:07:52 +01:00
else:
self.pipeline.set_state(Gst.State.PLAYING)
2025-03-11 16:11:47 +01:00
self.is_paused.value = False
2025-03-09 17:07:52 +01:00
def seek_relative(self, offset: float) -> None:
"""Seek relative to current position by offset seconds"""
2025-03-11 20:13:15 +01:00
self.last_user_input.value = time()
2025-03-09 17:07:52 +01:00
success, current = self.pipeline.query_position(Gst.Format.TIME)
if not success:
return
new_pos = current + int(offset * Gst.SECOND)
if new_pos < 0:
new_pos = 0
2025-03-11 20:13:15 +01:00
self.pipeline.seek_simple(
Gst.Format.TIME,
SEEK_FORWARD if offset >= 0 else SEEK_BACKWARD,
new_pos,
)
2025-03-09 17:07:52 +01:00
def seek_start(self):
"""Seek to the start of the video."""
2025-03-11 20:13:15 +01:00
self.last_user_input.value = time()
self.pipeline.seek_simple(Gst.Format.TIME, SEEK_BACKWARD, 0)
2025-03-09 17:07:52 +01:00
def seek_end(self):
"""Seek to the end of the video."""
2025-03-11 16:11:47 +01:00
2025-03-11 20:13:15 +01:00
self.last_user_input.value = time()
2025-03-11 16:11:47 +01:00
if duration := self.get_duration():
2025-03-11 20:13:15 +01:00
self.pipeline.seek_simple(Gst.Format.TIME, SEEK_FORWARD, duration)
2025-03-09 17:07:52 +01:00
def get_position(self) -> int | None:
"""Get current playback position in nanoseconds"""
2025-03-11 16:11:47 +01:00
2025-03-09 17:07:52 +01:00
success, position = self.pipeline.query_position(Gst.Format.TIME)
return position if success else None
def get_duration(self) -> int | None:
"""Get total duration in nanoseconds"""
2025-03-11 16:11:47 +01:00
2025-03-09 17:07:52 +01:00
success, duration = self.pipeline.query_duration(Gst.Format.TIME)
return duration if success else None
def cycle_subtitles(self) -> tuple[bool, int, str]:
2025-03-09 17:07:52 +01:00
"""Cycle through available subtitle tracks, including off state"""
2025-03-12 15:28:25 +01:00
if not self.file_item.value:
return False, 0, ""
2025-03-09 17:07:52 +01:00
flags = self.playbin.get_property("flags")
current = self.playbin.get_property("current-text")
n_text = self.playbin.get_property("n-text")
2025-03-12 15:28:25 +01:00
if not n_text:
return False, 0, ""
2025-03-09 17:07:52 +01:00
if not (flags & 0x00000004): # TEXT flag
flags |= 0x00000004
self.playbin.set_property("flags", flags)
self.playbin.set_property("current-text", 0)
2025-03-09 23:16:00 +01:00
return True, 1, self._get_track_lang("text", 0)
2025-03-09 17:07:52 +01:00
if current >= n_text - 1:
flags &= ~0x00000004 # TEXT flag
self.playbin.set_property("flags", flags)
return True, 0, ""
2025-03-09 17:07:52 +01:00
next_track = current + 1
self.playbin.set_property("current-text", next_track)
2025-03-09 23:16:00 +01:00
return True, next_track + 1, self._get_track_lang("text", next_track)
def cycle_audio(self) -> tuple[bool, int, str]:
"""Cycle through available audio tracks"""
current = self.playbin.get_property("current-audio")
n_audio = self.playbin.get_property("n-audio")
if n_audio == 0:
return False, 0, ""
next_track = (current + 1) % n_audio
self.playbin.set_property("current-audio", next_track)
return True, next_track + 1, self._get_track_lang("audio", next_track)
2025-03-09 17:07:52 +01:00
2025-03-09 23:16:00 +01:00
def _get_track_lang(self, track_type: str, track_index: int) -> str:
caps: Gst.TagList | None = self.playbin.emit(f"get-{track_type}-tags", track_index)
2025-03-09 17:07:52 +01:00
if not caps:
2025-03-12 15:28:25 +01:00
return str("???")
2025-03-09 17:07:52 +01:00
found, lang = caps.get_string("language-code")
2025-03-09 22:31:34 +01:00
return lang if found else "???"