Module regpy.operators.parallel_operators
Functions
def check_running(conns, conn_m)
-
Expand source code
def check_running(conns,conn_m): r""" Function that runs in seperate watcher process and checks if main process is alive. Terminates subprocesses after 10 seconds if main process is killed. Parameters ---------- conns : list of mp.connection.Connection connections to subprocesses of main process conn : mp.connection.Connection connection object used to receive command from main process to shut down this process if subprocesses are closed normally """ parent_id=os.getppid() terminated=False while(os.getppid()==parent_id and not terminated): if(conn_m.poll(10)): terminated=True if(not terminated): time.sleep(10) for conn in conns: if(conn.poll()): conn.recv() conn.send(['break']) print("Closed remaining background processes.")
Function that runs in seperate watcher process and checks if main process is alive. Terminates subprocesses after 10 seconds if main process is killed.
Parameters
conns
:list
ofmp.connection.Connection
- connections to subprocesses of main process
conn
:mp.connection.Connection
- connection object used to receive command from main process to shut down this process if subprocesses are closed normally
Classes
class ExitCode (*args, **kwds)
-
Expand source code
class ExitCode(Enum): SUCCESS=1 ERROR=2 TIMEOUT=3
Ancestors
- enum.Enum
Class variables
var SUCCESS
-
The type of the None singleton.
var ERROR
-
The type of the None singleton.
var TIMEOUT
-
The type of the None singleton.
class OperatorAsWorker (name, conn, F)
-
Expand source code
class OperatorAsWorker(mp.Process): r""" Process that represents an operator and can be used to do operator evaluations in parallel. Parameters ---------- name : string name of the process conn : mp.connection.Connection connection object to receive commands and send the results back to master F : operators.Operator the regpy operator """ log = classlogger def __init__(self, name, conn,F): super(OperatorAsWorker, self).__init__() self.F = F """the operator""" self.name = name """name of the process""" self.conn = conn """connection to master""" def run(self): """Starts the process. While running the process may receive the commands: 'eval_nodiff': evaluates the operator with differentiate=False 'eval_diff': evaluates the operator with differentiate=True 'deriv': returns linearize 'eval_nodiff': returns adjoint 'break': ends process Raises: TypeError: Error is raised if unknown command is received """ terminate=False while not terminate: res=None exit_code=ExitCode.ERROR try: command = self.conn.recv() self.log.debug(self.name+ ' executing '+command[0]) if command[0] == 'eval_nodiff': res=self.F(command[1]) exit_code=ExitCode.SUCCESS elif command[0] == 'eval_diff': res, self.deriv = self.F.linearize(command[1]) exit_code=ExitCode.SUCCESS elif command[0] == 'deriv': res = self.deriv(command[1]) exit_code=ExitCode.SUCCESS elif command[0] == 'adjoint': if self.F.linear: res = self.F.adjoint(command[1]) else: res = self.deriv.adjoint(command[1]) exit_code=ExitCode.SUCCESS elif command[0] == 'break': terminate=True else: raise TypeError(self.name+': unknown command ',command[0]) except TypeError: exit_code=ExitCode.ERROR res=TypeError(f"Error in subprocess: {self.name}: unknown command",command[0]) except: exit_code=ExitCode.ERROR res=RuntimeError(f"Error in subprocess: An error occured during the computation of {command[0]}") if(not terminate): self.conn.send([exit_code,res]) return 0
Process that represents an operator and can be used to do operator evaluations in parallel.
Parameters
name
:string
- name of the process
conn
:mp.connection.Connection
- connection object to receive commands and send the results back to master
F
:operators.Operator
- the regpy operator
Ancestors
- multiprocessing.context.Process
- multiprocessing.process.BaseProcess
Instance variables
prop log
-
Expand source code
@property def classlogger(self): """The [`logging.Logger`][1] instance. Every subclass has a separate instance, named by its fully qualified name. Subclasses should use it instead of `print` for any kind of status information to allow users to control output formatting, verbosity and persistence. [1]: https://docs.python.org/3/library/logging.html#logging.Logger """ return getattr(self, '_log', None) or getLogger(type(self).__qualname__)
The
logging.Logger
instance. Every subclass has a separate instance, named by its fully qualified name. Subclasses should use it instead ofprint
for any kind of status information to allow users to control output formatting, verbosity and persistence. var F
-
the operator
var name
-
Expand source code
@property def name(self): return self._name
name of the process
var conn
-
connection to master
Methods
def run(self)
-
Expand source code
def run(self): """Starts the process. While running the process may receive the commands: 'eval_nodiff': evaluates the operator with differentiate=False 'eval_diff': evaluates the operator with differentiate=True 'deriv': returns linearize 'eval_nodiff': returns adjoint 'break': ends process Raises: TypeError: Error is raised if unknown command is received """ terminate=False while not terminate: res=None exit_code=ExitCode.ERROR try: command = self.conn.recv() self.log.debug(self.name+ ' executing '+command[0]) if command[0] == 'eval_nodiff': res=self.F(command[1]) exit_code=ExitCode.SUCCESS elif command[0] == 'eval_diff': res, self.deriv = self.F.linearize(command[1]) exit_code=ExitCode.SUCCESS elif command[0] == 'deriv': res = self.deriv(command[1]) exit_code=ExitCode.SUCCESS elif command[0] == 'adjoint': if self.F.linear: res = self.F.adjoint(command[1]) else: res = self.deriv.adjoint(command[1]) exit_code=ExitCode.SUCCESS elif command[0] == 'break': terminate=True else: raise TypeError(self.name+': unknown command ',command[0]) except TypeError: exit_code=ExitCode.ERROR res=TypeError(f"Error in subprocess: {self.name}: unknown command",command[0]) except: exit_code=ExitCode.ERROR res=RuntimeError(f"Error in subprocess: An error occured during the computation of {command[0]}") if(not terminate): self.conn.send([exit_code,res]) return 0
Starts the process. While running the process may receive the commands: 'eval_nodiff': evaluates the operator with differentiate=False 'eval_diff': evaluates the operator with differentiate=True 'deriv': returns linearize 'eval_nodiff': returns adjoint 'break': ends process
Raises
TypeError
- Error is raised if unknown command is received
class ParallelInterface (conns, end_command='break')
-
Expand source code
class ParallelInterface: r""" Interface for parallel processing Parameters ---------- conns : list of mp.connection.Connection List of connections used to send commands to worker processes and receive results. end_command : string, optional Command that terminates sub processes. Defaults to "break" """ MAX_SUBPROCESSES=128 """maximal number of subprocesses until warning is raised""" parallel_instances=[WeakValueDictionary()] """list of dictionaries containig weak references to subprocesses. Used internally for terminating subprocesses.""" _min_id_inst=0 _id_manager=0 def total_subprocess_count(): r""" Calculates the total number of running processes. """ tot_sum=0 for p_inst in ParallelInterface.parallel_instances: tot_sum+=sum([instance.subprocess_count for instance in p_inst.values() if instance.running]) return tot_sum def warn_subprocess_count(): r""" Produces a warning if the total number of running processes is higher than MAX_SUBPROCESSES. """ sp_count=ParallelInterface.total_subprocess_count() if(sp_count> ParallelInterface.MAX_SUBPROCESSES): warn(f"Warning: There are already {sp_count} subprocesses running.",stacklevel=2) def terminate_managed_instances(manager_id): r""" Terminate all instances of ParallelInterface associated with manager_id or a higher id Parameters ---------- manager_id : int id of ParallelExecutionManager """ for i in range(manager_id,len(ParallelInterface.parallel_instances)): for instance in ParallelInterface.parallel_instances[i].values(): instance.terminate_all() if(manager_id>0): ParallelInterface._id_manager=manager_id-1 ParallelInterface.parallel_instances=ParallelInterface.parallel_instances[:manager_id] else: ParallelInterface._id_manager=0 ParallelInterface.parallel_instances=[WeakValueDictionary()] def terminate_all_instances(): r""" Terminates all instances of ParallelInterface. """ ParallelInterface.terminate_all_managed_instances(0) def add_manager(): r""" Adds a new manager section and returns the correcponding manager id. """ ParallelInterface.parallel_instances.append(WeakValueDictionary()) ParallelInterface._id_manager+=1 return ParallelInterface._id_manager def __init__(self,conns,end_command="break"): self.conns=conns """Connection to subprocesses""" self.subprocess_count=len(conns) """Number of subprocesses""" self.end_command=end_command """Command used to end sub processes""" #Add current instance to weak dictionary at current manager id ParallelInterface.parallel_instances[ParallelInterface._id_manager][ParallelInterface._min_id_inst]=self ParallelInterface._min_id_inst+=1 self.running=True """Flag which indicates if subprocsses of this object are still running""" ParallelInterface.warn_subprocess_count() #Setup watcher process conn_m, conn_w = mp.Pipe() self.conn_watcher=conn_m process = mp.Process(target=check_running, args=(conns,conn_w)) process.start() def terminate_all(self): r"""Terminate all running subprocesses. """ if(self.running): for conn in self.conns: if(conn.poll()): rec_d=conn.recv() conn.send([self.end_command]) self.subprocess_count=0 self.conn_watcher.send(['break']) self.running=False def handle_errors(self,rec_d): r"""Handles received errors by displaying massage and stopping subprocesses. Parameters ---------- rec_d : list list where the first entry is an ExitCode that indicates whether an error occured in the subprocess """ if(rec_d[0]==ExitCode.ERROR): self.terminate_all() raise rec_d[1] elif(rec_d[0]==ExitCode.TIMEOUT): self.terminate_all() raise TimeoutError("Subprocess timed out!") def compute_all(self,command,arg_same=None,args_specific=[]): r"""Sends command and argument to all subprocesses and returns results. Parameters ---------- command : string command describing task for subprocess arg_same : numpy.ndarray, optional argument send to all subprocesses. Defaults to None. args_specific : list, optional List of arguments where args_specific[j] is send to subprocess j. Defaulst to []. """ if(not self.running): raise RuntimeError(f"Computation of {command} is impossible, because process {self} was already terminated.") same_info=[command] if(arg_same is not None): same_info.append(arg_same) for conn in self.conns: conn.send(same_info) elif(len(args_specific)==len(self.conns)): for i,conn in enumerate(self.conns): conn.send(same_info+[args_specific[i]]) else: raise ValueError(f"Invalid number of arguments for parallel operators!") rec_data=[conn.recv() for conn in self.conns] for rec_d in rec_data: self.handle_errors(rec_d) return (rec_d[1] for rec_d in rec_data) def __del__(self): r""" Terminates all subprocesses when object is garbage collected. """ self.terminate_all()
Interface for parallel processing
Parameters
conns
:list
ofmp.connection.Connection
- List of connections used to send commands to worker processes and receive results.
end_command
:string
, optional- Command that terminates sub processes. Defaults to "break"
Subclasses
Class variables
var MAX_SUBPROCESSES
-
maximal number of subprocesses until warning is raised
var parallel_instances
-
list of dictionaries containig weak references to subprocesses. Used internally for terminating subprocesses.
Instance variables
var conns
-
Connection to subprocesses
var subprocess_count
-
Number of subprocesses
var end_command
-
Command used to end sub processes
var running
-
Flag which indicates if subprocsses of this object are still running
Methods
def total_subprocess_count()
-
Expand source code
def total_subprocess_count(): r""" Calculates the total number of running processes. """ tot_sum=0 for p_inst in ParallelInterface.parallel_instances: tot_sum+=sum([instance.subprocess_count for instance in p_inst.values() if instance.running]) return tot_sum
Calculates the total number of running processes.
def warn_subprocess_count()
-
Expand source code
def warn_subprocess_count(): r""" Produces a warning if the total number of running processes is higher than MAX_SUBPROCESSES. """ sp_count=ParallelInterface.total_subprocess_count() if(sp_count> ParallelInterface.MAX_SUBPROCESSES): warn(f"Warning: There are already {sp_count} subprocesses running.",stacklevel=2)
Produces a warning if the total number of running processes is higher than MAX_SUBPROCESSES.
def terminate_managed_instances(manager_id)
-
Expand source code
def terminate_managed_instances(manager_id): r""" Terminate all instances of ParallelInterface associated with manager_id or a higher id Parameters ---------- manager_id : int id of ParallelExecutionManager """ for i in range(manager_id,len(ParallelInterface.parallel_instances)): for instance in ParallelInterface.parallel_instances[i].values(): instance.terminate_all() if(manager_id>0): ParallelInterface._id_manager=manager_id-1 ParallelInterface.parallel_instances=ParallelInterface.parallel_instances[:manager_id] else: ParallelInterface._id_manager=0 ParallelInterface.parallel_instances=[WeakValueDictionary()]
Terminate all instances of ParallelInterface associated with manager_id or a higher id Parameters
manager_id
:int
- id of ParallelExecutionManager
def terminate_all_instances()
-
Expand source code
def terminate_all_instances(): r""" Terminates all instances of ParallelInterface. """ ParallelInterface.terminate_all_managed_instances(0)
Terminates all instances of ParallelInterface.
def add_manager()
-
Expand source code
def add_manager(): r""" Adds a new manager section and returns the correcponding manager id. """ ParallelInterface.parallel_instances.append(WeakValueDictionary()) ParallelInterface._id_manager+=1 return ParallelInterface._id_manager
Adds a new manager section and returns the correcponding manager id.
def terminate_all(self)
-
Expand source code
def terminate_all(self): r"""Terminate all running subprocesses. """ if(self.running): for conn in self.conns: if(conn.poll()): rec_d=conn.recv() conn.send([self.end_command]) self.subprocess_count=0 self.conn_watcher.send(['break']) self.running=False
Terminate all running subprocesses.
def handle_errors(self, rec_d)
-
Expand source code
def handle_errors(self,rec_d): r"""Handles received errors by displaying massage and stopping subprocesses. Parameters ---------- rec_d : list list where the first entry is an ExitCode that indicates whether an error occured in the subprocess """ if(rec_d[0]==ExitCode.ERROR): self.terminate_all() raise rec_d[1] elif(rec_d[0]==ExitCode.TIMEOUT): self.terminate_all() raise TimeoutError("Subprocess timed out!")
Handles received errors by displaying massage and stopping subprocesses.
Parameters
rec_d
:list
- list where the first entry is an ExitCode that indicates whether an error occured in the subprocess
def compute_all(self, command, arg_same=None, args_specific=[])
-
Expand source code
def compute_all(self,command,arg_same=None,args_specific=[]): r"""Sends command and argument to all subprocesses and returns results. Parameters ---------- command : string command describing task for subprocess arg_same : numpy.ndarray, optional argument send to all subprocesses. Defaults to None. args_specific : list, optional List of arguments where args_specific[j] is send to subprocess j. Defaulst to []. """ if(not self.running): raise RuntimeError(f"Computation of {command} is impossible, because process {self} was already terminated.") same_info=[command] if(arg_same is not None): same_info.append(arg_same) for conn in self.conns: conn.send(same_info) elif(len(args_specific)==len(self.conns)): for i,conn in enumerate(self.conns): conn.send(same_info+[args_specific[i]]) else: raise ValueError(f"Invalid number of arguments for parallel operators!") rec_data=[conn.recv() for conn in self.conns] for rec_d in rec_data: self.handle_errors(rec_d) return (rec_d[1] for rec_d in rec_data)
Sends command and argument to all subprocesses and returns results.
Parameters
command
:string
- command describing task for subprocess
arg_same
:numpy.ndarray
, optional- argument send to all subprocesses. Defaults to None.
args_specific
:list
, optional- List of arguments where args_specific[j] is send to subprocess j. Defaulst to [].
class ParallelVectorOfOperators (ops, domain=None, codomain=None)
-
Expand source code
class ParallelVectorOfOperators(Operator,ParallelInterface): """Vector of operators in which all components are evaluated in parallel. The functionality is identical to the sequential analog VectorOfOperators: For T_i : X -> Y_i we define T := VectorOfOperators(T_i) : X -> DirectSum(Y_i) by `T(x)_i := T_i(x)`. Parameters ---------- *ops : tuple of Operator codomain : vecsps.VectorSpace or callable, optional Either the underlying vector space or a factory function that will be called with all summands' vector spaces passed as arguments and should return a vecsps.DirectSum instance. The resulting vector space should be iterable, yielding the individual summands. Default: vecsps.DirectSum. """ def __init__(self, ops, domain=None, codomain=None): assert all([isinstance(op, Operator) for op in ops]) assert ops if domain is None: self.domain = ops[0].domain else: self.domain = domain assert all(op.domain == self.domain for op in ops) if codomain is None: codomain = vecsps.DirectSum if isinstance(codomain, vecsps.VectorSpace): pass elif callable(codomain): codomain = codomain(*(op.codomain for op in ops)) else: raise TypeError('codomain={} is neither a VectorSpace nor callable'.format(codomain)) assert all(op.codomain == c for op, c in zip(ops, codomain)) conns = [] it = 0 for op in ops: conn_m, conn_w = mp.Pipe() conns.append(conn_m) G = OperatorAsWorker(type(op).__name__+' as worker '+str(it),conn_w,op) G.start() it += 1 Operator.__init__(self,domain=self.domain, codomain=codomain, linear=all(op.linear for op in ops)) ParallelInterface.__init__(self,conns) def _eval(self, x, differentiate=False): if differentiate: return self.codomain.join(*self.compute_all('eval_diff',x)) else: return self.codomain.join(*self.compute_all('eval_nodiff',x)) def _derivative(self, x): return self.codomain.join(*self.compute_all('deriv',x)) def _adjoint(self, y): elms = self.codomain.split(y) return sum(self.compute_all('adjoint',args_specific=elms))
Vector of operators in which all components are evaluated in parallel. The functionality is identical to the sequential analog VectorOfOperators: For
T_i : X -> Y_i
we define
T := VectorOfOperators(T_i) : X -> DirectSum(Y_i)
by
T(x)_i := T_i(x)
.Parameters
*ops
:tuple
ofOperator
codomain
:vecsps.VectorSpace
orcallable
, optional- Either the underlying vector space or a factory function that will be called with all summands' vector spaces passed as arguments and should return a vecsps.DirectSum instance. The resulting vector space should be iterable, yielding the individual summands. Default: vecsps.DirectSum.
Ancestors
Inherited members
class DistributedVectorOfOperators (ops, domain, distribution_mat, codomain=None)
-
Expand source code
class DistributedVectorOfOperators(Operator,ParallelInterface): r"""Vector of operators in which all components are evaluated in parallel and the input is assumed to be from direct sum of spaces that is then distributed to the operators that need it \[T_i : X_{i_1}\times X_{i_2}... -> Y_i\] we define T := VectorOfOperators(T_i) : DirectSum(X_j) -> DirectSum(Y_i) by `T(x)_i := T_i(x_i1,x_i2,...)`. Parameters ---------- *ops : tuple of Operator domain : vecsps.VectorSpace The domain of the operator. It should usually be a direct sum of vector spaces distribution_mat : numpy.ndarray of bools The matrix that indicates which parts of the arguments are passed to which operator. If the entry M_i,j is True the j-th component of the argument is passed to the i-th operator. codomain : vecsps.VectorSpace or callable, optional Either the underlying vector space or a factory function that will be called with all summands' vector spaces passed as arguments and should return a vecsps.DirectSum instance. The resulting vector space should be iterable, yielding the individual summands. Default: vecsps.DirectSum. """ def __init__(self, ops, domain,distribution_mat, codomain=None): assert all([isinstance(op, Operator) for op in ops]) assert ops self.domain = domain if codomain is None: codomain = vecsps.DirectSum if isinstance(codomain, vecsps.VectorSpace): pass elif callable(codomain): codomain = codomain(*(op.codomain for op in ops)) else: raise TypeError('codomain={} is neither a VectorSpace nor callable'.format(codomain)) assert all(op.codomain == c for op, c in zip(ops, codomain)) self.distribution_mat=distribution_mat self.distribution_lists=[[j for j in range(distribution_mat.shape[1]) if distribution_mat[i,j]] for i in range(distribution_mat.shape[0])] conns = [] it = 0 for op in ops: conn_m, conn_w = mp.Pipe() conns.append(conn_m) G = OperatorAsWorker(type(op).__name__+' as worker '+str(it),conn_w,op) G.start() it += 1 self.ops=ops Operator.__init__(self,domain=self.domain, codomain=codomain, linear=all(op.linear for op in ops)) ParallelInterface.__init__(self,conns) def _distribute(self,x): elms=self.domain.split(x) x_ops=[] for i,op in enumerate(self.ops): if(len(self.distribution_lists[i])==1): x_ops.append(elms[self.distribution_lists[i][0]]) else: x_ops.append(op.domain.join(*[elms[j] for j in self.distribution_lists[i]])) return x_ops def _collect(self,x_res): elms=list(self.domain.split(self.domain.zeros())) x_split=[op.domain.split(x_res[i]) if isinstance(op.domain, DirectSum) else x_res[i] for i,op in enumerate(self.ops)] for i, op in enumerate(self.ops): for k,j in enumerate(self.distribution_lists[i]): elms[j]=elms[j]+x_split[i][k] return self.domain.join(*elms) def _eval(self, x, differentiate=False): x_ops=self._distribute(x) if differentiate: return self.codomain.join(*self.compute_all('eval_diff',args_specific=x_ops)) else: return self.codomain.join(*self.compute_all('eval_nodiff',args_specific=x_ops)) def _derivative(self, x): x_ops=self._distribute(x) return self.codomain.join(*self.compute_all('deriv',args_specific=x_ops)) def _adjoint(self, y): elms = self.codomain.split(y) return self._collect(list(self.compute_all('adjoint',args_specific=elms)))
Vector of operators in which all components are evaluated in parallel and the input is assumed to be from direct sum of spaces that is then distributed to the operators that need it T_i : X_{i_1}\times X_{i_2}... -> Y_i
we define
T := VectorOfOperators(T_i) : DirectSum(X_j) -> DirectSum(Y_i)
by
T(x)_i := T_i(x_i1,x_i2,...)
.Parameters
*ops
:tuple
ofOperator
domain
:vecsps.VectorSpace
- The domain of the operator. It should usually be a direct sum of vector spaces
distribution_mat
:numpy.ndarray
ofbools
- The matrix that indicates which parts of the arguments are passed to which operator. If the entry M_i,j is True the j-th component of the argument is passed to the i-th operator.
codomain
:vecsps.VectorSpace
orcallable
, optional- Either the underlying vector space or a factory function that will be called with all summands' vector spaces passed as arguments and should return a vecsps.DirectSum instance. The resulting vector space should be iterable, yielding the individual summands. Default: vecsps.DirectSum.
Ancestors
Inherited members
class ParallelExecutionManager
-
Expand source code
class ParallelExecutionManager: r""" Context manager used to manage objects that use the ParallelInterface. Before such objects are created a ParallelExecutionManager should be entered using `with ParallelExecutionManager()` to guarantee that subprocesses are closed correctly. """ def __init__(self): pass def __enter__(self): r""" Gets manager id from ParallelInterface. """ ParallelInterface.warn_subprocess_count() self.manager_id=ParallelInterface.add_manager() return self def __exit__(self,type, value, traceback): r""" Terminates all managed instances of ParallelInterface. """ ParallelInterface.terminate_managed_instances(self.manager_id)
Context manager used to manage objects that use the ParallelInterface. Before such objects are created a ParallelExecutionManager should be entered using
with ParallelExecutionManager
to guarantee that subprocesses are closed correctly.