-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgenerate_movie.py
375 lines (309 loc) · 13.4 KB
/
generate_movie.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
import sys
import json
import cv2
import numpy as np
from PIL import Image, ImageDraw, ImageFont
from moviepy.editor import VideoFileClip, CompositeVideoClip, VideoClip
from moviepy.video.VideoClip import ImageClip
import ffmpeg
from tqdm import tqdm
from moviepy.audio.AudioClip import CompositeAudioClip
from moviepy.editor import AudioFileClip
from moviepy.video.compositing.concatenate import concatenate_videoclips
from pydub import AudioSegment
import requests
import io
import re
import MeCab
import unidic
import pandas as pd
import alkana
# Helper function: Convert alphabet to Katakana
# https://qiita.com/kunishou/items/814e837cf504ce287a13
def alpha_to_kana(text):
# Check if string is alphabetic
alphaReg = re.compile(r'^[a-zA-Z]+$')
def isalpha(s):
return alphaReg.match(s) is not None
sample_txt = text
wakati = MeCab.Tagger('-Owakati')
wakati_result = wakati.parse(sample_txt)
df = pd.DataFrame(wakati_result.split(" "),columns=["word"])
df = df[df["word"].str.isalpha() == True]
df["english_word"] = df["word"].apply(isalpha)
df = df[df["english_word"] == True]
df["katakana"] = df["word"].apply(alkana.get_kana)
dict_rep = dict(zip(df["word"], df["katakana"]))
for word, read in dict_rep.items():
sample_txt = sample_txt.replace(word, read or "")
return sample_txt
TTF_FONTFILE='/usr/share/fonts/opentype/noto/NotoSansCJK-Bold.ttc'
def draw_trajectory(frame, current_time, trajectory, clear_events):
img = Image.fromarray(frame)
draw = ImageDraw.Draw(img)
# Clear all existing trajectories if the current time matches any of the clear_events
already_clipped = [clear_time for clear_time in clear_events if clear_time < current_time * 1000]
last_clipped = max(already_clipped) if len(already_clipped) > 0 else -1
# Draw the trajectory
for i, item in enumerate(trajectory):
start_time, draw_time, x, y = item
prev_start_time, prev_time, prev_x, prev_y = trajectory[i - 1] if i > 0 else [-1, 0, 0, 0]
x = x * img.width + img.width / 2
y = y * img.width + img.height / 2
prev_x = prev_x * img.width + img.width / 2
prev_y = prev_y * img.width + img.height / 2
if prev_start_time != start_time:
continue
if last_clipped < draw_time and draw_time <= current_time * 1000:
draw.line((prev_x, prev_y, x, y), fill="red", width=3)
return np.array(img)
def compose_video_with_trajectory(video, trajectory, clear_events):
def process_frame(get_frame, t):
frame = get_frame(t)
return draw_trajectory(frame, t, trajectory, clear_events)
new_video = video.fl(lambda gf, t: process_frame(gf, t), apply_to=['mask', 'video'])
return new_video
def read_comments(comments_filename):
trajectory = []
clear_events = []
with open(comments_filename, 'r') as f:
comments = json.load(f)
if isinstance(comments, dict):
trajectory = comments["trajectory"]
clear_events = comments["clear"]
comments = comments["comments"]
return comments, trajectory, clear_events
def apply_speed_change(clip, comment_text):
if re.match(r'^>+(\n)*$', comment_text):
speed_multiplier = len(comment_text.strip())
return clip.speedx(speed_multiplier)
elif re.match(r'^<+(\n)*$', comment_text):
speed_divisor = len(comment_text.strip())
return clip.speedx(1 / speed_divisor)
else:
return clip
def parse_comment(comment, use_literal=True):
def replacer(match):
literal, pronoun = match.groups()
return literal if use_literal else pronoun
return re.sub(r"\{(.+?)\|(.+?)\}", replacer, comment)
def process_video_speed_and_offsets(video, comments):
# First pass: create the processed clips and calculate the adjustments
processed_clips = []
current_speed = 1
current_time = 0
bracket_level = 0
cumulative_adjustment = 0
adjustments = []
for comment in comments:
start_ms, text = comment
start_s = start_ms / 1000.0
if text == "[":
bracket_level += 1
if bracket_level == 2: # Nested bracket detected, reset bracket level
bracket_level = 0
adjustments.append(cumulative_adjustment)
continue
elif text == "]":
bracket_level = max(0, bracket_level - 1)
adjustments.append(cumulative_adjustment)
continue
if bracket_level > 0: # Inside a bracket, skip this comment
adjustments.append(cumulative_adjustment)
continue
new_speed = apply_speed_multiplier(text, current_speed)
if new_speed != current_speed: # Speed change detected
if current_time != start_s:
clip = video.subclip(current_time, start_s).speedx(current_speed)
print("%f-%f (x%f)"%(current_time, start_s, current_speed))
processed_clips.append(clip)
clip_duration = start_s - current_time
adjustment = (clip_duration / current_speed - clip_duration) * 1000
cumulative_adjustment += adjustment
current_speed = new_speed
current_time = start_s
clip_duration = start_s - current_time
adjustment = (clip_duration / current_speed - clip_duration) * 1000
adjustments.append(cumulative_adjustment + adjustment)
# Add the remaining part of the video with the last speed change applied
processed_clips.append(video.subclip(current_time).speedx(current_speed))
print("%f- (x%f)"%(current_time, current_speed))
# Second pass: apply the adjustments to the comments
adjusted_comments = []
for i, comment in enumerate(comments):
start_ms, text = comment
adjustment = adjustments[i]
adjusted_comments.append([start_ms + adjustment, text])
print("%f-->%f: %s"%(start_ms / 1000, (start_ms + adjustment)/1000, text))
return concatenate_videoclips(processed_clips), adjusted_comments
def apply_speed_multiplier(text, current_speed):
if re.match(r'^>+(\n)*$', text):
return len(text.strip())
elif re.match(r'^<+(\n)*$', text):
return 1 / len(text.strip())
else:
return current_speed
def create_text_image(text, width, height, font):
image = Image.new('RGBA', (width, height), (0, 0, 0, 0))
draw = ImageDraw.Draw(image)
max_width = width
wrapped_text = ""
for line in text.split('\n'):
line_width = draw.textbbox((0, 0), line, font=font)[2]
if line_width <= max_width:
wrapped_text += line + "\n"
else:
delimiter = ' '
words = line.split(delimiter)
if len(words) == 1:
tagger = MeCab.Tagger("-Owakati")
words = tagger.parse(line).split(delimiter)
delimiter = ''
current_line = ""
for word in words:
word_width = draw.textbbox((0, 0), current_line + word + delimiter, font=font)[2]
if word_width <= max_width:
current_line += word + delimiter
else:
wrapped_text += current_line + '\n'
current_line = word + delimiter
wrapped_text += current_line + '\n'
lines = wrapped_text.split('\n')
print(lines)
total_text_height = sum([draw.textbbox((0, 0), line, font=font)[3] for line in lines])
y_text = height - total_text_height
for line in lines:
text_size = draw.textbbox((0, 0), line, font=font)
text_pos = ((width - text_size[2]) // 2, y_text)
x, y = text_pos
black = (0, 0, 0, 255) # Black edge
for offset in range(-3, 4):
draw.text((x+offset, y), line, font=font, fill=black)
draw.text((x-offset, y), line, font=font, fill=black)
draw.text((x, y+offset), line, font=font, fill=black)
draw.text((x, y-offset), line, font=font, fill=black)
draw.text((x+offset, y+offset), line, font=font, fill=black)
draw.text((x+offset, y-offset), line, font=font, fill=black)
draw.text((x-offset, y+offset), line, font=font, fill=black)
draw.text((x-offset, y-offset), line, font=font, fill=black)
white = (255, 255, 255, 255) # White text
draw.text(text_pos, line, font=font, fill=white)
y_text += text_size[3]
image = cv2.cvtColor(np.array(image), cv2.COLOR_RGBA2BGRA)
return image
def overlay_text_comments(video_filename, comments):
font = ImageFont.truetype(TTF_FONTFILE, 50)
if isinstance(video_filename, str):
video = VideoFileClip(video_filename, audio=False) # Remove audio
elif isinstance(video_filename, VideoClip):
video = video_filename
else:
return None
video_size = video.size
clips = [video]
for i in range(len(comments)):
start_ms, text, duration_ms = comments[i]
start_sec = start_ms / 1000.0 # Convert milliseconds to seconds
literal_text = parse_comment(text) # Use the literal part for overlay text
if i < len(comments) - 1:
next_start_ms, *_ = comments[i + 1]
duration = min((next_start_ms - start_ms) / 1000.0, duration_ms / 1000.0)
else:
duration = 10
print("%d: duration=%f sec"%(i, duration))
text_image = create_text_image(literal_text, video_size[0], video_size[1], font )
txt_clip = (ImageClip(text_image, duration=duration).set_start(start_sec))
clips.append(txt_clip)
return clips
def add_audio_comments(video_filename, audio_filename, output_filename):
input_video = ffmpeg.input(video_filename)
input_audio = ffmpeg.input(audio_filename)
ffmpeg.concat(input_video, input_audio, v=1, a=1).output(output_filename).run(overwrite_output=True)
def preview_video(comments, video_filename, audio_comments_filename):
# Overlay text comments on video
video_clips = overlay_text_comments(video_filename, comments)
final_video = CompositeVideoClip(video_clips)
# Generate audio comments
audio = AudioFileClip(audio_comments_filename)
# Set the duration of the audio to match the video
# audio = audio.set_duration(final_video.duration)
# Set audio to the video
final_video = final_video.set_audio(audio)
# Preview the video
final_video.preview()
def generate_video(comments, video_filename, audio_comments_filename, final_filename):
# Overlay text comments on video
video_clips = overlay_text_comments(video_filename, comments)
final_video = CompositeVideoClip(video_clips)
# Generate audio comments
audio = AudioFileClip(audio_comments_filename)
# Set the duration of the audio to match the video
# audio = audio.set_duration(final_video.duration)
# Set audio to the video
final_video = final_video.set_audio(audio)
# Preview the video
final_video.write_videofile(final_filename, codec='libx264')
def generate_wav(filename, comments, audioSpeedScale, speaker=0):
# Create an empty audio track of silence for mixdown
mixdown_audio = AudioSegment.silent(duration=0)
# List to store segmented comments
segmented_comments = []
# Iterate over the sorted comments
for comment in tqdm(sorted(comments, key=lambda x: x[0])):
start_time, text = comment
# Calculate silence duration and insert it if necessary
silence_duration_ms = max(0, start_time - len(mixdown_audio))
silence = AudioSegment.silent(duration=silence_duration_ms)
mixdown_audio += silence
# Split the comment text by '---' and process each segment separately
segments = text.split('---')
for segment in segments:
if len(segment.strip()) == 0:
continue
# Convert text, generate audio data, and load it into an AudioSegment
kana_segment = alpha_to_kana(segment)
pronoun_segment = parse_comment(kana_segment, use_literal=False)
res1 = requests.post("http://localhost:50021/audio_query", params={"text": pronoun_segment if pronoun_segment else segment, "speaker": speaker})
data = res1.json()
if "speedScale" in data:
data["speedScale"] *= audioSpeedScale
wav_res = requests.post("http://localhost:50021/synthesis", params={"speaker": speaker}, json=data)
wav_data = wav_res.content
audio_segment = AudioSegment.from_wav(io.BytesIO(wav_data))
# Append the audio_segment to the mixdown_audio
mixdown_audio += audio_segment
segment = parse_comment(segment, use_literal=True)
# Add the segmented comment with start time, text, and duration
audio_duration_ms = len(audio_segment)
segmented_comments.append([start_time, segment, audio_duration_ms])
# Update the start_time for the next segment
start_time += audio_duration_ms
# Export the mixdown_audio to a .wav file with the updated filename
output_filename = filename + ".comments.wav"
mixdown_audio.export(output_filename, format="wav")
return segmented_comments, output_filename
def main():
video_filename = sys.argv[1]
comments_filename = video_filename + ".comments.json"
comments, trajectory, clear_events = read_comments(comments_filename)
# comments = comments[0:3]
audioSpeedScale = float(sys.argv[3]) if len(sys.argv) > 3 and float(sys.argv[3]) else 1.0
if len(sys.argv) > 2 and sys.argv[2] == '--audio':
text_overlay_video_filename = video_filename[:-4] + "_text_overlay.mp4"
audio_comments_filename = video_filename + ".comments.wav"
# Combine video with overlay text and audio comments
output_filename = video_filename[:-4] + "_final.mp4"
add_audio_comments(text_overlay_video_filename, audio_comments_filename, output_filename)
else:
video = VideoFileClip(video_filename, audio=False)
video_with_trajectory = compose_video_with_trajectory(video, trajectory, clear_events)
processed_video, updated_comments = process_video_speed_and_offsets(video_with_trajectory, comments)
if len(sys.argv) > 2 and sys.argv[2] == '--preview':
updated_comments, audio_comments_filename = generate_wav(video_filename, updated_comments, audioSpeedScale)
preview_video(updated_comments, processed_video, audio_comments_filename)
else:
updated_comments, audio_comments_filename = generate_wav(video_filename, updated_comments, audioSpeedScale)
output_filename = video_filename[:-4] + "_final.mp4"
generate_video(updated_comments, processed_video, audio_comments_filename, output_filename)
if __name__ == "__main__":
main()