Source code for galsim.transform

# Copyright (c) 2012-2023 by the GalSim developers team on GitHub
# https://github.com/GalSim-developers
#
# This file is part of GalSim: The modular galaxy image simulation toolkit.
# https://github.com/GalSim-developers/GalSim
#
# GalSim is free software: redistribution and use in source and binary forms,
# with or without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
#    list of conditions, and the disclaimer given in the accompanying LICENSE
#    file.
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions, and the disclaimer given in the documentation
#    and/or other materials provided with the distribution.
#

__all__ = [ 'Transform', 'Transformation', '_Transform', ]

import numpy as np
import math
import cmath
import copy

from . import _galsim
from .gsobject import GSObject
from .gsparams import GSParams
from .utilities import lazy_property, WeakMethod
from .position import PositionD, _PositionD, parse_pos_args
from .errors import GalSimError
from .sum import Sum
from .sed import SED
from .wcs import JacobianWCS
from .shear import Shear, _Shear
from .angle import Angle
from . import convolve
from . import chromatic as chrom

[docs]def Transform(obj, jac=None, offset=(0.,0.), flux_ratio=1., gsparams=None, propagate_gsparams=True): """A function for transforming either a `GSObject` or `ChromaticObject`. This function will inspect its input argument to decide if a `Transformation` object or a `ChromaticTransformation` object is required to represent the resulting transformed object. Note: the name of the flux_ratio parameter is technically wrong here if the jacobian has a non-unit determinant, since that would also scale the flux. The flux_ratio parameter actually only refers to an overall amplitude ratio for the surface brightness profile. The total flux scaling is actually ``|det(jac)| * flux_ratio``. Parameters: obj: The object to be transformed. jac: A list or tuple ( dudx, dudy, dvdx, dvdy ) describing the Jacobian of the transformation. Use None to indicate that the Jacobian is the 2x2 unit matrix. [default: None] offset: A galsim.PositionD or tuple giving the offset by which to shift the profile. [default: (0.,0.)] flux_ratio: A factor by which to multiply the surface brightness of the object. (Technically, not necessarily the flux. See above.) [default: 1] gsparams: An optional `GSParams` argument. [default: None] propagate_gsparams: Whether to propagate gsparams to the transformed object. This is normally a good idea, but there may be use cases where one would not want to do this. [default: True] Returns: a `Transformation` or `ChromaticTransformation` instance as appropriate. """ # Usual cases are a bit simpler to handle, so check for them first. if (isinstance(obj, (GSObject, chrom.SimpleChromaticTransformation)) and not hasattr(jac,'__call__') and not hasattr(offset,'__call__')): if isinstance(obj, GSObject) : if not hasattr(flux_ratio,'__call__'): # This is the usual achromatic case, so check for this first. return Transformation(obj, jac, offset, flux_ratio, gsparams, propagate_gsparams) elif jac is None and offset == (0.,0.): # This is obj * SED. Return a SimpleChromaticTransformation. if not isinstance(flux_ratio, SED): flux_ratio = SED(flux_ratio, 'nm', '1') return chrom.SimpleChromaticTransformation(obj, flux_ratio, gsparams=gsparams, propagate_gsparams=propagate_gsparams) else: # Too complicated for simple. return chrom.ChromaticTransformation(obj, jac, offset, flux_ratio, gsparams=gsparams, propagate_gsparams=propagate_gsparams) else: # obj is a SimpleChromaticTransformation # Try to keep it that way if possible. if not hasattr(flux_ratio,'__call__'): # Then transformations are all achromatic. Apply to original. new_obj = Transformation(obj.original, jac, offset, flux_ratio, gsparams, propagate_gsparams) return chrom.SimpleChromaticTransformation(new_obj, obj._flux_ratio, gsparams=gsparams, propagate_gsparams=propagate_gsparams) elif jac is None and offset == (0.,0.): # Just a flux_ratio. Apply to SED. new_sed = obj._flux_ratio * flux_ratio return chrom.SimpleChromaticTransformation(obj.original, new_sed, gsparams=gsparams, propagate_gsparams=propagate_gsparams) else: # Too complicated for simple. return chrom.ChromaticTransformation(obj, jac, offset, flux_ratio, gsparams=gsparams, propagate_gsparams=propagate_gsparams) elif not isinstance(obj, (GSObject, chrom.ChromaticObject)): raise TypeError("Argument to Transform must be either a GSObject or a ChromaticObject.") else: # Now we know we are dealing with a more complicated chromatic transformation. # Sometimes for Chromatic compound types, it is more efficient to apply the # transformation to the components rather than the whole. In particular, this can # help preserve separability in many cases. # Don't transform ChromaticSum object, better to just transform the arguments. if isinstance(obj, chrom.ChromaticSum): new_obj = chrom.ChromaticSum( [ Transform(o,jac,offset,flux_ratio,gsparams,propagate_gsparams) for o in obj.obj_list ]) if hasattr(obj, 'covspec'): if jac is None: new_obj.covspec = obj.covspec * flux_ratio**2 else: dudx, dudy, dvdx, dvdy = np.asarray(jac, dtype=float).flatten() new_obj.covspec = obj.covspec.transform(dudx, dudy, dvdx, dvdy) * flux_ratio**2 return new_obj # If we are just flux scaling, then a Convolution can do that to the first element. # NB. Even better, if the flux scaling is chromatic, would be to find a component # that is already non-separable. But we don't bother trying to do that currently. elif (isinstance(obj, chrom.ChromaticConvolution or isinstance(obj, convolve.Convolution)) and jac is None and offset == (0.,0.)): first = Transform(obj.obj_list[0], flux_ratio=flux_ratio, gsparams=gsparams, propagate_gsparams=propagate_gsparams) return chrom.ChromaticConvolution( [first] + [o for o in obj.obj_list[1:]] ) else: return chrom.ChromaticTransformation(obj, jac, offset, flux_ratio, gsparams=gsparams, propagate_gsparams=propagate_gsparams)
[docs]class Transformation(GSObject): """A class for modeling an affine transformation of a `GSObject` instance. Typically, you do not need to construct a Transformation object explicitly. This is the type returned by the various transformation methods of `GSObject` such as `GSObject.shear`, `GSObject.rotate`, `GSObject.shift`, `GSObject.transform`, etc. All the various transformations can be described as a combination of a jacobian matrix (i.e. `GSObject.transform`) and a translation (`GSObject.shift`), which are described by (dudx,dudy,dvdx,dvdy) and (dx,dy) respectively. .. note:: The name of the flux_ratio parameter is technically wrong here if the jacobian has a non-unit determinant, since that would also scale the flux. The flux_ratio parameter actually only refers to an overall amplitude ratio for the surface brightness profile. The total flux scaling is actually ``|det(jac)| * flux_ratio``. Parameters: obj: The object to be transformed. jac: A list, tuple or numpy array ( dudx, dudy, dvdx, dvdy ) describing the Jacobian of the transformation. Use None to indicate that the Jacobian is the 2x2 unit matrix. [default: None] offset: A galsim.PositionD or tuple giving the offset by which to shift the profile. [default: (0.,0.)] flux_ratio: A factor by which to multiply the surface brightness of the object. (Technically, not necessarily the flux. See above.) [default: 1] gsparams: An optional `GSParams` argument. [default: None] propagate_gsparams: Whether to propagate gsparams to the transformed object. This is normally a good idea, but there may be use cases where one would not want to do this. [default: True] Attributes: original: The original object that is being transformed. jac: The jacobian of the transformation matrix. offset: The offset being applied. flux_ratio: The amount by which the overall surface brightness amplitude is multiplied. gsparams: The usual gsparams attribute that all `GSObject` classes have. Note: if ``gsparams`` is unspecified (or None), then the Transformation instance inherits the `GSParams` from obj. Also, note that parameters related to the Fourier-space calculations must be set when initializing obj, NOT when creating the Transform (at which point the accuracy and threshold parameters will simply be ignored). """ unit_jac = np.array([[1,0],[0,1]], dtype=float) def __init__(self, obj, jac=None, offset=(0.,0.), flux_ratio=1., gsparams=None, propagate_gsparams=True): if jac is None: self._jac = None else: self._jac = np.asarray(jac, dtype=float).reshape(2,2) offset = PositionD(offset) self._dx = offset.x self._dy = offset.y self._flux_ratio = float(flux_ratio) self._gsparams = GSParams.check(gsparams, obj.gsparams) self._propagate_gsparams = propagate_gsparams if self._propagate_gsparams: obj = obj.withGSParams(self._gsparams) if isinstance(obj, Transformation): # Combine the two affine transformations into one. if obj._has_offset: if jac is None: dx1, dy1 = obj._dx, obj._dy else: dx1, dy1 = self._fwd_normal(obj._dx, obj._dy) self._dx += dx1 self._dy += dy1 if jac is None: self._jac = obj._jac else: self._jac = self._jac if obj._jac is None else self._jac.dot(obj.jac) self._flux_ratio *= obj._flux_ratio self._original = obj._original else: self._original = obj self._has_offset = (self._dx != 0. or self._dy != 0.) if self._det==0.: raise GalSimError("Attempt to Transform with degenerate matrix"); @property def original(self): """The original object being transformed. """ return self._original @property def jac(self): """The Jacobian of the transforamtion. """ return self.unit_jac if self._jac is None else self._jac @property def offset(self): """The offset of the transformation. """ return _PositionD(self._dx, self._dy) @property def flux_ratio(self): """The flux ratio of the transformation. """ return self._flux_ratio @lazy_property def _flux(self): return self._flux_scaling * self._original.flux @lazy_property def _sbp(self): _jac = 0 if self._jac is None else self._jac.__array_interface__['data'][0] return _galsim.SBTransform(self._original._sbp, _jac, self._dx, self._dy, self._flux_ratio, self.gsparams._gsp) @lazy_property def _noise(self): if self.original.noise is None: return None else: return BaseCorrelatedNoise( self.original.noise.rng, _Transform(self.original.noise._profile, self._jac, flux_ratio=self.flux_ratio**2), self.original.noise.wcs)
[docs] def withGSParams(self, gsparams=None, **kwargs): """Create a version of the current object with the given gsparams .. note:: Unless you set ``propagate_gsparams=False``, this method will also update the gsparams of the wrapped component object. """ if gsparams == self.gsparams: return self ret = copy.copy(self) ret._gsparams = GSParams.check(gsparams, self.gsparams, **kwargs) if self._propagate_gsparams: ret._original = self._original.withGSParams(ret._gsparams) return ret
def __eq__(self, other): return (self is other or (isinstance(other, Transformation) and self.original == other.original and np.array_equal(self.jac, other.jac) and np.array_equal(self.offset, other.offset) and self.flux_ratio == other.flux_ratio and self.gsparams == other.gsparams and self._propagate_gsparams == other._propagate_gsparams)) def __hash__(self): return hash(("galsim.Transformation", self.original, tuple(self.jac.ravel()), self._dx, self._dy, self.flux_ratio, self.gsparams, self._propagate_gsparams)) def __repr__(self): return ('galsim.Transformation(%r, jac=%r, offset=%r, flux_ratio=%r, gsparams=%r, ' 'propagate_gsparams=%r)')%( self.original, None if self._jac is None else self._jac.tolist(), self.offset, self.flux_ratio, self.gsparams, self._propagate_gsparams) @classmethod def _str_from_jac(cls, jac): dudx, dudy, dvdx, dvdy = jac.ravel() # Figure out the shear/rotate/dilate calls that are equivalent. jac = JacobianWCS(dudx,dudy,dvdx,dvdy) scale, shear, theta, flip = jac.getDecomposition() s = None if flip: s = 0 # Special value indicating to just use transform. if abs(theta.rad) > 1.e-12: if s is None: s = '.rotate(%s)'%theta else: s = 0 if shear.g > 1.e-12: if s is None: s = '.shear(%s)'%shear else: s = 0 if abs(scale-1.0) > 1.e-12: if s is None: s = '.expand(%s)'%scale else: s = 0 if s == 0: # If flip or there are two components, then revert to transform as simpler. s = '.transform(%s,%s,%s,%s)'%(dudx,dudy,dvdx,dvdy) if s is None: # If nothing is large enough to show up above, give full detail of transform s = '.transform(%r,%r,%r,%r)'%(dudx,dudy,dvdx,dvdy) return s def __str__(self): s = str(self.original) if self._jac is not None: s += self._str_from_jac(self._jac) if self._dx != 0 or self._dy != 0: s += '.shift(%s,%s)'%(self._dx,self._dy) if self.flux_ratio != 1.: s += ' * %s'%self.flux_ratio return s def _prepareDraw(self): self._original._prepareDraw() # Some lazy properties to calculate things as needed. @lazy_property def _det(self): if self._jac is None: return 1 if self._jac[0,1] == 0. and self._jac[1,0] == 0.: if self._jac[0,0] == 1. and self._jac[1,1] == 1.: # jac is (1,0,0,1) return 1. else: # jac is (a,0,0,b) return self._jac[0,0] * self._jac[1,1] else: # Fully general case return self._jac[0,0] * self._jac[1,1] - self._jac[0,1] * self._jac[1,0] @lazy_property def _invdet(self): return 1. if self._jac is None else 1./self._det @lazy_property def _invjac(self): invjac = np.array([[self._jac[1,1], -self._jac[0,1]], [-self._jac[1,0], self._jac[0,0]]]) invjac *= self._invdet return invjac # To avoid confusion with the flux vs amplitude scaling, we use these names below, rather # than flux_ratio, which is really an amplitude scaling. @property def _amp_scaling(self): return self._flux_ratio @lazy_property def _flux_scaling(self): return abs(self._det) * self._flux_ratio # Some helper attributes to make fwd and inv transformation quicker @lazy_property def _fwd(self): if self._jac is None: return WeakMethod(self._ident) elif self._jac[0,1] == 0. and self._jac[1,0] == 0.: return WeakMethod(self._fwd_diag) else: return WeakMethod(self._fwd_normal) @lazy_property def _fwdT(self): if self._jac is None: return WeakMethod(self._ident) elif self._jac[0,1] == 0. and self._jac[1,0] == 0.: return WeakMethod(self._fwd_diag) else: return WeakMethod(self._fwdT_normal) @lazy_property def _inv(self): if self._jac is None: return WeakMethod(self._ident) elif self._jac[0,1] == 0. and self._jac[1,0] == 0.: return WeakMethod(self._inv_diag) else: return WeakMethod(self._inv_normal) @lazy_property def _kfactor(self): if self._dx == self._dy == 0.: return WeakMethod(self._kf_nophase) else: return WeakMethod(self._kf_phase) def _ident(self, x, y): return x, y def _fwd_diag(self, x, y): x *= self._jac[0,0] y *= self._jac[1,1] return x, y def _fwd_normal(self, x, y): #return self._jac[0,0] * x + self._jac[0,1] * y, self._jac[1,0] * x + self._jac[1,1] * y # Do this as much in place as possible temp = self._jac[0,1] * y y *= self._jac[1,1] y += self._jac[1,0] * x x *= self._jac[0,0] x += temp return x, y def _fwdT_normal(self, x, y): #return self._jac[0,0] * x + self._jac[1,0] * y, self._jac[0,1] * x + self._jac[1,1] * y temp = self._jac[1,0] * y y *= self._jac[1,1] y += self._jac[0,1] * x x *= self._jac[0,0] x += temp return x, y def _inv_diag(self, x, y): x /= self._jac[0,0] y /= self._jac[1,1] return x, y def _inv_normal(self, x, y): #return (self._invdet * (self._jac[1,1] * x - self._jac[0,1] * y), # self._invdet * (-self._jac[1,0] * x + self._jac[0,0] * y)) temp = self._invjac[0,1] * y y *= self._invjac[1,1] y += self._invjac[1,0] * x x *= self._invjac[0,0] x += temp return x, y def _kf_nophase(self, kx, ky): return self._flux_scaling def _kf_phase(self, kx, ky): kx *= -1j * self._dx ky *= -1j * self._dy kx += ky return self._flux_scaling * np.exp(kx) def __getstate__(self): d = self.__dict__.copy() d.pop('_sbp',None) d.pop('_fwd',None) d.pop('_fwdT',None) d.pop('_inv',None) d.pop('_kfactor',None) return d def __setstate__(self, d): self.__dict__ = d def _major_minor(self): if not hasattr(self, "_major"): if self._jac is None: self._major = self._minor = 1. else: h1 = math.hypot(self._jac[0,0] + self._jac[1,1], self._jac[0,1] - self._jac[1,0]) h2 = math.hypot(self._jac[0,0] - self._jac[1,1], self._jac[0,1] + self._jac[1,0]) self._major = 0.5 * abs(h1+h2) self._minor = 0.5 * abs(h1-h2) @lazy_property def _maxk(self): self._major_minor() return self._original.maxk / self._minor @lazy_property def _stepk(self): self._major_minor() stepk = self._original.stepk / self._major # If we have a shift, we need to further modify stepk # stepk = Pi/R # R <- R + |shift| # stepk <- Pi/(Pi/stepk + |shift|) if self._dx != 0. or self._dy != 0.: dr = math.hypot(self._dx, self._dy) stepk = math.pi / (math.pi/stepk + dr) return stepk @property def _has_hard_edges(self): return self._original.has_hard_edges @property def _is_axisymmetric(self): return bool(self._original.is_axisymmetric and (self._jac is None or (self._jac[0,0] == self._jac[1,1] and self._jac[0,1] == -self._jac[1,0])) and self._dx == self._dy == 0.) @property def _is_analytic_x(self): return self._original.is_analytic_x @property def _is_analytic_k(self): return self._original.is_analytic_k @property def _centroid(self): cen = self._original.centroid cenx, ceny = self._fwd(cen.x, cen.y) cenx += self._dx ceny += self._dy return _PositionD(cenx,ceny) @property def _positive_flux(self): return self._flux_scaling * self._original.positive_flux @property def _negative_flux(self): return self._flux_scaling * self._original.negative_flux @lazy_property def _flux_per_photon(self): return self._calculate_flux_per_photon() @property def _max_sb(self): return self._amp_scaling * self._original.max_sb def _xValue(self, pos): x = pos.x - self._dx y = pos.y - self._dy inv_pos = _PositionD(*self._inv(x, y)) return self._original._xValue(inv_pos) * self._amp_scaling def _kValue(self, kpos): fwdT_kpos = _PositionD(*self._fwdT(kpos.x, kpos.y)) return self._original._kValue(fwdT_kpos) * self._kfactor(kpos.x, kpos.y) def _drawReal(self, image, jac=None, offset=(0.,0.), flux_scaling=1.): dx, dy = offset if self._has_offset: if jac is not None: x1 = jac[0,0] * self._dx + jac[0,1] * self._dy y1 = jac[1,0] * self._dx + jac[1,1] * self._dy else: x1, y1 = self._dx, self._dy dx += x1 dy += y1 flux_scaling *= self._flux_scaling jac = self._jac if jac is None else jac if self._jac is None else jac.dot(self._jac) self._original._drawReal(image, jac, (dx, dy), flux_scaling) def _shoot(self, photons, rng): self._original._shoot(photons, rng) photons.x, photons.y = self._fwd(photons.x, photons.y) photons.x += self._dx photons.y += self._dy photons.scaleFlux(self._flux_scaling) def _drawKImage(self, image, jac=None): jac1 = self._jac if jac is None else jac if self._jac is None else jac.dot(self._jac) self._original._drawKImage(image, jac1) if self._has_offset: _jac = 0 if jac is None else jac.__array_interface__['data'][0] _galsim.ApplyKImagePhases(image._image, image.scale, _jac, self._dx, self._dy, self._flux_scaling) elif abs(self._flux_scaling-1.) > self._gsparams.kvalue_accuracy: image *= self._flux_scaling
[docs]def _Transform(obj, jac=None, offset=(0.,0.), flux_ratio=1.): """Approximately equivalent to Transform, but without some of the sanity checks (such as checking for chromatic options) or setting a new gsparams. For a `ChromaticObject`, you must use the regular `Transform`. Parameters: obj: The object to be transformed. jac: A 2x2 numpy array describing the Jacobian of the transformation. Use None to indicate that the Jacobian is the 2x2 unit matrix. [default: None] offset: The offset to apply. [default (0.,0.)] flux_ratio: A factor by which to multiply the surface brightness of the object. [default: 1.] """ ret = Transformation.__new__(Transformation) ret._gsparams = obj.gsparams ret._propagate_gsparams = True ret._jac = jac ret._dx, ret._dy = offset if isinstance(obj, Transformation): if obj._has_offset: if jac is None: dx1, dy1 = obj._dx, obj._dy else: dx1, dy1 = ret._fwd_normal(obj._dx, obj._dy) ret._dx += dx1 ret._dy += dy1 if jac is None: ret._jac = obj._jac else: ret._jac = ret._jac if obj._jac is None else ret._jac.dot(obj.jac) ret._flux_ratio = flux_ratio * obj._flux_ratio ret._original = obj._original else: ret._flux_ratio = flux_ratio ret._original = obj ret._has_offset = (ret._dx != 0. or ret._dy != 0.) return ret
# Put this at the bottom to avoid circular import error. from .correlatednoise import BaseCorrelatedNoise