"""
This module contains
code related to record scenes for use outside gewel.
For example, it contains the class :py:class:`~Mp4Recorder`
which is used to record scenes to .mp4 files.
"""
import os
from abc import ABC, ABCMeta, abstractmethod
from time import perf_counter
from typing import IO, Optional, Union
import cairocffi as cairo
import cv2
import imageio
import numpy as np
from gewel.draw import Drawable, TransformedDrawable, Scene
[docs]class Recorder(ABC):
def __init__(self):
self._recording_time = 0.0
self._frames_recorded = 0
def reset_metrics(self):
self._recording_time = 0.0
self._frames_recorded = 0
@property
def recording_time(self) -> float:
return self._recording_time
@property
def frames_recorded(self) -> int:
return self._frames_recorded
@abstractmethod
def record_frame(self, frame: int, surface: cairo.ImageSurface):
raise NotImplementedError('Recorder.record_frame is abstract.')
def start_recording(self, fps: float, width: int, height: int):
pass
def end_recording(self):
pass
def record(
self,
scene: Scene,
duration: Optional[float] = None,
t0: float = 0.0,
fps: float = 30.0,
width: Optional[int] = None,
height: Optional[int] = None,
):
if duration is None:
duration = scene.time
if width is None:
width = scene.render_width or 640
if height is None:
height = scene.render_height or 480
self.start_recording(fps, width, height)
frame, frame_time = 0, t0
end_time = t0 + duration
surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, width, height)
ctx = cairo.Context(surface)
while frame_time <= end_time:
with ctx:
scene.draw(ctx, frame_time)
frame_recording_start = perf_counter()
self.record_frame(frame, surface)
frame_recording_end = perf_counter()
frame_recording_time = frame_recording_end - frame_recording_start
frame += 1
frame_time = t0 + frame / fps
self._frames_recorded += 1
self._recording_time += frame_recording_time
self.end_recording()
[docs]class FrameFileRecorder(Recorder):
def __init__(self, dir_path: str, filename='frame'):
super().__init__()
self._dir_path: str = dir_path
self._filename: str = filename
def record_frame(self, frame: int, surface: cairo.ImageSurface):
filename = '{:}_{:06d}.png'.format(self._filename, frame)
filepath = os.path.join(self._dir_path, filename)
surface.write_to_png(filepath)
[docs]class FileRecorder(Recorder, metaclass=ABCMeta):
def __init__(self, file_path: Union[str, IO], extension: str):
super().__init__()
if isinstance(file_path, str):
self._file_path = file_path if file_path.endswith(extension) else file_path + extension
else:
self._file_path = file_path
self._height = 0
self._width = 0
[docs]class Mp4Recorder(FileRecorder):
def __init__(self, file_path: Union[str, IO]):
super().__init__(file_path, '.mp4')
self._writer = None
def start_recording(self, fps: float, width: int, height: int):
super().start_recording(fps, width, height)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
self._writer = cv2.VideoWriter(self._file_path, fourcc, fps, (width, height))
self._height = height
self._width = width
def end_recording(self):
self._writer.release()
self._writer = None
self._height = 0
self._width = 0
super().end_recording()
def record_frame(self, frame: int, surface: cairo.ImageSurface):
surface.flush()
buf = surface.get_data()
rgba_data = np.ndarray(shape=(self._height, self._width, 4), dtype=np.uint8, buffer=buf)
self._writer.write(rgba_data[:, :, :3])
[docs]class GifRecorder(FileRecorder):
def __init__(self, file_path: Union[str, IO]):
super().__init__(file_path, '.gif')
self._writer = None
def start_recording(self, fps: float, width: int, height: int):
super().start_recording(fps, width, height)
self._writer = imageio.get_writer(self._file_path, mode='I', fps=fps)
self._height = height
self._width = width
def record_frame(self, frame: int, surface: cairo.ImageSurface):
surface.flush()
buf = surface.get_data()
rgba_data = np.ndarray(shape=(self._height, self._width, 4), dtype=np.uint8, buffer=buf)
r_data, g_data, b_data = rgba_data[:, :, 0], rgba_data[:, :, 1], rgba_data[:, :, 2]
data = np.stack([b_data, g_data, r_data], axis=2)
self._writer.append_data(data)
def end_recording(self):
self._writer.close()
self._writer = None
self._height = 0
self._width = 0
super().end_recording()
[docs]def scale_to_fit(
drawable: Drawable,
from_width: float, from_height: float,
to_width: float, to_height: float
) -> Drawable:
xf = scale_to_fit_xform(from_width, from_height, to_width, to_height)
return TransformedDrawable(
drawable,
xx=xf.xx, yx=xf.yx, xy=xf.xy, yy=xf.yy, x0=xf.x0, y0=xf.y0,
z=drawable.z
)
RESOLUTION_4K = 3840, 2160
RESOLUTION_8K = 7680, 4320
RESOLUTION_1080P = 1920, 1080
RESOLUTION_720P = 1280, 720
RESOLUTION_VGA = 640, 480
RESOLUTION_VGA_16_9 = 640, 360