Source code for petgem.common
#!/usr/bin/env python3
# Author: Octavio Castillo Reyes
# Contact: octavio.castillo@bsc.es
"""Define common operations for **PETGEM**."""
# ---------------------------------------------------------------
# Load python modules
# ---------------------------------------------------------------
import time
import yaml
import sys
import os
import numpy as np
from functools import wraps
from colorama import Fore
from singleton_decorator import singleton
# ---------------------------------------------------------------
# Load petgem modules (BSC)
# ---------------------------------------------------------------
from .parallel import MPIEnvironment
# ###############################################################
# ################ CLASSES DEFINITION ##################
# ###############################################################
# ---------------------------------------------------------------
# Class Print definition
# ---------------------------------------------------------------
[docs]class Print(object):
"""
This class provides methods for pretty print.
:param object str: string to be printed.
:return: None.
:rtype: None.
"""
# Options for Gauss points computation (switch case)
_options = {
1: Fore.BLACK,
2: Fore.BLUE,
3: Fore.CYAN,
4: Fore.GREEN,
5: Fore.LIGHTBLACK_EX,
6: Fore.LIGHTBLUE_EX,
7: Fore.LIGHTCYAN_EX,
8: Fore.LIGHTGREEN_EX,
9: Fore.LIGHTMAGENTA_EX,
10: Fore.LIGHTRED_EX,
11: Fore.LIGHTWHITE_EX,
12: Fore.LIGHTYELLOW_EX,
13: Fore.MAGENTA,
14: Fore.RED,
15: Fore.WHITE,
16: Fore.YELLOW
}
# Constructor
def __init__(self, text, color_code=None):
"""Constructor."""
self._log(text, color_code)
# Logging method
def _log(self, text, color_code=None):
"""
Configure and prints a text.
:param str text: text to be printed.
:param int color_code: text color code.
:return: None.
"""
# Verify if color_code is None, then use black color
if color_code is None:
color_code = int(16)
set_color = self._options[color_code]
print(set_color + text)
sys.stdout.flush()
return
[docs] @classmethod
def header(self):
"""Print the header.
:param: None.
:return: None.
:rtype: None.
"""
# Specific color code for printing the header
color_code = 5
if( MPIEnvironment().rank == 0 ):
self._log(self, '%'*75, color_code)
self._log(self, '%%%' + ' '*69 + '%%%', color_code)
self._log(self, '%%%'+ 'PETGEM'.center(69) + '%%%', color_code)
self._log(self, '%%%'+ 'Parallel Edge-based Tool for Electromagnetic Modelling'.center(69) + '%%%', color_code)
self._log(self, '%%%' + ' '*69 + '%%%', color_code)
self._log(self, '%'*75, color_code)
self._log(self, '%%%' + ' '*69 + '%%%', color_code)
self._log(self, '%%% (c) Octavio Castillo-Reyes' +
' '*40 + '%%%', color_code)
self._log(self, '%%% Barcelona Supercomputing Center (BSC-CNS), 2021' +
' '*19 + '%%%', color_code)
self._log(self, '%%%' + ' '*69 + '%%%', color_code)
self._log(self, '%'*75, color_code)
return
[docs] @classmethod
def master(self, text, color_code=None):
"""
If the caller is the master process, this method prints a message.
:param: None.
:return: None.
:rtype: None.
"""
if( MPIEnvironment().rank == 0 ):
self._log(self, text, color_code)
return
# ---------------------------------------------------------------
# Class InputParameters definition
# ---------------------------------------------------------------
[docs]class InputParameters(object):
"""Method to import a yaml parameter file.
:param dict object: user params yaml file.
:return: user parameters as object view.
:rtype: object.
"""
def __init__(self, params, parEnv):
"""Class constructor.
:param str params: yaml parameters file.
:param object parEnv: parallel environment object.
:return: InputParameters object.
:rtype: object
"""
# ---------------------------------------------------------------
# Read the input parameters file
# ---------------------------------------------------------------
with open(params, 'r') as f:
# The FullLoader parameter handles the conversion from YAML
# scalar values to the Python dictionary format
inputs = yaml.safe_load(f)
# Get set of parameters
self.model = inputs['model']
self.run = inputs['run']
self.output = inputs['output']
# ---------------------------------------------------------------
# Check modeling mode
# ---------------------------------------------------------------
if not ('mode' in self.model.keys()):
Print.master(' Modeling mode not provided. Please, verify the parameter file consistency.')
exit(-1)
else:
if not ((self.model.get('mode') == 'csem') or (self.model.get('mode') == 'mt')):
Print.master(' Modeling mode not supported.')
exit(-1)
# ---------------------------------------------------------------
# Check parameters consistency for csem mode
# ---------------------------------------------------------------
if (self.model.get('mode') == 'csem'):
if not ('csem' in self.model.keys()):
Print.master(' csem parameters not provided. Please, verify the parameter file consistency.')
exit(-1)
else:
# Check consistency of csem params
conductivity_from_file, num_polarizations = self.__verify_CSEM_params__(self.model)
# ---------------------------------------------------------------
# Check parameters consistency for mt mode
# ---------------------------------------------------------------
elif (self.model.get('mode') == 'mt'):
if not ('mt' in self.model.keys()):
Print.master(' mt parameters not provided. Please, verify the parameter file consistency.')
exit(-1)
else:
# Check consistency of mt params
conductivity_from_file, num_polarizations = self.__verify_MT_params__(self.model)
# Update number of models, interpolation strategy and
# polarization modes
self.run.update({'conductivity_from_file': conductivity_from_file})
self.run.update({'num_polarizations': num_polarizations})
# ---------------------------------------------------------------
# Check consistency of common parameters
# ---------------------------------------------------------------
# Mesh
if not ('mesh' in self.model.keys()):
Print.master(' mesh parameter not provided. Please, verify the parameter file consistency.')
exit(-1)
# Receivers
if not ('receivers' in self.model.keys()):
Print.master(' receivers parameter not provided. Please, verify the parameter file consistency.')
exit(-1)
# Basis order
if not ('nord' in self.run.keys()):
Print.master(' nord parameter not provided. Please, verify the parameter file consistency.')
exit(-1)
else:
if ((self.run.get('nord') < 1) or (self.run.get('nord') > 6)):
Print.master(' Vector finite element basis order not supported. Please, select a valid order (1,2,3,4,5,6).')
exit(-1)
# Cuda support
if not ('cuda' in self.run.keys()):
self.run.update({'cuda': False})
else:
if not ((self.run.get('cuda') is False) or (self.run.get('cuda') is True)):
Print.master(' cuda option not supported. Please, select a valid order (True/False).')
exit(-1)
# Output
if not ('vtk' in self.output.keys()):
self.output.update({'vtk': False})
if not ('directory' in self.output.keys()):
Print.master(' output directory parameter not provided. Please, verify the parameter file consistency.')
exit(-1)
else:
if(parEnv.rank == 0):
if not os.path.exists(self.output.get('directory')):
os.mkdir(self.output.get('directory'))
# If not scratch directory, use output directory
if not ('directory_scratch' in self.output.keys()):
self.output.update({'directory_scratch': self.output.get('directory')})
self.output.update({'remove_scratch': False})
else:
if(parEnv.rank == 0):
if not os.path.exists(self.output.get('directory_scratch')):
os.mkdir(self.output.get('directory_scratch'))
self.output.update({'remove_scratch': True})
return
def __verify_CSEM_params__(self, data):
"""Verify consistency of CSEM parameters
:param dict data: csem dictionary
:return: input conductivity model from file or array.
:rtype: bool
"""
# Get csem parameters
csem_params = data.get('csem')
# One "polarization mode" per csem model
num_polarizations = np.int(1)
# Check consistency for csem modeling
# Check sigma consistency
if not ('sigma' in csem_params.keys()):
Print.master(' csem parameters not provided. Please, verify the parameter file consistency.')
exit(-1)
else:
# Get sigma parameters
i_sigma = csem_params.get('sigma')
# Conductivity file
if ('file' in i_sigma.keys()):
# No vectors conductivity
if (('horizontal' in i_sigma.keys()) or ('vertical' in i_sigma.keys())):
Print.master(' sigma parameters invalid. Please, verify the parameter file consistency.')
exit(-1)
else:
conductivity_from_file = True
# Vector conductivity
elif (('horizontal' in i_sigma.keys()) and ('vertical' in i_sigma.keys())):
# No file conductivity
if ('file' in i_sigma.keys()):
Print.master(' sigma parameters invalid. Please, verify the parameter file consistency.')
exit(-1)
else:
conductivity_from_file = False
else:
Print.master(' sigma parameters invalid. Please, verify the parameter file consistency.')
exit(-1)
# Check source consistency
if not ('source' in csem_params.keys()):
Print.master(' source parameters not provided. Please, verify the parameter file consistency.')
exit(-1)
else:
# Get source parameters
i_source = csem_params.get('source')
# Check number of source parameters
if not (len(i_source) == 6):
Print.master(' number of source parameters is not consistent. Please, verify the parameter file consistency.')
exit(-1)
else:
base_params = ['frequency', 'position', 'azimuth', 'dip', 'current', 'length']
for i in np.arange(6):
if not (base_params[i] in i_source.keys()):
m = ' ' + base_params[i] + ' parameter not provided. Please, verify the parameter file consistency.'
Print.master(m)
exit(-1)
return conductivity_from_file, num_polarizations
def __verify_MT_params__(self, data):
"""Verify consistency of MT parameters
:param dict data: mt dictionary
:return: input conductivity model from file or array.
:rtype: bool
"""
# Get mt parameters
mt_params = data.get('mt')
# Check consistency for all mt models
# Get model parameters
# Check sigma consistency
if not ('sigma' in mt_params.keys()):
Print.master(' mt parameters not provided. Please, verify the parameter file consistency.')
exit(-1)
else:
# Get sigma parameters
i_sigma = mt_params.get('sigma')
# Conductivity file
if ('file' in i_sigma.keys()):
# No vectors conductivity
if (('horizontal' in i_sigma.keys()) or ('vertical' in i_sigma.keys())):
Print.master(' sigma parameters invalid. Please, verify the parameter file consistency.')
exit(-1)
else:
conductivity_from_file = True
# Vector conductivity
elif (('horizontal' in i_sigma.keys()) and ('vertical' in i_sigma.keys())):
# No file conductivity
if ('file' in i_sigma.keys()):
Print.master(' sigma parameters invalid. Please, verify the parameter file consistency.')
exit(-1)
else:
conductivity_from_file = False
else:
Print.master(' sigma parameters invalid. Please, verify the parameter file consistency.')
exit(-1)
# Check frequency and polarization parameters
base_params = ['frequency', 'polarization']
for i in np.arange(2):
if not (base_params[i] in mt_params.keys()):
m = ' ' + base_params[i] + ' parameter not provided for model. Please, verify the parameter file consistency.'
Print.master(m)
exit(-1)
# Determine number of polarization modes
num_polarizations = len(mt_params.get('polarization'))
return conductivity_from_file, num_polarizations
# ---------------------------------------------------------------
# Class Timer definition
# ---------------------------------------------------------------
[docs]class Timer():
"""
Definition of timer class.
"""
# Attributes
elapsed = None
# Initializing
def __init__(self, elapsed = 0):
"""Constructor."""
self._start = 0
self.elapsed = elapsed
# Start timer
# Stop timer
[docs] def stop(self):
"""Stop timer."""
if(self._start > 0):
self.elapsed += time.time() - self._start
self._start = 0
# Reset timer
# ---------------------------------------------------------------
# Class Timers definition
# ---------------------------------------------------------------
@singleton
class Timers():
"""
Defintion of timers class.
"""
# Initializing
def __init__(self, opath = None):
"""Constructor."""
# Create private dict for storing the timers
self._elems = {}
# Set the output path for writting the times report
self._opath = opath
# Obtain the process rank once
self.rank = MPIEnvironment().rank
# Prepare the output file
self._out = None
self._header = False
if MPIEnvironment().rank == 0 and self._opath:
# Save times for parallel scalability test
out_filename = self._opath + '/times.txt'
if os.path.isfile(out_filename):
self._out = open(out_filename, 'a')
else:
self._out = open(out_filename, 'w')
self._header = True
# Get an specific timer
def __getitem__(self, key):
"""Get item name for timer."""
return self._elems.setdefault(key, Timer())
# Set a specific
def __setitem__(self, key, value):
"""Set item name for timer."""
self._elems[key] = Timer(value)
# Write report
def _write(self):
"""Write timer."""
# Obtain the MPI environment
parEnv = MPIEnvironment()
# Create empty lists
block = []
time = []
for key, value in self._elems.items():
total = parEnv.comm.allreduce(value.elapsed, op=parEnv.MPI.SUM) / parEnv.num_proc
block.append(key)
time.append(total)
if( parEnv.rank == 0):
if( self._header):
self._out.write(" | # proc |" + "".join('%20s | ' % t for t in block) + "\n")
self._out.write(" | %-6s |" % parEnv.num_proc + "".join('%20.3f | ' % t for t in time) + "\n")
self._out.close()
# Deleting
def __del__(self):
"""Delete timers."""
rank = MPIEnvironment().rank
# For each stored element
for key, value in self._elems.items():
msg = ' [Process {:2d}] | {:40s} | {:4.3f} seconds'.format(rank, key, value.elapsed)
Print(msg, color_code=4)
self._write()
# ###############################################################
# ################ DECORATORS DEFINITION #################
# ###############################################################
# ---------------------------------------------------------------
# Decorators for code instrumentation
# ---------------------------------------------------------------
[docs]def measure_time(f = None, group = None, split = False):
""""Implement method to measure execution time.
Args:
f: the decorated function
group: the group name
split: decides if all blocks in a group contribute to the same timer
Returns:
a function wrap
"""
def inner_function(function):
@wraps(function)
def wrapper(*args, **kwargs):
# No group was specified
if not group:
name = function.__name__
# Each method is measured separately
elif split:
name = group + "." + function.__name__
# Each method contribute to the group time measure
else:
name = group
# Timer start
Timers()[name].start()
# Execute the decorated method
ret = function(*args, **kwargs)
# Timer stop
Timers()[name].stop()
return ret
return wrapper
# For the decorate a method without providing arguments
if f:
return inner_function(f)
return inner_function
[docs]def measure_all_class_methods(Cls):
""""Implement a decorator to measure execution time for each method.
Args:
f: the decorated function
Returns:
a function wrap
"""
class DecoratedClass(object):
def __init__(self,*args,**kwargs):
self.oInstance = Cls(*args,**kwargs)
self.className = self.oInstance.__class__.__name__
def __getattribute__(self,s):
try:
x = super(DecoratedClass, self).__getattribute__(s)
except AttributeError:
pass
else:
return x
x = self.oInstance.__getattribute__(s)
if type(x) == type(self.__init__): # it is an instance method
return measure_time(x, group=self.className, split=True)
else:
return x
return DecoratedClass
# ###############################################################
# ################ FUNCTIONS DEFINITION #################
# ###############################################################
# TODO
# ###############################################################
# ################ MAIN #################
# ###############################################################
if __name__ == '__main__':
# ---------------------------------------------------------------
# Run unitary test
# ---------------------------------------------------------------
unitary_test()