executors.py 5.02 KB
Newer Older
1
import datetime
David Trudgian's avatar
David Trudgian committed
2
import logging
David Trudgian's avatar
David Trudgian committed
3
import os
4
import re
David Trudgian's avatar
David Trudgian committed
5
6
7
8
from abc import ABCMeta, abstractmethod
from subprocess import call

from pathos.multiprocessing import Pool
9

David Trudgian's avatar
David Trudgian committed
10
logger = logging.getLogger(__name__)
11

David Trudgian's avatar
David Trudgian committed
12

13
14
class BaseExecutor:
    metaclass = ABCMeta
15
16

    commands = []
David Trudgian's avatar
David Trudgian committed
17
    logdir = ""
18
    params = None
19

20
    def __init__(self, commands, cwd, params):
21

22
        logger.info("Initializing %s " % self.__class__.__name__)
David Trudgian's avatar
David Trudgian committed
23

24
        self.commands = commands
25
        self.params = params
26

27
28
29
30
31
32

        if 'work_dir' in params.vals:
            logger.info("Changed to work directory: %s" % params.vals['work_dir'])
            os.chdir(params.vals['work_dir'])
            cwd = params.vals['work_dir']

David Trudgian's avatar
David Trudgian committed
33
34
35
        logdir = os.path.join(cwd,
                              "param_runner_%s" % datetime.datetime.now().strftime(
                                  '%Y%m%d-%H%M%s'))
36
37
38

        try:
            os.mkdir(logdir)
David Trudgian's avatar
David Trudgian committed
39
40
41
            logger.info(
                " - Trace file and output will be at: %s" % os.path.relpath(
                    logdir))
42
43
44
45
46
        except IOError:
            raise IOError("Could not create log directory: %s" % logdir)

        self.logdir = logdir

47
48
        self.start_trace()

49
    def start_trace(self):
David Trudgian's avatar
David Trudgian committed
50
51

        trace_path = os.path.join(self.logdir, 'trace.txt')
David Trudgian's avatar
David Trudgian committed
52
        trace_file = open(trace_path, 'w', buffering=False)
David Trudgian's avatar
David Trudgian committed
53
54
55
56
57
58

        trace_file.write("Index\t")

        for arg in self.commands[0]:
            trace_file.write("%s\t" % arg)

David Trudgian's avatar
David Trudgian committed
59
60
61
62
63
64
        trace_file.write("Return Code\tDuration\tStart\tEnd")

        for summary in self.params.vals['summary']:
            trace_file.write("\t%s" % summary['id'])

        trace_file.write("\n")
David Trudgian's avatar
David Trudgian committed
65

66
    def write_trace(self, msg):
David Trudgian's avatar
David Trudgian committed
67

68
        trace_path = os.path.join(self.logdir, 'trace.txt')
David Trudgian's avatar
David Trudgian committed
69
        trace_file = open(trace_path, 'a', buffering=False)
David Trudgian's avatar
David Trudgian committed
70

71
        trace_file.write(msg)
David Trudgian's avatar
David Trudgian committed
72

73
        trace_file.close()
David Trudgian's avatar
David Trudgian committed
74

75
    def wrap_cmd(self, cmd_idx):
David Trudgian's avatar
David Trudgian committed
76

77
        logger.debug(" - Task %d is running" % cmd_idx)
David Trudgian's avatar
David Trudgian committed
78

79
        cmd = self.commands[cmd_idx]
David Trudgian's avatar
David Trudgian committed
80

81
82
83
84
        stdout_path = os.path.join(self.logdir, "%d.out" % cmd_idx)
        stderr_path = os.path.join(self.logdir, "%d.err" % cmd_idx)
        stderr_file = open(stderr_path, 'w')
        stdout_file = open(stdout_path, 'w')
David Trudgian's avatar
David Trudgian committed
85

86
87
88
89
        start = datetime.datetime.now()
        ret = self.run_cmd(cmd, stderr_file, stdout_file)
        end = datetime.datetime.now()
        duration = end - start
David Trudgian's avatar
David Trudgian committed
90

