blob: a13fad88af7a7c3bafd8a7da73af4cdb5a115feb [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Module: preprocess_data
Reference: https://github.com/rizkiarm/LipNet
"""
# pylint: disable=too-many-locals, no-self-use, c-extension-no-member
import os
import fnmatch
import errno
import numpy as np
from scipy import ndimage
from scipy.misc import imresize
from skimage import io
import skvideo.io
import dlib
def mkdir_p(path):
"""
Make a directory
"""
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise
def find_files(directory, pattern):
"""
Find files
"""
for root, _, files in os.walk(directory):
for basename in files:
if fnmatch.fnmatch(basename, pattern):
filename = os.path.join(root, basename)
yield filename
class Video(object):
"""
Preprocess for Video
"""
def __init__(self, vtype='mouth', face_predictor_path=None):
if vtype == 'face' and face_predictor_path is None:
raise AttributeError('Face video need to be accompanied with face predictor')
self.face_predictor_path = face_predictor_path
self.vtype = vtype
self.face = None
self.mouth = None
self.data = None
self.length = None
def from_frames(self, path):
"""
Read from frames
"""
frames_path = sorted([os.path.join(path, x) for x in os.listdir(path)])
frames = [ndimage.imread(frame_path) for frame_path in frames_path]
self.handle_type(frames)
return self
def from_video(self, path):
"""
Read from videos
"""
frames = self.get_video_frames(path)
self.handle_type(frames)
return self
def from_array(self, frames):
"""
Read from array
"""
self.handle_type(frames)
return self
def handle_type(self, frames):
"""
Config video types
"""
if self.vtype == 'mouth':
self.process_frames_mouth(frames)
elif self.vtype == 'face':
self.process_frames_face(frames)
else:
raise Exception('Video type not found')
def process_frames_face(self, frames):
"""
Preprocess from frames using face detector
"""
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor(self.face_predictor_path)
mouth_frames = self.get_frames_mouth(detector, predictor, frames)
self.face = np.array(frames)
self.mouth = np.array(mouth_frames)
if mouth_frames[0] is not None:
self.set_data(mouth_frames)
def process_frames_mouth(self, frames):
"""
Preprocess from frames using mouth detector
"""
self.face = np.array(frames)
self.mouth = np.array(frames)
self.set_data(frames)
def get_frames_mouth(self, detector, predictor, frames):
"""
Get frames using mouth crop
"""
mouth_width = 100
mouth_height = 50
horizontal_pad = 0.19
normalize_ratio = None
mouth_frames = []
for frame in frames:
dets = detector(frame, 1)
shape = None
for det in dets:
shape = predictor(frame, det)
i = -1
if shape is None: # Detector doesn't detect face, just return None
return [None]
mouth_points = []
for part in shape.parts():
i += 1
if i < 48: # Only take mouth region
continue
mouth_points.append((part.x, part.y))
np_mouth_points = np.array(mouth_points)
mouth_centroid = np.mean(np_mouth_points[:, -2:], axis=0)
if normalize_ratio is None:
mouth_left = np.min(np_mouth_points[:, :-1]) * (1.0 - horizontal_pad)
mouth_right = np.max(np_mouth_points[:, :-1]) * (1.0 + horizontal_pad)
normalize_ratio = mouth_width / float(mouth_right - mouth_left)
new_img_shape = (int(frame.shape[0] * normalize_ratio),
int(frame.shape[1] * normalize_ratio))
resized_img = imresize(frame, new_img_shape)
mouth_centroid_norm = mouth_centroid * normalize_ratio
mouth_l = int(mouth_centroid_norm[0] - mouth_width / 2)
mouth_r = int(mouth_centroid_norm[0] + mouth_width / 2)
mouth_t = int(mouth_centroid_norm[1] - mouth_height / 2)
mouth_b = int(mouth_centroid_norm[1] + mouth_height / 2)
mouth_crop_image = resized_img[mouth_t:mouth_b, mouth_l:mouth_r]
mouth_frames.append(mouth_crop_image)
return mouth_frames
def get_video_frames(self, path):
"""
Get video frames
"""
videogen = skvideo.io.vreader(path)
frames = np.array([frame for frame in videogen])
return frames
def set_data(self, frames):
"""
Prepare the input of model
"""
data_frames = []
for frame in frames:
#frame H x W x C
frame = frame.swapaxes(0, 1) # swap width and height to form format W x H x C
if len(frame.shape) < 3:
frame = np.array([frame]).swapaxes(0, 2).swapaxes(0, 1) # Add grayscale channel
data_frames.append(frame)
frames_n = len(data_frames)
data_frames = np.array(data_frames) # T x W x H x C
data_frames = np.rollaxis(data_frames, 3) # C x T x W x H
data_frames = data_frames.swapaxes(2, 3) # C x T x H x W = NCDHW
self.data = data_frames
self.length = frames_n
def preprocess(from_idx, to_idx, _params):
"""
Preprocess: Convert a video into the mouth images
"""
source_exts = '*.mpg'
src_path = _params['src_path']
tgt_path = _params['tgt_path']
face_predictor_path = './shape_predictor_68_face_landmarks.dat'
succ = set()
fail = set()
for idx in range(from_idx, to_idx):
s_id = 's' + str(idx) + '/'
source_path = src_path + '/' + s_id
target_path = tgt_path + '/' + s_id
fail_cnt = 0
for filepath in find_files(source_path, source_exts):
print("Processing: {}".format(filepath))
filepath_wo_ext = os.path.splitext(filepath)[0].split('/')[-2:]
target_dir = os.path.join(tgt_path, '/'.join(filepath_wo_ext))
if os.path.exists(target_dir):
continue
try:
video = Video(vtype='face', \
face_predictor_path=face_predictor_path).from_video(filepath)
mkdir_p(target_dir)
i = 0
if video.mouth[0] is None:
continue
for frame in video.mouth:
io.imsave(os.path.join(target_dir, "mouth_{0:03d}.png".format(i)), frame)
i += 1
except ValueError as error:
print(error)
fail_cnt += 1
if fail_cnt == 0:
succ.add(idx)
else:
fail.add(idx)
return (succ, fail)
if __name__ == '__main__':
import argparse
from multi import multi_p_run, put_worker
PARSER = argparse.ArgumentParser()
PARSER.add_argument('--src_path', type=str, default='../data/mp4s')
PARSER.add_argument('--tgt_path', type=str, default='../data/datasets')
PARSER.add_argument('--n_process', type=int, default=1)
CONFIG = PARSER.parse_args()
N_PROCESS = CONFIG.n_process
PARAMS = {'src_path':CONFIG.src_path,
'tgt_path':CONFIG.tgt_path}
os.makedirs('{tgt_path}'.format(tgt_path=PARAMS['tgt_path']), exist_ok=True)
if N_PROCESS == 1:
RES = preprocess(0, 35, PARAMS)
else:
RES = multi_p_run(35, put_worker, preprocess, PARAMS, N_PROCESS)