Source code for galsim.config.extra

# Copyright (c) 2012-2023 by the GalSim developers team on GitHub
# https://github.com/GalSim-developers
#
# This file is part of GalSim: The modular galaxy image simulation toolkit.
# https://github.com/GalSim-developers/GalSim
#
# GalSim is free software: redistribution and use in source and binary forms,
# with or without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
#    list of conditions, and the disclaimer given in the accompanying LICENSE
#    file.
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions, and the disclaimer given in the documentation
#    and/or other materials provided with the distribution.
#

import os
import logging
import inspect
from multiprocessing.managers import ListProxy, DictProxy

from .util import LoggerWrapper, SetDefaultExt, RetryIO, SafeManager, single_threaded
from .value import ParseValue
from .image import GetNObjForImage
from ..utilities import ensure_dir
from ..errors import GalSimConfigValueError, GalSimConfigError
from ..fits import writeMulti

# This file handles the processing of extra output items in addition to the primary output file
# in config['output']. The ones that are defined natively in GalSim are psf, weight, badpix,
# and truth.  See extra_*.py for the specific functions for each of these.

# This module-level dict will store all the registered "extra" output types.
# See the RegisterExtraOutput function at the end of this file.
# The keys will be the (string) names of the extra output types, and the values will be
# builder classes that will perform the different processing functions.
valid_extra_outputs = {}

