Source code for rapoc.models.model

import warnings

import astropy.constants as const
import astropy.units as u
import matplotlib.pyplot as plt
import numpy as np
import os

import rapoc.models
from rapoc.models.utils.validation import validate_inputs, validate_output


[docs]class Model: """ Base model class. Attributes ---------- input_data: str or dict data file name or data dictionary model_name: str model name band: tuple investigated band. Is initialised only after the use of :func:`~self.map` selected_grid: astropy.units.Quantity investigated grid. Is initialised only after the use of :func:`~self.map` mol: str molecule name mol_mass: astropy.units.Quantity molecular mass pressure_grid: astropy.units.Quantity data pressure grid in `si` units temperature_grid: astropy.units.Quantity data temperature grid in `si` units wavenumber_grid: astropy.units.Quantity data wavenumber grid opacities: astropy.units.Quantity data opacities grid in `si` units frequency_grid: astropy.units.Quantity data frequency grid wavelength_grid: astropy.units.Quantity data wavelength grid """ def __init__(self, input_data): """ Parameters ---------- input_data: str or dict data file name or input_data dictionary. If the input is a `str`, the correspondent loader is used (:class:`~rapoc.loaders.ExoMolFileLoader`) if the file format is supported. If is `dict`, then :class:`~rapoc.loaders.DictLoader` is used. Raises ------- IOError: if data format is not supported Examples --------- For the sake of semplicity let's assume we use an ExoMol file as input >>> from rapoc.models import Model >>> exomolFile = 'example.TauREx.h5' >>> model = Model(input_data=exomolFile) """ self.input_data = input_data self.model_name = 'model' self.selected_grid = None loaded = self._loader_selector() self.mol, self.mol_mass, self.pressure_grid, self.temperature_grid, self.wavenumber_grid, self.opacities, self.exceptions = loaded.read_content() self.frequency_grid = (self.wavenumber_grid * const.c).si self.wavelength_grid = 1 / self.wavenumber_grid.to(1 / u.um) self._map = None self.band = None
[docs] def opacity_model(self, opacities, nu, T_input): """ Computes the mean opacity in the investigated range. Parameters ---------- opacities: np.array opacity array. Has the same dimension of the frequency grid `nu` nu: np.array frequency grid T_input: float temperature Returns ------- float mean opacity computed from the model """ raise NotImplementedError
[docs] def extract_opacities(self, T_input, band, P_input=None, units='si'): """ Returns the input_data opacities for given pressure, temperature and band. Parameters ---------- T_input: float or np.array temperature to use for the estimation. This should be a `astropy.units.Quantity` with specified units, if not `K` are assumed as units. Can either be a single value or an array. band: tuple this should be a tuple of `astropy.units.Quantity` indicating the edges of the band to use in the estimation. The units used reveal the intention to work in wavelengths, wavenumbers or frequencies. P_input: float or np.array pressure to use for the estimation. This should be a `astropy.units.Quantity` with specified units, if not `Pa` are assumed as units. Can either be a single value or an array. It can be None in the case of data not depending on pressure, as Rayleigh scattering. Default is None. units: str or astropy.units, optional indicates the output units system. If `si` the opacity is returned as :math:`m^2/kg`, if `cgs` is returned as :math:`cm^2/g`. Instead of a string, you can also indicate the units using astropy.units. Units is `si` by default. Returns ------- astropy.units.Quantity returns the estimated opacity for the pressures and temperatures indicated. If only one pressure and temperature are indicated, it returns a single Quantity. If `n` pressures and `m` temperatures are indicated, it return a Quantity array of dimensions (n,m) list list of wavenumber (or wavelength or frequency) index relative to the investigated band """ P_input, T_input = validate_inputs(P_input, T_input, self.pressure_grid, self.temperature_grid, self.exceptions) if self.exceptions in ['rayleigh']: P_input = np.array([1])*u.Pa idw = self._wl_range_parser(band) _extract = np.zeros([P_input.size, T_input.size, idw.size]) * u.m ** 2 / u.kg for i, p in enumerate(P_input): for j, t in enumerate(T_input): idp = np.argmin(abs(self.pressure_grid - p.to(u.Pa))) idt = np.argmin(abs(self.temperature_grid - t.to(u.K))) _extract[i, j] = self.opacities[idp][idt][idw] return validate_output(_extract, units), idw
[docs] def estimate(self, T_input, band, P_input=None, mode='closest', units='si'): """ It estimates the model opacity in the indicated pressure and temperature grid for the indicated band, using the :func:`~rapoc.models.model.Model.opacity_model`. Parameters ---------- T_input: float or np.array temperature to use for the estimation. This should be a `astropy.units.Quantity` with specified units, if not `K` are assumed as units. Can either be a single value or an array. band: tuple this should be a tuple of `astropy.units.Quantity` indicating the edges of the band to use in the estimation. The units used reveal the intention to work in wavelengths, wavenumbers or frequencies. P_input: float or np.array pressure to use for the estimation. This should be a `astropy.units.Quantity` with specified units, if not `Pa` are assumed as units. Can either be a single value or an array. It can be None in the case of data not depending on pressure, as Rayleigh scattering. Default is None. mode: str, optional indicates the estimation mode desired. If `closest` the output will be the opacity computed from the data at the nearest pressure on the data grid to the indicated one, and at the nearest temperature on the data grid to the indicated one. If {'linear', 'loglinear'} it's used the :func:`scipy.interpolate.griddata` with the indicated `kind`. To interpolate a map of opacities over the data grid is computed first using :func:`~rapoc.models.model.Model.map`. Mode is `closest` by default. units: str or astropy.units, optional indicates the output units system. If `si` the opacity is returned as :math:`m^2/kg`, if `cgs` is returned as :math:`cm^2/g`. Instead of a string, you can also indicate the units using astropy.units. Units is `si` by default. Returns ------- astropy.units.Quantity returns the estimated opacity for the pressures and temperatures indicated. If only one pressure and temperature are indicated, it returns a single Quantity. If `n` pressures and `m` temperatures are indicated, it return a Quantity array of dimensions (n,m) Raises ------ NotImplementedError: if the indicated mode is not supported astropy.units.UnitConversionError: if the input units cannot be converted to `si` AttributeError: if the band cannot be interpreted as wavelength, wavenumber or frequency """ P_input, T_input = validate_inputs(P_input, T_input, self.pressure_grid, self.temperature_grid, self.exceptions) if self.exceptions is None: if mode == 'closest': _estimate = np.zeros([P_input.size, T_input.size]) * u.m ** 2 / u.kg idw = self._wl_range_parser(band) frequency = self.frequency_grid[idw] for i, p in enumerate(P_input): for j, t in enumerate(T_input): idp = np.argmin(abs(self.pressure_grid - p.to(u.Pa))) idt = np.argmin(abs(self.temperature_grid - t.to(u.K))) _estimate[i, j] = self.opacity_model(self.opacities[idp][idt][idw], frequency, self.temperature_grid[idt]) elif mode in ['linear', 'loglinear']: from scipy import interpolate _map = self.map(band).to(u.m ** 2 / u.kg).value points, map_values = [], [] for i in range(self.pressure_grid.size): for j in range(self.temperature_grid.size): if mode == 'linear': points.append((self.pressure_grid[i].value, self.temperature_grid[j].value)) else: points.append((np.log10(self.pressure_grid[i].value), self.temperature_grid[j].value)) map_values.append(_map[i, j]) _estimate = np.zeros([P_input.size, T_input.size]) * u.m ** 2 / u.kg for i, p in enumerate(P_input.value): for j, t in enumerate(T_input.value): if mode == 'linear': _estimate[i, j] = interpolate.griddata(points, np.array(map_values), (p, t), method=mode, rescale=True) * u.m ** 2 / u.kg else: _estimate[i, j] = interpolate.griddata(points, np.array(map_values), (np.log10(p), t), method='linear', rescale=True) * u.m ** 2 / u.kg else: raise NotImplementedError('The indicated mode is not supported.') return validate_output(_estimate, units) # handling rayleigh elif self.exceptions in ['rayleigh']: _estimate = np.zeros([1, T_input.size]) * u.m ** 2 / u.kg idw = self._wl_range_parser(band) frequency = self.frequency_grid[idw] for j, t in enumerate(T_input): _estimate[0, j] = self.opacity_model(self.opacities[0][0][idw], frequency, t) return validate_output(_estimate, units)
[docs] def estimate_plot(self, T_input, band, P_input=None, mode='closest', force_map=False, fig=None, ax=None, yscale='log', xscale='linear', grid='wavelength'): """ Produces a plot of the estimates (produced with :func:`~rapoc.models.model.Model.estimate`), comparing the raw data with the result. If a single estimate is to be plotted, this method produces a plot of raw opacities vs wavelength from the opacities (in grey) and the mean opacity estimated. If multiple estimates are be plotted, it produces a 3D plot, with the surface of mean opacity vs temperature and pressure from the data grid (using :func:`~rapoc.models.model.Model.map_plot`) and the interpolated data superimposed. Parameters ---------- T_input: float or np.array temperature to use for the estimation. This should be a `astropy.units.Quantity` with specified units, if not `K` are assumed as units. Can either be a single value or an array. band: tuple this should be a tuple of `astropy.units.Quantity` indicating the edges of the band to use in the estimation. The units used reveal the intention to work in wavelengths, wavenumbers or frequencies. P_input: float or np.array pressure to use for the estimation. This should be a `astropy.units.Quantity` with specified units, if not `Pa` are assumed as units. Can either be a single value or an array. It can be None in the case of data not depending on pressure, as Rayleigh scattering. Default is None. mode: str, optional indicates the estimation mode desired. If `closest` the output will be the opacity computed from the data at the nearest pressure on the data grid to the indicated one, and at the nearest temperature on the data grid to the indicated one. If {'linear', 'loglinear'} it's used the :func:`scipy.interpolate.griddata` with the indicated `kind`. Mode is `closest` by default. force_map: bool If True a 3D map plot is generate even for a single estimate. Default is `False`. fig: matplotlib.pyplot.figure, optional indicates the figure to use. If `None` a new figure is produced. Default is `None` ax: matplotlib.pyplot.axis, optional indicates the axis of the figure to use. If `None` a new axis is produced. Default is `None` yscale: str y-axis scale to use. Default is `log` xscale: str x-axis scale to use. Default is `linear` grid: str x-axis grid format. {`wavelength`, `frequency`, `wavenumber`} are supported. Default is `wavelength`. Returns ------- matplotlib.pyplot.figure, optional figure containing the plot matplotlib.pyplot.axis, optional axis containing the plot Raises ------ KeyError: if the indicated grid is not supported """ r = self.estimate(P_input=P_input, T_input=T_input, band=band, mode=mode) P_input, T_input = validate_inputs(P_input, T_input, self.pressure_grid, self.temperature_grid, self.exceptions) if r.size == 1 and not force_map: idw = self._wl_range_parser(band) if grid == 'wavelength': grid_plot = self.wavelength_grid[idw] elif grid == 'frequency': grid_plot = self.frequency_grid[idw] elif grid == 'wavenumber': grid_plot = self.wavenumber_grid[idw] else: raise KeyError('not supported grid') if (fig is None) and (ax is None): fig, ax = plt.subplots(1, 1) ax.set_xlabel(r'{} [${}$]'.format(grid, grid_plot.unit)) ax.set_ylabel(r'${}$'.format(self.opacities.unit)) ax.set_title(self.model_name) idp = np.argmin(abs(self.pressure_grid - P_input.to(u.Pa))) idt = np.argmin(abs(self.temperature_grid - T_input.to(u.K))) ax.plot(grid_plot, self.opacities[idp][idt][idw], label='opacities', c='grey', alpha=0.2) ax.axhline(r.value, lw=1, label=self.model_name) ax.set_yscale(yscale) ax.set_xscale(xscale) return fig, ax else: fig, ax = self.map_plot(band, fig=fig, ax=ax) T_input, P_input = np.meshgrid(T_input, P_input) ax.scatter(T_input, np.log10(P_input.to(u.Pa).value), r, ) return fig, ax
[docs] def map(self, band): """ Returns the mean opacity map for the indicated model. Parameters ---------- band: tuple this should be a tuple of `astropy.units.Quantity` indicating the edges of the band to use in the estimation. The units used reveal the intention to work in wavelengths, wavenumbers or frequencies. Returns ------- np.array mean opacity map in `si` units Notes ----- If the map has been already built for the indicated band, then the method returns the previous map. This speeds up the use of the code in external functions. """ if self.band is not None: band_check = any(x != y for x, y in zip(band, self.band)) else: band_check = True if band_check: warnings.warn('Computing map.', UserWarning) self.band = band idw = self._wl_range_parser(band) frequency = self.frequency_grid[idw] _map = np.zeros([self.pressure_grid.size, self.temperature_grid.size]) * u.m ** 2 / u.kg for i in range(self.pressure_grid.size): for j in range(self.temperature_grid.size): _map[i, j] = self.opacity_model(self.opacities[i][j][idw], frequency, self.temperature_grid[j]) _map = validate_output(_map) self._map = _map return self._map
[docs] def map_plot(self, band, fig=None, ax=None): """ Produces a 3D-plot of the model mean opacity map built with :func:`~rapoc.models.model.Model.map`. Parameters ---------- band: tuple this should be a tuple of `astropy.units.Quantity` indicating the edges of the band to use in the estimation. The units used reveal the intention to work in wavelengths, wavenumbers or frequencies. fig: matplotlib.pyplot.figure, optional indicates the figure to use. If `None` a new figure is produced. Default is `None` ax: matplotlib.pyplot.axis, optional indicates the axis of the figure to use. If `None` a new axis is produced. Default is `None` Returns ------- matplotlib.pyplot.figure, optional figure containing the plot matplotlib.pyplot.axis, optinal axis containing the plot """ from matplotlib import cm t, p = np.meshgrid(self.temperature_grid, self.pressure_grid) if (fig is None) and (ax is None): fig = plt.figure() ax = fig.add_subplot(111, projection='3d') ax.set_ylabel('log(Pressure [Pa])') ax.set_xlabel('Temperature [K]') ax.set_zlabel(r'${}$'.format(self.opacities.unit)) ax.set_title(self.model_name) if not hasattr(ax, 'plot_surface'): raise AttributeError('input axes should have 3D projection. ' 'Please refer to https://matplotlib.org/3.1.1/gallery/mplot3d/subplot3d.html') ax.plot_surface(t, np.log10(p.value), self.map(band).to(u.m ** 2 / u.kg), cmap=cm.coolwarm, alpha=0.7) return fig, ax
def _loader_selector(self): if isinstance(self.input_data, str): if 'TauREx.h5' in self.input_data: from rapoc.loaders import ExoMolFileLoader return ExoMolFileLoader(filename=self.input_data) elif [f for f in os.listdir(self.input_data) if f.endswith('.bin')]: from rapoc.loaders import DACEFileLoader return DACEFileLoader(filename=self.input_data) else: raise IOError('file extension not supported') elif isinstance(self.input_data, dict): from rapoc.loaders import DictLoader return DictLoader(input_data=self.input_data) elif isinstance(self.input_data, rapoc.models.Rayleigh): return self.input_data else: raise IOError('Data format not supported') def _wl_range_parser(self, investigated_range): try: investigated_range[0].to(u.um) lookup = self.wavelength_grid except u.UnitConversionError: try: investigated_range[0].to(1 / u.um) lookup = self.wavenumber_grid except u.UnitConversionError: try: investigated_range[0].to(u.Hz) lookup = self.frequency_grid except u.UnitConversionError: raise AttributeError('wrong range format') if investigated_range[0] > investigated_range[1]: investigated_range = reversed(investigated_range) idx = np.where(np.logical_and(lookup <= investigated_range[1], lookup >= investigated_range[0]))[0] self.selected_grid = lookup[idx] return idx