Commit 2d0065e0 authored by PLian's avatar PLian
Browse files

Changed to Python3.6, added the support of Spearmint, and introduced Ray Tune

parent 8731acec
......@@ -3,13 +3,14 @@ cache:
key: "$CI_BUILD_REF_NAME"
paths:
- .venv/
- spearmint/lib/
before_script:
- sh provision/setup_venv.sh
- sh provision/setup_sp.sh
- ls -a .venv/bin/
- ls -a .
- source activate .venv/
- export PATH=$(pwd)/.venv/bin/:$PATH
- export LD_LIBRARY_PATH=$(pwd)/.venv/lib/:$LD_LIBRARY_PATH
- echo $PATH
stages:
- deps
......@@ -22,7 +23,7 @@ stages:
setup-venv:
stage: deps
script:
- pip install -r requirements_dev.txt
- pip3 install -r requirements_dev.txt
artifacts:
paths:
- .venv/
......@@ -48,10 +49,10 @@ setup-py:
dependencies:
- setup-venv
script:
- python setup.py build
- python setup.py install
- python3 setup.py build
- python3 setup.py install
test:
coverage: '/\d+\%\s*$/'
script:
- python setup.py test
- python3 setup.py test
......@@ -44,11 +44,15 @@ class BaseExecutor(object):
self.params = params
if 'work_dir' in params.vals:
logger.info("Changed to work directory: %s" % params.vals['work_dir'])
os.chdir(params.vals['work_dir'])
if 'work_dir' in params.vals and os.path.isdir(params.vals['work_dir']):
cwd = params.vals['work_dir']
if not os.path.isdir(cwd):
cwd = '.'
logger.info("Changed to work directory: %s" % cwd)
os.chdir(cwd)
logdir = os.path.join(cwd,
"param_runner_%s" % datetime.datetime.now().strftime(
'%Y%m%d-%H%M%S'))
......@@ -80,8 +84,12 @@ class BaseExecutor(object):
trace_file.write("Index\tStatus\t")
for param in self.params.vals['parameters']:
trace_file.write("%s\t" % param['id'])
if 'optimizer' in self.params.vals.keys() and self.params.vals["optimizer"] == "spearmint":
trace_file.write("Score\t")
pass
else:
for param in self.params.vals['parameters']:
trace_file.write("%s\t" % param['id'])
trace_file.write("Command\tReturn Code\tDuration\tStart\tEnd")
......@@ -94,7 +102,7 @@ class BaseExecutor(object):
def write_trace(self, summary_dict):
"""Write a single line to the trace file, summarizing a task."""
print('Start to writing trace ...')
print(summary_dict)
logger.debug(summary_dict)
try:
self._write_trace(summary_dict)
......@@ -114,8 +122,12 @@ class BaseExecutor(object):
summary_dict["_status"]
)
for arg in list(summary_dict["_cmd"].values()):
summary_line += "%s\t" % str(arg['value'])
if 'optimizer' in self.params.vals and self.params.vals["optimizer"] == "spearmint":
summary_line += "%f\t" % summary_dict['_score']
summary_line += "%s\t" % summary_dict["_cmd"]
else:
for arg in list(summary_dict["_cmd"].values()):
summary_line += "%s\t" % str(arg['value'])
summary_line += "%d\t%s\t%s\t%s" % (
summary_dict["_return_code"],
......@@ -187,13 +199,6 @@ class BaseExecutor(object):
else:
summary_dict[summary_val['id']] = val
logger.debug('The summary_dict is:')
logger.debug(summary_dict)
logger.debug('')
logger.debug('The self.params.vals are:')
logger.debug(self.params.vals)
logger.debug('')
#
# Since a default score was set to 'Accuracy' in param.py, this if is not necessary anymore.
# But, will keep it till a better way to deal with the score was found.
......@@ -207,16 +212,44 @@ class BaseExecutor(object):
logger.debug(" - Score for task %d: %0.4f", summary_dict["_task_index"], summary_dict["_score"])
self._write_trace(summary_dict)
except ValueError:
summary_dict["_score"] = "ERROR"
summary_dict["_score"] = 1e99
summary_dict["_status"] = "ERROR"
self._write_trace(summary_dict)
raise ValueError("Could not extract a numerical score from output '%s'" % score_val)
else:
summary_dict["_score"] = "ERROR"
summary_dict["_score"] = 1e99
summary_dict["_status"] = "ERROR"
self._write_trace(summary_dict)
raise ValueError("score %s is not in the list of summary variables" % self.params["score"])
# spearmint score
if 'optimizer' in self.params.vals.keys() and self.params.vals["optimizer"] == "spearmint":
logger.debug('Spearmint optimizor is used')
try:
fcon = open(stderr_path).readlines()
best_value = float([i.split()[2] for i in fcon if i.startswith('Current best:')][-1])
status = "COMPLETE"
except Exception as e:
best_value = 1e99
status = "ERROR"
logger.info("Can't get the best_value for spearmint, the error is: ", e)
summary_dict["_score"] = best_value
summary_dict["_status"] = status
try:
self._write_trace(summary_dict)
except Exception as e:
logger.debug("The trace error is:", e)
logger.debug('The best value and status are: %f, %s' % (best_value, status))
logger.debug('The summary_dict is:')
logger.debug(summary_dict)
logger.debug('')
logger.debug('The self.params.vals are:')
logger.debug(self.params.vals)
logger.debug('')
return summary_dict
@staticmethod
......
"""Executors for local (single node) task execution."""
import os
import logging
import multiprocessing
from subprocess import call
from subprocess import call, Popen
from .base import BaseExecutor
......@@ -30,7 +31,7 @@ class LocalExecutor(BaseExecutor):
% cpus_per_task)
logger.info(" - Will run %d concurrent local tasks" % self.conc_tasks)
super(LocalExecutor, self).__init__(cwd, params)
super().__init__(cwd, params)
def run(self, task_index, cmd):
"""Run provided commands, in parallel up to conc_tasks."""
......@@ -39,7 +40,15 @@ class LocalExecutor(BaseExecutor):
def _run_cmd(self, cmd, stderr_file, stdout_file):
"""Local execution of a command under a simple shell call."""
logger.debug("Running: %s" % cmd['__command']['value'])
ret = call(cmd['__command']['value'], shell=True, stderr=stderr_file,
stdout=stdout_file)
if type(cmd) == str and os.path.basename(cmd.split()[0]) == 'spearmint':
logger.debug("Running: %s\nCWD: %s" % (cmd, os.path.abspath('.')))
p = Popen(cmd.split(), stderr=stderr_file, stdout=stdout_file)
logger.debug("The PID of the cmd is: ", p.pid)
p.wait()
ret = p.returncode
logger.debug("The return code of the cmd is: ", ret)
p.kill()
else:
logger.debug("Running: %s" % cmd['__command']['value'])
ret = call(cmd['__command']['value'], shell=True, stderr=stderr_file, stdout=stdout_file)
return ret
"""Optimizer classes implement an algorithm exploring parameter ranges."""
from .basic import GridSearchOptimizer
# from sp import SpearmintOptimizer
from .sp import SpearmintOptimizer
from .harmonica import HarmonicaOptimizer
......@@ -13,8 +13,8 @@ def get_optimizer(params, executor):
if optimizer == "gridsearch":
return GridSearchOptimizer(params, executor)
# if optimizer == "spearmint":
# return SpearmintOptimizer(params, executor)
if optimizer == "spearmint":
return SpearmintOptimizer(params, executor)
if optimizer == "harmonica":
return HarmonicaOptimizer(params, executor)
......
"""Implementation of the Spearmint optimization algorithm."""
import logging
import time
import traceback
import os
from .base import BaseOptimizer
from spearmint.chooser.GPEIOptChooser import GPEIOptChooser
from spearmint.ExperimentGrid import ExperimentGrid, load_experiment, Job
from spearmint.helpers import grid_for
from spearmint.Locker import Locker
logger = logging.getLogger(__name__)
......@@ -24,194 +17,41 @@ class SpearmintOptimizer(BaseOptimizer):
'job_id': None,
'score': 1e99
}
grid_size = 1000
max_tasks = 1000
def run(self):
"""Performa a Gaussian Process optimization using Spearmint."""
self.grid_size = self.params.vals.get('spearmint_grid_size', self.grid_size)
logger.info(" - Spearmint grid size is: %d", self.grid_size)
self.max_tasks = self.params.vals.get('spearmint_max_tasks', self.max_tasks)
logger.info(" - Will run maximum of %d tasks", self.max_tasks)
"""Pass all tasks to the executor immediately."""
tasks_total = len(self.params.commands)
logger.info(" - Will run %d tasks" % tasks_total)
pb_file = os.path.join(self.executor.logdir, "spearmint_config.pb")
self.write_config(pb_file)
for task in self.params.commands:
self.run_job(self.task_index, task)
self.task_index = self.task_index + 1
chooser = GPEIOptChooser(self.executor.logdir)
pool = self.executor.get_pool()
logger.info(" - Optimization completed after %d tasks run" % tasks_total)
while self.attempt_dispatch(pb_file, pool, self.executor.logdir, chooser):
# This is polling frequency. A higher frequency means that the algorithm
# picks up results more quickly after they finish, but also significantly
# increases overhead.
time.sleep(1)
pool.close()
pool.terminate()
pool.join()
logger.info(" - Optimization completed after %d tasks run", self.task_index + 1)
logger.info(" - Best score: %f\ttask %d", self.best_task['score'],
self.best_task['task_index'])
def write_config(self, pb_file):
"""Write a protobuf config file of parameter ranges for the spearmint code to use."""
config_pb = "language: PYTHON\n"
config_pb = config_pb + "name: \"param_runner_experiment\"\n"
for param in self.params.vals['parameters']:
config_pb = config_pb + "variable {\n"
config_pb = config_pb + " name: \"%s\"\n" % param['id']
if param['type'] == "real_range":
config_pb = config_pb + " size: 1\n"
config_pb = config_pb + " type: FLOAT\n"
config_pb = config_pb + " min: %s\n" % param['min']
config_pb = config_pb + " max: %s\n" % param['max']
if param['type'] == "int_range":
config_pb = config_pb + " size: 1\n"
config_pb = config_pb + " type: INT\n"
config_pb = config_pb + " min: %s\n" % param['min']
config_pb = config_pb + " max: %s\n" % param['max']
if param['type'] == "choice":
config_pb = config_pb + " size: %d\n" % len(param.values)
config_pb = config_pb + " type: ENUM\n"
for val in param.values:
config_pb = config_pb + " options:\"%s\"\n" % val
config_pb = config_pb + "}\n"
logger.debug(" - config.pb for spearmint\n%s", config_pb)
logger.info(" - Writing spearmint config file")
def run_job(self, task_index, cmd):
"""Run a single task within the grid search optimization."""
try:
with open(pb_file, "w") as f:
f.write(config_pb)
except Exception as e:
logger.fatal("Could not write Spearmint config file %s: %s", pb_file, e)
def attempt_dispatch(self, expt_config, pool, expt_dir, chooser):
"""Identify the next candidate in the optimization, and attempt to run it."""
expt = load_experiment(expt_config)
# Build the experiment grid.
expt_grid = ExperimentGrid(expt_dir,
expt.variable,
self.grid_size)
# Gets you everything - NaN for unknown values & durations.
grid, values, durations = expt_grid.get_grid()
# Returns lists of indices.
candidates = expt_grid.get_candidates()
pending = expt_grid.get_pending()
complete = expt_grid.get_complete()
n_candidates = candidates.shape[0]
n_pending = pending.shape[0]
n_complete = complete.shape[0]
logger.debug(" - Spearmint: %d candidates %d pending %d complete" %
(n_candidates, n_pending, n_complete))
# Track the time series of optimization.
if n_complete >= self.max_tasks:
logger.info(" - Spearmint: Maximum number of finished jobs (%d) reached. "
"Exiting" % 10)
return False
if n_candidates == 0:
logger.error(" - Spearmint: There are no candidates left. Exiting.")
return False
if n_pending >= self.executor.conc_tasks:
logger.debug(" - Maximum number of job (%d) pending / in progress.", self.executor.conc_tasks)
return True
# Ask the chooser to pick the next candidate
logger.debug(" - Spearmint choosing next candidate... ")
job_id = chooser.next(grid, values, durations, candidates, pending, complete)
# If the job_id is a tuple, then the chooser picked a new job.
# We have to add this to our grid
if isinstance(job_id, tuple):
(job_id, candidate) = job_id
job_id = expt_grid.add_to_grid(candidate)
logger.debug(" - Spearmint: Selected candidate %d from the experiment grid." % (job_id))
# Convert this back into an interpretable job and add metadata.
job = Job()
job.id = job_id
job.expt_dir = expt_dir
job.name = expt.name
job.language = expt.language
job.status = 'submitted'
job.submit_t = int(time.time())
job.param.extend(expt_grid.get_params(job_id))
locker = Locker()
locker.unlock(grid_for(job))
sp_params = expt_grid.get_params(job_id)
pool.apply_async(self.run_job, [sp_params, job_id, self.task_index], callback=self.score_check)
expt_grid.set_submitted(job_id, job_id)
self.task_index = self.task_index + 1
return True
def run_job(self, sp_params, job_id, task_index):
"""Run a single task through the executor."""
try:
logger.info(" - Starting task %d (spearmint job %d)", task_index, job_id)
args = self.params.arg_template()
for p in sp_params:
if args[p.name]['type'] == 'int_range':
args[p.name]['value'] = p.int_val[0]
elif args[p.name]['type'] == 'real_range':
args[p.name]['value'] = p.dbl_val[0]
elif args[p.name]['type'] == 'str_val':
args[p.name]['value'] = p.dbl_val[0]
else:
logger.fatal("Unknown parameter type: %s", args[p.name]['type'])
return None
args["__command"] = {"value": self.params.cmd_from_args(args)}
summary = self.executor.run(task_index, args)
if summary is None:
logger.error("Could not run job %d: no output received - marking failed", job_id)
ExperimentGrid.job_broken(self.executor.logdir, job_id)
return None
ExperimentGrid.job_complete(self.executor.logdir, job_id, summary["_score"], 0)
logger.info(" - Finished task %d (spearmint job %d) score: %f", task_index, job_id, summary["_score"])
logger.info(" - Starting task %d", task_index)
summary = self.executor.run(task_index, cmd)
logger.info(" - Finished task %d score: %f", task_index, summary["_score"])
return {
'task_index': task_index,
'job_id': job_id,
'score': summary["_score"]
}
except Exception as e:
logger.error("Could not run job %d: %s", job_id, e)
logger.error(traceback.print_exc())
logger.error("Could not run task %d: %s", task_index, e)
return None
def score_check(self, task_result):
"""Check if the score returned by a task is a new minimum."""
if task_result and task_result['score'] < self.best_task['score']:
self.best_task = task_result
logger.info(" - New best score: %f\ttask %d\tspearmint job %d", task_result['score'],
task_result['task_index'], task_result['job_id'])
logger.info(" - New best score: %f\ttask %d", task_result['score'],
task_result['task_index'])
......@@ -379,7 +379,7 @@ class ParamFile(object):
]
# base command string
cmd_srt = "%s %s --driver=%s --method=%s " % \
cmd_srt = "%s ./%s --driver=%s --method=%s " % \
(self.spearmint, self.vals['spearmint_config_file'], self.vals['spearmint_driver'],
self.vals['spearmint_method'])
# options
......
......@@ -28,16 +28,16 @@ mkdir -p "$TMP_DIR"
echo "Installing OpenSSL 1.1.1d"
cd "$TMP_DIR"
if [ ! -f "openssl-1.1.1d.tar.gz" ]; then
wget --quiet https://www.openssl.org/source/openssl-1.1.1d.tar.gz
fi
if [ ! -f "$SSL_DIR/bin/openssl" ]; then
if [ ! -f "openssl-1.1.1d.tar.gz" ]; then
wget --quiet https://www.openssl.org/source/openssl-1.1.1d.tar.gz
fi
tar -zxf openssl-1.1.1d.tar.gz
cd openssl-1.1.1d
./config --prefix="$SSL_DIR" --openssldir="$SSL_DIR/ssl"
make -j --quiet
make install > /dev/null 2>&1 &
make install > /dev/null 2>&1
fi
# used by python installation and pip
......@@ -48,11 +48,11 @@ export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$SSL_DIR/lib
echo "Installing Python 2.7.15"
cd "$TMP_DIR"
if [ ! -f "Python-2.7.15.tgz" ]; then
wget --quiet http://www.python.org/ftp/python/2.7.15/Python-2.7.15.tgz
fi
if [ ! -x "$PYTHON_DIR/bin/python" ]; then
if [ ! -f "Python-2.7.15.tgz" ]; then
wget --quiet http://www.python.org/ftp/python/2.7.15/Python-2.7.15.tgz
fi
tar -zxf Python-2.7.15.tgz
cd Python-2.7.15
......@@ -68,16 +68,15 @@ _ssl _ssl.c \\
make install > /dev/null 2>&1
fi
# Install pip
echo "Installing pip"
cd "$TMP_DIR"
if [ ! -f "get-pip.py" ]; then
wget --quiet https://bootstrap.pypa.io/get-pip.py
fi
if [ ! -x "$PYTHON_DIR/bin/pip" ]; then
if [ ! -f "get-pip.py" ]; then
wget --quiet https://bootstrap.pypa.io/get-pip.py
fi
"$PYTHON_DIR/bin/python" get-pip.py --no-cache-dir
fi
......@@ -87,7 +86,7 @@ cd "$BASE_DIR"
"$PYTHON_DIR/bin/pip" install --no-cache-dir -r requirements_sp.txt
# clean up
#rm -fr "$TMP_DIR"
rm -fr "$TMP_DIR"
cd "$BASE_DIR"
......
......@@ -17,52 +17,93 @@ done
DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
BASE_DIR="$DIR/.."
VENV_DIR="$DIR/../../venv"
PYTHON_DIR="$BASE_DIR/.venv"
SSL_DIR="$BASE_DIR/.venv"
TMP_DIR="$PYTHON_DIR/src"
mkdir -p "$PYTHON_DIR"
mkdir -p "$SSL_DIR"
mkdir -p "$TMP_DIR"
# Check if conda is available
if ! conda -V > /dev/null 2>&1; then
module load python/3.7.x-anaconda
fi
VENV_EXISTS=0
# Install OpenSSL from source
echo "Installing OpenSSL 1.1.1d"
cd "$TMP_DIR"
# Check if $VENV_DIR exists
if [ -d "$VENV_DIR" ]; then
echo "The virtualenv is exist. Skip the venv installation."
echo $VENV_DIR
if [ ! -f "$SSL_DIR/bin/openssl" ]; then
if [ ! -f "openssl-1.1.1d.tar.gz" ]; then
wget --quiet https://www.openssl.org/source/openssl-1.1.1d.tar.gz
fi
VENV_EXISTS=1
tar -zxf openssl-1.1.1d.tar.gz
cd openssl-1.1.1d
./config --prefix="$SSL_DIR" --openssldir="$SSL_DIR/ssl"
make -j --quiet
make install > /dev/null 2>&1
fi
# Setting up the venv
if [ "$VENV_EXISTS" = "0" ]; then
# Create venv with conda
echo "Creating virtual env"
conda create -y -q --prefix $VENV_DIR python=3.7
# used by python installation and pip
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$SSL_DIR/lib
# Install SQLite from source
echo "Installing SQLite 3.31.1"
cd "$TMP_DIR"
if [ ! -x "$PYTHON_DIR/bin/sqlite3" ]; then
if [ ! -f "sqlite-autoconf-3310100.tar.gz" ]; then
wget --quiet https://www.sqlite.org/2020/sqlite-autoconf-3310100.tar.gz
fi
tar -zxf sqlite-autoconf-3310100.tar.gz
cd sqlite-autoconf-3310100
./configure --prefix="$PYTHON_DIR" --quiet
make -j --quiet
make install > /dev/null 2>&1
fi
# Activate the virtual env
echo "Activating virtual env"
source activate $VENV_DIR
# Install Python 3.6.10 from source with ssl supporting
echo "Installing Python 3.6.10"
cd "$TMP_DIR"
# Install basic requirements first
cd "$BASE_DIR"
pip install --no-cache-dir -r requirements.txt
if [ ! -x "$PYTHON_DIR/bin/python" ]; then
if [ ! -f "Python-3.6.10.tgz" ]; then
wget --quiet http://www.python.org/ftp/python/3.6.10/Python-3.6.10.tgz
fi
cd "$BASE_DIR"
tar -zxf Python-3.6.10.tgz
cd Python-3.6.10
echo
echo "Complete"
echo
else
echo "The VENV_EXIST value is $VENV_EXISTS"
./configure --prefix="$PYTHON_DIR" --quiet
make -j --quiet
make install > /dev/null 2>&1
fi