Coverage for video_grid_merge/__main__.py: 100%

154 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-10 19:13 +0900

1import atexit 

2import io 

3import math 

4import os 

5import re 

6import subprocess 

7import sys 

8import termios 

9import time 

10from concurrent.futures import ThreadPoolExecutor, as_completed 

11from functools import lru_cache 

12from typing import List, Optional, Tuple, Union 

13 

14parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) 

15sys.path.append(parent_dir) 

16 

17from video_grid_merge import delete_files as dlf 

18from video_grid_merge import rename_files as rnf 

19 

20video_extension_list = [".mov", ".mp4"] 

21match_input_resolution_flag = True 

22temporarily_data_list = ["_TV", ".txt"] 

23ffmpeg_loglevel = "error" 

24ffmpeg_cmd_version = "v1" 

25 

26original_terminal_settings = None 

27 

28 

29def init_terminal_settings() -> None: 

30 """ 

31 Initialize and save the original terminal settings. 

32 

33 This function attempts to save the current terminal settings. If successful, 

34 these settings can be used later to reset the terminal to its original state. 

35 

36 Global Variables: 

37 original_terminal_settings (termios.tcgetattr): Stores the original terminal settings. 

38 

39 Raises: 

40 termios.error: If there's an error getting the terminal attributes. 

41 io.UnsupportedOperation: If the operation is not supported (e.g., in a non-interactive environment). 

42 """ 

43 global original_terminal_settings 

44 try: 

45 original_terminal_settings = termios.tcgetattr(sys.stdin) 

46 except (termios.error, io.UnsupportedOperation): 

47 original_terminal_settings = None 

48 

49 

50init_terminal_settings() 

51 

52 

53def reset_terminal() -> None: 

54 """ 

55 Reset terminal settings to their original state. 

56 

57 This function restores the terminal settings to the state they were in 

58 when the program started. It uses the global variable 

59 'original_terminal_settings' to achieve this. 

60 

61 The function specifically: 

62 1. Uses termios.tcsetattr to apply the original settings. 

63 2. Applies the settings immediately but waits for output to drain first. 

64 

65 Side Effects: 

66 - Modifies the current terminal settings. 

67 - May affect how the terminal handles input and output after this call. 

68 

69 Notes: 

70 - This function should typically be called before the program exits, 

71 to ensure the terminal is left in a usable state. 

72 - The global variable 'original_terminal_settings' must be properly 

73 initialized before this function is called. 

74 - This function is specifically for use on Unix-like systems and 

75 may not work on other operating systems. 

76 

77 Raises: 

78 termios.error: If there's an error setting the terminal attributes. 

79 """ 

80 if original_terminal_settings is not None: 

81 try: 

82 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, original_terminal_settings) 

83 except termios.error: 

84 # Silently pass if we're in a non-interactive environment 

85 pass 

86 

87 

88# Ensure terminal settings are reset when the program exits 

89atexit.register(reset_terminal) 

90 

91 

92def safe_input(prompt: str) -> str: 

93 """ 

94 Safely get input from user after resetting the input buffer. 

95 

96 This function clears the input buffer before prompting for user input, 

97 which can help prevent unwanted input from being processed. The buffer 

98 is only cleared in an interactive environment. 

99 

100 Args: 

101 prompt (str): The prompt to display to the user. 

102 

103 Returns: 

104 str: The user's input. 

105 

106 Notes: 

107 - The input buffer is only cleared if original_terminal_settings is not None, 

108 indicating we're in an interactive environment. 

109 - If clearing the buffer fails, the function will still attempt to get user input. 

110 """ 

111 if original_terminal_settings is not None: 

112 try: 

113 termios.tcflush(sys.stdin, termios.TCIFLUSH) 

114 except termios.error: 

115 # If flushing fails, continue to input anyway 

116 pass 

117 

118 return input(prompt) 

119 

120 

