Commit 267b92f1 authored by PLian's avatar PLian
Browse files

Added the support of Ray Tune

parent 2d0065e0
......@@ -9,10 +9,12 @@
.python/
.pytest_cache/
.venv/
*.tgz
cache/
build/
dist/
spearmint/lib/
param_runner/lib/spearmint/lib/
param_runner/lib/spearmint.tgz
# Output directories from any jobs run inside the codebase
param_runner_20*/
......
......@@ -3,11 +3,9 @@ cache:
key: "$CI_BUILD_REF_NAME"
paths:
- .venv/
- spearmint/lib/
- param_runner/lib/spearmint/lib/
before_script:
- sh provision/setup_venv.sh
- sh provision/setup_sp.sh
- export PATH=$(pwd)/.venv/bin/:$PATH
- export LD_LIBRARY_PATH=$(pwd)/.venv/lib/:$LD_LIBRARY_PATH
- echo $PATH
......@@ -23,6 +21,8 @@ stages:
setup-venv:
stage: deps
script:
- sh provision/setup_venv.sh
- sh param_runner/lib/setup_sp.sh
- pip3 install -r requirements_dev.txt
artifacts:
paths:
......@@ -31,28 +31,28 @@ setup-venv:
flake8:
stage: lint_1
script:
- flake8 param_runner
- flake8 --exclude=*/spearmint/*,*/test/* param_runner
pydocstyle:
stage: lint_2
script:
- pydocstyle param_runner
- pydocstyle --match-dir='(?!spearmint).or(?!test).' param_runner
bandit:
stage: lint_3
allow_failure: true
script:
- bandit -x test -r param_runner
- bandit -x spearmint,test -r param_runner
setup-py:
stage: build
dependencies:
- setup-venv
script:
- python3 setup.py build
- python3 setup.py install
- pip3 install .
test:
coverage: '/\d+\%\s*$/'
script:
- python3 setup.py test
- param_runner test
graft param_runner/schema
graft param_runner/test
include param_runner/lib/spearmint.tgz
include param_runner/lib/requirements_sp.txt
include param_runner/lib/setup_sp.sh
recursive-exclude param_runner/lib/spearmint/lib/*
......@@ -24,6 +24,42 @@ for summary in tabular format by defining regular expressions matching against
the output of the command.
## Install Parameter Runner
1. Download the source code
git clone THIS_REPO
2. Create Python3.6 env and activate it
conda create --name py36 python=3.6
source activate py36
3. Install with pip
pip install .
4. Install Spearmint and Python2 environment
param_runner init spearmint
5. Test your installation
param_runner test
## Uninstall Parameter Runner
param_runner uninstall
Note: You can uninstall param_runner with pip, but you have to delete the spearmint and its env manually.
## Using the Parameter Runner on the Nucleus Cluster
1. Arrange your data and programs on the cluster.
......
#!/usr/bin/env python2
#!/usr/bin/env python3
"""
......@@ -8,8 +8,10 @@ param_runner - Run an application multiple times on the BioHPC cluster,
Usage:
param_runner check <paramfile> [--verbose]
param_runner submit <paramfile> [--verbose]
param_runner srun <paramfile> [--verbose]
param_runner run <paramfile> [--verbose]
param_runner init spearmint [--verbose]
param_runner test [--verbose]
param_runner uninstall [--verbose]
param_runner -h | --help | --version
......@@ -20,6 +22,7 @@ Options:
import logging
import os
import shutil
import sys
import subprocess
......@@ -28,8 +31,9 @@ from docopt import docopt
parentdir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0,parentdir)
sys.path.insert(0, parentdir)
import param_runner
from param_runner import __version__, executors, optimizers, param
......@@ -48,16 +52,60 @@ def main():
else:
logger.setLevel(logging.INFO)
if arguments['init'] and arguments['spearmint']:
sp_install_path = os.path.join(os.path.dirname(param_runner.__file__), 'lib')
logger.info("Will install Python 2.7 and Spearmint into {0}".format(sp_install_path))
os.chdir(sp_install_path)
try:
shutil.rmtree(os.path.join(sp_install_path, 'spearmint', 'lib'))
except:
pass
try:
shutil.unpack_archive('spearmint.tgz', extract_dir=os.path.dirname(os.path.dirname(param_runner.__file__)))
subprocess.check_call(['/bin/bash', 'setup_sp.sh', '--prefix={0}'.format(sp_install_path)])
except Exception as e:
logger.error('Failed to init spearmint.')
logger.error(e)
if arguments['test']:
install_path = os.path.dirname(param_runner.__file__)
print("Testing the installation of param_runner with Pytest")
os.chdir(install_path)
try:
subprocess.check_call([shutil.which('pytest'), 'test'])
except Exception as e:
logger.error('Failed to test param_runner.')
logger.error(e)
if arguments['uninstall']:
sp_install_path = os.path.join(os.path.dirname(param_runner.__file__), 'lib')
print("Will uninstall param_runner")
os.chdir(sp_install_path)
try:
shutil.rmtree(os.path.join(sp_install_path, 'spearmint'))
except Exception as e:
print(e)
pass
try:
subprocess.check_call([shutil.which('pip'), 'uninstall', '-y', 'param_runner'])
except Exception as e:
logger.error('Failed to uninstall param_runner.')
logger.error(e)
if arguments['<paramfile>']:
param_file = arguments['<paramfile>']
print "param_runner - version %s" % __version__
print "-------------------------------"
print "Parameter exploration runner for the BioHPC Cluster"
print "D. C. Trudgian, UT Southwestern BioHPC"
print "biohpc-help@utsouthwestern.edu"
print
print("param_runner - version %s" % __version__)
print("-------------------------------")
print("Parameter exploration runner for the BioHPC Cluster")
print("P. Lian, UT Southwestern BioHPC")
print("biohpc-help@utsouthwestern.edu")
print("")
try:
......@@ -67,67 +115,35 @@ def main():
if arguments['run']:
exe = executors.LocalExecutor(os.path.dirname((os.path.abspath(param_file))), p)
opt = optimizers.get_optimizer(p, exe)
opt.run()
ret = opt.run()
if arguments['srun']:
exe = executors.SrunExecutor(os.path.dirname((os.path.abspath(param_file))), p)
opt = optimizers.get_optimizer(p, exe)
opt.run()
if ret:
logger.info("Done.")
if arguments['submit']:
# We submit using sbatch
python_exe = sys.executable
this_script = os.path.abspath(__file__)
logger.debug("Python is %s" % python_exe)
logger.debug("Script location is %s" % this_script)
sbatch_args = '-o param_runner_%j.out '
sbatch_args = sbatch_args + "-N %d " % p.vals['nodes']
sbatch_args = sbatch_args + "-p %s " % p.vals['partition']
sbatch_args = sbatch_args + "-t %s " % p.vals['time_limit']
if 'gpus' in p.vals:
sbatch_args = sbatch_args + "--gres=gpu:%d" % p.vals['gpus']
if 'gres' in p.vals:
sbatch_args = sbatch_args + "--gres=%s" % p.vals['gres']
sbatch_cmd = "sbatch %s" % sbatch_args
batch_script = "#!/bin/bash\n"
if 'modules' in p.vals.keys():
for mod_name in p.vals['modules']:
batch_script += "module add %s\n" % mod_name
batch_script += python_exe
batch_script += ' -u '
batch_script += this_script
batch_script += ' srun '
batch_script += os.path.abspath(param_file)
batch_script += ' --verbose 2>&1\n'
logger.debug(sbatch_cmd)
logger.debug(batch_script)
sbatch_proc = subprocess.Popen(sbatch_cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, env=os.environ.copy())
stdout, stderr = sbatch_proc.communicate(batch_script)
if stdout:
logger.info(stdout)
if stderr:
logger.error(stderr)
logger.info("Done.")
try:
# prepare the sbatch file
exe = executors.SlurmWrapperExecutor(os.path.dirname((os.path.abspath(param_file))), p)
opt = optimizers.get_optimizer(p, exe)
ret = opt.run()
except Exception as e:
logger.error("Could not prepare the sbatch for submition.")
logger.error(e)
raise
# submit the sbatch file
if ret:
try:
out = subprocess.check_output([shutil.which('sbatch'), exe.sbatch_file])
logger.debug(out.decode())
except Exception as e:
logger.error("Could not submit the sbatch file %s." % exe.sbatch_file)
logger.error(e)
logger.info("Done.")
except Exception as e:
logger.error("[%s] %s" % (e.__class__.__name__, e))
print "\n"
logger.info("Could not finish the task!\n")
logger.error("[%s] %s\n" % (e.__class__.__name__, e))
raise
......
#!/usr/bin/env python2
#!/usr/bin/env python3
"""
rosenbrock - compute the value of the rosenbrock function for a given X and Y
......@@ -28,7 +28,7 @@ def rosenbrock(x, y):
def main():
arguments = docopt(__doc__)
time.sleep(random.random() * 10)
print rosenbrock(float(arguments['--x']), float(arguments['--y']))
print(rosenbrock(float(arguments['--x']), float(arguments['--y'])))
if __name__ == '__main__':
......
"""Executor classes are responsible for running tasks that are provided by an Optimizer."""
from .local import LocalExecutor # noqa
from .slurm import SrunExecutor # noqa
from .slurm import SlurmWrapperExecutor # noqa
......@@ -42,6 +42,8 @@ class BaseExecutor(object):
"""Initialize the Executor, work and log directories."""
logger.info("Initializing %s " % self.__class__.__name__)
self.executor = 'base'
self.params = params
if 'work_dir' in params.vals and os.path.isdir(params.vals['work_dir']):
......@@ -84,9 +86,8 @@ class BaseExecutor(object):
trace_file.write("Index\tStatus\t")
if 'optimizer' in self.params.vals.keys() and self.params.vals["optimizer"] == "spearmint":
if 'optimizer' in self.params.vals.keys() and self.params.vals["optimizer"] in ["spearmint", "ray_tune"]:
trace_file.write("Score\t")
pass
else:
for param in self.params.vals['parameters']:
trace_file.write("%s\t" % param['id'])
......@@ -122,7 +123,7 @@ class BaseExecutor(object):
summary_dict["_status"]
)
if 'optimizer' in self.params.vals and self.params.vals["optimizer"] == "spearmint":
if 'optimizer' in self.params.vals and self.params.vals["optimizer"] in ["spearmint", "ray_tune"]:
summary_line += "%f\t" % summary_dict['_score']
summary_line += "%s\t" % summary_dict["_cmd"]
else:
......@@ -222,17 +223,43 @@ class BaseExecutor(object):
self._write_trace(summary_dict)
raise ValueError("score %s is not in the list of summary variables" % self.params["score"])
# spearmint score
if 'optimizer' in self.params.vals.keys() and self.params.vals["optimizer"] == "spearmint":
logger.debug('Spearmint optimizor is used')
# spearmint/ray_tune score
if 'optimizer' in self.params.vals.keys() and self.params.vals["optimizer"] in ["spearmint"]:
logger.debug('{0} optimizor is used'.format(self.params.vals['optimizer']))
try:
fcon = open(stderr_path).readlines()
best_value = float([i.split()[2] for i in fcon if i.startswith('Current best:')][-1])
if self.executor in ['local']:
best_value = float([i.split()[2] for i in fcon if i.startswith('Current best:')][-1])
else:
best_value = 1e99
status = "COMPLETE"
except Exception as e:
best_value = 1e99
status = "ERROR"
logger.info("Can't get the best_value, the error is: ", e)
summary_dict["_score"] = best_value
summary_dict["_status"] = status
try:
self._write_trace(summary_dict)
except Exception as e:
logger.debug("The trace error is:", e)
logger.debug('The best value and status are: %f, %s' % (best_value, status))
elif 'optimizer' in self.params.vals.keys() and self.params.vals["optimizer"] in ["ray_tune"]:
logger.debug('{0} optimizor is used'.format(self.params.vals['optimizer']))
try:
fcon = open(stdout_path).readlines()
if self.executor in ['local']:
best_value = sorted([float(i.strip().split()[-2]) for i in fcon if i.find('| TERMINATED |') > 0])[0]
else:
best_value = 1e99
status = "COMPLETE"
except Exception as e:
best_value = 1e99
status = "ERROR"
logger.info("Can't get the best_value for spearmint, the error is: ", e)
logger.info("Can't get the best_value, the error is: ", e)
summary_dict["_score"] = best_value
summary_dict["_status"] = status
......
"""Executors for local (single node) task execution."""
import os
import shutil
import logging
import multiprocessing
from subprocess import call, Popen
from subprocess import call
from subprocess import Popen
from .base import BaseExecutor
......@@ -30,9 +32,18 @@ class LocalExecutor(BaseExecutor):
raise ValueError("Cannot run any tasks - cpus_per_task (%d) is larger than CPUs on this machine."
% cpus_per_task)
if params.vals['nodes'] > 1:
raise ValueError("Cannot run any tasks - nodes (%d) are larger than 1 on this machine."
% params.vals['nodes'])
logger.info(" - Will run %d concurrent local tasks" % self.conc_tasks)
super().__init__(cwd, params)
self.executor = 'local'
logger.info(" - Initializing local wrapper for ray_tune tasks")
self.params.sbatchs = self.__compute_sbatchs()
def run(self, task_index, cmd):
"""Run provided commands, in parallel up to conc_tasks."""
logger.debug("LocalExecutor is handling command: %s", cmd)
......@@ -40,15 +51,142 @@ class LocalExecutor(BaseExecutor):
def _run_cmd(self, cmd, stderr_file, stdout_file):
"""Local execution of a command under a simple shell call."""
if type(cmd) == str and os.path.basename(cmd.split()[0]) == 'spearmint':
if 'optimizer' in self.params.vals.keys() and self.params.vals["optimizer"] == "spearmint":
# check if input files are all here
logging.info(" - checking required files for this task")
required_files = [self.params.vals['spearmint_config_file'], self.params.vals['spearmint_function_file']]
for rf in required_files:
if not os.path.exists(rf):
logger.error("The function file '{0}' is missing.".format(rf))
raise FileNotFoundError
# run the command
logger.debug("Running: %s\nCWD: %s" % (cmd, os.path.abspath('.')))
p = Popen(cmd.split(), stderr=stderr_file, stdout=stdout_file)
logger.debug("The PID of the cmd is: ", p.pid)
logger.debug("The PID of the cmd is: ")
logger.debug(p.pid)
p.wait()
ret = p.returncode
logger.debug("The return code of the cmd is: ", ret)
logger.debug("The return code of the cmd is: ")
logger.debug(ret)
p.kill()
elif 'optimizer' in self.params.vals.keys() and self.params.vals["optimizer"] == "ray_tune":
sbatch_file = 'param_runner_{0}.sh'.format(self.params.vals['optimizer'])
# check if input files are all here
logging.info(" - checking required files for task, %s", sbatch_file)
required_files = [self.params.vals['rt_function_file']]
for rf in required_files:
if not os.path.exists(rf):
logger.error("The function file '{0}' is missing.".format(rf))
raise FileNotFoundError
# writing out batch file and generate the command
logging.info(" - writing out batch file for task, %s", sbatch_file)
open(sbatch_file, 'w').write(self.params.sbatchs[0])
cmd = "{0} {1}".format(shutil.which('bash'), sbatch_file)
# run the command
logger.debug("Running: %s\nCWD: %s" % (cmd, os.path.abspath('.')))
p = Popen(cmd.split(), stderr=stderr_file, stdout=stdout_file)
logger.debug("The PID of the cmd is: ")
logger.debug(p.pid)
p.wait()
ret = p.returncode
logger.debug("The return code of the cmd is: ")
logger.debug(ret)
p.kill()
else:
logger.debug("Running: %s" % cmd['__command']['value'])
ret = call(cmd['__command']['value'], shell=True, stderr=stderr_file, stdout=stdout_file)
ret = call(cmd['__command']['value'].split(), shell=False, stderr=stderr_file, stdout=stdout_file)
return ret
def __compute_sbatchs(self):
"""Compute the sbatch scripts to be submitted to the cluster.
Returns:
a list of sbatchs
"""
logger.info("Generating SBATCH ...")
logger.debug(self.params)
if 'optimizer' in self.params.vals.keys() and self.params.vals["optimizer"] == "spearmint":
arg_combns = self.__compute_sbatchs_optimizer(optimizer='spearmint')
elif 'optimizer' in self.params.vals.keys() and self.params.vals["optimizer"] == "ray_tune":
arg_combns = self.__compute_sbatchs_optimizer(optimizer='ray_tune')
else:
arg_combns = ['']
logger.warning("compute batchs have not been implemented yet")
return arg_combns
def __compute_sbatchs_optimizer(self, optimizer="spearmint"):
arg_combns = "#!/bin/bash\n"
if optimizer == "spearmint":
arg_combns += "{0}\n".format(self.params.commands[0])
if optimizer == "ray_tune":
s = """
Input="{0}"
#export PARAM_RUNNER_HOME={1}
#export PYTHONHOME=$PARAM_RUNNER_HOME/.venv
#export PATH=$PARAM_RUNNER_HOME/.venv/bin/:$PATH
#export LD_LIBRARY_PATH=$PARAM_RUNNER_HOME/.venv/lib/:$LD_LIBRARY_PATH_PATH
# Localdir will be used by $Input
export Localdir="/tmp/param_runner_{2}/$$"
Outdir="$(pwd)/$(echo $Input | awk -F. '{{print $1}}')"
# non empty proxy settings will cause connection issue!
unset http_proxy
unset https_proxy
# get hostname and num of cpus
head_node=$(hostname)
NUM_CPUs=$(grep "^processor" /proc/cpuinfo | wc -l)
# get the num of GPUs, and set the --num-gpus flag for ray
if [[ -z $CUDA_VISIBLE_DEVICES ]]; then
NUM_GPUs=0
RAY_GPUs=""
CUDA_VISIBLE_DEVICES=''
elif [ $CUDA_VISIBLE_DEVICES = 'NoDevFiles' ] || [ $CUDA_VISIBLE_DEVICES = '' ]; then
NUM_GPUs=0
RAY_GPUs=""
CUDA_VISIBLE_DEVICES=''
else
NUM_GPUs=$(echo $CUDA_VISIBLE_DEVICES | sed "s/,/\\n/g" | wc -l )
RAY_GPUs="--num-gpus=$NUM_GPUs"
fi
# NUM_GPUs will be used by $Input
export NUM_GPUs
echo "ray start --head --num-cpus=$NUM_CPUs $RAY_GPUs 2>&1"
ray_start_out="$(ray start --head --num-cpus=$NUM_CPUs $RAY_GPUs 2>&1)"
# RAY_HEAD_IP will be used by $Input
export RAY_HEAD_IP="$(echo $ray_start_out | awk -F "redis_address=\\"" '{{print $2}}' | awk -F \\" '{{print $1}}')"
# start the calculation
python3 $Input
# stop ray
ray stop
# collect the results
mkdir -p $Outdir
cp -r $Localdir/* $Outdir/
rm -fr $Localdir
""".format(self.params.vals['rt_function_file'], self.params.mypath, optimizer)
arg_combns += s
return [arg_combns]
......@@ -3,6 +3,7 @@
import logging
import os
import re
import sys
from subprocess import call
from .base import BaseExecutor
......@@ -73,7 +74,9 @@ class SrunExecutor(BaseExecutor):
logger.debug(" - srun arguments: %s", self.srun_args)
super(SrunExecutor, self).__init__(cwd, params)
super().__init__(cwd, params)
self.executor = 'srun'
def run(self, task_index, cmd):
"""Run provided commands, in parallel up to conc_tasks."""
......@@ -85,7 +88,185 @@ class SrunExecutor(BaseExecutor):
srun_cmd = "srun %s %s" % (self.srun_args, cmd['__command']['value'])