# Copyright (c) 2012-2023 by the GalSim developers team on GitHub
# https://github.com/GalSim-developers
#
# This file is part of GalSim: The modular galaxy image simulation toolkit.
# https://github.com/GalSim-developers/GalSim
#
# GalSim is free software: redistribution and use in source and binary forms,
# with or without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions, and the disclaimer given in the accompanying LICENSE
# file.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions, and the disclaimer given in the documentation
# and/or other materials provided with the distribution.
#
import logging
import numpy as np
import math
from .util import LoggerWrapper, GetIndex, GetRNG
from .value import ParseValue, GetCurrentValue, GetAllParams
from .input import RegisterInputConnectedType
from ..errors import GalSimConfigError, GalSimConfigValueError
from ..image import Image
from ..position import PositionI
from ..random import PoissonDeviate
from ..noise import GaussianNoise, PoissonNoise, DeviateNoise, CCDNoise
from ..correlatednoise import getCOSMOSNoise, BaseCorrelatedNoise, UncorrelatedNoise
# This file handles the functionality for adding noise and the sky to an image after
# drawing the objects.
# This module-level dict will store all the registered noise types.
# See the RegisterNoiseType function at the end of this file.
# The keys are the (string) names of the noise types, and the values will be builder objects
# that will perform the different functions related to adding noise to images.
valid_noise_types = {}
#
# First the driver functions:
#
[docs]def AddSky(config, im):
"""Add the sky level to the image
Parameters:
config: The (base) configuration dict
im: The image onto which to add the sky
"""
index, orig_index_key = GetIndex(config['image'], config)
config['index_key'] = 'image_num'
sky = GetSky(config['image'], config, full=True)
if sky:
im += sky
config['index_key'] = orig_index_key
[docs]def AddNoise(config, im, current_var=0., logger=None):
"""
Add noise to an image according to the noise specifications in the noise dict.
Parameters:
config: The (base) configuration dict
im: The image onto which to add the noise
current_var: The current noise variance present in the image already [default: 0]
logger: If given, a logger object to log progress. [default: None]
Returns:
Variance added to the image (units are ADU if gain != 1)
"""
from .stamp import SetupConfigObjNum
logger = LoggerWrapper(logger)
if 'noise' in config['image']:
noise = config['image']['noise']
else: # No noise.
return 0
if not isinstance(noise, dict):
raise GalSimConfigError("image.noise is not a dict.")
# Default is Poisson
noise_type = noise.get('type', 'Poisson')
if noise_type not in valid_noise_types:
raise GalSimConfigValueError("Invalid noise.type.", noise_type,
list(valid_noise_types.keys()))
# We need to use image_num for the index_key, but if we are running this from the stamp
# building phase, then we want to use obj_num_rng for the noise rng. So get the rng now
# before we change config['index_key'].
index, orig_index_key = GetIndex(noise, config)
rng = GetRNG(noise, config)
# This makes sure draw_method is properly copied over and given a default value.
SetupConfigObjNum(config, config.get('obj_num',0), logger)
draw_method = GetCurrentValue('draw_method', config['stamp'], str, config)
builder = valid_noise_types[noise_type]
config['index_key'] = 'image_num'
var = builder.addNoise(noise, config, im, rng, current_var, draw_method, logger)
config['index_key'] = orig_index_key
return var
[docs]def CalculateNoiseVariance(config, full=False):
"""
Calculate the noise variance from the noise specified in the noise dict.
Parameters:
config: The (base) configuration dict
full: If the noise is variable across the image, return the full image with the
noise variance at every pixel. Otherwise, just return the value at the center.
Returns:
the noise variance (units are ADU if gain != 1)
"""
if 'noise' in config['image']:
noise = config['image']['noise']
else: # No noise.
return 0
if not isinstance(noise, dict):
raise GalSimConfigError("image.noise is not a dict.")
noise_type = noise.get('type', 'Poisson')
if noise_type not in valid_noise_types:
raise GalSimConfigValueError("Invalid noise.type.", noise_type,
list(valid_noise_types.keys()))
index, orig_index_key = GetIndex(noise, config)
config['index_key'] = 'image_num'
builder = valid_noise_types[noise_type]
var = builder.getNoiseVariance(noise, config, full=full)
config['index_key'] = orig_index_key
return var
[docs]def AddNoiseVariance(config, im, include_obj_var=False, logger=None):
"""
Add the noise variance to an image according to the noise specifications in the noise dict.
Typically, this is used for building a weight map, which is typically the inverse variance.
Parameters:
config: The (base) configuration dict
im: The image onto which to add the variance values
include_obj_var: Whether to add the variance from the object photons for noise
models that have a component based on the number of photons.
Note: if this is True, the returned variance will not include this
contribution to the noise variance. [default: False]
logger: If given, a logger object to log progress. [default: None]
Returns:
the variance in the image (units are ADU if gain != 1)
"""
logger = LoggerWrapper(logger)
if 'noise' in config['image']:
noise = config['image']['noise']
else: # No noise.
return
if not isinstance(noise, dict):
raise GalSimConfigError("image.noise is not a dict.")
noise_type = noise.get('type', 'Poisson')
if noise_type not in valid_noise_types:
raise GalSimConfigValueError("Invalid noise.type.", noise_type,
list(valid_noise_types.keys()))
index, orig_index_key = GetIndex(noise, config)
config['index_key'] = 'image_num'
builder = valid_noise_types[noise_type]
builder.addNoiseVariance(noise, config, im, include_obj_var, logger)
config['index_key'] = orig_index_key
[docs]def GetSky(config, base, logger=None, full=False):
"""Parse the sky information and return either a float value for the sky level per pixel
or an image, as needed.
If an image is required (because wcs is not uniform) then it will use the presence of
base['image_pos'] to determine what size image to return (stamp or full). If there is
a current image_pos, then we are doing a stamp. Otherwise a full image.
Parameters:
config: The configuration field with the sky specification, which can be either
base['image'] or base['image']['noise']
base: The base configuration dict
logger: If given, a logger object to log progress. [default: None]
full: If the sky level is variable across the image, return the full
image with the sky at every pixel. Otherwise, just return the
sky at the image center.
Returns:
sky, either a float value or an Image. (The latter only if full=True)
"""
logger = LoggerWrapper(logger)
if 'sky_level' in config:
if 'sky_level_pixel' in config:
raise GalSimConfigValueError(
"Cannot specify both sky_level and sky_level_pixel",
(config['sky_level'], config['sky_level_pixel']))
sky_level = ParseValue(config,'sky_level',base,float)[0]
logger.debug('image %d, obj %d: sky_level = %f',
base.get('image_num',0),base.get('obj_num',0), sky_level)
wcs = base['wcs']
if wcs._isUniform:
sky_level_pixel = sky_level * wcs.pixelArea()
logger.debug('image %d, obj %d: Uniform: sky_level_pixel = %f',
base.get('image_num',0),base.get('obj_num',0), sky_level_pixel)
return sky_level_pixel
elif full:
# This calculation is non-trivial, so store this in case we need it again.
tag = (id(base), base.get('file_num',0), base.get('image_num',0), base.get('obj_num',0))
if config.get('_current_sky_tag',None) == tag:
logger.debug('image %d, obj %d: Using saved sky image',
base.get('image_num',0),base.get('obj_num',0))
return config['_current_sky']
else:
logger.debug('image %d, obj %d: Non-uniform wcs. Making sky image',
base.get('image_num',0),base.get('obj_num',0))
bounds = base['current_noise_image'].bounds
sky = Image(bounds, wcs=wcs)
wcs.makeSkyImage(sky, sky_level)
sensor = base.get('sensor', None)
if sensor is not None:
center = base.get('image_origin', PositionI(1,1)) - sky.origin
use_flux = config.get('use_flux_sky_areas', False)
# TODO: If use_flux_sky_areas = True, then we should really build this up
# in steps. E.g. for a flat field.
# This one step calcualtion isn't right.
area = sensor.calculate_pixel_areas(sky, orig_center=center, use_flux=use_flux)
sky *= area
config['_current_sky_tag'] = tag
config['_current_sky'] = sky
return sky
else:
center = base['current_noise_image'].bounds.true_center
return wcs.local(image_pos=center).pixelArea() * sky_level
elif 'sky_level_pixel' in config:
sky_level_pixel = ParseValue(config,'sky_level_pixel',base,float)[0]
logger.debug('image %d, obj %d: sky_level_pixel = %f',
base.get('image_num',0),base.get('obj_num',0), sky_level_pixel)
return sky_level_pixel
else:
return 0.
# items that are parsed separately from the normal noise function
noise_ignore = [ 'whiten', 'symmetrize', 'use_flux_sky_areas' ]
[docs]class NoiseBuilder:
"""A base class for building noise objects and applying the noise to images.
The base class doesn't do anything, but it defines the call signatures of the methods
that derived classes should use for the different specific noise types.
"""
[docs] def addNoise(self, config, base, im, rng, current_var, draw_method, logger):
"""Read the noise parameters from the config dict and add the appropriate noise to the
given image.
Parameters:
config: The configuration dict for the noise field.
base: The base configuration dict.
im: The image onto which to add the noise
rng: The random number generator to use for adding the noise.
current_var: The current noise variance present in the image already.
draw_method: The method that was used to draw the objects on the image.
logger: If given, a logger object to log progress.
Returns:
the variance of the noise model (units are ADU if gain != 1)
"""
raise NotImplementedError("The %s class has not overridden addNoise"%self.__class__)
[docs] def getNoiseVariance(self, config, base, full=False):
"""Read the noise parameters from the config dict and return the variance.
Parameters:
config: The configuration dict for the noise field.
base: The base configuration dict.
full: If the noise is variable across the image, return the full image with the
noise variance at every pixel. Otherwise, just return the value at the
center.
Returns:
the variance of the noise model (units are ADU if gain != 1)
"""
raise NotImplementedError("The %s class has not overridden addNoise"%self.__class__)
[docs] def addNoiseVariance(self, config, base, im, include_obj_var, logger):
"""Read the noise parameters from the config dict and add the appropriate noise variance
to the given image.
This is used for constructing the weight map iamge. It doesn't add a random value to
each pixel. Rather, it adds the variance of the noise that was used in the main image to
each pixel in this image.
This method has a default implemenation that is appropriate for noise models that have
a constant noise variance. It just gets the variance from getNoiseVariance and adds
that constant value to every pixel.
Parameters:
config: The configuration dict for the noise field.
base: The base configuration dict.
im: The image onto which to add the noise variance
include_obj_var: Whether the noise variance values should the photon noise from
object flux in addition to the sky flux. Only relevant for
noise models that are based on the image flux values such as
Poisson and CCDNoise.
logger: If given, a logger object to log progress.
"""
im += self.getNoiseVariance(config, base, full=True)
#
# Gaussian
#
[docs]class GaussianNoiseBuilder(NoiseBuilder):
def addNoise(self, config, base, im, rng, current_var, draw_method, logger):
# Read the noise variance
var = self.getNoiseVariance(config, base)
ret = var # save for the return value
# If we already have some variance in the image (from whitening), then we subtract this much
# from sigma**2.
if current_var:
logger.debug('image %d, obj %d: Target variance is %f, current variance is %f',
base.get('image_num',0),base.get('obj_num',0),var,current_var)
if var < current_var:
raise GalSimConfigError(
"Whitening already added more noise than the requested Gaussian noise.")
var -= current_var
# Now apply the noise.
sigma = math.sqrt(var)
im.addNoise(GaussianNoise(rng,sigma=sigma))
logger.debug('image %d, obj %d: Added Gaussian noise with var = %f',
base.get('image_num',0),base.get('obj_num',0),var)
return ret
def getNoiseVariance(self, config, base, full=False):
# The noise level can be specified either as a sigma or a variance. Here we just calculate
# the value of the variance from either one.
single = [ { 'sigma' : float , 'variance' : float } ]
params = GetAllParams(config, base, single=single, ignore=noise_ignore)[0]
if 'sigma' in params:
sigma = params['sigma']
return sigma * sigma
else:
return params['variance']
#
# Poisson
#
[docs]class PoissonNoiseBuilder(NoiseBuilder):
def addNoise(self, config, base, im, rng, current_var, draw_method, logger):
# Get how much extra sky to assume from the image.noise attribute.
sky = GetSky(base['image'], base, logger, full=True)
extra_sky = GetSky(config, base, logger, full=True)
total_sky = sky + extra_sky # for the return value
if isinstance(total_sky, Image):
var = np.mean(total_sky.array)
else:
var = total_sky
# (This could be zero, in which case we only add poisson noise for the object photons)
# If we already have some variance in the image (from whitening), then we subtract this
# much off of the sky level. It's not precisely accurate, since the existing variance is
# Gaussian, rather than Poisson, but it's the best we can do.
if current_var:
logger.debug('image %d, obj %d: Target variance is %f, current variance is %f',
base.get('image_num',0),base.get('obj_num',0), var, current_var)
if isinstance(total_sky, Image):
test = np.any(total_sky.array < current_var)
else:
test = (total_sky < current_var)
if test:
raise GalSimConfigError(
"Whitening already added more noise than the requested Poisson noise.")
total_sky -= current_var
extra_sky -= current_var
# At this point, there is a slight difference between fft and phot. For photon shooting,
# the galaxy already has Poisson noise, so we want to make sure not to add that again!
if draw_method == 'phot':
# Only add in the noise from the sky.
if isinstance(total_sky, Image):
noise_im = total_sky.copy()
noise_im.addNoise(PoissonNoise(rng))
noise_im -= total_sky
# total_sky should now have zero mean, but with the noise of the total sky level.
im += noise_im
else:
im.addNoise(DeviateNoise(PoissonDeviate(rng, mean=total_sky)))
# This deviate adds a noisy version of the sky, so need to subtract the mean
# back off.
im -= total_sky
else:
# Do the normal PoissonNoise calculation.
if isinstance(total_sky, Image):
im += extra_sky
im.addNoise(PoissonNoise(rng))
im -= extra_sky
else:
im.addNoise(PoissonNoise(rng, sky_level=extra_sky))
logger.debug('image %d, obj %d: Added Poisson noise',
base.get('image_num',0),base.get('obj_num',0))
return var
def getNoiseVariance(self, config, base, full=False):
# The noise variance is the net sky level per pixel
sky = GetSky(base['image'], base, full=full)
sky += GetSky(config, base, full=full)
return sky
def addNoiseVariance(self, config, base, im, include_obj_var, logger):
if include_obj_var:
# The current image at this point should be the noise-free, sky-free image,
# which is the object variance in each pixel.
im += base['current_noise_image']
# Note: For the phot case, we don't actually have an exact value for the variance in
# each pixel, but the drawn image before adding the Poisson noise is our best guess for
# the variance from the object's flux, so if we want the object variance included, this
# is still the best we can do.
# Add the total sky level
im += self.getNoiseVariance(config, base, full=True)
#
# CCD
#
[docs]class CCDNoiseBuilder(NoiseBuilder):
def getCCDNoiseParams(self, config, base):
opt = { 'gain' : float , 'read_noise' : float }
ignore = ['sky_level', 'sky_level_pixel']
params = GetAllParams(config, base, opt=opt, ignore=noise_ignore + ignore)[0]
gain = params.get('gain',1.0)
read_noise = params.get('read_noise',0.0)
read_noise_var = read_noise**2
return gain, read_noise, read_noise_var
def addNoise(self, config, base, im, rng, current_var, draw_method, logger):
# This process goes a lot like the Poisson routine. There are just two differences.
# First, the Poisson noise is in electrons, not ADU, and now we allow for a gain = e-/ADU,
# so we need to account for that properly. Second, we also allow for an additional Gaussian
# read noise.
gain, read_noise, read_noise_var = self.getCCDNoiseParams(config, base)
# Get how much extra sky to assume from the image.noise attribute.
sky = GetSky(base['image'], base, logger, full=True)
extra_sky = GetSky(config, base, logger, full=True)
total_sky = sky + extra_sky
if isinstance(total_sky, Image):
var_adu = np.mean(total_sky.array) / gain + read_noise_var / gain**2
else:
var_adu = total_sky / gain + read_noise_var / gain**2
# If we already have some variance in the image (from whitening), then we try to subtract
# it from the read noise if possible. If not, we subtract the rest off of the sky level.
# It's not precisely accurate, since the existing variance is Gaussian, rather than
# Poisson, but it's the best we can do.
if current_var:
logger.debug('image %d, obj %d: Target variance is %f, current variance is %f',
base.get('image_num',0),base.get('obj_num',0), var_adu, current_var)
read_noise_var_adu = read_noise_var / gain**2
if isinstance(total_sky, Image):
test = np.any(total_sky.array/gain + read_noise_var_adu < current_var)
else:
logger.debug('image %d, obj %d: Target variance is %f, current variance is %f',
base.get('image_num',0),base.get('obj_num',0),
var_adu, current_var)
test = var_adu < current_var
if test:
raise GalSimConfigError(
"Whitening already added more noise than the requested CCD noise.")
if read_noise_var_adu >= current_var:
# First try to take away from the read_noise, since this one is actually Gaussian.
read_noise_var -= current_var * gain**2
read_noise = math.sqrt(read_noise_var)
else:
# Take read_noise down to zero, since already have at least that much already.
current_var -= read_noise_var_adu
read_noise = 0
read_noise_var = 0
# Take the rest away from the sky level
total_sky -= current_var * gain
extra_sky -= current_var * gain
# At this point, there is a slight difference between fft and phot. For photon shooting,
# the galaxy already has Poisson noise, so we want to make sure not to add that again!
if draw_method == 'phot':
# Add in the noise from the sky.
if isinstance(total_sky, Image):
noise_im = total_sky.copy()
if gain != 1.0: noise_im *= gain
noise_im.addNoise(PoissonNoise(rng))
if gain != 1.0: noise_im /= gain
noise_im -= total_sky
# total_sky should now have zero mean, but with the noise of the total sky level.
im += noise_im
else:
if gain != 1.0: im *= gain
pd = PoissonDeviate(rng, mean=total_sky*gain)
im.addNoise(DeviateNoise(pd))
if gain != 1.0: im /= gain
im -= total_sky
# And add the read noise
if read_noise != 0.:
im.addNoise(GaussianNoise(rng, sigma=read_noise/gain))
else:
# Do the normal CCDNoise calculation.
if isinstance(total_sky, Image):
im += extra_sky
im.addNoise(CCDNoise(rng, gain=gain, read_noise=read_noise))
im -= extra_sky
else:
im.addNoise(CCDNoise(rng, gain=gain, read_noise=read_noise, sky_level=extra_sky))
logger.debug('image %d, obj %d: Added CCD noise with gain = %f, read_noise = %f',
base.get('image_num',0),base.get('obj_num',0), gain, read_noise)
return var_adu
def getNoiseVariance(self, config, base, full=False):
# The noise variance is sky / gain + (read_noise/gain)**2
gain, read_noise, read_noise_var = self.getCCDNoiseParams(config, base)
# Start with the background sky level for the image
sky = GetSky(base['image'], base, full=full)
sky += GetSky(config, base, full=full)
# Account for the gain and read_noise
read_noise_var_adu = read_noise_var / gain**2
return sky / gain + read_noise_var_adu
def addNoiseVariance(self, config, base, im, include_obj_var, logger):
gain, read_noise, read_noise_var = self.getCCDNoiseParams(config, base)
if include_obj_var:
# The current image at this point should be the noise-free, sky-free image,
# which is the object variance in each pixel.
if gain != 1.0:
im += base['current_noise_image']/gain
else:
im += base['current_noise_image']
# Now add on the regular CCDNoise from the sky and read noise.
im += self.getNoiseVariance(config, base, full=True)
#
# Correlated
#
class CorrelatedNoiseBuilder(NoiseBuilder):
def getNoise(self, config, base, rng=None):
# Save the constructed CorrelatedNoise object, since we might need it again.
tag = (id(base), base['file_num'], base['image_num'])
if base.get('_current_cn_tag',None) == tag:
return base['_current_cn']
else:
cn = self._readNoiseFile(config, base, rng)
base['_current_cn'] = cn
base['_current_cn_tag'] = tag
return cn
def _readNoiseFile(self, config, base, rng=None):
req = { 'file_name': str, 'pixel_scale': float }
opt = { 'variance' : float }
kwargs = GetAllParams(config, base, req=req, opt=opt, ignore=noise_ignore)[0]
if rng is None:
rng = GetRNG(config, base)
return BaseCorrelatedNoise.from_file(rng=rng, **kwargs)
def addNoise(self, config, base, im, rng, current_var, draw_method, logger):
# Build the correlated noise
cn = self.getNoise(config,base,rng)
var = cn.getVariance()
# Subtract off the current variance if any
if current_var:
logger.debug('image %d, obj %d: Target variance is %f, current variance is %f',
base.get('image_num',0),base.get('obj_num',0), var, current_var)
if var < current_var:
raise GalSimConfigError(
"Whitening already added more noise than the requested COSMOS noise.")
cn -= UncorrelatedNoise(current_var, rng=rng, wcs=cn.wcs)
# Add the noise to the image
im.addNoise(cn)
logger.debug('image %d, obj %d: Added COSMOS correlated noise with variance = %f',
base.get('image_num',0),base.get('obj_num',0), var)
return var
def getNoiseVariance(self, config, base, full=False):
cn = self.getNoise(config,base)
return cn.getVariance()
# Specialization for COSMOS noise in particular (the OG version of this noise type).
[docs]class COSMOSNoiseBuilder(CorrelatedNoiseBuilder):
def _readNoiseFile(self, config, base, rng=None):
opt = { 'file_name' : str, 'cosmos_scale' : float, 'variance' : float }
kwargs = GetAllParams(config, base, opt=opt, ignore=noise_ignore)[0]
if rng is None:
rng = GetRNG(config, base)
return getCOSMOSNoise(rng=rng, **kwargs)
[docs]def RegisterNoiseType(noise_type, builder, input_type=None):
"""Register a noise type for use by the config apparatus.
Parameters:
noise_type: The name of the type in config['image']['noise']
builder: A builder object to use for building the noise. It should be an
instance of a subclass of NoiseBuilder.
input_type: If the builder utilises an input object, give the key name of the
input type here. (If it uses more than one, this may be a list.)
[default: None]
"""
valid_noise_types[noise_type] = builder
RegisterInputConnectedType(input_type, noise_type)
RegisterNoiseType('Gaussian', GaussianNoiseBuilder())
RegisterNoiseType('Poisson', PoissonNoiseBuilder())
RegisterNoiseType('CCD', CCDNoiseBuilder())
RegisterNoiseType('Correlated', CorrelatedNoiseBuilder())
RegisterNoiseType('COSMOS', COSMOSNoiseBuilder())