lazy-player/lazy_player/video_player.py

215 lines
6.7 KiB
Python

from __future__ import annotations
from time import time
from gi.repository import GObject, Gst, Gtk
from .file_model import FileItem
from .reactive import Ref
SEEK_FORWARD = Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT | Gst.SeekFlags.SNAP_AFTER
SEEK_BACKWARD = Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT | Gst.SeekFlags.SNAP_BEFORE
SEEK_ACCURATE = Gst.SeekFlags.FLUSH | Gst.SeekFlags.ACCURATE
class VideoPlayer(GObject.Object):
picture: Gtk.Picture
pipeline: Gst.Pipeline
playbin: Gst.Element
__gtype_name__ = "VideoPlayer"
file_item: Ref[FileItem | None]
is_playing: Ref[bool]
is_paused: Ref[bool]
last_user_input: Ref[float]
def __init__(self, picture: Gtk.Picture):
super().__init__()
self.picture = picture
self.file_item = Ref(None)
self.is_playing = Ref(False)
self.is_paused = Ref(True)
self.last_user_input = Ref(time())
self.pipeline = Gst.Pipeline.new("video-player")
playbin = Gst.ElementFactory.make("playbin", "playbin")
if not playbin:
raise RuntimeError("Failed to create playbin element")
self.playbin = playbin
video_sink = Gst.ElementFactory.make("gtk4paintablesink", "gtk4paintablesink")
if not video_sink:
raise RuntimeError("Failed to create gtk4paintablesink element")
self.playbin.set_property("video-sink", video_sink)
self.pipeline.add(self.playbin)
# Link picture to sink
paintable = video_sink.get_property("paintable")
self.picture.set_paintable(paintable)
def play(
self,
file_item: FileItem,
position: int = 0,
subtitle_track: int = -2,
audio_track: int = 0,
) -> None:
"""Start playing a video file"""
self.file_item.value = file_item
uri = Gst.filename_to_uri(str(file_item.full_path))
assert uri is not None
self.playbin.set_property("uri", uri)
if subtitle_track >= 0:
flags = self.playbin.get_property("flags")
flags |= 0x00000004 # TEXT flag
self.playbin.set_property("flags", flags)
self.playbin.set_property("current-text", subtitle_track)
elif subtitle_track == -1:
flags = self.playbin.get_property("flags")
flags &= ~0x00000004 # TEXT flag
self.playbin.set_property("flags", flags)
self.playbin.set_property("current-audio", audio_track)
# Pause and wait for it to complete
self.pipeline.set_state(Gst.State.PAUSED)
self.pipeline.get_state(Gst.CLOCK_TIME_NONE)
if position:
# Seek to saved position
self.pipeline.seek_simple(Gst.Format.TIME, SEEK_ACCURATE, position)
# Start playing
self.pipeline.set_state(Gst.State.PLAYING)
self.is_playing.value = True
self.is_paused.value = False
self.last_user_input.value = time()
def stop(self) -> None:
"""Stop playback and release resources"""
self.pipeline.set_state(Gst.State.NULL)
self.file_item.value = None
self.is_playing.value = True
self.is_paused.value = False
self.last_user_input.value = time()
def toggle_play_pause(self) -> None:
"""Toggle between play and pause states"""
self.last_user_input.value = time()
_, state, _ = self.pipeline.get_state(0)
if state == Gst.State.PLAYING:
self.pipeline.set_state(Gst.State.PAUSED)
self.is_paused.value = True
else:
self.pipeline.set_state(Gst.State.PLAYING)
self.is_paused.value = False
def seek_relative(self, offset: float) -> None:
"""Seek relative to current position by offset seconds"""
self.last_user_input.value = time()
success, current = self.pipeline.query_position(Gst.Format.TIME)
if not success:
return
new_pos = current + int(offset * Gst.SECOND)
if new_pos < 0:
new_pos = 0
self.pipeline.seek_simple(
Gst.Format.TIME,
SEEK_FORWARD if offset >= 0 else SEEK_BACKWARD,
new_pos,
)
def seek_start(self):
"""Seek to the start of the video."""
self.last_user_input.value = time()
self.pipeline.seek_simple(Gst.Format.TIME, SEEK_BACKWARD, 0)
def seek_end(self):
"""Seek to the end of the video."""
self.last_user_input.value = time()
if duration := self.get_duration():
self.pipeline.seek_simple(Gst.Format.TIME, SEEK_FORWARD, duration)
def get_position(self) -> int | None:
"""Get current playback position in nanoseconds"""
success, position = self.pipeline.query_position(Gst.Format.TIME)
return position if success else None
def get_duration(self) -> int | None:
"""Get total duration in nanoseconds"""
success, duration = self.pipeline.query_duration(Gst.Format.TIME)
return duration if success else None
def cycle_subtitles(self) -> tuple[bool, int, str]:
"""Cycle through available subtitle tracks, including off state"""
if not self.file_item.value:
return False, 0, ""
flags = self.playbin.get_property("flags")
current = self.playbin.get_property("current-text")
n_text = self.playbin.get_property("n-text")
if not n_text:
return False, 0, ""
if not (flags & 0x00000004): # TEXT flag
flags |= 0x00000004
self.playbin.set_property("flags", flags)
self.playbin.set_property("current-text", 0)
return True, 1, self._get_track_lang("text", 0)
if current >= n_text - 1:
flags &= ~0x00000004 # TEXT flag
self.playbin.set_property("flags", flags)
return True, 0, ""
next_track = current + 1
self.playbin.set_property("current-text", next_track)
return True, next_track + 1, self._get_track_lang("text", next_track)
def cycle_audio(self) -> tuple[bool, int, str]:
"""Cycle through available audio tracks"""
current = self.playbin.get_property("current-audio")
n_audio = self.playbin.get_property("n-audio")
if n_audio == 0:
return False, 0, ""
next_track = (current + 1) % n_audio
self.playbin.set_property("current-audio", next_track)
return True, next_track + 1, self._get_track_lang("audio", next_track)
def _get_track_lang(self, track_type: str, track_index: int) -> str:
caps: Gst.TagList | None = self.playbin.emit(f"get-{track_type}-tags", track_index)
if not caps:
return str("???")
found, lang = caps.get_string("language-code")
return lang if found else "???"