91
92
93
94
95
96
97
98
        stderr_file.close()
        stdout_file.close()

        summary_line = "%d\t" % cmd_idx

        for arg in cmd.values():
            summary_line += "%s\t" % str(arg['value'])

David Trudgian's avatar
David Trudgian committed
99
        summary_line += "%d\t%s\t%s\t%s" % (
100
101
102
103
104
105
            ret,
            duration,
            start,
            end
        )

David Trudgian's avatar
David Trudgian committed
106
107
108
109
110
111
        for summary in self.params.vals['summary']:
            summary_line += '\t'
            summary_line += self.__pygrep_first(summary['regex'], stdout_path)

        summary_line += '\n'

112
        return summary_line
113

David Trudgian's avatar
David Trudgian committed
114
115
    @staticmethod
    def __pygrep_first(regex, text_file):
116

David Trudgian's avatar
David Trudgian committed
117
        for i, line in enumerate(open(text_file)):
David Trudgian's avatar
David Trudgian committed
118
119
120
121
122
            for match in re.finditer(regex, line):
                return match.group(1)

        return ''

123
124
125
    @abstractmethod
    def run(self):
        pass
126

127
    @abstractmethod
David Trudgian's avatar
David Trudgian committed
128
    def run_cmd(self, cmd, stderr_file, stdout_file):
129
        pass
David Trudgian's avatar
David Trudgian committed
130
131
132


class LocalExecutor(BaseExecutor):
133
134
    def run(self):
        for cmd_idx, cmd in enumerate(self.commands):
David Trudgian's avatar
David Trudgian committed
135
            self.write_trace(self.wrap_cmd(cmd_idx))
136

David Trudgian's avatar
David Trudgian committed
137
138
139
140
141
142
143
144
    def run_cmd(self, cmd, stderr_file, stdout_file):
        ret = call(cmd['__command']['value'], shell=True, stderr=stderr_file,
                   stdout=stdout_file)

        return ret


class SrunExecutor(BaseExecutor):
145
146
147
    def run(self):

        if 'SLURM_JOB_CPUS_PER_NODE' not in os.environ:
David Trudgian's avatar
David Trudgian committed
148
149
            raise EnvironmentError(
                'The srun executor can only be run inside a SLURM allocation.')
150

David Trudgian's avatar
David Trudgian committed
151
152
153
154
        logger.info(
            " - Running in SLURM JOB %s" % os.environ.get('SLURM_JOB_ID'))
        cpus_per_node = int(
            re.match('\d+', os.environ.get('SLURM_JOB_CPUS_PER_NODE')).group(0))
155
156
157
158
159
160
161
162
163
164
165
        logger.info(" - %d CPUs per node" % cpus_per_node)
        nodes = int(os.environ.get('SLURM_NNODES'))
        logger.info(" - %d nodes" % nodes)
        cpus_per_task = self.params.vals['cpus_per_task']
        conc_tasks = (cpus_per_node / cpus_per_task) * nodes
        logger.info(" - %d Concurrent tasks" % conc_tasks)

        p = Pool(conc_tasks)

        for cmd_idx, cmd in enumerate(self.commands):
            logger.debug(" - Task %d is waiting in the pool" % cmd_idx)
David Trudgian's avatar
David Trudgian committed
166
            p.apply_async(self.wrap_cmd, [cmd_idx], callback=self.write_trace)
167
168
169
170

        p.close()
        p.join()

171
172
        logger.info(" - All tasks have completed")

David Trudgian's avatar
David Trudgian committed
173
174
    def run_cmd(self, cmd, stderr_file, stdout_file):

175
        cpus_per_task = self.params.vals['cpus_per_task']
176
        srun_cmd = "srun --exclusive -N1 -n1 --cpus-per-task=%d --distribution=cyclic %s" % (
David Trudgian's avatar
David Trudgian committed
177
         cpus_per_task, cmd['__command']['value'])
David Trudgian's avatar
David Trudgian committed
178
179

        ret = call(srun_cmd, shell=True, stderr=stderr_file,
David Trudgian's avatar
David Trudgian committed
180
                   stdout=stdout_file, env=os.environ)
181

182
        return ret