121def get_video_files(input_folder: str) -> List[str]: 

122 """ 

123 Get a list of video files from the specified input folder. 

124 

125 Args: 

126 input_folder (str): The path to the folder containing video files. 

127 

128 Returns: 

129 List[str]: A list of video file names with extensions in the video_extension_list. 

130 """ 

131 return [ 

132 file 

133 for file in os.listdir(input_folder) 

134 if os.path.splitext(file)[1] in video_extension_list 

135 ] 

136 

137 

138@lru_cache(maxsize=None) 

139def get_video_length_ffmpeg(file_path: str) -> Union[float, None]: 

140 """ 

141 Get the duration of a video file using ffmpeg. 

142 

143 This function is cached to improve performance for repeated calls. 

144 

145 Args: 

146 file_path (str): The path to the video file. 

147 

148 Returns: 

149 Union[float, None]: The duration of the video in seconds, or None if the duration 

150 cannot be determined. 

151 """ 

152 command = ["ffmpeg", "-i", file_path] 

153 try: 

154 result = subprocess.Popen( 

155 command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT 

156 ) 

157 output = result.communicate()[0].decode("utf-8") 

158 

159 match = re.search(r"Duration: (\d+):(\d+):(\d+\.\d+)", output) 

160 if match: 

161 h, m, s = map(float, match.groups()) 

162 return h * 3600 + m * 60 + s 

163 else: 

164 print(f"Failed to extract duration from FFmpeg output for {file_path}.") 

165 return None 

166 except Exception as e: 

167 print(f"Error getting video length for {file_path}: {e}") 

168 return None 

169 

170 

171def process_video(input_folder: str, file: str, max_length: float) -> None: 

172 """ 

173 Process a single video file, either by linking or concatenating it to match the max_length. 

174 

175 Args: 

176 input_folder (str): The path to the folder containing the input video. 

177 file (str): The name of the video file to process. 

178 max_length (float): The target length for the processed video. 

179 """ 

180 length = get_video_length_ffmpeg(os.path.join(input_folder, file)) 

181 if length == max_length: 

182 os.link( 

183 os.path.join(input_folder, file), 

184 os.path.join( 

185 input_folder, 

186 f"{os.path.splitext(file)[0]}_TV{os.path.splitext(file)[1]}", 

187 ), 

188 ) 

189 elif length and max_length and length < max_length: 

190 count = int(max_length / length) 

191 output_file = os.path.join( 

192 input_folder, f"list_{os.path.splitext(file)[0]}.txt" 

193 ) 

194 

195 with open(output_file, "w") as f: 

196 f.write(f"file '{file}'\n" * count) 

197 if max_length % length != 0: 

198 f.write(f"file '{file}'\n") 

199 

200 tv_file = os.path.join( 

201 input_folder, f"{os.path.splitext(file)[0]}_TV{os.path.splitext(file)[1]}" 

202 ) 

203 subprocess.run( 

204 f"ffmpeg -f concat -safe 0 -i {output_file} -c copy -t {max_length} {tv_file} -loglevel {ffmpeg_loglevel}", 

205 shell=True, 

206 ) 

207 

208 

209def create_target_video(input_folder: str, video_files: List[str]) -> None: 

210 """ 

211 Create target videos by processing all input video files. 

212 

213 This function determines the maximum length among all input videos and processes 

214 each video to match this length. 

215 

216 Args: 

217 input_folder (str): The path to the folder containing the input videos. 

218 video_files (List[str]): A list of video file names to process. 

219 """ 

220 lengths = [ 

221 get_video_length_ffmpeg(os.path.join(input_folder, file)) 

222 for file in video_files 

223 ] 

224 print(f"Input Video Time List: {lengths}") 

225 

226 max_length = max((length for length in lengths if length is not None), default=None) 

227 if max_length is None: 

228 return 

229 

230 with ThreadPoolExecutor() as executor: 