[docs]def SetupExtraOutput(config, logger=None): """ Set up the extra output items as necessary, including building Managers for the work space so they can work safely in multi-processing mode. Each builder will be placed in config['extra_builder'][key] where key is the key in galsim.config.valid_extra_outputs. Parameters: config: The configuration dict. logger: If given, a logger object to log progress. [default: None] """ logger = LoggerWrapper(logger) output = config['output'] file_num = config.get('file_num',0) # We'll iterate through this list of keys a few times all_keys = [ k for k in valid_extra_outputs.keys() if k in output ] # We don't need the manager stuff if we (a) are already in a multiprocessing Process, or # (b) config.image.nproc == 1. use_manager = ( 'current_nproc' not in config and 'image' in config and 'nproc' in config['image'] and ParseValue(config['image'], 'nproc', config, int)[0] != 1 ) if use_manager and 'output_manager' not in config: class OutputManager(SafeManager): pass # We'll use a list and a dict as work space to do the extra output processing. OutputManager.register('dict', dict, DictProxy) OutputManager.register('list', list, ListProxy) # Start up the output_manager config['output_manager'] = OutputManager() with single_threaded(): config['output_manager'].start() if 'extra_builder' not in config: config['extra_builder'] = {} # Keep track of any skipped obj_nums, since usually need to treat them differently. # Note: it would be slightly nicer to use a set here, but there isn't a pre-defined # multiprocessing.managers.SetProxy type, so we just use a dict like a set by giving # each item the value None. if '_skipped_obj_nums' in config: config['_skipped_obj_nums'].clear() elif use_manager: config['_skipped_obj_nums'] = config['output_manager'].dict() else: config['_skipped_obj_nums'] = dict() for key in all_keys: logger.debug('file %d: Setup output item %s',file_num,key) # Make the work space structures if use_manager: data = config['output_manager'].list() scratch = config['output_manager'].dict() else: data = list() scratch = dict() # Make the data list the right length now to avoid issues with multiple # processes trying to append at the same time. nimages = config.get('nimages', 1) for k in range(nimages): data.append(None) # Create the builder, giving it the data and scratch objects as work space. field = config['output'][key] builder = valid_extra_outputs[key] builder.initialize(data, scratch, field, config, logger) # And store it in the config dict config['extra_builder'][key] = builder logger.debug('file %d: Setup output %s object',file_num,key)
[docs]def SetupExtraOutputsForImage(config, logger=None): """Perform any necessary setup for the extra output items at the start of a new image. Parameters: config: The configuration dict. logger: If given, a logger object to log progress. [default: None] """ if 'output' in config: if 'extra_builder' not in config: SetupExtraOutput(config, logger) for key, builder in config['extra_builder'].items(): field = config['output'][key] builder.setupImage(field, config, logger)
[docs]def ProcessExtraOutputsForStamp(config, skip, logger=None): """Run the appropriate processing code for any extra output items that need to do something at the end of building each object. This gets called after all the object flux is added to the stamp, but before the sky level and noise are added. Parameters: config: The configuration dict. skip: Was the drawing of this object skipped? logger: If given, a logger object to log progress. [default: None] """ if 'output' in config: obj_num = config['obj_num'] for key, builder in config.get('extra_builder',{}).items(): field = config['output'][key] if skip: config['_skipped_obj_nums'][obj_num] = None builder.processSkippedStamp(obj_num, field, config, logger) else: builder.processStamp(obj_num, field, config, logger)
[docs]def ProcessExtraOutputsForImage(config, logger=None): """Run the appropriate processing code for any extra output items that need to do something at the end of building each image Parameters: config: The configuration dict. logger: If given, a logger object to log progress. [default: None] """ if 'output' in config: obj_nums = None for key, builder in config.get('extra_builder',{}).items(): image_num = config.get('image_num',0) start_image_num = config.get('start_image_num',0) if obj_nums is None: # Figure out which obj_nums were used for this image. file_num = config.get('file_num',0) start_obj_num = config.get('start_obj_num',0) nobj = config.get('nobj', [GetNObjForImage(config,image_num)]) k = image_num - start_image_num for i in range(k): start_obj_num += nobj[i] obj_nums = range(start_obj_num, start_obj_num+nobj[k]) # Omit skipped obj_nums skipped = config['_skipped_obj_nums'] obj_nums = [ n for n in obj_nums if n not in skipped ] field = config['output'][key] index = image_num - start_image_num builder.processImage(index, obj_nums, field, config, logger)
[docs]def WriteExtraOutputs(config, main_data, logger=None): """Write the extra output objects to files. This gets run at the end of the functions for building the regular output files. Parameters: config: The configuration dict. main_data: The main file data in case it is needed. logger: If given, a logger object to log progress. [default: None] """ logger = LoggerWrapper(logger) output = config['output'] if 'retry_io' in output: ntries = ParseValue(config['output'],'retry_io',config,int)[0] # This is how many retries. Do at least 1, so ntries is 1 more than this. ntries = ntries + 1 else: ntries = 1 if 'dir' in output: default_dir = ParseValue(output,'dir',config,str)[0] else: default_dir = None if 'noclobber' in output: noclobber = ParseValue(output,'noclobber',config,bool)[0] else: noclobber = False if 'extra_last_file' not in config: config['extra_last_file'] = {} for key, builder in config['extra_builder'].items(): field = output[key] if 'file_name' in field: SetDefaultExt(field, '.fits') file_name = ParseValue(field,'file_name',config,str)[0] else: # pragma: no cover This is covered, but codecov thinks it isn't. # If no file_name, then probably writing to hdu continue if 'dir' in field: dir = ParseValue(field,'dir',config,str)[0] else: dir = default_dir if dir is not None: file_name = os.path.join(dir,file_name) ensure_dir(file_name) if noclobber and os.path.isfile(file_name): logger.warning('Not writing %s file %d = %s because output.noclobber = True ' 'and file exists',key,config['file_num'],file_name) continue if config['extra_last_file'].get(key, None) == file_name: # If we already wrote this file, skip it this time around. # (Typically this is applicable for psf, where we may only want 1 psf file.) logger.info('Not writing %s file %d = %s because already written', key,config['file_num'],file_name) continue # Do any final processing that needs to happen. builder.ensureFinalized(field, config, main_data, logger) # Call the write function, possibly multiple times to account for IO failures. write_func = builder.writeFile args = (file_name,field,config,logger) RetryIO(write_func, args, ntries, file_name, logger) config['extra_last_file'][key] = file_name logger.debug('file %d: Wrote %s to %r',config['file_num'],key,file_name)
[docs]def AddExtraOutputHDUs(config, main_data, logger=None): """Write the extra output objects to either HDUS or images as appropriate and add them to the existing data. This gets run at the end of the functions for building the regular output files. Note: the extra items must have hdu numbers ranging continuously (in any order) starting at len(data). Typically first = 1, since the main image is the primary HDU, numbered 0. Parameters: config: The configuration dict. main_data: The main file data as a list of images. Usually just [image] where image is the primary image to be written to the output file. logger: If given, a logger object to log progress. [default: None] Returns: data with additional hdus added """ output = config['output'] hdus = {} for key, builder in config['extra_builder'].items(): field = output[key] if 'hdu' in field: hdu = ParseValue(field,'hdu',config,int)[0] else: # pragma: no cover This is covered, but codecov thinks it isn't. # If no hdu, then probably writing to file continue if hdu <= 0 or hdu in hdus: raise GalSimConfigValueError("hdu is invalid or a duplicate.",hdu) # Do any final processing that needs to happen. builder.ensureFinalized(field, config, main_data, logger) # Build the HDU for this output object. hdus[hdu] = builder.writeHdu(field,config,logger) first = len(main_data) for h in range(first,len(hdus)+first): if h not in hdus: raise GalSimConfigError("Cannot skip hdus. No output found for hdu %d"%h) # Turn hdus into a list (in order) hdulist = [ hdus[k] for k in range(first,len(hdus)+first) ] return main_data + hdulist
[docs]def CheckNoExtraOutputHDUs(config, output_type, logger=None): """Check that none of the extra output objects want to add to the HDU list. Raises an exception if one of them has an hdu field. Parameters: config: The configuration dict. output_type: A string to use in the error message to indicate which output type had a problem. logger: If given, a logger object to log progress. [default: None] """ logger = LoggerWrapper(logger) output = config['output'] for key in config['extra_builder'].keys(): field = output[key] if 'hdu' in field: hdu = ParseValue(field,'hdu',config,int)[0] logger.error("Extra output %s requesting to write to hdu %d", key, hdu) raise GalSimConfigError( "Output type %s cannot add extra images as HDUs"%output_type)
[docs]def GetFinalExtraOutput(key, config, main_data=[], logger=None): """Get the finalized output object for the given extra output key Parameters: key: The name of the output field in config['output'] config: The configuration dict. main_data: The main file data in case it is needed. [default: []] logger: If given, a logger object to log progress. [default: None] Returns: the final data to be output. """ field = config['output'][key] return config['extra_builder'][key].ensureFinalized(field, config, main_data, logger)
[docs]class ExtraOutputBuilder: """A base class for building some kind of extra output object along with the main output. The base class doesn't do anything, but it defines the function signatures that a derived class can override to perform specific processing at any of several steps in the processing. The builder gets initialized with a list and and dict to use as work space. The typical work flow is to save something in scratch[obj_num] for each object built, and then process them all at the end of each image into data[k]. Then finalize may do something additional at the end of the processing to prepare the data to be written. It's worth remembering that the objects could potentially be processed in a random order if multiprocessing is being used. The above work flow will thus work regardless of the order that the stamps and/or images are processed. Also, because of how objects are duplicated across processes during multiprocessing, you should not count on attributes you set in the builder object during the stamp or image processing stages to be present in the later finalize or write stages. You should write any information you want to persist into the scratch or data objects, which are set up to handle the multiprocessing communication properly. """
[docs] def initialize(self, data, scratch, config, base, logger): """Do any initial setup for this builder at the start of a new output file. The base class implementation saves two work space items into self.data and self.scratch that can be used to safely communicate across multiple processes. Parameters: data: An empty list of length nimages to use as work space. scratch: An empty dict that can be used as work space. config: The configuration field for this output object. base: The base configuration dict. logger: If given, a logger object to log progress. [default: None] """ self.data = data self.scratch = scratch self.final_data = None
[docs] def setupImage(self, config, base, logger): """Perform any necessary setup at the start of an image. This function will be called at the start of each image to allow for any setup that needs to happen at this point in the processing. Parameters: config: The configuration field for this output object. base: The base configuration dict. logger: If given, a logger object to log progress. [default: None] """ pass
[docs] def processStamp(self, obj_num, config, base, logger): """Perform any necessary processing at the end of each stamp construction. This function will be called after each stamp is built, but before the noise is added, so the existing stamp image has the true surface brightness profile (unless photon shooting was used, in which case there will necessarily be noise from that process). Remember, these stamps may be processed out of order. Saving data to the scratch dict is safe, even if multiprocessing is being used. Parameters: obj_num: The object number config: The configuration field for this output object. base: The base configuration dict. logger: If given, a logger object to log progress. [default: None] """ pass # pragma: no cover (all our ExtraBuilders override this function.)
[docs] def processSkippedStamp(self, obj_num, config, base, logger): """Perform any necessary processing for stamps that were skipped in the normal processing. This function will be called for stamps that are not built because they were skipped for some reason. Normally, you would not want to do anything for the extra outputs in these cases, but in case some module needs to do something in these cases as well, this method can be overridden. Parameters: obj_num: The object number config: The configuration field for this output object. base: The base configuration dict. logger: If given, a logger object to log progress. [default: None] """ pass
[docs] def processImage(self, index, obj_nums, config, base, logger): """Perform any necessary processing at the end of each image construction. This function will be called after each full image is built. Remember, these images may be processed out of order. But if using the default constructor, the data list is already set to be the correct size, so it is safe to access self.data[k], where k = base['image_num'] - base['start_image_num'] is the appropriate index to use for this image. Parameters: index: The index in self.data to use for this image. This isn't the image_num (which can be accessed at base['image_num'] if needed), but rather an index that starts at 0 for the first image being worked on and goes up to nimages-1. obj_nums: The object numbers that were used for this image. config: The configuration field for this output object. base: The base configuration dict. logger: If given, a logger object to log progress. [default: None] """ pass
[docs] def ensureFinalized(self, config, base, main_data, logger): """A helper function in the base class to make sure finalize only gets called once by the different possible locations that might need it to have been called. Parameters: config: The configuration field for this output object. base: The base configuration dict. main_data: The main file data in case it is needed. logger: If given, a logger object to log progress. [default: None] Returns: the final version of the object. """ if self.final_data is None: self.final_data = self.finalize(config, base, main_data, logger) return self.final_data
[docs] def finalize(self, config, base, main_data, logger): """Perform any final processing at the end of all the image processing. This function will be called after all images have been built. It returns some sort of final version of the object. In the base class, it just returns self.data, but depending on the meaning of the output object, something else might be more appropriate. Parameters: config: The configuration field for this output object. base: The base configuration dict. main_data: The main file data in case it is needed. logger: If given, a logger object to log progress. [default: None] Returns: The final version of the object. """ return self.data
[docs] def writeFile(self, file_name, config, base, logger): """Write this output object to a file. The base class implementation is appropriate for the cas that the result of finalize is a list of images to be written to a FITS file. Parameters: file_name: The file to write to. config: The configuration field for this output object. base: The base configuration dict. logger: If given, a logger object to log progress. [default: None] """ writeMulti(self.final_data, file_name)
[docs] def writeHdu(self, config, base, logger): """Write the data to a FITS HDU with the data for this output object. The base class implementation is appropriate for the cas that the result of finalize is a list of images of length 1 to be written to a FITS file. Parameters: config: The configuration field for this output object. base: The base configuration dict. logger: If given, a logger object to log progress. [default: None] Returns: an HDU with the output data. """ if len(self.data) != 1: # pragma: no cover (Not sure if this is possible.) raise GalSimConfigError( "%d %s images were created. Expecting 1."%(n,self._extra_output_key)) return self.data[0]
[docs]def RegisterExtraOutput(key, builder): """Register an extra output field for use by the config apparatus. The builder parameter should be a subclass of galsim.config.ExtraOutputBuilder. See that class for the functions that should be defined and their signatures. Not all functions need to be overridden. If nothing needs to be done at a particular place in the processing, you can leave the base class function, which doesn't do anything. Parameters: key: The name of the output field in config['output'] builder: A builder object to use for building the extra output object. It should be an instance of a subclass of ExtraOutputBuilder. """ builder._extra_output_key = key valid_extra_outputs[key] = builder
# Nothing is registered here. The appropriate items are registered in extra_*.py.