Source code for galsim.sed

# Copyright (c) 2012-2023 by the GalSim developers team on GitHub
# https://github.com/GalSim-developers
#
# This file is part of GalSim: The modular galaxy image simulation toolkit.
# https://github.com/GalSim-developers/GalSim
#
# GalSim is free software: redistribution and use in source and binary forms,
# with or without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
#    list of conditions, and the disclaimer given in the accompanying LICENSE
#    file.
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions, and the disclaimer given in the documentation
#    and/or other materials provided with the distribution.
#

__all__ = [ 'SED', 'EmissionLine' ]

import numpy as np
from astropy import units
from astropy import constants
from numbers import Real

from .table import LookupTable, _LookupTable
from ._utilities import WeakMethod, lazy_property, basestring
from .errors import GalSimError, GalSimValueError, GalSimRangeError, GalSimSEDError
from .errors import GalSimIncompatibleValuesError
from .random import DistDeviate
from . import utilities
from . import integ
from . import dcr
from . import gsobject

[docs]class SED: """Object to represent the spectral energy distributions of stars and galaxies. SEDs are callable, usually returning the flux density in photons/nm/cm^2/s as a function of wavelength, though SEDs are also used by GalSim to track dimensionless wavelength-dependent normalizations, and may thus also return dimensionless values. By default, the above wavelength used by __call__ is nanometers, but it's possible to use other units via the astropy.units module (at least, if the SED keyword argument ``fast=False``, see below). For instance,:: >>> sed = galsim.SED(...) >>> from astropy import units as u >>> assert sed(500) == sed(5000 * u.AA) # 500 nm == 5000 Angstroms The python type of the return value depends on the type of the input wavelength(s). A scalar input wavelength yields a scalar flux density, a tuple yields a tuple, a list yields a list, and a numpy.ndarray yields a numpy.ndarray. A scalar astropy.units.Quantity yields a python scalar, and a vector astropy.units.Quantity yields a numpy.ndarray. SEDs are immutable; all transformative SED methods return *new* SEDs, and leave their originating SEDs unaltered. SEDs have ``blue_limit`` and ``red_limit`` attributes, which indicate the range over which the SED is defined. An exception will be raised if the flux density or normalization is requested outside of this range. Note that ``blue_limit`` and ``red_limit`` are always in nanometers and in the observed frame when ``redshift != 0``. SEDs may be multiplied by scalars or scalar functions of wavelength. In particular, an SED multiplied by a `Bandpass` will yield the appropriately filtered SED. Two SEDs may be multiplied together if at least one of them represents a dimensionless normalization. SEDs may be added together if they are at the same redshift. The resulting SED will only be defined on the wavelength region where both of the operand SEDs are defined. ``blue_limit`` and ``red_limit`` will be reset accordingly. The input parameter, ``spec``, may be one of several possible forms: 1. a regular python function (or an object that acts like a function) 2. a `LookupTable` 3. a file from which a `LookupTable` can be read in 4. a string which can be evaluated to a function of ``wave`` via ``eval('lambda wave:'+spec)``, e.g.:: spec = '0.8 + 0.2 * (wave-800)' 5. a python scalar (only possible for dimensionless SEDs) The argument of ``spec`` should be the wavelength in units specified by ``wave_type``, which should be an instance of ``astropy.units.Unit`` of equivalency class ``astropy.units.spectral``, or one of the case-insensitive aliases 'nm', 'nanometer', 'nanometers', 'A', 'Ang', 'Angstrom', or 'Angstroms'. Note that ``astropy.units.spectral`` includes not only units with dimensions of length, but also frequency, energy, or wavenumber. The return value of ``spec`` should be a spectral density with units specified by ``flux_type``, which should be an instance of ``astropy.units.Unit`` of equivalency class ``astropy.units.spectral_density``, or one of the case-insensitive aliases: 1. 'flambda': erg/wave_type/cm^2/s, where wave_type is as above. 2. 'fnu': erg/Hz/cm^2/s 3. 'fphotons': photons/wave_type/cm^2/s, where wave_type is as above. 4. '1': dimensionless Note that the ``astropy.units.spectral_density`` class includes units with dimensions of [energy/time/area/unit-wavelength], [energy/time/area/unit-frequency], [photons/time/area/unit-wavelength], and so on. Finally, the optional ``fast`` keyword option is used to specify when unit and dimension changes are executed, particularly for SEDs specified by a `LookupTable`. If ``fast=True``, the default, then the input units/dimensions may be converted to an internal working unit before interpolation in wavelength is performed. Alternatively, ``fast=False`` implies that interpolation should take place in the native units of the input ``spec``, and subsequently flux density converted to photons/cm^2/s/nm afterwards. Generally, the former option is faster, but may be less accurate since interpolation and dimensionality conversion do not commute. One consequence of using ``fast=True`` is that __call__ can not accept an ``astropy.units.Quantity`` in this case. Parameters: spec: Function defining the z=0 spectrum at each wavelength. See above for valid options for this parameter. wave_type: String or astropy.unit specifying units for wavelength input to ``spec``. flux_type: String or astropy.unit specifying what type of spectral density or dimensionless normalization ``spec`` represents. See above for valid options for this parameter. redshift: Optionally shift the spectrum to the given redshift. [default: 0] fast: Convert units on initialization instead of on __call__. [default: True] interpolant: If reading from a file, what interpolant to use. [default: 'linear'] """ # We'll use these multiple times below, and they are ridiculously slow to construct, # so just make them once at the class level. _fphotons_base = units.astrophys.photon/(units.s * units.cm**2) _flambda_base = units.erg/(units.s * units.cm**2) _fphotons = _fphotons_base / units.nm _flambda = _flambda_base / units.nm _fnu = units.erg / (units.s * units.Hz * units.cm**2) _spec_nm = units.spectral_density(1*units.nm) _c = constants.c.to('nm/s').value _h = constants.h.to('erg s').value _dimensionless = units.dimensionless_unscaled _bolo_max_wave = 1.e30 # What we use as "infinity" for bolometric flux calculations. def __init__(self, spec, wave_type, flux_type, redshift=0., fast=True, interpolant='linear', _blue_limit=0.0, _red_limit=np.inf, _wave_list=None): self._flux_type = flux_type # Need to save the original for repr self.interpolant = interpolant # Parse the various options for wave_type self.wave_type, self.wave_factor = self._parse_wave_type(wave_type) # Parse the various options for flux_type self.flux_factor = None if isinstance(flux_type, str): if flux_type.lower() == 'flambda': self.flux_type = 'flambda' self.spectral = True self.flux_factor = 1. / (SED._h * SED._c) elif flux_type.lower() == 'fphotons': self.spectral = True if self.wave_factor is not None: self.flux_type = 'fphotons' self.flux_factor = self.wave_factor else: self.flux_type = SED._fphotons elif flux_type.lower() == 'fnu': self.spectral = True if self.wave_factor is not None: self.flux_type = 'fnu' self.flux_factor = self.wave_factor / SED._h else: self.flux_type = SED._fnu elif flux_type == '1': self.flux_type = '1' self.spectral = False else: raise GalSimValueError("Unknown flux_type", flux_type, ('flambda', 'fnu', 'fphotons', '1')) else: self.flux_type = flux_type self.spectral = self.check_spectral() if not self.spectral and not self.check_dimensionless(): raise GalSimValueError( "Flux_type must be equivalent to a spectral density or dimensionless.", flux_type) try: if self.wave_factor and self.spectral: self.flux_factor = (1*self.flux_type).to(SED._fphotons).value self.flux_type = 'fphotons' except units.UnitConversionError: try: self.flux_factor = (1*self.flux_type).to(SED._flambda).value self.flux_factor /= SED._h * SED._c * self.wave_factor self.flux_type = 'flambda' except units.UnitConversionError: try: self.flux_factor = (1*self.flux_type).to(SED._fnu).value self.flux_factor *= self.wave_factor / SED._h self.flux_type = 'fnu' except units.UnitConversionError: self.wave_type = units.Unit(self.wave_type) self.redshift = redshift self.fast = fast # Convert string input into a real function (possibly a LookupTable) self._orig_spec = spec # Save this for pickling self._initialize_spec() # Setup the wave_list, red_limit, blue_limit if _wave_list is not None: self.wave_list = _wave_list self.blue_limit = float(_blue_limit) self.red_limit = float(_red_limit) elif isinstance(self._spec, LookupTable): self.wave_list = np.array(self._spec.getArgs()) if self.wave_factor: self.wave_list *= (1.0 + self.redshift) / self.wave_factor else: self.wave_list = (self.wave_list*self.wave_type).to(units.nm, units.spectral()).value self.wave_list *= (1.0 + self.redshift) self.blue_limit = float(np.min(self.wave_list)) self.red_limit = float(np.max(self.wave_list)) else: self.blue_limit = 0.0 self.red_limit = np.inf self.wave_list = np.array([], dtype=float) # Define the appropriate functions to call self._setup_funcs() @staticmethod def _parse_wave_type(wave_type): # Parse the various options for wave_type. # Returns wave_type, wave_factor if isinstance(wave_type, str): if wave_type.lower() in ('nm', 'nanometer', 'nanometers'): return 'nm', 1. elif wave_type.lower() in ('a', 'ang', 'angstrom', 'angstroms'): return 'Angstrom', 10. else: raise GalSimValueError("Unknown wave_type", wave_type, ('nm', 'Angstrom')) else: try: wave_factor = (1*units.nm).to(wave_type).value if wave_factor == 1.: return 'nm', 1. elif abs(wave_factor-10.) < 2.e-15: # This doesn't come out exactly 10. return 'Angstrom', 10. else: return units.Unit(wave_type), wave_factor except units.UnitConversionError: return units.Unit(wave_type), None def _setup_funcs(self): # Set up the various functions we use to do the right calculation based on which # wave type and/or flux type we have for _spec. # The astropy unit functions are horribly slow, so we want to avoid them as much as # possible. If the wave_type and flux_type are one of the simpler (and most common) # types, then we have custom functions that do the necessary conversions directly. if self.wave_factor == 1: self._get_native_waves = WeakMethod(self._get_native_waves_trivial) elif self.wave_factor: self._get_native_waves = WeakMethod(self._get_native_waves_fast) else: self._get_native_waves = WeakMethod(self._get_native_waves_slow) if self.redshift == 0.: self._get_rest_native_waves = self._get_native_waves elif self.wave_factor: self._get_rest_native_waves = WeakMethod(self._get_rest_native_waves_fast) else: self._get_rest_native_waves = WeakMethod(self._get_rest_native_waves_slow) if self.flux_type == 'fphotons': #assert self.flux_factor is not None self._flux_to_photons = WeakMethod(self._flux_to_photons_fphot) elif self.flux_type == 'flambda': #assert self.flux_factor is not None self._flux_to_photons = WeakMethod(self._flux_to_photons_flam) elif self.flux_type == 'fnu': #assert self.flux_factor is not None self._flux_to_photons = WeakMethod(self._flux_to_photons_fnu) else: self._flux_to_photons = WeakMethod(self._flux_to_photons_slow) if self.fast: self._call = WeakMethod(self._call_fast) else: self._call = WeakMethod(self._call_slow) # Here are the definitions for the various functions we can use depending on the wave_type # and flux_type (cf. _setup_funcs). def _get_native_waves_trivial(self, wave): return wave def _get_native_waves_fast(self, wave): return np.asarray(wave) * self.wave_factor def _get_native_waves_slow(self, wave): return (wave * units.nm).to(self.wave_type, units.spectral()).value def _get_rest_native_waves_fast(self, wave): return np.asarray(wave) * (self.wave_factor / (1.0+self.redshift)) def _get_rest_native_waves_slow(self, wave): return (wave / (1.0+self.redshift) * units.nm).to(self.wave_type, units.spectral()).value def _flux_to_photons_fphot(self, flux_native, wave_native): return flux_native * self.flux_factor def _flux_to_photons_flam(self, flux_native, wave_native): return flux_native * wave_native * self.flux_factor def _flux_to_photons_fnu(self, flux_native, wave_native): return flux_native / wave_native * self.flux_factor def _flux_to_photons_slow(self, flux_native, wave_native): return (flux_native * self.flux_type).to( SED._fphotons, units.spectral_density(wave_native * self.wave_type)).value def _initialize_spec(self): # Turn the input _orig_spec into a real function _spec. # The function cannot be pickled, so will need to do this in getstate as well as init. self._const = False if isinstance(self._orig_spec, (int, float)): if not self.dimensionless: raise GalSimSEDError("Attempt to set spectral SED using float or integer.", self) self._const = True self._spec = lambda w: float(self._orig_spec) elif isinstance(self._orig_spec, basestring): isfile, filename = utilities.check_share_file(self._orig_spec, 'SEDs') if isfile: self._spec = LookupTable.from_file(filename, interpolant=self.interpolant) else: # If a constant function is input as a string (e.g. '1'), then we want to # make sure it is flagged as a const SED. try: float(self._orig_spec) except ValueError: pass else: self._const = True self._spec = lambda w: float(self._orig_spec) return # Don't catch ArithmeticErrors when testing to see if the the result of `eval()` # is valid since `spec = '1./(wave-700)'` will generate a ZeroDivisionError (which # is a subclass of ArithmeticError) despite being a valid spectrum specification, # while `spec = 'blah'` where `blah` is undefined generates a NameError and is not # a valid spectrum specification. # Are there any other types of errors we should trap here? try: self._spec = utilities.math_eval('lambda wave : ' + self._orig_spec) test_value = self._spec(700.0) except ArithmeticError: test_value = 0 except Exception as e: raise GalSimValueError( "String spec must either be a valid filename or something that " "can eval to a function of wave.\n" "Caught error: {0}".format(e), self._orig_spec) if not isinstance(test_value, Real): raise GalSimValueError("The given SED function did not return a valid number " "at test wavelength %s: got %s"%(700.0, test_value), self._orig_spec) else: self._spec = self._orig_spec
[docs] def check_spectral(self): """Return boolean indicating if SED has units compatible with a spectral density.""" return self.flux_type.is_equivalent(SED._fphotons, SED._spec_nm)
[docs] def check_dimensionless(self): """Return boolean indicating if SED is dimensionless.""" if self.flux_type.is_equivalent(SED._dimensionless): self._flux_type = '1' # The astropy.units.dimensionless_unscaled object isn't properly reprable. # So switch to using '1' in these cases. return True else: return False
@property def dimensionless(self): # for convenience """Whether the object is dimensionless (rather than spectral). """ return not self.spectral def _rest_nm_to_photons(self, wave): wave_native = self._get_native_waves(wave) flux_native = self._spec(wave_native) return self._flux_to_photons(flux_native, wave_native) def _rest_nm_to_dimensionless(self, wave): return self._spec(self._get_native_waves(wave)) def _check_bounds(self, wave): if hasattr(wave, '__iter__'): wmin = np.min(wave) wmax = np.max(wave) else: wmin = wmax = wave extrapolation_slop = 1.e-6 # allow a small amount of extrapolation if wmin < self.blue_limit - extrapolation_slop: raise GalSimRangeError("Requested wavelength is bluer than blue_limit.", wave, self.blue_limit, self.red_limit) if wmax > self.red_limit + extrapolation_slop: raise GalSimRangeError("Requested wavelength is redder than red_limit.", wave, self.blue_limit, self.red_limit) @lazy_property def _fast_spec(self): # Create a fast version of self._spec by constructing a LookupTable on self.wave_list if self.wave_factor == 1. and self.flux_factor == 1.: return self._spec else: if len(self.wave_list) == 0: if self.spectral: return WeakMethod(self._rest_nm_to_photons) else: return WeakMethod(self._rest_nm_to_dimensionless) else: x = self.wave_list / (1.0 + self.redshift) if self.spectral: f = self._rest_nm_to_photons(x) else: f = self._rest_nm_to_dimensionless(x) interp = self._spec.interpolant if isinstance(self._spec, LookupTable) else 'linear' return _LookupTable(x, f, interpolant=interp) def _call_fast(self, wave): """Return either flux in photons / sec / cm^2 / nm, or dimensionless normalization. Assumes that self._spec has already been transformed to accept correct wavelength units and yield correct flux units. Parameters: wave: Wavelength in nanometers. Returns: Flux or normalization. """ self._check_bounds(wave) return self._fast_spec(np.asarray(wave) / (1.0 + self.redshift)) def _call_slow(self, wave): """Return flux in photons / sec / cm^2 / nm or dimensionless normalization. Uses self._spec that has not been pre-transformed for desired units, instead does all unit conversions inside this method. Parameters: wave: Wavelength. If not an astropy.units.Quantity, then assumed units are nanometers. Returns: Flux. """ wave_in = wave # Convert wave to nanometers if needed. if isinstance(wave, units.Quantity): wave = wave.to(units.nm, units.spectral()).value wave = np.asarray(wave) self._check_bounds(wave) # Figure out rest-frame wave_type wavelength array for query to self._spec. rest_wave_native = self._get_rest_native_waves(wave) out = self._spec(rest_wave_native) # Manipulate output units if self.spectral: out = self._flux_to_photons(out, rest_wave_native) return out
[docs] def __call__(self, wave): """Return photon flux density or dimensionless normalization at wavelength ``wave``. Note that outside of the wavelength range defined by the ``blue_limit`` and ``red_limit`` attributes, the SED is considered undefined, and this method will raise an exception if a wavelength outside the defined range is passed as an argument. Parameters: wave: Wavelength in nanometers at which to evaluate the SED. May be a scalar, a numpy.array, or an astropy.units.Quantity Returns: photon flux density in units of photons/nm/cm^2/s if self.spectral, or dimensionless normalization if self.dimensionless. """ ret = self._call(wave) if self._const: ret = np.full_like(wave, ret, dtype=float) return ret
[docs] def _mul_sed(self, other): """Equivalent to self * other when other is an SED, but no sanity checks.""" # There should only be one SED with a non-trivial redshift, so adding them # should always give us the right net redshift to use. redshift = self.redshift + other.redshift fast = self.fast and other.fast wave_list, blue_limit, red_limit = utilities.combine_wave_list(self, other) if fast: if (isinstance(self._fast_spec, LookupTable) and not self._fast_spec.x_log and not self._fast_spec.f_log): x = wave_list / (1.0 + self.redshift) # Add in 500 uniformly spaced values to help improve accuracy. x = utilities.merge_sorted([x, np.linspace(x[0], x[-1], 500)]) zfactor2 = (1.+redshift) / (1.+other.redshift) f = self._fast_spec(x) * other._fast_spec(x*zfactor2) spec = _LookupTable(x, f, self._fast_spec.interpolant) elif (isinstance(other._fast_spec, LookupTable) and not other._fast_spec.x_log and not other._fast_spec.f_log): x = wave_list / (1.0 + other.redshift) x = utilities.merge_sorted([x, np.linspace(x[0], x[-1], 500)]) zfactor1 = (1.+redshift) / (1.+other.redshift) f = self._fast_spec(x*zfactor1) * other._fast_spec(x) spec = _LookupTable(x, f, other._fast_spec.interpolant) else: zfactor1 = (1.+redshift) / (1.+self.redshift) zfactor2 = (1.+redshift) / (1.+other.redshift) spec = lambda w: self._fast_spec(w * zfactor1) * other._fast_spec(w * zfactor2) else: spec = lambda w: self(w * (1.+redshift)) * other(w * (1.+redshift)) spectral = self.spectral or other.spectral flux_type = 'fphotons' if spectral else '1' return SED(spec, 'nm', flux_type, redshift=redshift, fast=fast, _blue_limit=blue_limit, _red_limit=red_limit, _wave_list=wave_list)
[docs] def _mul_bandpass(self, other): """Equivalent to self * other when other is a Bandpass""" wave_list, blue_limit, red_limit = utilities.combine_wave_list(self, other) zfactor = (1.0+self.redshift) / other.wave_factor if self.fast: if (isinstance(self._fast_spec, LookupTable) and not self._fast_spec.x_log and not self._fast_spec.f_log): x = wave_list / (1.0 + self.redshift) # Add in 500 uniformly spaced values to help improve accuracy. x = utilities.merge_sorted([x, np.linspace(x[0], x[-1], 500)]) f = self._fast_spec(x) * other._tp(x*zfactor) spec = _LookupTable(x, f, self._fast_spec.interpolant) else: spec = lambda w: self._fast_spec(w) * other._tp(w*zfactor) else: spec = lambda w: self(w*(1.0+self.redshift)) * other._tp(w*zfactor) return SED(spec, 'nm', 'fphotons', redshift=self.redshift, fast=self.fast, _blue_limit=blue_limit, _red_limit=red_limit, _wave_list=wave_list)
[docs] def _mul_scalar(self, other, spectral): """Equivalent to self * other when other is a scalar""" # If other is a scalar and self._spec a LookupTable, then remake that LookupTable. if isinstance(self._spec, LookupTable): wave_type = self.wave_type flux_type = self._flux_type x = self._spec.getArgs() f = np.array(self._spec.getVals()) * other spec = _LookupTable(x, f, x_log=self._spec.x_log, f_log=self._spec.f_log, interpolant=self._spec.interpolant) elif self._const and not spectral: spec = self._spec(42.0) * other wave_type = 'nm' flux_type = '1' else: wave_type = 'nm' flux_type = 'fphotons' if spectral else '1' if self.fast: spec = lambda w: self._fast_spec(w) * other else: spec = lambda w: self(w*(1.0+self.redshift)) * other return SED(spec, wave_type, flux_type, redshift=self.redshift, fast=self.fast, _blue_limit=self.blue_limit, _red_limit=self.red_limit, _wave_list=self.wave_list)
[docs] def __mul__(self, other): """Multiply the SED by something. There are several possibilities: 1. SED * SED -> SED (at least one must be dimensionless) 2. SED * GSObject -> ChromaticObject 3. SED * Bandpass -> SED (treating throughput similarly to dimensionless SED) 4. SED * callable function -> SED (treating function as dimensionless SED) 5. SED * scalar -> SED """ # Watch out for 5 types of `other`: # 1. SED: Check that not both spectral densities. # 2. GSObject: return a ChromaticObject(). # 3. Bandpass: return an SED, but carefully propagate blue/red limit and wave_list. # 4. Callable: return an SED # 5. Scalar: return an SED # # Additionally, check for shortcuts when self._const # Product of two SEDs if isinstance(other, SED): if self.spectral and other.spectral: raise GalSimIncompatibleValuesError( "Cannot multiply two spectral densities together.", self_sed=self, other=other) if other._const: # const, so can eval anywhere. return self._mul_scalar(other._spec(42.0), self.spectral or other.spectral) elif self._const: return other._mul_scalar(self._spec(42.0), self.spectral or other.spectral) else: return self._mul_sed(other) # Product of SED and achromatic GSObject is a SimpleChromaticTransformation. elif isinstance(other, gsobject.GSObject): return other * self # Product of SED and Bandpass is (filtered) SED. The `redshift` attribute is retained. elif isinstance(other, Bandpass): return self._mul_bandpass(other) # Product of SED with generic callable is also a (filtered) SED, with retained `redshift`. elif hasattr(other, '__call__'): if self.fast: spec = lambda w: self._fast_spec(w) * other(w*(1.0+self.redshift)) else: spec = lambda w: self(w*(1.0+self.redshift)) * other(w*(1.0+self.redshift)) flux_type = 'fphotons' if self.spectral else '1' return SED(spec, 'nm', flux_type, redshift=self.redshift, fast=self.fast, _blue_limit=self.blue_limit, _red_limit=self.red_limit, _wave_list=self.wave_list) elif isinstance(other, (int, float)): return self._mul_scalar(other, self.spectral) else: raise TypeError("Cannot multiply an SED by %s"%(other))
def __rmul__(self, other): return self*other def __div__(self, other): # Enable division by scalars or dimensionless callables (including dimensionless SEDs.) if isinstance(other, SED) and other.spectral: raise GalSimSEDError("Cannot divide by spectral SED.", other) if hasattr(other, '__call__'): spec = lambda w: self(w * (1.0 + self.redshift)) / other(w * (1.0 + self.redshift)) elif isinstance(self._spec, LookupTable): # If other is not a function, then there is no loss of accuracy by applying the # factor directly to the LookupTable, if that's what we are using. # Make sure to keep the same properties about the table, flux_type, wave_type. x = self._spec.getArgs() f = [ val / other for val in self._spec.getVals() ] spec = _LookupTable(x, f, x_log=self._spec.x_log, f_log=self._spec.f_log, interpolant=self._spec.interpolant) else: spec = lambda w: self(w * (1.0 + self.redshift)) / other return SED(spec, flux_type=self.flux_type, wave_type=self.wave_type, redshift=self.redshift, fast=self.fast, _wave_list=self.wave_list, _blue_limit=self.blue_limit, _red_limit=self.red_limit) __truediv__ = __div__ def __add__(self, other): # Add together two SEDs, with the following caveats: # 1) The SEDs must have the same redshift. # 2) The resulting SED will be defined on the wavelength range set by the overlap of the # wavelength ranges of the two SED operands. # 3) The new `wave_list` will be the union of the operand `wave_list`s in the intersecting # region, even if one or both of the `wave_list`s are empty. # These conditions ensure that SED addition is commutative. if self.redshift != other.redshift: raise GalSimIncompatibleValuesError( "Can only add SEDs with same redshift.", self_sed=self, other=other) if self.dimensionless and other.dimensionless: flux_type = '1' elif self.spectral and other.spectral: flux_type = 'fphotons' else: raise GalSimIncompatibleValuesError( "Cannot add SEDs with incompatible dimensions.", self_sed=self, other=other) wave_list, blue_limit, red_limit = utilities.combine_wave_list(self, other) # If both SEDs are `fast`, and both `_fast_spec`s are LookupTables, then make a new # LookupTable instead and preserve picklability. # First need to make sure self._fast_spec and other._fast_spec are initialized. Can do this # by evaluating them at a good wavelength. blue_limit should work. self(blue_limit) other(blue_limit) if (self.fast and other.fast and isinstance(self._fast_spec, LookupTable) and isinstance(other._fast_spec, LookupTable) and not self._fast_spec.x_log and not other._fast_spec.x_log and not self._fast_spec.f_log and not other._fast_spec.f_log and self._fast_spec.interpolant == 'linear' and other._fast_spec.interpolant == 'linear'): x = wave_list / (1.0 + self.redshift) f = self._fast_spec(x) + other._fast_spec(x) # Note: adding splines doesn't quite work at full precision, so only do this for # linear interpolants. spec = _LookupTable(x, f, interpolant='linear') else: spec = lambda w: self(w*(1.0+self.redshift)) + other(w*(1.0+self.redshift)) return SED(spec, wave_type='nm', flux_type=flux_type, redshift=self.redshift, fast=self.fast, _wave_list=wave_list, _blue_limit=blue_limit, _red_limit=red_limit) def __sub__(self, other): # Subtract two SEDs, with the same caveats as adding two SEDs. return self.__add__(-1.0 * other)
[docs] def withFluxDensity(self, target_flux_density, wavelength): """Return a new `SED` with flux density set to ``target_flux_density`` at wavelength ``wavelength``. See `ChromaticObject` docstring for information about how `SED` normalization affects `ChromaticObject` normalization. Parameters: target_flux_density: The target normalization in photons/nm/cm^2/s. wavelength: The wavelength, in nm, at which the flux density will be set. Returns: the new normalized SED. """ if self.dimensionless: raise GalSimSEDError("Cannot set flux density of dimensionless SED.", self) if isinstance(wavelength, units.Quantity): wavelength_nm = wavelength.to(units.nm, units.spectral()) current_flux_density = self._call(wavelength_nm.value) else: wavelength_nm = wavelength * units.nm current_flux_density = self._call(wavelength) if isinstance(target_flux_density, units.Quantity): target_flux_density = target_flux_density.to( SED._fphotons, units.spectral_density(wavelength_nm)).value factor = target_flux_density / current_flux_density return self * factor
[docs] def withFlux(self, target_flux, bandpass): """Return a new `SED` with flux through the `Bandpass` ``bandpass`` set to ``target_flux``. See `ChromaticObject` docstring for information about how `SED` normalization affects `ChromaticObject` normalization. Parameters: target_flux: The desired flux normalization of the SED. bandpass: A `Bandpass` object defining a filter bandpass. Returns: the new normalized `SED`. """ current_flux = self.calculateFlux(bandpass) norm = target_flux/current_flux return self * norm
[docs] def withMagnitude(self, target_magnitude, bandpass): """Return a new `SED` with magnitude through the `Bandpass` ``bandpass`` set to ``target_magnitude``. Note that this requires ``bandpass`` to have been assigned a zeropoint using `Bandpass.withZeropoint`. See `ChromaticObject` docstring for information about how `SED` normalization affects `ChromaticObject` normalization. Parameters: target_magnitude: The desired magnitude of the `SED`. bandpass: A `Bandpass` object defining a filter bandpass. Returns: the new normalized `SED`. """ if bandpass.zeropoint is None: raise GalSimError("Cannot call SED.withMagnitude on this bandpass, because it does " "not have a zeropoint. See Bandpass.withZeropoint()") current_magnitude = self.calculateMagnitude(bandpass) norm = 10**(-0.4*(target_magnitude - current_magnitude)) return self * norm
[docs] def atRedshift(self, redshift): """Return a new `SED` with redshifted wavelengths. Parameters: redshift: The redshift for the returned `SED` Returns: the redshifted `SED`. """ if redshift == self.redshift: return self if redshift <= -1: raise GalSimRangeError("Invalid redshift", redshift, -1.) zfactor = (1.0 + redshift) / (1.0 + self.redshift) wave_list = self.wave_list * zfactor blue_limit = self.blue_limit * zfactor red_limit = self.red_limit * zfactor return SED(self._spec, self.wave_type, self.flux_type, redshift, self.fast, _wave_list=wave_list, _blue_limit=blue_limit, _red_limit=red_limit)
[docs] def calculateFlux(self, bandpass): """Return the flux (photons/cm^2/s) of the `SED` through the `Bandpass` bandpass. Parameters: bandpass: A `Bandpass` object representing a filter, or None to compute the bolometric flux. For the bolometric flux the integration limits will be set to (0, infinity), which implies that the `SED` needs to be evaluable over this entire range. Returns: the flux through the bandpass. """ if self.dimensionless: raise GalSimSEDError("Cannot calculate flux of dimensionless SED.", self) if bandpass is None: # compute bolometric flux bandpass = Bandpass(lambda w: 1., 'nm', blue_limit=0., red_limit=SED._bolo_max_wave) if len(bandpass.wave_list) > 0 or len(self.wave_list) > 0: slop = 1e-6 # nm if (self.blue_limit > bandpass.blue_limit + slop or self.red_limit < bandpass.red_limit - slop): raise GalSimRangeError("Bandpass is not completely within defined wavelength " "range for this SED.", (bandpass.blue_limit, bandpass.red_limit), self.blue_limit, self.red_limit) wmin = max(self.blue_limit, bandpass.blue_limit) wmax = min(self.red_limit, bandpass.red_limit) if self.fast and isinstance(self._fast_spec, LookupTable): wf = 1./(1.+self.redshift) / bandpass.wave_factor ff = 1./bandpass.wave_factor wmin *= bandpass.wave_factor wmax *= bandpass.wave_factor return self._fast_spec.integrate_product(bandpass._tp, wmin, wmax, wf) * ff else: w, _, _ = utilities.combine_wave_list(self, bandpass) if not self.fast and self.flux_type != 'fphotons': # When not fast, the SED definition is not linear between the wave_list # points, so this can be slightly inaccurate if the waves are too far apart. # Add in 100 uniformly spaced points to achieve relative accurace ~few e-6. w = utilities.merge_sorted([w, np.linspace(w[0], w[-1], 100)]) interpolant = (bandpass._tp.interpolant if hasattr(bandpass._tp, 'interpolant') else 'linear') return _LookupTable(w, bandpass(w), interpolant).integrate_product(self) else: return integ.int1d(lambda w: bandpass(w)*self(w), bandpass.blue_limit, bandpass.red_limit)
[docs] def calculateMagnitude(self, bandpass): """Return the `SED` magnitude through a `Bandpass` ``bandpass``. Note that this requires ``bandpass`` to have been assigned a zeropoint using `Bandpass.withZeropoint`. Parameters: bandpass: A `Bandpass` object representing a filter, or None to compute the bolometric magnitude. For the bolometric magnitude the integration limits will be set to (0, infinity), which implies that the `SED` needs to be evaluable over this entire range. Returns: the bandpass magnitude. """ if self.dimensionless: raise GalSimSEDError("Cannot calculate magnitude of dimensionless SED.", self) if bandpass.zeropoint is None: raise GalSimError("Cannot do this calculation for a bandpass without an assigned " "zeropoint") flux = self.calculateFlux(bandpass) return -2.5 * np.log10(flux) + bandpass.zeropoint
[docs] def thin(self, rel_err=1.e-4, trim_zeros=True, preserve_range=True, fast_search=True): """Remove some tabulated values while keeping the integral over the set of tabulated values still accurate to ``rel_err``. This is only relevant if the `SED` was initialized with a `LookupTable` or from a file (which internally creates a `LookupTable`). Parameters: rel_err: The relative error allowed in the integral over the `SED` [default: 1.e-4] trim_zeros: Remove redundant leading and trailing points where f=0? (The last leading point with f=0 and the first trailing point with f=0 will be retained). Note that if both trim_leading_zeros and preserve_range are True, then the only the range of ``x`` *after* zero trimming is preserved. [default: True] preserve_range: Should the original range (``blue_limit`` and ``red_limit``) of the `SED` be preserved? (True) Or should the ends be trimmed to include only the region where the integral is significant? (False) [default: True] fast_search: If set to True, then the underlying algorithm will use a relatively fast O(N) algorithm to select points to include in the thinned approximation. If set to False, then a slower O(N^2) algorithm will be used. We have found that the slower algorithm tends to yield a thinned representation that retains fewer samples while still meeting the relative error requirement. [default: True] Returns: the thinned `SED`. """ if len(self.wave_list) > 0: rest_wave_native = self._get_rest_native_waves(self.wave_list) spec_native = self._spec(rest_wave_native) # Note that this is thinning in native units, not nm and photons/nm. interpolant = (self.interpolant if not isinstance(self._spec, LookupTable) else self._spec.interpolant) newx, newf = utilities.thin_tabulated_values( rest_wave_native, spec_native, rel_err=rel_err, trim_zeros=trim_zeros, preserve_range=preserve_range, fast_search=fast_search, interpolant=interpolant) newspec = _LookupTable(newx, newf, interpolant=interpolant) return SED(newspec, self.wave_type, self.flux_type, redshift=self.redshift, fast=self.fast) else: return self
[docs] def calculateDCRMomentShifts(self, bandpass, **kwargs): """Calculates shifts in first and second moments of PSF due to differential chromatic refraction (DCR). I.e., equations (1) and (2) from Plazas and Bernstein (2012): http://arxiv.org/abs/1204.1346). Parameters: bandpass: `Bandpass` through which object is being imaged. zenith_angle: `Angle` from object to zenith parallactic_angle: Parallactic angle, i.e. the position angle of the zenith, measured from North through East. [default: 0] obj_coord: Celestial coordinates of the object being drawn as a `CelestialCoord`. [default: None] zenith_coord: Celestial coordinates of the zenith as a `CelestialCoord`. [default: None] HA: Hour angle of the object as an `Angle`. [default: None] latitude: Latitude of the observer as an `Angle`. [default: None] pressure: Air pressure in kiloPascals. [default: 69.328 kPa] temperature: Temperature in Kelvins. [default: 293.15 K] H2O_pressure: Water vapor pressure in kiloPascals. [default: 1.067 kPa] Returns: a tuple: - The first element is the vector of DCR first moment shifts - The second element is the 2x2 matrix of DCR second (central) moment shifts. """ if self.dimensionless: raise GalSimSEDError("Cannot calculate DCR shifts of dimensionless SED.", self) zenith_angle, parallactic_angle, kwargs = dcr.parse_dcr_angles(**kwargs) # Any remaining kwargs will get forwarded to galsim.dcr.get_refraction # Check that they're valid for kw in kwargs: if kw not in ('temperature', 'pressure', 'H2O_pressure'): raise (TypeError("Got unexpected keyword in calculateDCRMomentShifts: {0}" .format(kw))) # Now actually start calculating things. flux = self.calculateFlux(bandpass) if len(self.wave_list) > 0 or len(bandpass.wave_list) > 0: w, _, _ = utilities.combine_wave_list(self, bandpass) interpolant = (bandpass._tp.interpolant if hasattr(bandpass._tp, 'interpolant') else 'linear') bp = _LookupTable(w, bandpass(w), interpolant) R = lambda w: dcr.get_refraction(w, zenith_angle, **kwargs) Rbar = bp.integrate_product(lambda w: self(w) * R(w)) / flux V = bp.integrate_product(lambda w: self(w) * (R(w)-Rbar)**2) / flux else: weight = lambda w: bandpass(w) * self(w) Rbar_kernel = lambda w: dcr.get_refraction(w, zenith_angle, **kwargs) Rbar = integ.int1d(lambda w: weight(w) * Rbar_kernel(w), bandpass.blue_limit, bandpass.red_limit) Rbar /= flux V_kernel = lambda w: (dcr.get_refraction(w, zenith_angle, **kwargs) - Rbar)**2 V = integ.int1d(lambda w: weight(w) * V_kernel(w), bandpass.blue_limit, bandpass.red_limit) V /= flux # Rbar and V are computed above assuming that the parallactic angle is 0. Hence we # need to rotate our frame by the parallactic angle to get the desired output. sinp, cosp = parallactic_angle.sincos() rot = np.array([[cosp, -sinp], [sinp, cosp]]) Rbar = Rbar * rot.dot(np.array([0,1])) V = rot.dot(np.array([[0, 0], [0, V]])).dot(rot.T) return Rbar, V
[docs] def calculateSeeingMomentRatio(self, bandpass, alpha=-0.2, base_wavelength=500): """Calculates the relative size of a PSF compared to the monochromatic PSF size at wavelength ``base_wavelength``. Parameters: bandpass: `Bandpass` through which object is being imaged. alpha: Power law index for wavelength-dependent seeing. [default: -0.2, the prediction for Kolmogorov turbulence] base_wavelength: Reference wavelength in nm from which to compute the relative PSF size. [default: 500] Returns: the ratio of the PSF second moments to the second moments of the reference PSF. """ if self.dimensionless: raise GalSimSEDError("Cannot calculate seeing moment ratio of dimensionless SED.", self) flux = self.calculateFlux(bandpass) if len(self.wave_list) > 0 or len(bandpass.wave_list) > 0: # With three things multiplied together, we can't rely on integrate_product # being completely accurate if the waves are spaced too far apart, especially with # a power law being one of the factors. # So make sure to include a uniform density of points along with the native sed and # bandpass points. The error goes like dx**3, so 100 points should give relative # errors of order ~few e-6. w, _, _ = utilities.combine_wave_list([self, bandpass]) w = utilities.merge_sorted([w, np.linspace(w[0], w[-1], 100)]) interpolant = (bandpass._tp.interpolant if hasattr(bandpass._tp, 'interpolant') else 'linear') bp = _LookupTable(w, bandpass(w), interpolant) return bp.integrate_product(lambda w: self(w) * (w/base_wavelength)**(2*alpha)) / flux else: weight = lambda w: bandpass(w) * self(w) kernel = lambda w: (w/base_wavelength)**(2*alpha) return integ.int1d(lambda w: weight(w) * kernel(w), bandpass.blue_limit, bandpass.red_limit) / flux
@lazy_property def _cache_deviate(self): return dict()
[docs] def sampleWavelength(self, nphotons, bandpass, rng=None, npoints=None): """Sample a number of random wavelength values from the `SED`, possibly as observed through a `Bandpass` bandpass. Parameters: nphotons: Number of samples (photons) to randomly draw. bandpass: A `Bandpass` object representing a filter, or None to sample over the full `SED` wavelength range. rng: If provided, a random number generator that is any kind of `BaseDeviate` object. If ``rng`` is None, one will be automatically created from the system. [default: None] npoints: Number of points `DistDeviate` should use for its internal interpolation tables. [default: None, which uses the `DistDeviate` default] """ nphotons=int(nphotons) key = (bandpass,npoints) if key in self._cache_deviate: dev = self._cache_deviate[key] else: if bandpass is None: sed = self else: sed = self._mul_bandpass(bandpass) xmin = sed.blue_limit / (1.+self.redshift) xmax = sed.red_limit / (1.+self.redshift) dev = DistDeviate(function=sed._fast_spec, x_min=xmin, x_max=xmax, npoints=npoints, clip_neg=True) self._cache_deviate[key] = dev # Reset the deviate explicitly if rng is not None: dev.reset(rng) ret = np.empty(nphotons) dev.generate(ret) if self.redshift != 0: ret *= (1. + self.redshift) # Rarely, with the redshift round trip, this can produce wavelengths < blue_limit. # If this happens, set those values equal to blue_limit. # I'm not sure if the red limit overrun can happen (we didn't see any in the use case # that noticed the blue overruns), but it seems prudent to also correct any of these # that may occur too. Plus it's not noticeably slower using clip to do both at once. if bandpass is not None: np.clip(ret, bandpass.blue_limit, bandpass.red_limit, out=ret) return ret
def __eq__(self, other): return (self is other or (isinstance(other, SED) and self._orig_spec == other._orig_spec and self.fast == other.fast and self.wave_type == other.wave_type and self.flux_type == other.flux_type and self.redshift == other.redshift and self.red_limit == other.red_limit and self.blue_limit == other.blue_limit and np.array_equal(self.wave_list,other.wave_list))) def __ne__(self, other): return not self.__eq__(other) def __hash__(self): # Cache this in case self._orig_spec or self.wave_list is long. if not hasattr(self, '_hash'): self._hash = hash(("galsim.SED", self._orig_spec, self.wave_type, self.flux_type, self.redshift, self.fast, self.blue_limit, self.red_limit, tuple(self.wave_list))) return self._hash def __repr__(self): outstr = ('galsim.SED(%r, wave_type=%r, flux_type=%r, redshift=%r, fast=%r, ' 'interpolant=%r, _wave_list=%r, _blue_limit=%r, _red_limit=%s)')%( self._orig_spec, self.wave_type, self._flux_type, self.redshift, self.fast, self.interpolant, self.wave_list, self.blue_limit, "float('inf')" if self.red_limit == np.inf else repr(self.red_limit)) return outstr def __str__(self): orig_spec = repr(self._orig_spec) if len(orig_spec) > 80: orig_spec = str(self._orig_spec) return 'galsim.SED(%s, redshift=%s)'%(orig_spec, self.redshift) def __getstate__(self): d = self.__dict__.copy() if not isinstance(d['_spec'], LookupTable): del d['_spec'] d.pop('_fast_spec',None) del d['_call'] del d['_get_native_waves'] del d['_get_rest_native_waves'] del d['_flux_to_photons'] return d def __setstate__(self, d): self.__dict__ = d if '_spec' not in d: self._initialize_spec() self._setup_funcs()
[docs]class EmissionLine(SED): """Emission line SED. Creates a simple triangle-shaped emission line with a given wavelength and FWHM. Parameters: wavelength: The wavelength of the line. fwhm: The full-width-half-max of the line. [default: 1.0] flux: The integrated flux of the line. [default: 1.0] wave_type: The units of ``wavelength`` and ``fwhm`` above. See SED constructor for options. [default: 'nm'] flux_type: The units of flux used for this SED. See SED constructor for options. [default: 'fphotons'] max_wave The maximum wavelength to use in the LookupTable for the SED. [default: {}; must be > wavelength+fwhm] **kwargs: Any additional keyword arguments to pass to the `SED` constructor. """.format(SED._bolo_max_wave) def __init__( self, wavelength, fwhm=1.0, flux=1.0, wave_type='nm', flux_type='fphotons', max_wave=SED._bolo_max_wave, **kwargs ): self.wavelength = wavelength self.fwhm = fwhm self.flux = flux _, wave_factor = SED._parse_wave_type(wave_type) if wave_factor is None: raise GalSimValueError("wave_type must be a distance unit", wave_type) assert max_wave > wavelength + fwhm w = wavelength # Some operations can turn a 0 into 1.e-15, which can lead to large errors # when integrating from w+fwhm to max_wave. So add a second set of 0's at # w +- 2*fwhm to make sure it's exactly 0 for most of the range. spec = LookupTable( [0.0, w-2*fwhm, w-fwhm, w, w+fwhm, w+2*fwhm, max_wave*wave_factor], [0, 0, 0, flux/fwhm, 0, 0, 0], interpolant='linear' ) super().__init__( spec, wave_type=wave_type, flux_type=flux_type, **kwargs )
[docs] def atRedshift(self, redshift): """Return a new `EmissionLine` with redshifted wavelengths. Parameters: redshift: The redshift for the returned `EmissionLine` Returns: the redshifted `EmissionLine`. """ if redshift == self.redshift: return self if redshift <= -1: raise GalSimRangeError("Invalid redshift", redshift, -1.) return EmissionLine( self.wavelength, self.fwhm, flux=self.flux, wave_type=self.wave_type, flux_type=self.flux_type, redshift=redshift, fast=self.fast )
def __mul__(self, other): if isinstance(other, (int, float)): flux = self.flux * other return EmissionLine( self.wavelength, self.fwhm, flux=flux, wave_type=self.wave_type, flux_type=self.flux_type, redshift=self.redshift, fast=self.fast ) else: return super().__mul__(other) def __div__(self, other): if isinstance(other, (int, float)): flux = self.flux / other return EmissionLine( self.wavelength, self.fwhm, flux=flux, wave_type=self.wave_type, flux_type=self.flux_type, redshift=self.redshift, fast=self.fast ) else: return super().__div__(other) __truediv__ = __div__ def __eq__(self, other): return (self is other or (isinstance(other, EmissionLine) and self.wavelength == other.wavelength and self.fwhm == other.fwhm and self.flux == other.flux and self.wave_type == other.wave_type and self.flux_type == other.flux_type and self.redshift == other.redshift)) def __hash__(self): return hash(("galsim.EmissionLine", self.wavelength, self.fwhm, self.flux)) def __repr__(self): outstr = ('galsim.EmissionLine(%r, %r, flux=%r, wave_type=%r, flux_type=%r, redshift=%r,' ' fast=%r)')%( self.wavelength, self.fwhm, self.flux, self.wave_type, self._flux_type, self.redshift, self.fast) return outstr def __str__(self): outstr = 'galsim.EmissionLine(wavelength=%s, fwhm=%s'%(self.wavelength, self.fwhm) if self.flux != 1.0: outstr += ', flux=%s'%self.flux if self.redshift != 0.0: outstr += ', redshift=%s'%self.redshift outstr += ')' return outstr
# Put this at the bottom to avoid circular import errors. from .bandpass import Bandpass