231 futures = [ 

232 executor.submit(process_video, input_folder, file, max_length) 

233 for file in video_files 

234 ] 

235 for future in as_completed(futures): 

236 future.result() 

237 

238 

239def get_target_files(folder: str, files: List[str]) -> List[str]: 

240 """ 

241 Get a list of processed video files (with '_TV' in the filename) from the specified folder. 

242 

243 Args: 

244 folder (str): The path to the folder containing the processed video files. 

245 files (List[str]): A list of file names to filter. 

246 

247 Returns: 

248 List[str]: A list of full paths to the processed video files. 

249 """ 

250 return [ 

251 os.path.join(folder, file) 

252 for file in files 

253 if "_TV" in file and file.endswith(tuple(video_extension_list)) 

254 ] 

255 

256 

257def get_output_filename_from_user(output_folder: str) -> str: 

258 """ 

259 Prompt the user for an output filename and handle potential file conflicts. 

260 

261 Args: 

262 output_folder (str): The path to the output folder. 

263 

264 Returns: 

265 str: The full path to the output file. 

266 """ 

267 while True: 

268 output_file = safe_input( 

269 f"Enter the name of the output file (default is 'combined_video{video_extension_list[0]}'): " 

270 ) 

271 if not output_file: 

272 output_file = f"combined_video{video_extension_list[0]}" 

273 if not output_file.endswith(tuple(video_extension_list)): 

274 output_file += video_extension_list[0] 

275 output_path = os.path.join(output_folder, output_file) 

276 if os.path.exists(output_path): 

277 overwrite = safe_input( 

278 f"File {output_file} already exists. Overwrite? (y/n): " 

279 ) 

280 if overwrite.lower() != "y": 

281 continue 

282 return output_path 

283 

284 

285@lru_cache(maxsize=None) 

286def get_video_size(filename: str) -> Optional[Tuple[int, int]]: 

287 """ 

288 Get the width and height of a video file using ffprobe. 

289 

290 This function is cached to improve performance for repeated calls. 

291 

292 Args: 

293 filename (str): The path to the video file. 

294 

295 Returns: 

296 Optional[Tuple[int, int]]: A tuple containing the width and height of the video, 

297 or None if the size cannot be determined. 

298 """ 

299 cmd = [ 

300 "ffprobe", 

301 "-v", 

302 f"{ffmpeg_loglevel}", 

303 "-select_streams", 

304 "v:0", 

305 "-count_packets", 

306 "-show_entries", 

307 "stream=width,height", 

308 "-of", 

309 "csv=p=0", 

310 filename, 

311 ] 

312 try: 

313 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode().strip() 

314 dimensions = output.split(",") 

315 if len(dimensions) == 2: 

316 width, height = map(int, dimensions) 

317 return (width, height) 

318 else: 

319 print(f"Unexpected output format from ffprobe for {filename}: {output}") 

320 return None 

321 except subprocess.CalledProcessError as e: 

322 print(f"Error running ffprobe on {filename}: {e}") 

323 return None 

324 except ValueError as e: 

325 print(f"Error parsing ffprobe output for {filename}: {e}") 

326 return None 

327 

328 

329def create_ffmpeg_command_v1( 

330 input_files: list[str], output_path: str, match_input_resolution_flag: bool 

331) -> str: 

