Commit ccdd5825 authored by PLian's avatar PLian
Browse files

Optimized schema, executors, and bin/param_runner

parent 267b92f1
......@@ -12,9 +12,12 @@
*.tgz
cache/
build/
builds/
dist/
tmp/
param_runner/lib/spearmint/lib/
param_runner/lib/spearmint.tgz
gitlab_runner_launcher.sh
# Output directories from any jobs run inside the codebase
param_runner_20*/
......
......@@ -31,18 +31,18 @@ setup-venv:
flake8:
stage: lint_1
script:
- flake8 --exclude=*/spearmint/*,*/test/* param_runner
- flake8 --exclude=*/spearmint/*,*/test/*,*/examples/* param_runner
pydocstyle:
stage: lint_2
script:
- pydocstyle --match-dir='(?!spearmint).or(?!test).' param_runner
- pydocstyle --match-dir='(?!spearmint).or(?!test).or(?!examples).' param_runner
bandit:
stage: lint_3
allow_failure: true
script:
- bandit -x spearmint,test -r param_runner
- bandit -x spearmint,test,examples -r param_runner
setup-py:
stage: build
......@@ -55,4 +55,3 @@ test:
coverage: '/\d+\%\s*$/'
script:
- python3 setup.py test
\ No newline at end of file
- param_runner test
graft param_runner/schema
graft param_runner/examples
graft param_runner/test
include param_runner/lib/spearmint.tgz
include param_runner/lib/requirements_sp.txt
......
......@@ -8,8 +8,8 @@
## Introduction
The BioHPC `param_runner` is a command line tool to run a command multiple
times, exploring a defined parameter space, summarizing results.
The BioHPC `param_runner` is a command line tool to perform hyperparameter optimization,
exploring a defined parameter space, summarizing results.
This tool uses a simple YAML configuration file to define a parameter space to
exhaustively search, and runs tasks in parallel by distributing them over a set
......@@ -29,7 +29,7 @@ the output of the command.
1. Download the source code
git clone THIS_REPO
git clone https://git.biohpc.swmed.edu/s190450/param_runner.git
2. Create Python3.6 env and activate it
......@@ -52,6 +52,11 @@ the output of the command.
param_runner test
6. Show example files
param_runner examples
## Uninstall Parameter Runner
......@@ -60,10 +65,32 @@ the output of the command.
Note: You can uninstall param_runner with pip, but you have to delete the spearmint and its env manually.
## Using the Parameter Runner on your own computer
1. Arrange your data.
For spearmint executor, a python script with the model to be optimized (e.g. braninpy) and
a configuration file for spearmint (e.g. config.pb) are required.
For ray_tune executor, a python script with your Trainable class is required (e.g. hyperband_examples.py).
Please note that to optimize your trainable class by param_runner, the `redis_address=os.environ["RAY_HEAD_IP"]`
and `resources_per_trial={'gpu': os.environ["NUM_GPUs"]` options should be used in your `ray.init` and `tune.run`
settings, respectively. See the below as an example,
`ray.init(redis_address=os.environ["RAY_HEAD_IP"])`
`... ...`
`run(exp, scheduler=hyperband, resources_per_trial={'gpu': os.environ["NUM_GPUs"]})`
More details can be found by running `param_runner examples` to list all examples.
2. Create a parameter .yml file (see below, "Parameter File Format" section)
3. Check the parameter .yml file: `param_runner check myexperiment.yml`
4. Run the job on your own computer: `param_runner run myexperiment.yml`
## Using the Parameter Runner on the Nucleus Cluster
1. Arrange your data and programs on the cluster.
2. Create a parameter .yml file (see below)
1. Arrange your data. (see above)
2. Create a parameter .yml file (see below, "Parameter File Format" section)
3. Check the parameter .yml file: `param_runner check myexperiment.yml`
4. Submit to the cluster: `param_runner submit myexperiment.yml`
......
#!/usr/bin/env python3
"""
param_runner - Run an application multiple times on the BioHPC cluster,
exploring a parameter space and summarizing results.
Usage:
param_runner check <paramfile> [--verbose]
param_runner submit <paramfile> [--verbose]
param_runner run <paramfile> [--verbose]
param_runner init spearmint [--verbose]
param_runner test [--verbose]
param_runner uninstall [--verbose]
param_runner -h | --help | --version
Options:
--verbose Show debug messages
"""
import logging
import os
import shutil
import sys
import subprocess
import argparse
import colorlog
from docopt import docopt
import pathlib
parentdir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0, parentdir)
......@@ -36,23 +15,125 @@ sys.path.insert(0, parentdir)
import param_runner
from param_runner import __version__, executors, optimizers, param
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
'%(log_color)s%(levelname)-8s %(message)s'))
logger = colorlog.getLogger()
logger.addHandler(handler)
class Main(object):
def __init__(self):
parser = argparse.ArgumentParser(
usage='''param_runner <command> [<paramfile>]
check <paramfile> Check if your yaml file is validate
run <paramfile> Run the job on the local computer
submit <paramfile> Run the job on BioHPC clusters
init <spearmint> Install Spearmint and Python2 environment (required by spearmint)
test Run the test cases
examples Show example files
uninstall Uninstall param_runner
''')
parser.add_argument('command', help='Subcommand to run')
parser.add_argument('-V', '--version', action='version', version=__version__, help='Show version')
# parse_args defaults to [1:] for args, but you need to
# exclude the rest of the args too, or validation will fail
args = parser.parse_args(sys.argv[1:2])
if not hasattr(self, args.command):
print('Unrecognized command')
parser.print_help()
exit(1)
# use dispatch pattern to invoke method with same name
getattr(self, args.command)()
def check(self):
parser = argparse.ArgumentParser(usage='param_runner check [-h] [-v] <paramfile>',
description='Check if your yaml file is validate')
# prefixing the argument with -- means it's optional
parser.add_argument('-v', '--verbose', action='store_true', help='Show debug information')
parser.add_argument('paramfile', type=str, default='', help='Yaml file which contains your configurations')
# now that we're inside a subcommand, ignore the first
# TWO argvs, ie the command (git) and the subcommand (commit)
args = parser.parse_args(sys.argv[2:])
if args.verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
self.__prepare_param(args.paramfile)
def main():
arguments = docopt(__doc__, version='param_runner %s' % __version__)
def run(self):
parser = argparse.ArgumentParser(usage='param_runner run [-h] [-v] <paramfile>',
description='Run the job on the local computer')
parser.add_argument('-v', '--verbose', action='store_true', help='Show debug information')
parser.add_argument('paramfile', type=str, default='', help='Yaml file which contains your configurations')
args = parser.parse_args(sys.argv[2:])
if args.verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
'%(log_color)s%(levelname)-8s %(message)s'))
p = self.__prepare_param(args.paramfile)
logger = colorlog.getLogger()
logger.addHandler(handler)
exe = executors.LocalExecutor(os.path.dirname((os.path.abspath(args.paramfile))), p)
opt = optimizers.get_optimizer(p, exe)
ret = opt.run()
if ret:
logger.info("Done.")
if arguments['--verbose']:
def submit(self):
parser = argparse.ArgumentParser(usage='param_runner submit [-h] [-v] <paramfile>',
description='This command will generate a sbatch file and try to submit it to the BioHPC clusters')
parser.add_argument('-v', '--verbose', action='store_true', help='Show debug information')
parser.add_argument('paramfile', type=str, default='', help='Yaml file which contains your configurations')
args = parser.parse_args(sys.argv[2:])
if args.verbose:
logger.setLevel(logging.DEBUG)
print('Running param_runner --verbose submit %s' % args.paramfile)
else:
logger.setLevel(logging.INFO)
print('Running param_runner submit %s' % args.paramfile)
p = self.__prepare_param(args.paramfile)
# prepare the sbatch file
try:
exe = executors.SlurmWrapperExecutor(os.path.dirname((os.path.abspath(args.paramfile))), p)
opt = optimizers.get_optimizer(p, exe)
ret = opt.run()
except Exception as e:
logger.error("Could not prepare the sbatch for submition.")
logger.error(e)
raise
# submit the sbatch file
if ret:
try:
out = subprocess.check_output([shutil.which('sbatch'), exe.sbatch_file])
logger.debug(out.decode())
except Exception as e:
logger.error("Could not submit the sbatch file %s." % exe.sbatch_file)
logger.error(e)
logger.info("Done.")
def init(self):
parser = argparse.ArgumentParser(usage='param_runner init [-h] [-v] spearmint',
description='Install the computing environment. Currently, only spearmint needs to be installed'
' before running.')
parser.add_argument('-v', '--verbose', action='store_true', help='Show debug information')
parser.add_argument('spearmint', type=str, default='spearmint',
help='Install Spearmint and Python2 environment (required by spearmint)')
args = parser.parse_args(sys.argv[2:])
if args.verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
if arguments['init'] and arguments['spearmint']:
sp_install_path = os.path.join(os.path.dirname(param_runner.__file__), 'lib')
logger.info("Will install Python 2.7 and Spearmint into {0}".format(sp_install_path))
os.chdir(sp_install_path)
......@@ -68,7 +149,16 @@ def main():
logger.error('Failed to init spearmint.')
logger.error(e)
if arguments['test']:
def test(self):
parser = argparse.ArgumentParser(usage='param_runner test [-h] [-v]',
description='Run the test cases')
parser.add_argument('-v', '--verbose', action='store_true', help='Show debug information')
args = parser.parse_args(sys.argv[2:])
if args.verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
install_path = os.path.dirname(param_runner.__file__)
print("Testing the installation of param_runner with Pytest")
os.chdir(install_path)
......@@ -79,7 +169,16 @@ def main():
logger.error('Failed to test param_runner.')
logger.error(e)
if arguments['uninstall']:
def uninstall(self):
parser = argparse.ArgumentParser(usage='param_runner uninstall [-h] [-v]',
description='Uninstall param_runner')
parser.add_argument('-v', '--verbose', action='store_true', help='Show debug information')
args = parser.parse_args(sys.argv[2:])
if args.verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
sp_install_path = os.path.join(os.path.dirname(param_runner.__file__), 'lib')
print("Will uninstall param_runner")
os.chdir(sp_install_path)
......@@ -96,10 +195,28 @@ def main():
logger.error('Failed to uninstall param_runner.')
logger.error(e)
if arguments['<paramfile>']:
param_file = arguments['<paramfile>']
def examples(self):
parser = argparse.ArgumentParser(usage='param_runner examples [-h] [-v]',
description='Show example files')
parser.add_argument('-v', '--verbose', action='store_true', help='Show debug information')
args = parser.parse_args(sys.argv[2:])
if args.verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
examples_path = os.path.join(os.path.dirname(param_runner.__file__), 'examples')
os.chdir(examples_path)
# remove cache files
[p.unlink() for p in pathlib.Path('.').rglob('*.py[co]')]
[p.rmdir() for p in pathlib.Path('.').rglob('__pycache__')]
# list the example files tree
print("Example files for param_runner:\n")
print(examples_path)
for l in self.__tree(pathlib.Path.cwd()):
print(l)
def __prepare_param(self, param_file):
print("param_runner - version %s" % __version__)
print("-------------------------------")
print("Parameter exploration runner for the BioHPC Cluster")
......@@ -108,44 +225,32 @@ def main():
print("")
try:
p = param.ParamFile(param_file)
p.load()
if arguments['run']:
exe = executors.LocalExecutor(os.path.dirname((os.path.abspath(param_file))), p)
opt = optimizers.get_optimizer(p, exe)
ret = opt.run()
if ret:
logger.info("Done.")
if arguments['submit']:
try:
# prepare the sbatch file
exe = executors.SlurmWrapperExecutor(os.path.dirname((os.path.abspath(param_file))), p)
opt = optimizers.get_optimizer(p, exe)
ret = opt.run()
except Exception as e:
logger.error("Could not prepare the sbatch for submition.")
logger.error(e)
raise
# submit the sbatch file
if ret:
try:
out = subprocess.check_output([shutil.which('sbatch'), exe.sbatch_file])
logger.debug(out.decode())
except Exception as e:
logger.error("Could not submit the sbatch file %s." % exe.sbatch_file)
logger.error(e)
logger.info("Done.")
return p
except Exception as e:
logger.info("Could not finish the task!\n")
logger.error("[%s] %s\n" % (e.__class__.__name__, e))
raise
def __tree(self, dir_path: pathlib.Path, prefix: str = ''):
# prefix components:
space = ' '
branch = '│ '
# pointers:
tee = '├── '
last = '└── '
contents = list(dir_path.iterdir())
# contents each get pointers that are ├── with a final └── :
pointers = [tee] * (len(contents) - 1) + [last]
for pointer, path in zip(pointers, contents):
yield prefix + pointer + path.name
if path.is_dir(): # extend the prefix and recurse:
extension = branch if pointer == tee else space
# i.e. space because last, └── , above so no more |
yield from self.__tree(path, prefix=prefix + extension)
if __name__ == '__main__':
main()
Main()
import numpy as np
import sys
import math
import time
def branin(x):
x[0] = x[0]*15
x[1] = (x[1]*15)-5
y = np.square(x[1] - (5.1/(4*np.square(math.pi)))*np.square(x[0]) + (5/math.pi)*x[0] - 6) + 10*(1-(1./(8*math.pi)))*np.cos(x[0]) + 10;
result = y
print result
return result
# Write a function like this called 'main'
def main(job_id, params):
print 'Anything printed here will end up in the output directory for job #:', str(job_id)
print params
return branin(params['X'])
language: PYTHON
name: "branin"
variable {
name: "X"
type: FLOAT
size: 2
min: 0
max: 1
}
# Integer example
#
# variable {
# name: "Y"
# type: INT
# size: 5
# min: -5
# max: 5
# }
# Enumeration example
#
# variable {
# name: "Z"
# type: ENUM
# size: 3
# options: "foo"
# options: "bar"
# options: "baz"
# }
# "cpus_per_task" is required for every kind of jobs.
# To run the job on BioHPC cluster, "partition" and "time_limit" are also required.
# Spearmint can only be run on one node, therefore, the "nodes" will always be 1.
# Number of CPUs required by each task
cpus_per_task: 4
# Spearmint settings
optimizer: spearmint
spearmint_max_tasks: 5
spearmint_method: GPEIOptChooser
spearmint_config_file: config.pb
spearmint_function_file: branin.py
import numpy as np
import sys
import math
import time
def branin(x):
x[0] = x[0]*15
x[1] = (x[1]*15)-5
y = np.square(x[1] - (5.1/(4*np.square(math.pi)))*np.square(x[0]) + (5/math.pi)*x[0] - 6) + 10*(1-(1./(8*math.pi)))*np.cos(x[0]) + 10;
result = y
print result
return result
# Write a function like this called 'main'
def main(job_id, params):
print 'Anything printed here will end up in the output directory for job #:', str(job_id)
print params
return branin(params['X'])
language: PYTHON
name: "branin"
variable {
name: "X"
type: FLOAT
size: 2
min: 0
max: 1
}
# Integer example
#
# variable {
# name: "Y"
# type: INT
# size: 5
# min: -5
# max: 5
# }
# Enumeration example
#
# variable {
# name: "Z"
# type: ENUM
# size: 3
# options: "foo"
# options: "bar"
# options: "baz"
# }
# "cpus_per_task" is required for every kind of jobs.
# To run the job on BioHPC cluster, "partition" and "time_limit" are also required.
# Spearmint can only be run on one node, therefore, the "nodes" will always be 1.
# Cluster partition to use
partition: 256GB
# Total number of nodes to use
nodes: 2
# Number of CPUs required by each task
cpus_per_task: 4
# Time limit
time_limit: 3-00:00:00
# Spearmint settings
optimizer: spearmint
spearmint_max_tasks: 5
spearmint_method: GPEIOptChooser
spearmint_config_file: config.pb
spearmint_function_file: branin.py
#!/usr/bin/env python
import argparse
import json
import os
import random
import numpy as np
import ray
from ray.tune import Trainable, run, Experiment, sample_from
from ray.tune.schedulers import HyperBandScheduler
class MyTrainableClass(Trainable):
"""Example agent whose learning curve is a random sigmoid.
The dummy hyperparameters "width" and "height" determine the slope and
maximum reward value reached.
"""
def _setup(self, config):
self.timestep = 0
def _train(self):
self.timestep += 1
v = np.tanh(float(self.timestep) / self.config.get("width", 1))
v *= self.config.get("height", 1)
# Here we use `episode_reward_mean`, but you can also report other
# objectives such as loss or accuracy.
return {"episode_reward_mean": v}
def _save(self, checkpoint_dir):
path = os.path.join(checkpoint_dir, "checkpoint")
with open(path, "w") as f:
f.write(json.dumps({"timestep": self.timestep}))
return path
def _restore(self,