#!/usr/bin/env python
# Python module to load images into postgres or greenplum db, for
# use with madlib deep_learning module.
# The format of the image tables created will have at least 3 columns:
# (id SERIAL, x REAL[], y). Each row is 1 image,
# with image data represented by x (a 3D array of type "real"), and
# y (category) as text or 3D array of numeric type(int[], real[], etc).
# id is just a unique identifier for each image, so they don't get
# mixed up during prediction. If images are being loaded from disk,
# there will be an additional img_name column containing the filename
# of the image, to help identify later.
# ImageLoader.ROWS_PER_FILE = 1000 by default; this is the number of rows per
# temporary file (or StringIO buffer) loaded at once.
# There are two ways of using this module. One is to load it with:
# import madlib_image_loader
# (make sure it is in a directory python knows about.
# Try adding the directory to PYTHONPATH if it can't find it.)
# and use the exposed classes and functions described below.
# The second way is to run it directly, passing all options on the
# command line. The second way only supports loading images
# from disk, whereas the first way can be used either to do that or
# to load them from a dataset already in an existing numpy array (such
# as the datasets that come prepackaged with keras).
# The module API is pretty simple, only involving two classes:
# ImageLoader
# DbCredentials
# two functions (in addition to the class constructors):
# ImageLoader.load_dataset_from_np
# ImageLoader.load_dataset_from_disk
# and one adjustable parameter (change if default is not working well):
# ImageLoader.ROWS_PER_FILE=1000
# Workflow
# 1. Create objects:
# db_creds = DbCredentials(db_name='madlib', user=None, password='',
# host='localhost', port=5432)
# iloader = ImageLoader(db_creds, num_workers, table_name=None)
# 2a. Perform parallel image loading from numpy arrays:
# iloader.load_dataset_from_np(data_x, data_y, table_name,
# append=False, label_datatype='TEXT')
# data_x contains image data in np.array format, and data_y is a 1D np.array
# of the image categories (labels).
# Default database credentials are: localhost port 5432, madlib db, no
# password. Calling the default constructor DbCredentials() will attempt
# to connect using these credentials, but any of them can be overriden.
# append=False attempts to create a new table, while append=True appends more
# images to an existing table.
# If the user passes a table_name while creating ImageLoader object, it will
# be used for all further calls to load_dataset_from_np. It can be
# changed by passing it as a parameter during the actual call to
# load_dataset_from_np, and if so future calls will load to that table
# name instead. This avoids needing to pass the table_name again every
# time, but also allows it to be changed at any time.
# label_datatype is used for defining the datatype for y(label) in the output
# table, where y is a numeric array. Default datatype for y is TEXT
# or,
# 2b. Perform parallel image loading from disk:
# load_dataset_from_disk(self, root_dir, table_name, num_labels='all',
# append=False, label_datatype='TEXT'):
# Calling this function instead will look in root_dir on the local disk of
# wherever this is being run. It will skip over any files in that
# directory, but will load images contained in each of its
# subdirectories. The images should be organized by category/class,
# where the name of each subdirectory is the label for the images
# contained within it.
# The table_name and append parameters are the same as described
# above. num_labels is an optional parameter which can be used to
# restrict the number of labels (image classes) loaded, even if more
# are found in root_dir. For example, for a large dataset you may
# have hundreds of labels, but only wish to use a subset of that
# containing a few dozen.
# label_datatype is used for defining the datatype for y(label) in the output
# table, where y is a numeric array. Default datatype for y is TEXT
# If you want to load an image dataset from disk, but don't feel like writing
# any python code to call the API, you can just run this file directly, passing
# these parameters on the command line.
# usage: [-h] [-r ROOT_DIR] [-n NUM_LABELS] [-d DB_NAME]
# [-a] [-w NUM_WORKERS] [-p PORT] [-U USERNAME]
# table_name
# positional arguments:
# table_name Name of table where images should be loaded
# optional arguments:
# -h, --help show this help message and exit
# -r ROOT_DIR, --root-dir ROOT_DIR
# Root directory of image directories (default: .)
# -n NUM_LABELS, --num-labels NUM_LABELS
# Number of image labels (categories) to load. (default:
# all)
# -d DB_NAME, --db-name DB_NAME
# Name of database where images should be loaded
# (default: madlib)
# -a, --append Name of database where images should be loaded
# (default: False)
# -l LABEL_DATATYPE, --label-datatype LABEL_DATATYPE
# SQL datatype of label column in output table for
# numeric arrays
# Example: INT, REAL, BIGINT (default: TEXT)
# -w NUM_WORKERS, --num-workers NUM_WORKERS
# Name of parallel workers. (default: 5)
# -p PORT, --port PORT database server port (default: 5432)
# -U USERNAME, --username USERNAME
# database user name (default: None)
# -t HOST, --host HOST database server host. (default: localhost)
# -P PASSWORD, --password PASSWORD
# database user password (default: None)
# -m, --no-temp-files no temporary files, construct all image tables in-
# memory (default: False)
import argparse
from cStringIO import StringIO
from multiprocessing import Pool, current_process
import os
import random
import signal
from shutil import rmtree
import string
import time
import traceback
import psycopg2 as db
import numpy as np
from PIL import Image
class SignalException(Exception):
def _worker_sig_handler(signum, frame):
if signum == signal.SIGINT:
msg = "Received SIGINT in worker."
elif signum == signal.SIGTERM:
msg = "Received SIGTERM in worker."
elif signum == signal.SIGSEGV:
msg = "Received SIGSEGV in worker."
msg = "Received unknown signal in worker"
raise SignalException(msg)
def _call_disk_worker(label):
global iloader
def _call_np_worker(data): # data = list of (x, y) or (x, y, num_images) tuples
try: # of length self.ROWS_PER_FILE
if iloader.no_temp_files:
except Exception as e:
if iloader.tmp_dir:
# For some reason, when an exception is raised in a worker, the
# stack trace doesn't get shown. So we have to print it ourselves
# (actual exception # msg will get printed by mother process.
print "\nError in {0} while loading images".format(iloader.pr_name)
print traceback.format_exc()
raise e
# dummy param needed so this can be called for
# each worker from
def _worker_cleanup(dummy):
if iloader.tmp_dir:
def init_worker(mother_pid, table_name, append, no_temp_files, db_creds,
from_disk, root_dir=None):
pr = current_process()
print("Initializing {0} [pid {1}]".format(,
iloader = ImageLoader(db_creds=db_creds)
iloader.mother_pid = mother_pid
iloader.table_name = table_name
iloader.no_temp_files = no_temp_files
iloader.root_dir = root_dir
iloader.from_disk = from_disk
signal.signal(signal.SIGINT, _worker_sig_handler)
signal.signal(signal.SIGSEGV, _worker_sig_handler)
if not no_temp_files:
except Exception as e:
if iloader.tmp_dir:
print "\nException in {0} init_worker:".format(
print traceback.format_exc()
raise e
class DbCredentials:
def __init__(self, db_name='madlib', user=None, password='',
host='localhost', port=5432):
if user:
self.user = user
self.user = os.environ["USER"]
self.db_name = db_name
self.password = password = host
self.port = port
class ImageLoader:
def __init__(self, db_creds=None, num_workers=None, table_name=None):
self.num_workers = num_workers
self.append = False
self.img_num = 0
self.db_creds = db_creds
self.db_conn = None
self.db_cur = None
self.tmp_dir = None
self.mother = False
self.pr_name = current_process().name
self.table_name = table_name
self.root_dir = None
self.pool = None
self.no_temp_files = False
global iloader # Singleton per process
iloader = self
def terminate_workers(self):
if iloader.pool:, [0] * self.num_workers)
self.pool = None
print("{} workers terminated.".format(self.num_workers))
def _random_string(self):
return ''.join([random.choice(string.ascii_letters + string.digits)\
for n in xrange(10)])
def mk_temp_dir(self):
self.tmp_dir = '/tmp/madlib_{0}'.format(self._random_string())
print("{0}: Created temporary directory {1}"\
.format(self.pr_name, self.tmp_dir))
def rm_temp_dir(self):
print("{0}: Removed temporary directory {1}"\
.format(self.pr_name, self.tmp_dir))
self.tmp_dir = None
def db_connect(self):
if self.db_cur:
db_name = self.db_creds.db_name
user = self.db_creds.user
host =
port = self.db_creds.port
password = self.db_creds.password
connection_string = "dbname={0} user={1} host={2} port={3} password={4}"\
.format(db_name, user, host, port, password)
self.db_conn = db.connect(connection_string)
self.db_cur = self.db_conn.cursor()
self.db_conn.autocommit = True
except db.DatabaseError as error:
raise error
print("{0}: Connected to {1} db.".
format(self.pr_name, self.db_creds.db_name))
def db_exec(self, query, args=None, echo=True):
if self.db_cur is not None:
if echo:
print "Executing: {0}".format(query)
self.db_cur.execute(query, args)
if echo:
print self.db_cur.statusmessage
raise RuntimeError("{0}: db_cur is None in db_exec"\
def db_close(self):
if self.db_cur is not None:
self.db_cur = None
if isinstance(self.db_conn, db.extensions.connection):
self.db_conn = None
def _gen_lines(self, data):
def f(x):
x = str(x.tolist())
return x.replace('[','{').replace(']','}')
for i, row in enumerate(data):
if len(row) == 3:
x, y, image_name = row
if not self.from_disk and y.ndim > 1:
y = f(y)
yield '{0}|{1}|{2}\n'.format(f(x), y, image_name)
elif len(row) == 2:
x, y = row
if not self.from_disk and y.ndim > 1:
y = f(y)
yield '{0}|{1}\n'.format(f(x), y)
raise RuntimeError("Cannot write invalid row to table:\n{0}"\
def _write_file(self, file_object, data):
lines = self._gen_lines(data)
# This is default value, can be overriden by user, by setting
# iloader.ROWS_PER_FILE after ImageLoader is created.
# Copies from open file-like object f into database
def _copy_into_db(self, f, data):
table_name = self.table_name
if self.from_disk:
self.db_cur.copy_from(f, table_name, sep='|', columns=['x','y',
self.db_cur.copy_from(f, table_name, sep='|', columns=['x','y'])
print("{0}: Loaded {1} images into {2}".format(self.pr_name, len(data),
# Use in-memory buffer as file-like object to load a block of data into db
# (no temp files written)
def _just_load(self, data):
f = StringIO()
self._write_file(f, data)
self._copy_into_db(f, data)
# Write out a temporary file and then load it into db as a table
def _write_tmp_file_and_load(self, data):
table_name = self.table_name
if not self.tmp_dir:
print("{0}: Can't find temporary directory... exiting."\
time.sleep(1) # allow some time for p.terminate() to be called
filename = os.path.join(self.tmp_dir, '{0}{1:04}.tmp'.format(
table_name, self.img_num))
self.img_num += 1
with file(filename, 'w') as f:
self._write_file(f, data)
print("{0}: Wrote {1} images to {2}".format(self.pr_name, len(data),
with file(filename, 'r') as f:
self._copy_into_db(f, data)
def _validate_input_and_create_table(self, data_x=[], data_y=[]):
if len(data_x) != len(data_y):
raise ValueError("Invalid dataset passed, number of labels in "
"data_y ({0}) does not match number of images "
"in data_x ({1})"\
.format(len(data_y), len(data_x)))
if self.append:
# Validate that table already exists
self.db_exec("SELECT count(*) FROM {0}".format(self.table_name),
except db.DatabaseError:
raise RuntimeError("append=True passed, but cannot append to "
"table {0} in db {1}. Either make sure the "
"table exists and you have access to it, or "
"use append=False (default) to auto-create it"
"during loading."
.format(self.table_name, self.db_creds.db_name))
print "Appending to table {0} in {1} db".format(self.table_name,
y_type = self.label_datatype
# Create new table
if self.from_disk:
sql = "CREATE TABLE {0} (id SERIAL, x REAL[], y {1},\
img_name TEXT)".format(self.table_name, y_type)
sql = "CREATE TABLE {0} (id SERIAL, x REAL[], y {1})"\
.format( self.table_name, y_type)
except db.DatabaseError as e:
raise RuntimeError("{0} while creating {1} in db {2}.\n"
"If the table already exists, you can use "
"append=True to append more images to it."
.format(e.message.strip(), self.table_name,
print "Created table {0} in {1} db".format(self.table_name,
def load_dataset_from_np(self, data_x, data_y, table_name=None,
append=False, label_datatype='TEXT'):
Loads a numpy array into db. For append=False, creates a new table and
loads the data. For append=True, appends data to existing table.
Throws an exception if append=False and table_name already exists,
or if append=True and table_name does not exist. Makes use of
worker processes initialized during ImageLoader object creation to
load in parallel.
@data_x independent variable data, a numpy array of images. Size of
first dimension is number of images. Rest of dimensions determined
by image resolution and number of channels.
@data_y dependent variable data (image classes), as an numpy array
@table_name Name of table in db to load data into
@append Whether to create a new table (False) or append to an existing
one (True). If unspecified, default is False
@label_datatype: If set will create table with the the column 'y' set
to the datatype specified. Default is set to TEXT
start_time = time.time()
self.mother = True
self.from_disk = False
self.append = append
self.label_datatype = label_datatype
if table_name:
self.table_name = table_name
if not self.table_name:
raise ValueError("Must specify table_name either in ImageLoader"
" constructor or in load_dataset_from_np params!")
# Flatten labels only for arrays with shape (n,1) o (1,n) since these
# shapes can be treated as individual labels
if data_y.ndim == 2 and (data_y.shape[0] == 1 or data_y.shape[1] == 1):
data_y = data_y.flatten()
self.label_datatype = self.label_datatype + '[]'
self._validate_input_and_create_table(data_x, data_y)
data = zip(data_x, data_y)
if not self.pool:
print("Spawning {0} workers...".format(self.num_workers))
self.pool = Pool(processes=self.num_workers,
datas = []
for n in range(0, len(data), self.ROWS_PER_FILE):
# Each element in datas is a list of self.ROWS_PER_FILE rows
# Shape of datas: ( number of files, rows per file, ( x-dim, y-dim ) )
# ( inside x can also be a numpy tensor with several dimensions, but y
# should just be a single scalar )
# multiprocessing library will call _call_np_worker() in some worker for
# each file, splitting the list of files up into roughly equal chunks
# for each worker to handle. For example, if there are 500 files and
# 5 workers, each will handle about 100 files, and _call_np_worker()
# will be called 100 times, each time with a different file full
# of images.
try:, datas)
except(Exception) as e:
raise e
end_time = time.time()
print("Done! Loaded {0} images in {1}s"\
.format(len(data), end_time - start_time))
def call_disk_worker(self, label):
dir_name = os.path.join(self.root_dir,label)
filenames = os.listdir(dir_name)
data = []
first_image =, label, filenames[0]))
for index, filename in enumerate(filenames):
image =, label, filename))
x = np.array(image)
if x.shape != np.array(first_image).shape:
raise Exception("Images {0} and {1} in label {2} have different "
"shapes {0}:{3} {1}:{4}. Make sure that all the "
"images are of the same shape."\
.format(filenames[0], filename, label,
first_image.shape, x.shape))
data.append((x, label, filename))
if (index % self.ROWS_PER_FILE) == (self.ROWS_PER_FILE - 1):
data = []
if len(data) > 0:
def load_dataset_from_disk(self, root_dir, table_name, num_labels='all',
append=False, label_datatype='TEXT'):
Load images from disk into a greenplum database table. All the images
should be of the same shape.
@root_dir: Location of the dir which contains all the labels and their
associated images. Can be relative or absolute. Each label needs to
have it's own dir and should contain only images inside it's own dir.
(Extra files in root dir will be ignored, only diretories matter.)
@table_name: Name of destination table in db
@num_labels: Num of labels to process/load into a table. By default all
the labels are loaded. @table_name: Name of the database table into
which images will be loaded.
@append: If set to true, do not create a new table but append to an
existing table.
@label_datatype: If set will create table with the the column 'y' set
to the datatype specified. Default is set to TEXT
start_time = time.time()
self.mother = True
self.append = append
self.no_temp_files = False
self.table_name = table_name
self.label_datatype = label_datatype
self.from_disk = True
self.root_dir = root_dir
subdirs = os.listdir(root_dir)
labels = []
# Prune files from directory listing, only use actual sub-directories
# This allows the user to keep a tar.gz file or other extraneous files
# in the root directory without causing any problems.
for subdir in subdirs:
if os.path.isdir(os.path.join(root_dir,subdir)):
print("{0} is not a directory, skipping".format(subdir))
if num_labels == 'all':
print('number of labels = {}'.format(len(labels)))
num_labels = len(labels)
print "Found {0} image labels in {1}".format(num_labels, root_dir)
num_labels = int(num_labels)
labels = labels[:num_labels]
print "Using first {0} image labels in {1}".format(num_labels,
if not self.pool:
print("Spawning {0} workers...".format(self.num_workers))
self.pool = Pool(processes=self.num_workers,
try:, labels)
except(Exception) as e:
raise e, [0] * self.num_workers)
end_time = time.time()
print("Done! Loaded {0} image categories in {1}s"\
.format(len(labels), end_time - start_time))
def main():
parser = argparse.ArgumentParser(description='Madlib Image Loader',
parser.add_argument('-r', '--root-dir', action='store',
dest='root_dir', default='.',
help='Root directory of image directories')
parser.add_argument('-n', '--num-labels', action='store',
dest='num_labels', default='all',
help='Number of image labels (categories) to load.')
parser.add_argument('-d', '--db-name', action='store',
dest='db_name', default='madlib',
help='Name of database where images should be loaded')
parser.add_argument('-a', '--append', action='store_true',
dest='append', default=False,
help='Insert into existing table or Create new table')
parser.add_argument('-l', '--label-datatype', action='store',
dest='label_datatype', default='TEXT',
help='SQL datatype(INT, REAL, BIGINT) of label column in output table')
parser.add_argument('-w', '--num-workers', action='store',
dest='num_workers', default=5,
help='Name of parallel workers.')
parser.add_argument('-p', '--port', action='store',
dest='port', default=5432,
help='database server port')
parser.add_argument('-U', '--username', action='store',
dest='username', default=None,
help='database user name')
parser.add_argument('-t', '--host', action='store',
dest='host', default='localhost',
help='database server host.')
parser.add_argument('-P', '--password', action='store',
dest='password', default=None,
help='database user password')
# This option is not working yet
# parser.add_argument('-m', '--no-temp-files', action='store_true',
# dest='no_temp_files', default=False,
# help="no temporary files, construct all image tables "
# " in-memory")
help='Name of table where images should be loaded')
args = parser.parse_args()
db_creds = DbCredentials(args.db_name, args.username, args.password,, args.port)
iloader = ImageLoader(db_creds, int(args.num_workers))
if __name__ == '__main__':