332 """ 

333 Create an advanced ffmpeg command to merge multiple videos into a grid layout with sophisticated audio mixing. 

334 

335 This function generates an ffmpeg command that combines multiple input videos 

336 into a single output video with a grid layout. It scales all input videos to the 

337 same resolution, stacks them into a grid, and applies a sophisticated audio mixing process. 

338 The command uses the 'ultrafast' preset of the libx264 codec for rapid encoding, 

339 prioritizing speed while maintaining high audio quality. 

340 

341 Features: 

342 - Scales all input videos to the same size without maintaining aspect ratio 

343 - Arranges videos in a grid layout based on the square root of the number of input files 

344 - Applies volume normalization to each input audio stream for consistent audio levels 

345 - Uses a sophisticated audio mixing process with dropout transition and volume adjustment 

346 - Enhances audio clarity and reduces distortion in the final output 

347 - Balances audio levels effectively across all inputs 

348 - Uses libx264 codec with 'ultrafast' preset for fast encoding 

349 - Optionally matches the output resolution to the combined input resolutions if specified 

350 

351 Args: 

352 input_files (list[str]): A list of paths to input video files. 

353 output_path (str): The path for the output video file. 

354 match_input_resolution_flag (bool): If True, the output resolution matches 

355 the combined input video resolutions; otherwise, 

356 it uses the resolution of the first input video. 

357 

358 Returns: 

359 str: The ffmpeg command string with advanced video layout and audio processing. 

360 

361 Note: 

362 This function prioritizes both speed in video processing and quality in audio output, 

363 making it suitable for projects where quick rendering and audio clarity are important. 

364 """ 

365 if not input_files: 

366 return "" 

367 

368 video_size = get_video_size(input_files[0]) 

369 if video_size is None: 

370 return "" 

371 

372 video_width, video_height = video_size 

373 

374 N = len(input_files) 

375 sqrt_N = int(math.sqrt(N)) 

376 

377 if match_input_resolution_flag: 

378 output_width = video_width * sqrt_N 

379 output_height = video_height * sqrt_N 

380 else: 

381 output_width = video_width 

382 output_height = video_height 

383 

384 filter_complex = "".join( 

385 [f"[{i}:v]scale={video_width}:{video_height}[v{i}]; " for i in range(N)] 

386 ) 

387 filter_complex += "".join( 

388 [ 

389 f'{"".join([f"[v{i*sqrt_N+j}]" for j in range(sqrt_N)])}hstack=inputs={sqrt_N}[row{i}]; ' 

390 for i in range(sqrt_N) 

391 ] 

392 ) 

393 filter_complex += f'{"".join([f"[row{i}]" for i in range(sqrt_N)])}vstack=inputs={sqrt_N}[vstack]; ' 

394 filter_complex += "".join([f"[{i}:a]volume=1[a{i}]; " for i in range(N)]) 

395 filter_complex += "".join([f"[a{i}]" for i in range(N)]) 

396 filter_complex += f"amix=inputs={N}:dropout_transition=0,volume={N}[aout]" 

397 

398 return ( 

399 f"ffmpeg -y {' '.join([f'-i {input_file}' for input_file in input_files])} " 

400 f'-filter_complex "{filter_complex}" ' 

401 f'-map "[vstack]" -map "[aout]" ' 

402 f"-c:v libx264 -preset ultrafast " 

403 f"-c:a aac -b:a 192k -threads {os.cpu_count()} -loglevel {ffmpeg_loglevel} " 

404 f"-s {output_width}x{output_height} {output_path}" 

405 ) 

406 

407 

408def create_ffmpeg_command_v2( 

409 input_files: list[str], output_path: str, match_input_resolution_flag: bool 

410) -> str: 

