Source code for galsim.sum

# Copyright (c) 2012-2023 by the GalSim developers team on GitHub
# https://github.com/GalSim-developers
#
# This file is part of GalSim: The modular galaxy image simulation toolkit.
# https://github.com/GalSim-developers/GalSim
#
# GalSim is free software: redistribution and use in source and binary forms,
# with or without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
#    list of conditions, and the disclaimer given in the accompanying LICENSE
#    file.
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions, and the disclaimer given in the documentation
#    and/or other materials provided with the distribution.
#

__all__ = [ 'Add', 'Sum', ]

import numpy as np
import copy

from .gsparams import GSParams
from .gsobject import GSObject
from .utilities import lazy_property
from . import _galsim
from .photon_array import PhotonArray
from .random import UniformDeviate
from . import chromatic as chrom


[docs]def Add(*args, **kwargs): """A function for adding 2 or more `GSObject` or `ChromaticObject` instances. This function will inspect its input arguments to decide if a `Sum` object or a `ChromaticSum` object is required to represent the sum of surface brightness profiles. Typically, you do not need to call Add() explicitly. Normally, you would just use the + operator, which returns a Sum:: >>> bulge = galsim.Sersic(n=3, half_light_radius=0.8) >>> disk = galsim.Exponential(half_light_radius=1.4) >>> gal = bulge + disk >>> psf = galsim.Gaussian(sigma=0.3, flux=0.3) + galsim.Gaussian(sigma=0.8, flux=0.7) If one of the items is chromatic, it will return a `ChromaticSum`:: >>> disk = galsim.Exponential(half_light_radius=1.4) * galsim.SED(sed_file) >>> gal = bulge + disk Parameters: args: Unnamed args should be a list of objects to add. gsparams: An optional `GSParams` argument. [default: None] propagate_gsparams: Whether to propagate gsparams to each of the components. This is normally a good idea, but there may be use cases where one would not want to do this. [default: True] Returns: a `Sum` or `ChromaticSum` instance as appropriate. """ if len(args) == 0: raise TypeError("At least one ChromaticObject or GSObject must be provided.") elif len(args) == 1: # 1 argument. Should be either a GSObject or a list of GSObjects if isinstance(args[0], (GSObject, chrom.ChromaticObject)): args = [args[0]] elif isinstance(args[0], list) or isinstance(args[0], tuple): args = args[0] else: raise TypeError("Single input argument must be a GSObject, ChromaticObject or " + "a (possibly mixed) list of them.") # else args is already the list of objects if any([isinstance(a, chrom.ChromaticObject) for a in args]): return chrom.ChromaticSum(*args, **kwargs) else: return Sum(*args, **kwargs)
[docs]class Sum(GSObject): """A class for adding 2 or more `GSObject` instances. The Sum class is used to represent the sum of multiple `GSObject` instances. For example, it might be used to represent a multiple-component galaxy as the sum of an Exponential and a DeVaucouleurs, or to represent a PSF as the sum of multiple Gaussian objects. Typically, you do not need to construct a Sum object explicitly. Normally, you would just use the + operator, which returns a Sum:: >>> bulge = galsim.Sersic(n=3, half_light_radius=0.8) >>> disk = galsim.Exponential(half_light_radius=1.4) >>> gal = bulge + disk >>> psf = galsim.Gaussian(sigma=0.3, flux=0.3) + galsim.Gaussian(sigma=0.8, flux=0.7) You can also use the Add() factory function, which returns a Sum object if none of the individual objects are chromatic:: >>> gal = galsim.Add([bulge,disk]) Parameters: args: Unnamed args should be a list of objects to add. gsparams: An optional `GSParams` argument. [default: None] propagate_gsparams: Whether to propagate gsparams to each of the components. This is normally a good idea, but there may be use cases where one would not want to do this. [default: True] Note: if ``gsparams`` is unspecified (or None), then the Sum instance will use the most restrictive combination of parameters from each of the component objects. Normally, this means the smallest numerical value (e.g. folding_threshold, xvalue_accuracy, etc.), but for a few parameters, the largest numerical value is used. See `GSParams.combine` for details. Furthermore, the gsparams used for the Sum (either given explicitly or derived from the components) will normally be applied to each of the components. It doesn't usually make much sense to apply stricter-than-normal accuracy or threshold values to one component but not another in a Sum, so this ensures that they all have consistent rendering behavior. However, if you want to keep the existing gsparams of the component objects (e.g. because one object is much fainter and can thus use looser accuracy tolerances), then you may set ``propagate_gsparams=False``. """ def __init__(self, *args, **kwargs): # Check kwargs first: gsparams = kwargs.pop("gsparams", None) self._propagate_gsparams = kwargs.pop('propagate_gsparams', True) # Make sure there is nothing left in the dict. if kwargs: raise TypeError( "Sum constructor got unexpected keyword argument(s): %s"%kwargs.keys()) if len(args) == 0: raise TypeError("At least one ChromaticObject or GSObject must be provided.") elif len(args) == 1: # 1 argument. Should be either a GSObject or a list of GSObjects if isinstance(args[0], GSObject): args = [args[0]] elif isinstance(args[0], list) or isinstance(args[0], tuple): args = args[0] else: raise TypeError("Single input argument must be a GSObject or list of them.") # else args is already the list of objects # Consolidate args for Sums of Sums... new_args = [] for a in args: if isinstance(a, Sum): new_args.extend(a._obj_list) else: new_args.append(a) args = new_args # Save the list as an attribute, so it can be inspected later if necessary. self._obj_list = args for obj in args: if not isinstance(obj, GSObject): raise TypeError("Arguments to Sum must be GSObjects, not %s"%obj) # Figure out what gsparams to use if gsparams is None: # If none is given, take the most restrictive combination from the obj_list. self._gsparams = GSParams.combine([obj.gsparams for obj in args]) else: # If something explicitly given, then use that. self._gsparams = GSParams.check(gsparams) # Apply gsparams to all in obj_list. if self._propagate_gsparams: self._obj_list = [obj.withGSParams(self._gsparams) for obj in args] else: self._obj_list = args @property def obj_list(self): """The list of objects being added. """ return self._obj_list @lazy_property def _sbp(self): # NB. I only need this until compound and transform are reimplemented in Python... sb_list = [obj._sbp for obj in self.obj_list] return _galsim.SBAdd(sb_list, self.gsparams._gsp) @lazy_property def _flux(self): flux_list = [obj.flux for obj in self.obj_list] return np.sum(flux_list) @lazy_property def _noise(self): # If any of the objects have a noise attribute, then we propagate the sum of the # noises (they add like variances) to the final sum. _noise = None for obj in self.obj_list: if obj.noise is not None: if _noise is None: _noise = obj.noise else: _noise += obj.noise return _noise
[docs] def withGSParams(self, gsparams=None, **kwargs): """Create a version of the current object with the given gsparams .. note:: Unless you set ``propagate_gsparams=False``, this method will also update the gsparams of each object being added. """ if gsparams == self.gsparams: return self ret = copy.copy(self) ret._gsparams = GSParams.check(gsparams, self.gsparams, **kwargs) if self._propagate_gsparams: ret._obj_list = [ obj.withGSParams(ret._gsparams) for obj in self.obj_list ] return ret
def __eq__(self, other): return (self is other or (isinstance(other, Sum) and self.obj_list == other.obj_list and self.gsparams == other.gsparams and self._propagate_gsparams == other._propagate_gsparams)) def __hash__(self): return hash(("galsim.Sum", tuple(self.obj_list), self.gsparams, self._propagate_gsparams)) def __repr__(self): return 'galsim.Sum(%r, gsparams=%r, propagate_gsparams=%r)'%(self.obj_list, self.gsparams, self._propagate_gsparams) def __str__(self): str_list = [ str(obj) for obj in self.obj_list ] return '(' + ' + '.join(str_list) + ')' def _prepareDraw(self): for obj in self.obj_list: obj._prepareDraw() @lazy_property def _maxk(self): maxk_list = [obj.maxk for obj in self.obj_list] return np.max(maxk_list) @lazy_property def _stepk(self): stepk_list = [obj.stepk for obj in self.obj_list] return np.min(stepk_list) @lazy_property def _has_hard_edges(self): hard_list = [obj.has_hard_edges for obj in self.obj_list] return bool(np.any(hard_list)) @lazy_property def _is_axisymmetric(self): axi_list = [obj.is_axisymmetric for obj in self.obj_list] return bool(np.all(axi_list)) @lazy_property def _is_analytic_x(self): ax_list = [obj.is_analytic_x for obj in self.obj_list] return bool(np.all(ax_list)) @lazy_property def _is_analytic_k(self): ak_list = [obj.is_analytic_k for obj in self.obj_list] return bool(np.all(ak_list)) @lazy_property def _centroid(self): cen_list = [obj.centroid * obj.flux for obj in self.obj_list] return sum(cen_list[1:], cen_list[0]) / self.flux @lazy_property def _positive_flux(self): pflux_list = [obj.positive_flux for obj in self.obj_list] return np.sum(pflux_list) @lazy_property def _negative_flux(self): nflux_list = [obj.negative_flux for obj in self.obj_list] return np.sum(nflux_list) @lazy_property def _flux_per_photon(self): return self._calculate_flux_per_photon() @lazy_property def _max_sb(self): sb_list = [obj.max_sb for obj in self.obj_list] return np.sum(sb_list) def _xValue(self, pos): xv_list = [obj.xValue(pos) for obj in self.obj_list] return np.sum(xv_list) def _kValue(self, pos): kv_list = [obj.kValue(pos) for obj in self.obj_list] return np.sum(kv_list) def _drawReal(self, image, jac=None, offset=(0.,0.), flux_scaling=1.): self.obj_list[0]._drawReal(image, jac, offset, flux_scaling) if len(self.obj_list) > 1: im1 = image.copy() for obj in self.obj_list[1:]: obj._drawReal(im1, jac, offset, flux_scaling) image += im1 def _shoot(self, photons, rng): # We probabilistically choose a component for each photon based on the # relative flux density of that component for the given wavelength. comp_flux = np.array([obj.positive_flux + obj.negative_flux for obj in self._obj_list]) total_flux = np.sum(comp_flux) prob = comp_flux / total_flux cdf = np.cumsum(prob) u = np.empty(len(photons)) UniformDeviate(rng).generate(u) use_k = np.sum((u >= cdf[:,None]), axis=0) n = len(self._obj_list) use_k[use_k==n] = n-1 # This shouldn't be possible, but maybe with numerical rounding... fluxPerPhoton = total_flux / len(photons) # Shoot the corresponding photons from each component. for kk, obj in enumerate(self.obj_list): use = (use_k == kk) # True for each photon where this is the object to shoot from this_n = np.sum(use) if this_n == 0: continue temp = PhotonArray(this_n) temp._copyFrom(photons, slice(None), use, do_xy=False, do_flux=False) obj._shoot(temp, rng) temp.flux = fluxPerPhoton photons._copyFrom(temp, use, slice(None)) def _drawKImage(self, image, jac=None): self.obj_list[0]._drawKImage(image, jac) if len(self.obj_list) > 1: im1 = image.copy() for obj in self.obj_list[1:]: obj._drawKImage(im1, jac) image += im1 def __getstate__(self): d = self.__dict__.copy() d.pop('_sbp',None) return d def __setstate__(self, d): self.__dict__ = d