# Copyright (c) 2012-2023 by the GalSim developers team on GitHub
# https://github.com/GalSim-developers
#
# This file is part of GalSim: The modular galaxy image simulation toolkit.
# https://github.com/GalSim-developers/GalSim
#
# GalSim is free software: redistribution and use in source and binary forms,
# with or without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions, and the disclaimer given in the accompanying LICENSE
# file.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions, and the disclaimer given in the documentation
# and/or other materials provided with the distribution.
#
# Most things require the full specification, e.g. galsim.fits.read.
# Only FitsHeader is brought into the main galsim namespace.
__all__ = [ 'FitsHeader' ]
import os
import io
import numpy as np
import copy
import subprocess
import gzip
import bz2
from io import BytesIO
from .image import Image
from .errors import GalSimError, GalSimValueError, GalSimIncompatibleValuesError, galsim_warn
from ._utilities import basestring, ensure_dir
from ._pyfits import pyfits
from .bounds import BoundsI
from .wcs import readFromFitsHeader
##############################################################################################
#
# We start off with some helper functions for some common operations that will be used in
# more than one of our primary read and write functions.
#
##############################################################################################
def _parse_compression(compression, file_name):
file_compress = None
pyfits_compress = None
if compression == 'rice' or compression == 'RICE_1': pyfits_compress = 'RICE_1'
elif compression == 'gzip_tile' or compression == 'GZIP_1': pyfits_compress = 'GZIP_1'
elif compression == 'hcompress' or compression == 'HCOMPRESS_1': pyfits_compress = 'HCOMPRESS_1'
elif compression == 'plio' or compression == 'PLIO_1': pyfits_compress = 'PLIO_1'
elif compression == 'gzip': file_compress = 'gzip'
elif compression == 'bzip2': file_compress = 'bzip2'
elif compression == 'none' or compression is None: pass
elif compression == 'auto':
if file_name:
if file_name.lower().endswith('.fz'): pyfits_compress = 'RICE_1'
elif file_name.lower().endswith('.gz'): file_compress = 'gzip'
elif file_name.lower().endswith('.bz2'): file_compress = 'bzip2'
else: # pragma: no cover (Not sure why codecov thinks this isn't covered.)
pass
else:
raise GalSimValueError("Invalid compression", compression,
('rice', 'gzip_tile', 'hcompress', 'plio', 'gzip', 'bzip2',
'none', 'auto'))
return file_compress, pyfits_compress
# This is a class rather than a def, since we want to store some variable, and that's easier
# to do with a class than a function. But this will be used as though it were a normal
# function: _read_file(file, file_compress)
class _ReadFile:
# There are two methods available for each of gzip and bzip2. Each is its own function.
# 1. The _call functions call out to an external program. gunzip, bunzip2 as appropriate.
# 2. The _in_mem functions use the corresponding python modules to do things in memory.
#
# As of commit 2e2d643b47fa27dbdcfcb1ba7bd, the write functions were generally faster using
# the in memory functions, but the reads were faster with the external functions.
# cf. GalSim/devel/time_zip.py for details about the timing tests.
def gunzip_call(self, file):
# cf. http://bugs.python.org/issue7471
# We use gunzip -c rather than zcat, since the latter is sometimes called gzcat
# (with zcat being a symlink to uncompress instead).
# Also, I'd rather all these use `with subprocess.Popen(...) as p:`, but that's not
# supported in 2.7. So need to keep things this way for now.
try:
p = subprocess.Popen(["gunzip", "-c", file], stdout=subprocess.PIPE,
stderr=subprocess.PIPE, close_fds=True)
except OSError:
# This OSError should mean that the gunzip call itself was invalid on this system.
# Convert to a NotImplementedError, so we can try a different method.
raise NotImplementedError()
ret = p.communicate()
if ret[0] == b'': # pragma: no cover
raise OSError("Error running gunzip. stderr output = %s"%ret[1])
if p.returncode != 0: # pragma: no cover
raise OSError("Error running gunzip. Return code = %s"%p.returncode)
fin = BytesIO(ret[0])
p.wait()
try:
hdu_list = pyfits.open(fin, 'readonly')
except (OSError, AttributeError, TypeError, ValueError): # pragma: no cover
# In case astropy fails.
raise NotImplementedError()
return hdu_list, fin
def gzip_in_mem(self, file): # pragma: no cover
fin = gzip.open(file, 'rb')
hdu_list = pyfits.open(fin, 'readonly')
# pyfits doesn't actually read the file yet, so we can't close fin here.
# Need to pass it back to the caller and let them close it when they are
# done with hdu_list.
return hdu_list, fin
def bunzip2_call(self, file):
try:
p = subprocess.Popen(["bunzip2", "-c", file], stdout=subprocess.PIPE,
stderr=subprocess.PIPE, close_fds=True)
except OSError:
# This OSError should mean that the gunzip call itself was invalid on this system.
# Convert to a NotImplementedError, so we can try a different method.
raise NotImplementedError()
ret = p.communicate()
if ret[0] == b'': # pragma: no cover
raise OSError("Error running bunzip2. stderr output = %s"%ret[1])
if p.returncode != 0: # pragma: no cover
raise OSError("Error running bunzip2. Return code = %s"%p.returncode)
fin = BytesIO(ret[0])
p.wait()
try:
hdu_list = pyfits.open(fin, 'readonly')
except (OSError, AttributeError, TypeError, ValueError): # pragma: no cover
# In case astropy fails.
raise NotImplementedError()
return hdu_list, fin
def bz2_in_mem(self, file): # pragma: no cover
fin = bz2.BZ2File(file, 'rb')
hdu_list = pyfits.open(fin, 'readonly')
return hdu_list, fin
def __init__(self):
# We used to have multiple options for gzip and bzip2. However, with recent versions of
# astropy for the fits I/O, the in memory version should always work. So we first
# try the command line method, which is usually faster. Then if that fails, we let
# astropy do the compression.
self.gz_index = 0
self.bz2_index = 0
self.gz_methods = [self.gunzip_call, self.gzip_in_mem]
self.bz2_methods = [self.bunzip2_call, self.bz2_in_mem]
self.gz = self.gz_methods[0]
self.bz2 = self.bz2_methods[0]
def __call__(self, file, dir, file_compress):
if dir:
file = os.path.join(dir,file)
if not os.path.isfile(file):
raise OSError("File %s not found"%file)
if not file_compress:
hdu_list = pyfits.open(file, 'readonly')
return hdu_list, None
elif file_compress == 'gzip':
# First make sure the file exists and is readable.
# The easiest way to do this is to try to open it. Just let the open command return
# its normal error message if the file doesn't exist or cannot be opened.
with open(file) as fid: pass
while self.gz_index < len(self.gz_methods):
try:
return self.gz(file)
except (ImportError, NotImplementedError): # pragma: no cover
if self.gz_index == len(self.gz_methods)-1:
raise
else:
self.gz_index += 1
self.gz = self.gz_methods[self.gz_index]
else: # pragma: no cover
raise GalSimError("None of the options for gunzipping were successful.")
elif file_compress == 'bzip2':
with open(file) as fid: pass
while self.bz2_index < len(self.bz2_methods):
try:
return self.bz2(file)
except (ImportError, NotImplementedError): # pragma: no cover
if self.bz2_index == len(self.bz2_methods)-1:
raise
else:
self.bz2_index += 1
self.bz2 = self.bz2_methods[self.bz2_index]
else: # pragma: no cover
raise GalSimError("None of the options for bunzipping were successful.")
else: # pragma: no cover (can't get here from public API)
raise GalSimValueError("Unknown file_compression", file_compress, ('gzip', 'bzip2'))
_read_file = _ReadFile()
# Do the same trick for _write_file(file,hdu_list,clobber,file_compress,pyfits_compress):
class _WriteFile:
# kwargs to use for the writeto function
kw = {'output_verify' : 'silentfix+ignore'}
# There are two methods available for each of gzip and bzip2. Each is its own function.
# 1. The _call functions call out to an external program. gzip, bzip2 as appropriate.
# 2. The _in_mem functions use the corresponding python modules to do things in memory.
#
# As of commit 2e2d643b47fa27dbdcfcb1ba7bd, the write functions were generally faster using
# the in memory functions, but the reads were faster with the external functions.
# cf. GalSim/devel/time_zip.py for details about the timing tests.
#
# As a result, we no longer try multple functions here (as we still do in _ReadFile).
# If the timing status chages, the above mentioned commit has the old code, which first tried
# the external function and the reverted to the in-memory version if that failed.
# No longer used, but preserved for timing tests to see if we should revisit this.
def gzip_call(self, hdu_list, file): # pragma: no cover
with open(file, 'wb') as fout:
try:
p = subprocess.Popen(["gzip", "-"], stdin=subprocess.PIPE, stdout=fout,
close_fds=True)
hdu_list.writeto(p.stdin, **self.kw)
except (OSError, AttributeError, TypeError, ValueError): # pragma: no cover
# This OSError should mean that the gunzip call itself was invalid on this system.
# Convert to a NotImplementedError, so we can try a different method.
# The others are in case astropy fails.
raise NotImplementedError()
p.communicate()
if p.returncode != 0: # pragma: no cover
raise OSError("Error running gzip. Return code = %s"%p.returncode)
p.wait()
def gzip_in_mem(self, hdu_list, file):
# The compression routines work better if we first write to an internal buffer
# and then output that to a file.
buf = io.BytesIO()
hdu_list.writeto(buf, **self.kw)
data = buf.getvalue()
# There is a compresslevel option (for both gzip and bz2), but we just use the
# default.
with gzip.open(file, 'wb') as fout:
fout.write(data)
def bzip2_call(self, hdu_list, file): # pragma: no cover
with open(file, 'wb') as fout:
try:
p = subprocess.Popen(["bzip2"], stdin=subprocess.PIPE, stdout=fout, close_fds=True)
hdu_list.writeto(p.stdin, **self.kw)
except (OSError, AttributeError, TypeError, ValueError): # pragma: no cover
# This OSError should mean that the gunzip call itself was invalid on this system.
# Convert to a NotImplementedError, so we can try a different method.
# The others are in case astropy fails.
raise NotImplementedError()
p.communicate()
if p.returncode != 0: # pragma: no cover
raise OSError("Error running bzip2. Return code = %s"%p.returncode)
p.wait()
def bz2_in_mem(self, hdu_list, file):
buf = io.BytesIO()
hdu_list.writeto(buf, **self.kw)
data = buf.getvalue()
with bz2.BZ2File(file, 'wb') as fout:
fout.write(data)
def __call__(self, file, dir, hdu_list, clobber, file_compress, pyfits_compress):
if dir:
file = os.path.join(dir,file)
ensure_dir(file)
if os.path.isfile(file):
if clobber:
os.remove(file)
else:
raise OSError('File %r already exists'%file)
if not file_compress:
hdu_list.writeto(file, **self.kw)
elif file_compress == 'gzip':
return self.gzip_in_mem(hdu_list, file)
elif file_compress == 'bzip2':
return self.bz2_in_mem(hdu_list, file)
else: # pragma: no cover (can't get here from public API)
raise GalSimValueError("Unknown file_compression", file_compress, ('gzip', 'bzip2'))
_write_file = _WriteFile()
def _add_hdu(hdu_list, data, pyfits_compress):
if pyfits_compress:
if len(hdu_list) == 0:
hdu_list.append(pyfits.PrimaryHDU()) # Need a blank PrimaryHDU
hdu = pyfits.CompImageHDU(data, compression_type=pyfits_compress)
else:
if len(hdu_list) == 0:
hdu = pyfits.PrimaryHDU(data)
else:
hdu = pyfits.ImageHDU(data)
hdu_list.append(hdu)
return hdu
def _add_to_header(hdu, image):
h = FitsHeader(hdu.header)
if hasattr(image, 'header'):
h.extend(image.header)
if image.wcs:
image.wcs.writeToFitsHeader(h, image.bounds)
def _check_hdu(hdu, pyfits_compress, header_only=False):
"""Check that an input ``hdu`` is valid
"""
# Check for fixable verify errors
# Astropy will automatically fix any errors it finds by just accessing the header and data.
hdu.header
if not header_only:
try:
hdu.data
except pyfits.VerifyError: # pragma: no cover
# If just reading the data raises a VerifyError, try again with fix.
# This seems to be OS-specific, and probably astropy-version specific.
# But it's at least possible for this to happen and the verify step to help.
hdu.verify('fix')
hdu.data
# Check that the specified compression is right for the given hdu type.
if pyfits_compress:
if not isinstance(hdu, pyfits.CompImageHDU):
raise OSError('Found invalid HDU type reading FITS file (expected a CompImageHDU)')
else:
if not isinstance(hdu, (pyfits.CompImageHDU, pyfits.ImageHDU, pyfits.PrimaryHDU)):
raise OSError('Found invalid HDU type reading FITS file (expected an ImageHDU)')
def _get_hdu(hdu_list, hdu, pyfits_compress, header_only=False):
if isinstance(hdu_list, pyfits.HDUList):
# Note: Nothing special needs to be done when reading a compressed hdu.
# However, such compressed hdu's may not be the PrimaryHDU, so if we think we are
# reading a compressed file, skip to hdu 1.
if hdu is None:
if pyfits_compress:
if len(hdu_list) <= 1:
raise OSError('Expecting at least one extension HDU in galsim.read')
hdu = 1
else:
hdu = 0
if len(hdu_list) <= hdu:
raise OSError('Expecting at least %d HDUs in galsim.read'%(hdu+1))
hdu = hdu_list[hdu]
elif isinstance(hdu_list, (pyfits.ImageHDU, pyfits.PrimaryHDU, pyfits.CompImageHDU)):
hdu = hdu_list
else:
raise TypeError("Invalid hdu_list: %s",hdu_list)
_check_hdu(hdu, pyfits_compress, header_only)
return hdu
# Unlike the other helpers, this one doesn't start with an underscore, since we make it
# available to people who use the function ReadFile.
[docs]def closeHDUList(hdu_list, fin):
"""If necessary, close the file handle that was opened to read in the ``hdu_list``"""
hdu_list.close()
if fin:
fin.close()
##############################################################################################
#
# Now the primary write functions. We have:
# write(image, ...)
# writeMulti(image_list, ...)
# writeCube(image_list, ...)
# writeFile(hdu_list, ...)
#
##############################################################################################
[docs]def write(image, file_name=None, dir=None, hdu_list=None, clobber=True, compression='auto'):
"""Write a single image to a FITS file.
Write the `Image` instance ``image`` to a FITS file, with details depending on the arguments.
This function can be called directly as ``galsim.fits.write(image, ...)``, with the image as the
first argument, or as an `Image` method: ``image.write(...)``.
Parameters:
image: The `Image` to write to file. Per the description of this method, it may be
given explicitly via ``galsim.fits.write(image, ...)`` or the method may be
called directly as an image method, ``image.write(...)``. Note that if the
image has a 'header' attribute containing a `FitsHeader`, then the
`FitsHeader` is written to the header in the PrimaryHDU, followed by the
WCS as usual.
file_name: The name of the file to write to. [Either ``file_name`` or ``hdu_list`` is
required.]
dir: Optionally a directory name can be provided if ``file_name`` does not
already include it. [default: None]
hdu_list: An astropy.io.fits.HDUList. If this is provided instead of ``file_name``,
then the `Image` is appended to the end of the HDUList as a new HDU. In
that case, the user is responsible for calling either
``hdu_list.writeto(...)`` or ``galsim.fits.writeFile(...)`` afterwards.
[Either ``file_name`` or ``hdu_list`` is required.]
clobber: Setting ``clobber=True`` will silently overwrite existing files.
[default: True]
compression: Which compression scheme to use (if any). Options are:
- None or 'none' = no compression
- 'rice' = use rice compression in tiles (preserves header readability)
- 'gzip' = use gzip to compress the full file
- 'bzip2' = use bzip2 to compress the full file
- 'gzip_tile' = use gzip in tiles (preserves header readability)
- 'hcompress' = use hcompress in tiles (only valid for 2-d images)
- 'plio' = use plio compression in tiles (only valid for pos integer data)
- 'auto' = determine the compression from the extension of the file name
(requires ``file_name`` to be given):
- '.fz' => 'rice'
- '.gz' => 'gzip'
- '.bz2' => 'bzip2'
- otherwise None
[default: 'auto']
"""
if image.iscomplex:
raise GalSimValueError("Cannot write complex Images to a fits file. "
"Write image.real and image.imag separately.", image)
file_compress, pyfits_compress = _parse_compression(compression,file_name)
if file_name and hdu_list is not None:
raise GalSimIncompatibleValuesError(
"Cannot provide both file_name and hdu_list", file_name=file_name, hdu_list=hdu_list)
if not (file_name or hdu_list is not None):
raise GalSimIncompatibleValuesError(
"Must provide either file_name or hdu_list", file_name=file_name, hdu_list=hdu_list)
if hdu_list is None:
hdu_list = pyfits.HDUList()
hdu = _add_hdu(hdu_list, image.array, pyfits_compress)
_add_to_header(hdu, image)
if file_name:
_write_file(file_name, dir, hdu_list, clobber, file_compress, pyfits_compress)
[docs]def writeMulti(image_list, file_name=None, dir=None, hdu_list=None, clobber=True,
compression='auto'):
"""Write a Python list of images to a multi-extension FITS file.
The details of how the images are written to file depends on the arguments.
.. note::
This function along with `readMulti` can be used to effect the equivalent of a simple
version of fpack or funpack. To Rice compress a fits file, you can call::
fname = 'some_image_file.fits'
galsim.fits.writeMulti(galsim.fits.readMulti(fname, read_headers=True), fname+'.fz')
To uncompress::
fname = 'some_image_file.fits.fz'
galsim.fits.writeMulti(galsim.fits.readMulti(fname, read_headers=True), fname[:-3])
Parameters:
image_list: A Python list of `Image` instances. (For convenience, some items in this
list may be HDUs already. Any `Image` will be converted into an
astropy.io.fits.HDU.)
file_name: The name of the file to write to. [Either ``file_name`` or ``hdu_list`` is
required.]
dir: Optionally a directory name can be provided if ``file_name`` does not
already include it. [default: None]
hdu_list: An astropy.io.fits.HDUList. If this is provided instead of ``file_name``,
then the `Image` is appended to the end of the HDUList as a new HDU. In
that case, the user is responsible for calling either
``hdu_list.writeto(...)`` or ``galsim.fits.writeFile(...)`` afterwards.
[Either ``file_name`` or ``hdu_list`` is required.]
clobber: Setting ``clobber=True`` will silently overwrite existing files.
[default: True]
compression: Which compression scheme to use (if any). Options are:
- None or 'none' = no compression
- 'rice' = use rice compression in tiles (preserves header readability)
- 'gzip' = use gzip to compress the full file
- 'bzip2' = use bzip2 to compress the full file
- 'gzip_tile' = use gzip in tiles (preserves header readability)
- 'hcompress' = use hcompress in tiles (only valid for 2-d images)
- 'plio' = use plio compression in tiles (only valid for pos integer data)
- 'auto' = determine the compression from the extension of the file name
(requires ``file_name`` to be given):
- '.fz' => 'rice'
- '.gz' => 'gzip'
- '.bz2' => 'bzip2'
- otherwise None
[default: 'auto']
"""
if any(image.iscomplex for image in image_list if isinstance(image, Image)):
raise GalSimValueError("Cannot write complex Images to a fits file. "
"Write image.real and image.imag separately.", image_list)
file_compress, pyfits_compress = _parse_compression(compression,file_name)
if file_name and hdu_list is not None:
raise GalSimIncompatibleValuesError(
"Cannot provide both file_name and hdu_list", file_name=file_name, hdu_list=hdu_list)
if not (file_name or hdu_list is not None):
raise GalSimIncompatibleValuesError(
"Must provide either file_name or hdu_list", file_name=file_name, hdu_list=hdu_list)
if hdu_list is None:
hdu_list = pyfits.HDUList()
for image in image_list:
if isinstance(image, Image):
hdu = _add_hdu(hdu_list, image.array, pyfits_compress)
_add_to_header(hdu, image)
else:
# Assume that image is really an HDU. If not, this should give a reasonable error
# message. (The base type of HDUs vary among versions of pyfits, so it's hard to
# check explicitly with an isinstance call. For newer pyfits versions, it is
# pyfits.hdu.base.ExtensionHDU, but not in older versions.)
hdu_list.append(image)
if file_name:
_write_file(file_name, dir, hdu_list, clobber, file_compress, pyfits_compress)
[docs]def writeCube(image_list, file_name=None, dir=None, hdu_list=None, clobber=True,
compression='auto'):
"""Write a Python list of images to a FITS file as a data cube.
The details of how the images are written to file depends on the arguments. Unlike for
writeMulti, when writing a data cube it is necessary that each `Image` in ``image_list`` has
the same size ``(nx, ny)``. No check is made to confirm that all images have the same origin
and pixel scale (or WCS).
In fact, the WCS of the first image is the one that gets put into the FITS header (since only
one WCS can be put into a FITS header). Thus, if the images have different WCS functions,
only the first one will be rendered correctly by plotting programs such as ds9. The FITS
standard does not support any way to have the various images in a data cube to have different
WCS solutions.
Parameters:
image_list: The ``image_list`` can also be either an array of NumPy arrays or a 3d NumPy
array, in which case this is written to the fits file directly. In the
former case, no explicit check is made that the NumPy arrays are all the
same shape, but a NumPy exception will be raised which we let pass upstream
unmolested.
file_name: The name of the file to write to. [Either ``file_name`` or ``hdu_list`` is
required.]
dir: Optionally a directory name can be provided if ``file_name`` does not
already include it. [default: None]
hdu_list: An astropy.io.fits.HDUList. If this is provided instead of ``file_name``,
then the cube is appended to the end of the HDUList as a new HDU. In that
case, the user is responsible for calling either ``hdu_list.writeto(...)``
or ``galsim.fits.writeFile(...)`` afterwards. [Either ``file_name`` or
``hdu_list`` is required.]
clobber: Setting ``clobber=True`` will silently overwrite existing files.
[default: True]
compression: Which compression scheme to use (if any). Options are:
- None or 'none' = no compression
- 'rice' = use rice compression in tiles (preserves header readability)
- 'gzip' = use gzip to compress the full file
- 'bzip2' = use bzip2 to compress the full file
- 'gzip_tile' = use gzip in tiles (preserves header readability)
- 'hcompress' = use hcompress in tiles (only valid for 2-d images)
- 'plio' = use plio compression in tiles (only valid for pos integer data)
- 'auto' = determine the compression from the extension of the file name
(requires ``file_name`` to be given):
- '.fz' => 'rice'
- '.gz' => 'gzip'
- '.bz2' => 'bzip2'
- otherwise None
[default: 'auto']
"""
if isinstance(image_list, np.ndarray):
is_all_numpy = True
if image_list.dtype.kind == 'c':
raise GalSimValueError("Cannot write complex numpy arrays to a fits file. "
"Write array.real and array.imag separately.", image_list)
elif len(image_list) == 0:
raise GalSimValueError("In writeCube: image_list has no images", image_list)
elif all(isinstance(item, np.ndarray) for item in image_list):
is_all_numpy = True
if any(a.dtype.kind == 'c' for a in image_list):
raise GalSimValueError("Cannot write complex numpy arrays to a fits file. "
"Write array.real and array.imag separately.", image_list)
else:
is_all_numpy = False
if any(im.iscomplex for im in image_list):
raise GalSimValueError("Cannot write complex images to a fits file. "
"Write image.real and image.imag separately.", image_list)
file_compress, pyfits_compress = _parse_compression(compression,file_name)
if file_name and hdu_list is not None:
raise GalSimIncompatibleValuesError(
"Cannot provide both file_name and hdu_list", file_name=file_name, hdu_list=hdu_list)
if not (file_name or hdu_list is not None):
raise GalSimIncompatibleValuesError(
"Must provide either file_name or hdu_list", file_name=file_name, hdu_list=hdu_list)
if hdu_list is None:
hdu_list = pyfits.HDUList()
if is_all_numpy:
cube = np.asarray(image_list)
nimages = cube.shape[0]
nx = cube.shape[1]
ny = cube.shape[2]
# Use default values for bounds
bounds = BoundsI(1,nx,1,ny)
wcs = None
else:
nimages = len(image_list)
im = image_list[0]
dtype = im.array.dtype
nx = im.xmax - im.xmin + 1
ny = im.ymax - im.ymin + 1
# Use the first image's wcs and bounds
wcs = im.wcs
bounds = im.bounds
# Note: numpy shape is y,x
array_shape = (nimages, ny, nx)
cube = np.zeros(array_shape, dtype=dtype)
for k in range(nimages):
im = image_list[k]
nx_k = im.xmax-im.xmin+1
ny_k = im.ymax-im.ymin+1
if nx_k != nx or ny_k != ny:
raise GalSimValueError("In writeCube: image %d has the wrong shape. "
"Shape is (%d,%d) should be (%d,%d)"%(k,nx_k,ny_k,nx,ny),
im)
cube[k,:,:] = image_list[k].array
hdu = _add_hdu(hdu_list, cube, pyfits_compress)
if not is_all_numpy:
# Use any header/wcs in the first image
_add_to_header(hdu, image_list[0])
if file_name:
_write_file(file_name, dir, hdu_list, clobber, file_compress, pyfits_compress)
[docs]def writeFile(file_name, hdu_list, dir=None, clobber=True, compression='auto'):
"""Write a Pyfits hdu_list to a FITS file, taking care of the GalSim compression options.
If you have used the write(), writeMulti() or writeCube() functions with the ``hdu_list``
option rather than writing directly to a file, you may subsequently use the command
``hdu_list.writeto(...)``. However, it may be more convenient to use this function, writeFile()
instead, since it treats the compression option consistently with how that option is handled in
the above functions.
Parameters:
file_name: The name of the file to write to.
hdu_list: An astropy.io.fits.HDUList.
dir: Optionally a directory name can be provided if ``file_name`` does not
already include it. [default: None]
clobber: Setting ``clobber=True`` will silently overwrite existing files.
[default: True]
compression: Which compression scheme to use (if any). Options are:
- None or 'none' = no compression
- 'gzip' = use gzip to compress the full file
- 'bzip2' = use bzip2 to compress the full file
- 'auto' = determine the compression from the extension of the file name
(requires ``file_name`` to be given):
- '.gz' => 'gzip'
- '.bz2' => 'bzip2'
- otherwise None
Note that the other options, such as 'rice', that operate on the image
directly are not available at this point. If you want to use one of them,
it must be applied when writing each hdu.
[default: 'auto']
"""
file_compress, pyfits_compress = _parse_compression(compression,file_name)
if pyfits_compress and compression != 'auto':
# If compression is auto and it determined that it should use rice, then we
# should presume that the hdus were already rice compressed, so we can ignore it here.
# Otherwise, any pyfits_compression options are invalid.
raise GalSimValueError("Compression %s is invalid for writeFile",compression)
_write_file(file_name, dir, hdu_list, clobber, file_compress, pyfits_compress)
##############################################################################################
#
# Now the primary read functions. We have:
# image = read(...)
# image_list = readMulti(...)
# image_list = readCube(...)
# hdu, hdu_list, fin = readFile(...)
#
##############################################################################################
[docs]def read(file_name=None, dir=None, hdu_list=None, hdu=None, compression='auto',
read_header=False, suppress_warning=True):
"""Construct an `Image` from a FITS file or HDUList.
The normal usage for this function is to read a fits file and return the image contained
therein, automatically decompressing it if necessary. However, you may also pass it
an HDUList, in which case it will select the indicated hdu (with the ``hdu`` parameter)
from that.
Not all FITS pixel types are supported -- only ``short``, ``int``, ``unsigned short``,
``unsigned int``, ``float``, and ``double``.
If the FITS header has keywords that start with ``GS_``, these will be used to initialize the
bounding box and WCS. If these are absent, the code will try to read whatever WCS is given
in the FITS header. cf. `galsim.wcs.readFromFitsHeader`. The default bounding box will have
``(xmin,ymin)`` at ``(1,1)``. The default WCS, if there is no WCS information in the FITS
file, will be PixelScale(1.0).
This function is called as ``im = galsim.fits.read(...)``
Parameters:
file_name: The name of the file to read in. [Either ``file_name`` or ``hdu_list`` is
required.]
dir: Optionally a directory name can be provided if ``file_name`` does not
already include it. [default: None]
hdu_list: Either an astropy.io.fits.HDUList, astropy.io.fits.PrimaryHDU, or
astropy.io.fits..ImageHDU. In the former case, the ``hdu`` in the list
will be selected. In the latter two cases, the ``hdu`` parameter is
ignored. [Either ``file_name`` or ``hdu_list`` is required.]
hdu: The number of the HDU to return. [default: None, which means to return
either the primary or first extension as appropriate for the given
compression. (e.g. for 'rice', the first extension is the one you normally
want.)]
compression: Which decompression scheme to use (if any). Options are:
- None or 'none' = no decompression
- 'rice' = use rice decompression in tiles
- 'gzip' = use gzip to decompress the full file
- 'bzip2' = use bzip2 to decompress the full file
- 'gzip_tile' = use gzip decompression in tiles
- 'hcompress' = use hcompress decompression in tiles
- 'plio' = use plio decompression in tiles
- 'auto' = determine the decompression from the extension of the file name
(requires ``file_name`` to be given).
- '.fz' => 'rice'
- '.gz' => 'gzip'
- '.bz2' => 'bzip2'
- otherwise None
[default: 'auto']
read_header: Whether to read the header and store it as image.header.[default: False]
suppress_warning: Whether to suppress a warning that the WCS could not be read from the
FITS header, so the WCS defaulted to either a `PixelScale` or
`AffineTransform`. [default: True]
Returns:
the image as an `Image` instance.
"""
file_compress, pyfits_compress = _parse_compression(compression,file_name)
if file_name and hdu_list is not None:
raise GalSimIncompatibleValuesError(
"Cannot provide both file_name and hdu_list", file_name=file_name, hdu_list=hdu_list)
if not (file_name or hdu_list is not None):
raise GalSimIncompatibleValuesError(
"Must provide either file_name or hdu_list", file_name=file_name, hdu_list=hdu_list)
if file_name:
hdu_list, fin = _read_file(file_name, dir, file_compress)
try:
hdu = _get_hdu(hdu_list, hdu, pyfits_compress)
if hdu.data is None:
raise OSError("HDU is empty. (data is None)")
wcs, origin = readFromFitsHeader(hdu.header, suppress_warning)
dt = hdu.data.dtype.type
if dt in Image.valid_dtypes:
data = hdu.data
else:
galsim_warn("No C++ Image template instantiation for data type %s. "
"Using numpy.float64 instead."%(dt))
data = hdu.data.astype(np.float64)
image = Image(array=data)
image.setOrigin(origin)
image.wcs = wcs
if read_header:
image.header = FitsHeader(hdu.header)
finally:
# If we opened a file, don't forget to close it.
if file_name:
closeHDUList(hdu_list, fin)
return image
[docs]def readMulti(file_name=None, dir=None, hdu_list=None, compression='auto',
read_headers=False, suppress_warning=True):
"""Construct a list of `Image` instances from a FITS file or HDUList.
The normal usage for this function is to read a fits file and return a list of all the images
contained therein, automatically decompressing them if necessary. However, you may also pass
it an HDUList, in which case it will build the images from these directly.
Not all FITS pixel types are supported -- only ``short``, ``int``, ``unsigned short``,
``unsigned int``, ``float``, and ``double``.
If the FITS header has keywords that start with ``GS_``, these will be used to initialize the
bounding box and WCS. If these are absent, the code will try to read whatever WCS is given
in the FITS header. cf. `galsim.wcs.readFromFitsHeader`. The default bounding box will have
``(xmin,ymin)`` at ``(1,1)``. The default WCS, if there is no WCS information in the FITS
file, will be PixelScale(1.0).
This function is called as ``im = galsim.fits.readMulti(...)``
.. note::
This function along with `writeMulti` can be used to effect the equivalent of a simple
version of fpack or funpack. To Rice compress a fits file, you can call::
fname = 'some_image_file.fits'
galsim.fits.writeMulti(galsim.fits.readMulti(fname, read_headers=True), fname+'.fz')
To uncompress::
fname = 'some_image_file.fits.fz'
galsim.fits.writeMulti(galsim.fits.readMulti(fname, read_headers=True), fname[:-3])
Parameters:
file_name: The name of the file to read in. [Either ``file_name`` or ``hdu_list`` is
required.]
dir: Optionally a directory name can be provided if ``file_name`` does not
already include it. [default: None]
hdu_list: An astropy.io.fits.HDUList from which to read the images. [Either
``file_name`` or ``hdu_list`` is required.]
compression: Which decompression scheme to use (if any). Options are:
- None or 'none' = no decompression
- 'rice' = use rice decompression in tiles
- 'gzip' = use gzip to decompress the full file
- 'bzip2' = use bzip2 to decompress the full file
- 'gzip_tile' = use gzip decompression in tiles
- 'hcompress' = use hcompress decompression in tiles
- 'plio' = use plio decompression in tiles
- 'auto' = determine the decompression from the extension of the file name
(requires ``file_name`` to be given).
- '.fz' => 'rice'
- '.gz' => 'gzip'
- '.bz2' => 'bzip2'
- otherwise None
[default: 'auto']
read_headers: Whether to read the headers and store them as image.header.[default: False]
suppress_warning: Whether to suppress a warning that the WCS could not be read from the
FITS header, so the WCS defaulted to either a `PixelScale` or
`AffineTransform`. [default: True]
Returns:
a Python list of `Image` instances.
"""
file_compress, pyfits_compress = _parse_compression(compression,file_name)
if file_name and hdu_list is not None:
raise GalSimIncompatibleValuesError(
"Cannot provide both file_name and hdu_list", file_name=file_name, hdu_list=hdu_list)
if not (file_name or hdu_list is not None):
raise GalSimIncompatibleValuesError(
"Must provide either file_name or hdu_list", file_name=file_name, hdu_list=hdu_list)
if file_name:
hdu_list, fin = _read_file(file_name, dir, file_compress)
elif not isinstance(hdu_list, pyfits.HDUList):
raise TypeError("In readMulti, hdu_list is not an HDUList")
try:
image_list = []
if pyfits_compress:
first = 1
if len(hdu_list) <= 1:
raise OSError('Expecting at least one extension HDU in galsim.read')
else:
first = 0
if len(hdu_list) < 1:
raise OSError('Expecting at least one HDU in galsim.readMulti')
for hdu in range(first,len(hdu_list)):
image_list.append(read(hdu_list=hdu_list, hdu=hdu, compression=pyfits_compress,
read_header=read_headers, suppress_warning=suppress_warning))
finally:
# If we opened a file, don't forget to close it.
if file_name:
closeHDUList(hdu_list, fin)
return image_list
[docs]def readCube(file_name=None, dir=None, hdu_list=None, hdu=None, compression='auto',
suppress_warning=True):
"""Construct a Python list of `Image` instances from a FITS data cube.
Not all FITS pixel types are supported -- only ``short``, ``int``, ``unsigned short``,
``unsigned int``, ``float``, and ``double``.
If the FITS header has keywords that start with ``GS_``, these will be used to initialize the
bounding box and WCS. If these are absent, the code will try to read whatever WCS is given
in the FITS header. cf. `galsim.wcs.readFromFitsHeader`. The default bounding box will have
``(xmin,ymin)`` at ``(1,1)``. The default WCS, if there is no WCS information in the FITS
file, will be PixelScale(1.0).
This function is called as ``image_list = galsim.fits.readCube(...)``
Parameters:
file_name: The name of the file to read in. [Either ``file_name`` or ``hdu_list`` is
required.]
dir: Optionally a directory name can be provided if ``file_name`` does not
already include it. [default: None]
hdu_list: Either an astropy.io.fits.HDUList, an astropy.io.fits.PrimaryHDU, or
astropy.io.fits.ImageHDU. In the former case, the ``hdu`` in the list will
be selected. In the latter two cases, the ``hdu`` parameter is ignored.
[Either ``file_name`` or ``hdu_list`` is required.]
hdu: The number of the HDU to return. [default: None, which means to return
either the primary or first extension as appropriate for the given
compression. (e.g. for rice, the first extension is the one you normally
want.)]
compression: Which decompression scheme to use (if any). Options are:
- None or 'none' = no decompression
- 'rice' = use rice decompression in tiles
- 'gzip' = use gzip to decompress the full file
- 'bzip2' = use bzip2 to decompress the full file
- 'gzip_tile' = use gzip decompression in tiles
- 'hcompress' = use hcompress decompression in tiles
- 'plio' = use plio decompression in tiles
- 'auto' = determine the decompression from the extension of the file name
(requires ``file_name`` to be given).
- '.fz' => 'rice'
- '.gz' => 'gzip'
- '.bz2' => 'bzip2'
- otherwise None
[default: 'auto']
suppress_warning: Whether to suppress a warning that the WCS could not be read from the
FITS header, so the WCS defaulted to either a `PixelScale` or
`AffineTransform`. [default: True]
Returns:
a Python list of `Image` instances.
"""
file_compress, pyfits_compress = _parse_compression(compression,file_name)
if file_name and hdu_list is not None:
raise GalSimIncompatibleValuesError(
"Cannot provide both file_name and hdu_list", file_name=file_name, hdu_list=hdu_list)
if not (file_name or hdu_list is not None):
raise GalSimIncompatibleValuesError(
"Must provide either file_name or hdu_list", file_name=file_name, hdu_list=hdu_list)
if file_name:
hdu_list, fin = _read_file(file_name, dir, file_compress)
try:
hdu = _get_hdu(hdu_list, hdu, pyfits_compress)
if hdu.data is None:
raise OSError("HDU is empty. (data is None)")
wcs, origin = readFromFitsHeader(hdu.header, suppress_warning)
dt = hdu.data.dtype.type
if dt in Image.valid_dtypes:
data = hdu.data
else:
galsim_warn("No C++ Image template instantiation for data type %s. "
"Using numpy.float64 instead."%(dt))
data = hdu.data.astype(np.float64)
data = np.atleast_3d(data)
nimages = data.shape[0]
image_list = []
for k in range(nimages):
image = Image(array=data[k,:,:])
image.setOrigin(origin)
image.wcs = wcs
image_list.append(image)
finally:
# If we opened a file, don't forget to close it.
if file_name:
closeHDUList(hdu_list, fin)
return image_list
[docs]def readFile(file_name, dir=None, hdu=None, compression='auto'):
"""Read in a Pyfits hdu_list from a FITS file, taking care of the GalSim compression options.
If you want to do something different with an hdu or hdu_list than one of our other read
functions, you can use this function. It handles the compression options in the standard
GalSim way and just returns the hdu (and hdu_list) for you to use as you see fit.
This function is called as::
>>> hdu, hdu_list, fin = galsim.fits.readFile(...)
The first item in the returned tuple is the specified hdu (or the primary if none was
specifically requested). The other two are returned so you can properly close them.
They are the full HDUList and possibly a file handle. The appropriate cleanup can be
done with::
>>> galsim.fits.closeHDUList(hdu_list, fin)
Parameters:
file_name: The name of the file to read in.
dir: Optionally a directory name can be provided if ``file_name`` does not
already include it. [default: None]
hdu: The number of the HDU to return. [default: None, which means to return
either the primary or first extension as appropriate for the given
compression. (e.g. for rice, the first extension is the one you normally
want.)]
compression: Which decompression scheme to use (if any). Options are:
- None or 'none' = no decompression
- 'rice' = use rice decompression in tiles
- 'gzip' = use gzip to decompress the full file
- 'bzip2' = use bzip2 to decompress the full file
- 'gzip_tile' = use gzip decompression in tiles
- 'hcompress' = use hcompress decompression in tiles
- 'plio' = use plio decompression in tiles
- 'auto' = determine the decompression from the extension of the file name
(requires ``file_name`` to be given).
- '.fz' => 'rice'
- '.gz' => 'gzip'
- '.bz2' => 'bzip2'
- otherwise None
[default: 'auto']
Returns:
a tuple with three items: ``(hdu, hdu_list, fin)``.
"""
file_compress, pyfits_compress = _parse_compression(compression,file_name)
hdu_list, fin = _read_file(file_name, dir, file_compress)
hdu = _get_hdu(hdu_list, hdu, pyfits_compress)
return hdu, hdu_list, fin
##############################################################################################
#
# Finally, we have a class for handling FITS headers called FitsHeader.
#
##############################################################################################
# inject write as method of Image class
Image.write = write