411 """ 

412 Create an ffmpeg command to merge multiple videos into a grid layout with balanced efficiency and quality. 

413 

414 This function generates an ffmpeg command that combines multiple input videos 

415 into a single output video with a grid layout. It scales all input videos to the 

416 same resolution, stacks them into a grid, and applies sophisticated audio mixing. 

417 The command balances encoding efficiency and output quality, prioritizing file size 

418 reduction while maintaining good visual and audio quality. 

419 

420 Features: 

421 - Scales all input videos to the same size without maintaining aspect ratio 

422 - Arranges videos in a grid layout based on the square root of the number of input files 

423 - Applies volume normalization to each input audio stream for consistent audio levels 

424 - Uses a sophisticated audio mixing process with dropout transition and volume adjustment 

425 - Enhances audio clarity and reduces distortion in the final output 

426 - Balances audio levels effectively across all inputs 

427 - Uses libx264 codec with 'medium' preset for a balance of encoding speed and compression efficiency 

428 - Applies Constant Rate Factor (CRF) for efficient size/quality balance 

429 - Optionally matches the output resolution to the combined input resolutions if specified 

430 

431 Args: 

432 input_files (list[str]): A list of paths to input video files. 

433 output_path (str): The path for the output video file. 

434 match_input_resolution_flag (bool): If True, the output resolution matches 

435 the combined input video resolutions; otherwise, 

436 it uses the resolution of the first input video. 

437 

438 Returns: 

439 str: The ffmpeg command string with balanced video and audio processing. 

440 

441 Note: 

442 This function aims to produce smaller file sizes compared to the fastest encoding 

443 options, while still maintaining good visual quality and audio clarity. It's suitable 

444 for projects where file size is a concern but quality cannot be significantly compromised. 

445 """ 

446 if not input_files: 

447 return "" 

448 

449 video_size = get_video_size(input_files[0]) 

450 if video_size is None: 

451 return "" 

452 

453 video_width, video_height = video_size 

454 

455 N = len(input_files) 

456 sqrt_N = int(math.sqrt(N)) 

457 

458 if match_input_resolution_flag: 

459 output_width = video_width * sqrt_N 

460 output_height = video_height * sqrt_N 

461 else: 

462 output_width = video_width 

463 output_height = video_height 

464 

465 filter_complex = "".join( 

466 [f"[{i}:v]scale={video_width}:{video_height}[v{i}]; " for i in range(N)] 

467 ) 

468 filter_complex += "".join( 

469 [ 

470 f'{"".join([f"[v{i*sqrt_N+j}]" for j in range(sqrt_N)])}hstack=inputs={sqrt_N}[row{i}]; ' 

471 for i in range(sqrt_N) 

472 ] 

473 ) 

474 filter_complex += f'{"".join([f"[row{i}]" for i in range(sqrt_N)])}vstack=inputs={sqrt_N}[vstack]; ' 

475 filter_complex += "".join([f"[{i}:a]volume=1[a{i}]; " for i in range(N)]) 

476 filter_complex += "".join([f"[a{i}]" for i in range(N)]) 

477 filter_complex += f"amix=inputs={N}:dropout_transition=0,volume={N}[aout]" 

478 

479 return ( 

480 f"ffmpeg -y {' '.join([f'-i {input_file}' for input_file in input_files])} " 

481 f'-filter_complex "{filter_complex}" ' 

482 f'-map "[vstack]" -map "[aout]" ' 

483 f"-c:v libx264 -preset medium -crf 23 " 

484 f"-c:a aac -b:a 128k -threads {os.cpu_count()} -loglevel {ffmpeg_loglevel} " 

485 f"-s {output_width}x{output_height} {output_path}" 

486 ) 

487 

488 

489def create_gpu_ffmpeg_command( 

490 input_files: list[str], output_path: str, match_input_resolution_flag: bool 

491) -> str: # pragma: no cover 

492 if not input_files: 

493 return "" 

494 

495 video_size = get_video_size(input_files[0]) 

496 if video_size is None: 

497 return "" 

498 

499 video_width, video_height = video_size 

500 

501 N = len(input_files) 

502 sqrt_N = int(math.sqrt(N)) 

503 

504 if match_input_resolution_flag: 

505 output_width = video_width * sqrt_N 

506 output_height = video_height * sqrt_N 

507 else: 

508 output_width = video_width 

509 output_height = video_height 

510 

511 filter_complex = "".join( 

512 [f"[{i}:v]scale={video_width}:{video_height}[v{i}]; " for i in range(N)] 

513 ) 

