Coverage for video_grid_merge/__main__.py: 100%
154 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-10 19:13 +0900
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-10 19:13 +0900
1import atexit
2import io
3import math
4import os
5import re
6import subprocess
7import sys
8import termios
9import time
10from concurrent.futures import ThreadPoolExecutor, as_completed
11from functools import lru_cache
12from typing import List, Optional, Tuple, Union
14parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
15sys.path.append(parent_dir)
17from video_grid_merge import delete_files as dlf
18from video_grid_merge import rename_files as rnf
20video_extension_list = [".mov", ".mp4"]
21match_input_resolution_flag = True
22temporarily_data_list = ["_TV", ".txt"]
23ffmpeg_loglevel = "error"
24ffmpeg_cmd_version = "v1"
26original_terminal_settings = None
29def init_terminal_settings() -> None:
30 """
31 Initialize and save the original terminal settings.
33 This function attempts to save the current terminal settings. If successful,
34 these settings can be used later to reset the terminal to its original state.
36 Global Variables:
37 original_terminal_settings (termios.tcgetattr): Stores the original terminal settings.
39 Raises:
40 termios.error: If there's an error getting the terminal attributes.
41 io.UnsupportedOperation: If the operation is not supported (e.g., in a non-interactive environment).
42 """
43 global original_terminal_settings
44 try:
45 original_terminal_settings = termios.tcgetattr(sys.stdin)
46 except (termios.error, io.UnsupportedOperation):
47 original_terminal_settings = None
50init_terminal_settings()
53def reset_terminal() -> None:
54 """
55 Reset terminal settings to their original state.
57 This function restores the terminal settings to the state they were in
58 when the program started. It uses the global variable
59 'original_terminal_settings' to achieve this.
61 The function specifically:
62 1. Uses termios.tcsetattr to apply the original settings.
63 2. Applies the settings immediately but waits for output to drain first.
65 Side Effects:
66 - Modifies the current terminal settings.
67 - May affect how the terminal handles input and output after this call.
69 Notes:
70 - This function should typically be called before the program exits,
71 to ensure the terminal is left in a usable state.
72 - The global variable 'original_terminal_settings' must be properly
73 initialized before this function is called.
74 - This function is specifically for use on Unix-like systems and
75 may not work on other operating systems.
77 Raises:
78 termios.error: If there's an error setting the terminal attributes.
79 """
80 if original_terminal_settings is not None:
81 try:
82 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, original_terminal_settings)
83 except termios.error:
84 # Silently pass if we're in a non-interactive environment
85 pass
88# Ensure terminal settings are reset when the program exits
89atexit.register(reset_terminal)
92def safe_input(prompt: str) -> str:
93 """
94 Safely get input from user after resetting the input buffer.
96 This function clears the input buffer before prompting for user input,
97 which can help prevent unwanted input from being processed. The buffer
98 is only cleared in an interactive environment.
100 Args:
101 prompt (str): The prompt to display to the user.
103 Returns:
104 str: The user's input.
106 Notes:
107 - The input buffer is only cleared if original_terminal_settings is not None,
108 indicating we're in an interactive environment.
109 - If clearing the buffer fails, the function will still attempt to get user input.
110 """
111 if original_terminal_settings is not None:
112 try:
113 termios.tcflush(sys.stdin, termios.TCIFLUSH)
114 except termios.error:
115 # If flushing fails, continue to input anyway
116 pass
118 return input(prompt)
121def get_video_files(input_folder: str) -> List[str]:
122 """
123 Get a list of video files from the specified input folder.
125 Args:
126 input_folder (str): The path to the folder containing video files.
128 Returns:
129 List[str]: A list of video file names with extensions in the video_extension_list.
130 """
131 return [
132 file
133 for file in os.listdir(input_folder)
134 if os.path.splitext(file)[1] in video_extension_list
135 ]
138@lru_cache(maxsize=None)
139def get_video_length_ffmpeg(file_path: str) -> Union[float, None]:
140 """
141 Get the duration of a video file using ffmpeg.
143 This function is cached to improve performance for repeated calls.
145 Args:
146 file_path (str): The path to the video file.
148 Returns:
149 Union[float, None]: The duration of the video in seconds, or None if the duration
150 cannot be determined.
151 """
152 command = ["ffmpeg", "-i", file_path]
153 try:
154 result = subprocess.Popen(
155 command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
156 )
157 output = result.communicate()[0].decode("utf-8")
159 match = re.search(r"Duration: (\d+):(\d+):(\d+\.\d+)", output)
160 if match:
161 h, m, s = map(float, match.groups())
162 return h * 3600 + m * 60 + s
163 else:
164 print(f"Failed to extract duration from FFmpeg output for {file_path}.")
165 return None
166 except Exception as e:
167 print(f"Error getting video length for {file_path}: {e}")
168 return None
171def process_video(input_folder: str, file: str, max_length: float) -> None:
172 """
173 Process a single video file, either by linking or concatenating it to match the max_length.
175 Args:
176 input_folder (str): The path to the folder containing the input video.
177 file (str): The name of the video file to process.
178 max_length (float): The target length for the processed video.
179 """
180 length = get_video_length_ffmpeg(os.path.join(input_folder, file))
181 if length == max_length:
182 os.link(
183 os.path.join(input_folder, file),
184 os.path.join(
185 input_folder,
186 f"{os.path.splitext(file)[0]}_TV{os.path.splitext(file)[1]}",
187 ),
188 )
189 elif length and max_length and length < max_length:
190 count = int(max_length / length)
191 output_file = os.path.join(
192 input_folder, f"list_{os.path.splitext(file)[0]}.txt"
193 )
195 with open(output_file, "w") as f:
196 f.write(f"file '{file}'\n" * count)
197 if max_length % length != 0:
198 f.write(f"file '{file}'\n")
200 tv_file = os.path.join(
201 input_folder, f"{os.path.splitext(file)[0]}_TV{os.path.splitext(file)[1]}"
202 )
203 subprocess.run(
204 f"ffmpeg -f concat -safe 0 -i {output_file} -c copy -t {max_length} {tv_file} -loglevel {ffmpeg_loglevel}",
205 shell=True,
206 )
209def create_target_video(input_folder: str, video_files: List[str]) -> None:
210 """
211 Create target videos by processing all input video files.
213 This function determines the maximum length among all input videos and processes
214 each video to match this length.
216 Args:
217 input_folder (str): The path to the folder containing the input videos.
218 video_files (List[str]): A list of video file names to process.
219 """
220 lengths = [
221 get_video_length_ffmpeg(os.path.join(input_folder, file))
222 for file in video_files
223 ]
224 print(f"Input Video Time List: {lengths}")
226 max_length = max((length for length in lengths if length is not None), default=None)
227 if max_length is None:
228 return
230 with ThreadPoolExecutor() as executor:
231 futures = [
232 executor.submit(process_video, input_folder, file, max_length)
233 for file in video_files
234 ]
235 for future in as_completed(futures):
236 future.result()
239def get_target_files(folder: str, files: List[str]) -> List[str]:
240 """
241 Get a list of processed video files (with '_TV' in the filename) from the specified folder.
243 Args:
244 folder (str): The path to the folder containing the processed video files.
245 files (List[str]): A list of file names to filter.
247 Returns:
248 List[str]: A list of full paths to the processed video files.
249 """
250 return [
251 os.path.join(folder, file)
252 for file in files
253 if "_TV" in file and file.endswith(tuple(video_extension_list))
254 ]
257def get_output_filename_from_user(output_folder: str) -> str:
258 """
259 Prompt the user for an output filename and handle potential file conflicts.
261 Args:
262 output_folder (str): The path to the output folder.
264 Returns:
265 str: The full path to the output file.
266 """
267 while True:
268 output_file = safe_input(
269 f"Enter the name of the output file (default is 'combined_video{video_extension_list[0]}'): "
270 )
271 if not output_file:
272 output_file = f"combined_video{video_extension_list[0]}"
273 if not output_file.endswith(tuple(video_extension_list)):
274 output_file += video_extension_list[0]
275 output_path = os.path.join(output_folder, output_file)
276 if os.path.exists(output_path):
277 overwrite = safe_input(
278 f"File {output_file} already exists. Overwrite? (y/n): "
279 )
280 if overwrite.lower() != "y":
281 continue
282 return output_path
285@lru_cache(maxsize=None)
286def get_video_size(filename: str) -> Optional[Tuple[int, int]]:
287 """
288 Get the width and height of a video file using ffprobe.
290 This function is cached to improve performance for repeated calls.
292 Args:
293 filename (str): The path to the video file.
295 Returns:
296 Optional[Tuple[int, int]]: A tuple containing the width and height of the video,
297 or None if the size cannot be determined.
298 """
299 cmd = [
300 "ffprobe",
301 "-v",
302 f"{ffmpeg_loglevel}",
303 "-select_streams",
304 "v:0",
305 "-count_packets",
306 "-show_entries",
307 "stream=width,height",
308 "-of",
309 "csv=p=0",
310 filename,
311 ]
312 try:
313 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode().strip()
314 dimensions = output.split(",")
315 if len(dimensions) == 2:
316 width, height = map(int, dimensions)
317 return (width, height)
318 else:
319 print(f"Unexpected output format from ffprobe for {filename}: {output}")
320 return None
321 except subprocess.CalledProcessError as e:
322 print(f"Error running ffprobe on {filename}: {e}")
323 return None
324 except ValueError as e:
325 print(f"Error parsing ffprobe output for {filename}: {e}")
326 return None
329def create_ffmpeg_command_v1(
330 input_files: list[str], output_path: str, match_input_resolution_flag: bool
331) -> str:
332 """
333 Create an advanced ffmpeg command to merge multiple videos into a grid layout with sophisticated audio mixing.
335 This function generates an ffmpeg command that combines multiple input videos
336 into a single output video with a grid layout. It scales all input videos to the
337 same resolution, stacks them into a grid, and applies a sophisticated audio mixing process.
338 The command uses the 'ultrafast' preset of the libx264 codec for rapid encoding,
339 prioritizing speed while maintaining high audio quality.
341 Features:
342 - Scales all input videos to the same size without maintaining aspect ratio
343 - Arranges videos in a grid layout based on the square root of the number of input files
344 - Applies volume normalization to each input audio stream for consistent audio levels
345 - Uses a sophisticated audio mixing process with dropout transition and volume adjustment
346 - Enhances audio clarity and reduces distortion in the final output
347 - Balances audio levels effectively across all inputs
348 - Uses libx264 codec with 'ultrafast' preset for fast encoding
349 - Optionally matches the output resolution to the combined input resolutions if specified
351 Args:
352 input_files (list[str]): A list of paths to input video files.
353 output_path (str): The path for the output video file.
354 match_input_resolution_flag (bool): If True, the output resolution matches
355 the combined input video resolutions; otherwise,
356 it uses the resolution of the first input video.
358 Returns:
359 str: The ffmpeg command string with advanced video layout and audio processing.
361 Note:
362 This function prioritizes both speed in video processing and quality in audio output,
363 making it suitable for projects where quick rendering and audio clarity are important.
364 """
365 if not input_files:
366 return ""
368 video_size = get_video_size(input_files[0])
369 if video_size is None:
370 return ""
372 video_width, video_height = video_size
374 N = len(input_files)
375 sqrt_N = int(math.sqrt(N))
377 if match_input_resolution_flag:
378 output_width = video_width * sqrt_N
379 output_height = video_height * sqrt_N
380 else:
381 output_width = video_width
382 output_height = video_height
384 filter_complex = "".join(
385 [f"[{i}:v]scale={video_width}:{video_height}[v{i}]; " for i in range(N)]
386 )
387 filter_complex += "".join(
388 [
389 f'{"".join([f"[v{i*sqrt_N+j}]" for j in range(sqrt_N)])}hstack=inputs={sqrt_N}[row{i}]; '
390 for i in range(sqrt_N)
391 ]
392 )
393 filter_complex += f'{"".join([f"[row{i}]" for i in range(sqrt_N)])}vstack=inputs={sqrt_N}[vstack]; '
394 filter_complex += "".join([f"[{i}:a]volume=1[a{i}]; " for i in range(N)])
395 filter_complex += "".join([f"[a{i}]" for i in range(N)])
396 filter_complex += f"amix=inputs={N}:dropout_transition=0,volume={N}[aout]"
398 return (
399 f"ffmpeg -y {' '.join([f'-i {input_file}' for input_file in input_files])} "
400 f'-filter_complex "{filter_complex}" '
401 f'-map "[vstack]" -map "[aout]" '
402 f"-c:v libx264 -preset ultrafast "
403 f"-c:a aac -b:a 192k -threads {os.cpu_count()} -loglevel {ffmpeg_loglevel} "
404 f"-s {output_width}x{output_height} {output_path}"
405 )
408def create_ffmpeg_command_v2(
409 input_files: list[str], output_path: str, match_input_resolution_flag: bool
410) -> str:
411 """
412 Create an ffmpeg command to merge multiple videos into a grid layout with balanced efficiency and quality.
414 This function generates an ffmpeg command that combines multiple input videos
415 into a single output video with a grid layout. It scales all input videos to the
416 same resolution, stacks them into a grid, and applies sophisticated audio mixing.
417 The command balances encoding efficiency and output quality, prioritizing file size
418 reduction while maintaining good visual and audio quality.
420 Features:
421 - Scales all input videos to the same size without maintaining aspect ratio
422 - Arranges videos in a grid layout based on the square root of the number of input files
423 - Applies volume normalization to each input audio stream for consistent audio levels
424 - Uses a sophisticated audio mixing process with dropout transition and volume adjustment
425 - Enhances audio clarity and reduces distortion in the final output
426 - Balances audio levels effectively across all inputs
427 - Uses libx264 codec with 'medium' preset for a balance of encoding speed and compression efficiency
428 - Applies Constant Rate Factor (CRF) for efficient size/quality balance
429 - Optionally matches the output resolution to the combined input resolutions if specified
431 Args:
432 input_files (list[str]): A list of paths to input video files.
433 output_path (str): The path for the output video file.
434 match_input_resolution_flag (bool): If True, the output resolution matches
435 the combined input video resolutions; otherwise,
436 it uses the resolution of the first input video.
438 Returns:
439 str: The ffmpeg command string with balanced video and audio processing.
441 Note:
442 This function aims to produce smaller file sizes compared to the fastest encoding
443 options, while still maintaining good visual quality and audio clarity. It's suitable
444 for projects where file size is a concern but quality cannot be significantly compromised.
445 """
446 if not input_files:
447 return ""
449 video_size = get_video_size(input_files[0])
450 if video_size is None:
451 return ""
453 video_width, video_height = video_size
455 N = len(input_files)
456 sqrt_N = int(math.sqrt(N))
458 if match_input_resolution_flag:
459 output_width = video_width * sqrt_N
460 output_height = video_height * sqrt_N
461 else:
462 output_width = video_width
463 output_height = video_height
465 filter_complex = "".join(
466 [f"[{i}:v]scale={video_width}:{video_height}[v{i}]; " for i in range(N)]
467 )
468 filter_complex += "".join(
469 [
470 f'{"".join([f"[v{i*sqrt_N+j}]" for j in range(sqrt_N)])}hstack=inputs={sqrt_N}[row{i}]; '
471 for i in range(sqrt_N)
472 ]
473 )
474 filter_complex += f'{"".join([f"[row{i}]" for i in range(sqrt_N)])}vstack=inputs={sqrt_N}[vstack]; '
475 filter_complex += "".join([f"[{i}:a]volume=1[a{i}]; " for i in range(N)])
476 filter_complex += "".join([f"[a{i}]" for i in range(N)])
477 filter_complex += f"amix=inputs={N}:dropout_transition=0,volume={N}[aout]"
479 return (
480 f"ffmpeg -y {' '.join([f'-i {input_file}' for input_file in input_files])} "
481 f'-filter_complex "{filter_complex}" '
482 f'-map "[vstack]" -map "[aout]" '
483 f"-c:v libx264 -preset medium -crf 23 "
484 f"-c:a aac -b:a 128k -threads {os.cpu_count()} -loglevel {ffmpeg_loglevel} "
485 f"-s {output_width}x{output_height} {output_path}"
486 )
489def create_gpu_ffmpeg_command(
490 input_files: list[str], output_path: str, match_input_resolution_flag: bool
491) -> str: # pragma: no cover
492 if not input_files:
493 return ""
495 video_size = get_video_size(input_files[0])
496 if video_size is None:
497 return ""
499 video_width, video_height = video_size
501 N = len(input_files)
502 sqrt_N = int(math.sqrt(N))
504 if match_input_resolution_flag:
505 output_width = video_width * sqrt_N
506 output_height = video_height * sqrt_N
507 else:
508 output_width = video_width
509 output_height = video_height
511 filter_complex = "".join(
512 [f"[{i}:v]scale={video_width}:{video_height}[v{i}]; " for i in range(N)]
513 )
514 filter_complex += "".join(
515 [
516 f'{"".join([f"[v{i*sqrt_N+j}]" for j in range(sqrt_N)])}hstack=inputs={sqrt_N}[row{i}]; '
517 for i in range(sqrt_N)
518 ]
519 )
520 filter_complex += f'{"".join([f"[row{i}]" for i in range(sqrt_N)])}vstack=inputs={sqrt_N}[vstack]; '
521 filter_complex += "".join([f"[{i}:a]volume=1[a{i}]; " for i in range(N)])
522 filter_complex += "".join([f"[a{i}]" for i in range(N)])
523 filter_complex += f"amix=inputs={N}:dropout_transition=0,volume={N}[aout]"
525 return (
526 f"ffmpeg -y {' '.join([f'-i {input_file}' for input_file in input_files])} "
527 f'-filter_complex "{filter_complex}" '
528 f'-map "[vstack]" -map "[aout]" '
529 f"-c:v h264_nvenc -preset p7 "
530 f"-c:a aac -b:a 192k -threads {os.cpu_count()} -loglevel {ffmpeg_loglevel} "
531 f"-s {output_width}x{output_height} {output_path}"
532 )
535def main(
536 input_folder: Optional[str] = None, output_folder: Optional[str] = None
537) -> None:
538 """
539 Main function to process and merge multiple videos into a grid layout.
541 This function handles the entire workflow, including:
542 - Getting input video files
543 - Creating target videos
544 - Generating the ffmpeg command
545 - Running the ffmpeg command to create the final merged video
547 Args:
548 input_folder (Optional[str]): The path to the input folder. If None, a default path is used.
549 output_folder (Optional[str]): The path to the output folder. If None, a default path is used.
550 """
551 input_folder = input_folder or "./video_grid_merge/media/input"
552 output_folder = output_folder or "./video_grid_merge/media/output"
554 start = time.perf_counter()
555 rnf.rename_files_with_spaces(input_folder)
556 video_files = get_video_files(input_folder)
558 if len(video_files) < 4 or int(math.sqrt(len(video_files))) ** 2 != len(
559 video_files
560 ):
561 sys.exit(
562 f"Error: Please store a perfect square number (>= 4) of video files in the input folder.\ninput_folder: {input_folder}"
563 )
565 os.makedirs(output_folder, exist_ok=True)
566 output_path = get_output_filename_from_user(output_folder)
568 create_target_video(input_folder, video_files)
569 input_files = get_target_files(input_folder, sorted(os.listdir(input_folder)))
571 if ffmpeg_cmd_version == "v1":
572 ffmpeg_command = create_ffmpeg_command_v1(
573 input_files, output_path, match_input_resolution_flag
574 )
575 elif ffmpeg_cmd_version == "v2":
576 ffmpeg_command = create_ffmpeg_command_v2(
577 input_files, output_path, match_input_resolution_flag
578 )
579 elif ffmpeg_cmd_version == "gpu": # pragma: no cover
580 ffmpeg_command = create_gpu_ffmpeg_command(
581 input_files, output_path, match_input_resolution_flag
582 )
583 else:
584 raise ValueError(f"Invalid ffmpeg_cmd_version: {ffmpeg_cmd_version}")
586 print("Video Grid Merge Start")
587 subprocess.run(ffmpeg_command, shell=True)
588 dlf.delete_files_in_folder(temporarily_data_list, input_folder)
589 print("Video Grid Merge End And Output Success")
590 print(f"File Output Complete: {output_path}")
592 elapsed_time = time.perf_counter() - start
593 print(f"Processing Time(s): {elapsed_time:.8f}\n")
596if __name__ == "__main__": # pragma: no cover
597 main()