from __future__ import annotations from time import time from gi.repository import GObject, Gst, Gtk from .file_model import FileItem from .reactive import Ref SEEK_FORWARD = Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT | Gst.SeekFlags.SNAP_AFTER SEEK_BACKWARD = Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT | Gst.SeekFlags.SNAP_BEFORE SEEK_ACCURATE = Gst.SeekFlags.FLUSH | Gst.SeekFlags.ACCURATE class VideoPlayer(GObject.Object): picture: Gtk.Picture pipeline: Gst.Pipeline playbin: Gst.Element __gtype_name__ = "VideoPlayer" file_item: Ref[FileItem | None] is_playing: Ref[bool] is_paused: Ref[bool] last_user_input: Ref[float] def __init__(self, picture: Gtk.Picture): super().__init__() self.picture = picture self.file_item = Ref(None) self.is_playing = Ref(False) self.is_paused = Ref(True) self.last_user_input = Ref(time()) self.pipeline = Gst.Pipeline.new("video-player") playbin = Gst.ElementFactory.make("playbin", "playbin") if not playbin: raise RuntimeError("Failed to create playbin element") self.playbin = playbin video_sink = Gst.ElementFactory.make("gtk4paintablesink", "gtk4paintablesink") if not video_sink: raise RuntimeError("Failed to create gtk4paintablesink element") self.playbin.set_property("video-sink", video_sink) self.pipeline.add(self.playbin) # Link picture to sink paintable = video_sink.get_property("paintable") self.picture.set_paintable(paintable) def play( self, file_item: FileItem, position: int = 0, subtitle_track: int = -2, audio_track: int = 0, ) -> None: """Start playing a video file""" self.file_item.value = file_item uri = Gst.filename_to_uri(str(file_item.full_path)) assert uri is not None self.playbin.set_property("uri", uri) if subtitle_track >= 0: flags = self.playbin.get_property("flags") flags |= 0x00000004 # TEXT flag self.playbin.set_property("flags", flags) self.playbin.set_property("current-text", subtitle_track) elif subtitle_track == -1: flags = self.playbin.get_property("flags") flags &= ~0x00000004 # TEXT flag self.playbin.set_property("flags", flags) self.playbin.set_property("current-audio", audio_track) # Pause and wait for it to complete self.pipeline.set_state(Gst.State.PAUSED) self.pipeline.get_state(Gst.CLOCK_TIME_NONE) if position: # Seek to saved position self.pipeline.seek_simple(Gst.Format.TIME, SEEK_ACCURATE, position) # Start playing self.pipeline.set_state(Gst.State.PLAYING) self.is_playing.value = True self.is_paused.value = False self.last_user_input.value = time() def stop(self) -> None: """Stop playback and release resources""" self.pipeline.set_state(Gst.State.NULL) self.file_item.value = None self.is_playing.value = True self.is_paused.value = False self.last_user_input.value = time() def toggle_play_pause(self) -> None: """Toggle between play and pause states""" self.last_user_input.value = time() _, state, _ = self.pipeline.get_state(0) if state == Gst.State.PLAYING: self.pipeline.set_state(Gst.State.PAUSED) self.is_paused.value = True else: self.pipeline.set_state(Gst.State.PLAYING) self.is_paused.value = False def seek_relative(self, offset: float) -> None: """Seek relative to current position by offset seconds""" self.last_user_input.value = time() success, current = self.pipeline.query_position(Gst.Format.TIME) if not success: return new_pos = current + int(offset * Gst.SECOND) if new_pos < 0: new_pos = 0 self.pipeline.seek_simple( Gst.Format.TIME, SEEK_FORWARD if offset >= 0 else SEEK_BACKWARD, new_pos, ) def seek_start(self): """Seek to the start of the video.""" self.last_user_input.value = time() self.pipeline.seek_simple(Gst.Format.TIME, SEEK_BACKWARD, 0) def seek_end(self): """Seek to the end of the video.""" self.last_user_input.value = time() if duration := self.get_duration(): self.pipeline.seek_simple(Gst.Format.TIME, SEEK_FORWARD, duration) def get_position(self) -> int | None: """Get current playback position in nanoseconds""" success, position = self.pipeline.query_position(Gst.Format.TIME) return position if success else None def get_duration(self) -> int | None: """Get total duration in nanoseconds""" success, duration = self.pipeline.query_duration(Gst.Format.TIME) return duration if success else None def cycle_subtitles(self) -> tuple[bool, int, str]: """Cycle through available subtitle tracks, including off state""" if not self.file_item.value: return False, 0, "" flags = self.playbin.get_property("flags") current = self.playbin.get_property("current-text") n_text = self.playbin.get_property("n-text") if not n_text: return False, 0, "" if not (flags & 0x00000004): # TEXT flag flags |= 0x00000004 self.playbin.set_property("flags", flags) self.playbin.set_property("current-text", 0) return True, 1, self._get_track_lang("text", 0) if current >= n_text - 1: flags &= ~0x00000004 # TEXT flag self.playbin.set_property("flags", flags) return True, 0, "" next_track = current + 1 self.playbin.set_property("current-text", next_track) return True, next_track + 1, self._get_track_lang("text", next_track) def cycle_audio(self) -> tuple[bool, int, str]: """Cycle through available audio tracks""" current = self.playbin.get_property("current-audio") n_audio = self.playbin.get_property("n-audio") if n_audio == 0: return False, 0, "" next_track = (current + 1) % n_audio self.playbin.set_property("current-audio", next_track) return True, next_track + 1, self._get_track_lang("audio", next_track) def _get_track_lang(self, track_type: str, track_index: int) -> str: caps: Gst.TagList | None = self.playbin.emit(f"get-{track_type}-tags", track_index) if not caps: return str("???") found, lang = caps.get_string("language-code") return lang if found else "???"