514 filter_complex += "".join( 

515 [ 

516 f'{"".join([f"[v{i*sqrt_N+j}]" for j in range(sqrt_N)])}hstack=inputs={sqrt_N}[row{i}]; ' 

517 for i in range(sqrt_N) 

518 ] 

519 ) 

520 filter_complex += f'{"".join([f"[row{i}]" for i in range(sqrt_N)])}vstack=inputs={sqrt_N}[vstack]; ' 

521 filter_complex += "".join([f"[{i}:a]volume=1[a{i}]; " for i in range(N)]) 

522 filter_complex += "".join([f"[a{i}]" for i in range(N)]) 

523 filter_complex += f"amix=inputs={N}:dropout_transition=0,volume={N}[aout]" 

524 

525 return ( 

526 f"ffmpeg -y {' '.join([f'-i {input_file}' for input_file in input_files])} " 

527 f'-filter_complex "{filter_complex}" ' 

528 f'-map "[vstack]" -map "[aout]" ' 

529 f"-c:v h264_nvenc -preset p7 " 

530 f"-c:a aac -b:a 192k -threads {os.cpu_count()} -loglevel {ffmpeg_loglevel} " 

531 f"-s {output_width}x{output_height} {output_path}" 

532 ) 

533 

534 

535def main( 

536 input_folder: Optional[str] = None, output_folder: Optional[str] = None 

537) -> None: 

538 """ 

539 Main function to process and merge multiple videos into a grid layout. 

540 

541 This function handles the entire workflow, including: 

542 - Getting input video files 

543 - Creating target videos 

544 - Generating the ffmpeg command 

545 - Running the ffmpeg command to create the final merged video 

546 

547 Args: 

548 input_folder (Optional[str]): The path to the input folder. If None, a default path is used. 

549 output_folder (Optional[str]): The path to the output folder. If None, a default path is used. 

550 """ 

551 input_folder = input_folder or "./video_grid_merge/media/input" 

552 output_folder = output_folder or "./video_grid_merge/media/output" 

553 

554 start = time.perf_counter() 

555 rnf.rename_files_with_spaces(input_folder) 

556 video_files = get_video_files(input_folder) 

557 

558 if len(video_files) < 4 or int(math.sqrt(len(video_files))) ** 2 != len( 

559 video_files 

560 ): 

561 sys.exit( 

562 f"Error: Please store a perfect square number (>= 4) of video files in the input folder.\ninput_folder: {input_folder}" 

563 ) 

564 

565 os.makedirs(output_folder, exist_ok=True) 

566 output_path = get_output_filename_from_user(output_folder) 

567 

568 create_target_video(input_folder, video_files) 

569 input_files = get_target_files(input_folder, sorted(os.listdir(input_folder))) 

570 

571 if ffmpeg_cmd_version == "v1": 

572 ffmpeg_command = create_ffmpeg_command_v1( 

573 input_files, output_path, match_input_resolution_flag 

574 ) 

575 elif ffmpeg_cmd_version == "v2": 

576 ffmpeg_command = create_ffmpeg_command_v2( 

577 input_files, output_path, match_input_resolution_flag 

578 ) 

579 elif ffmpeg_cmd_version == "gpu": # pragma: no cover 

580 ffmpeg_command = create_gpu_ffmpeg_command( 

581 input_files, output_path, match_input_resolution_flag 

582 ) 

583 else: 

584 raise ValueError(f"Invalid ffmpeg_cmd_version: {ffmpeg_cmd_version}") 

585 

586 print("Video Grid Merge Start") 

587 subprocess.run(ffmpeg_command, shell=True) 

588 dlf.delete_files_in_folder(temporarily_data_list, input_folder) 

589 print("Video Grid Merge End And Output Success") 

590 print(f"File Output Complete: {output_path}") 

591 

592 elapsed_time = time.perf_counter() - start 

593 print(f"Processing Time(s): {elapsed_time:.8f}\n") 

594 

595 

596if __name__ == "__main__": # pragma: no cover 

597 main()