from __future__ import division
# =============================================================================
# IMPORTS
# =============================================================================
# general imports
import os
import glob
import copy
import logging
import numpy as np
# astropy imports
import pysiaf
from astropy.io import fits
from astropy.table import Table
# jwst imports
from jwst.pipeline import Detector1Pipeline
from stdatamodels.jwst import datamodels
# webbpsf_ext imports
from webbpsf_ext.logging_utils import setup_logging
from webbpsf_ext.imreg_tools import get_files
from webbpsf_ext.imreg_tools import get_coron_apname as nircam_apname
# helper functions
from .utils import get_nrcmask_from_apname, get_filter_info, get_pce_info, config_stpipe_log
# Set up log.
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
# =============================================================================
# MAIN
# =============================================================================
# Initialize SIAF instruments.
siaf_nrc = pysiaf.Siaf('NIRCam')
siaf_nis = pysiaf.Siaf('NIRISS')
siaf_mir = pysiaf.Siaf('MIRI')
setup_logging('WARN', verbose=False)
[docs]
class Database():
"""
The central spaceKLIP database class.
"""
def __init__(self,
output_dir):
"""
Initialize the central spaceKLIP database class. It stores the
observational metadata and keeps track of the reduction steps.
The pre-PSF subtraction data is stored in the Database.obs dictionary
and the post-PSF subtraction data is stored in the Database.red
dictionary. They contain a table of metadata for each concatenation,
which are identified automatically based on instrument, filter, pupil
mask, and image mask. The tables can be edited by the user at any
stage of the data reduction process and spaceKLIP will continue with
the updated metadata (e.g., modified host star position).
A non-verbose mode is available by setting Database.verbose = False.
Parameters
----------
output_dir : path
Directory where the reduction products shall be saved.
Returns
-------
None.
"""
# Output directory for saving reduction products.
self.output_dir = output_dir
# Check if output directory exists
if not os.path.isdir(self.output_dir):
log.warning(f'Output directory does not exist. Creating {self.output_dir}.')
os.makedirs(self.output_dir)
# Initialize observations dictionary which contains the individual
# concatenations.
self.obs = {}
# Initialize reductions dictionary which contains the individual
# concatenations.
self.red = {}
# Initialize source dictionary which contains the individual
# companions.
self.src = {}
# Verbose mode?
self.verbose = True
pass
[docs]
def read_jwst_s012_data(self,
datapaths,
psflibpaths=None,
bgpaths=None,
cr_from_siaf=False,
assoc_using_targname=True):
"""
Read JWST stage 0 (uncal), 1 (rate or rateints), or 2 (cal or
calints) data into the Database.obs dictionary. It contains a table of
metadata for each concatenation, which are identified automatically
based on instrument, filter, pupil mask, and image mask. The tables can
be edited by the user at any stage of the data reduction process and
spaceKLIP will continue with the updated metadata (e.g., modified host
star position).
Parameters
----------
datapaths : list of paths
List of paths of the input JWST data. SpaceKLIP will try to
automatically identify science data, PSF references, target
acquisition frames, and MIRI background observations. If spaceKLIP
does not get things right, you may use the 'psflibpaths' and
'bgpaths' keywords below.
psflibpaths : list of paths, optional
List of paths of the input PSF references. Make sure that they are
NOT duplicated in the 'datapaths'. The default is None.
bgpaths : list of paths, optional
List of paths of the input MIRI background observations. Make sure
that they ARE duplicated in the 'datapaths' or 'psflibpaths'. The
default is None.
assoc_using_targname : bool, optional
If True associate TA and BG observations to their corresponding
SCI and REF observations based on the target name from the APT
file. Otherwise, only consider the instrument parameters to
distinguish between SCI and REF TA/BG. The default is True.
Returns
-------
None.
"""
# Check input.
if isinstance(datapaths, str):
datapaths = [datapaths]
if len(datapaths) == 0:
raise UserWarning('Could not find any data paths')
if isinstance(psflibpaths, str):
psflibpaths = [psflibpaths]
if isinstance(bgpaths, str):
bgpaths = [bgpaths]
if bgpaths is not None:
for i in range(len(bgpaths)):
if psflibpaths is not None:
if bgpaths[i] not in datapaths and bgpaths[i] not in psflibpaths:
raise UserWarning('Background path ' + bgpaths[i] + ' does not occur in data or PSF library paths')
else:
if bgpaths[i] not in datapaths:
raise UserWarning('Background path ' + bgpaths[i] + ' does not occur in data paths')
# Read FITS headers.
DATAMODL = []
TELESCOP = []
TARGPROP = []
TARG_RA = [] # deg
TARG_DEC = [] # deg
INSTRUME = []
DETECTOR = []
FILTER = []
CWAVEL = [] # micron
DWAVEL = [] # micron
PUPIL = []
CORONMSK = []
EXP_TYPE = []
EXPSTART = [] # MJD
NINTS = []
EFFINTTM = [] # s
IS_PSF = []
SELFREF = []
SUBARRAY = []
NUMDTHPT = []
XOFFSET = [] # arcsec
YOFFSET = [] # arcsec
APERNAME = []
PPS_APER = []
PIXSCALE = [] # arcsec
PIXAR_SR = [] # sr
BUNIT = []
CRPIX1 = [] # pix
CRPIX2 = [] # pix
MASKCENX = [] # pix
MASKCENY = [] # pix
NANMASKCENX = [] # pix
NANMASKCENY = []
STARCENX = [] # pix
STARCENY = [] # pix
CROP_SHIFTX = [] # pix
CROP_SHIFTY = [] # pix
ALIGN_SHIFT = [] # pix
CENTER_SHIFT = [] # pix
ALIGN_MASK = [] # pix
CENTER_MASK = [] # pix
VPARITY = []
V3I_YANG = [] # deg
RA_REF = [] # deg
DEC_REF = [] # deg
ROLL_REF = [] # deg
BLURFWHM = [] # pix
HASH = []
if psflibpaths is not None:
allpaths = np.array(datapaths + psflibpaths)
else:
allpaths = np.array(datapaths)
Nallpaths = len(allpaths)
for i in range(Nallpaths):
hdul = fits.open(allpaths[i])
head = hdul[0].header
# Only read in the data if needed.
data = hdul['SCI'].data if not head.get('NINTS') else None
if 'uncal' in allpaths[i]:
DATAMODL += ['STAGE0']
elif 'rate' in allpaths[i] or 'rateints' in allpaths[i]:
DATAMODL += ['STAGE1']
elif 'cal' in allpaths[i] or 'calints' in allpaths[i]:
DATAMODL += ['STAGE2']
else:
raise UserWarning('File name must contain one of the following: uncal, rate, rateints, cal, calints')
TELESCOP += [head.get('TELESCOP', 'JWST')]
TARGPROP += [head.get('TARGPROP', 'UNKNOWN')]
TARG_RA += [head.get('TARG_RA', np.nan)]
TARG_DEC += [head.get('TARG_DEC', np.nan)]
INSTRUME += [head.get('INSTRUME', 'UNKNOWN')]
DETECTOR += [head.get('DETECTOR', 'UNKNOWN')]
FILTER += [head['FILTER']]
PUPIL += [head.get('PUPIL', 'NONE')]
if TELESCOP[-1] == 'JWST':
if INSTRUME[-1] in ('NIRCAM', 'MIRI'):
try:
_pce = get_pce_info(INSTRUME[-1], FILTER[-1], DETECTOR[-1],
head.get('EXP_TYPE', 'UNKNOWN'))
CWAVEL += [_pce['WavelengthPivot'] / 1e4] # angstrom -> micron
DWAVEL += [_pce['WidthEff'] / 1e4] # angstrom -> micron
except KeyError:
log.warning(f'PCE data not found for {INSTRUME[-1]} {FILTER[-1]} '
f'{DETECTOR[-1]}. Using NaN for CWAVEL/DWAVEL.')
CWAVEL += [np.nan]
DWAVEL += [np.nan]
elif INSTRUME[-1] == 'NIRISS':
log.warning('NIRISS PCE data not available. Using NaN for CWAVEL/DWAVEL.')
CWAVEL += [np.nan]
DWAVEL += [np.nan]
else:
raise UserWarning('Data originates from unknown JWST instrument')
else:
raise UserWarning('Data originates from unknown telescope')
EXP_TYPE += [head.get('EXP_TYPE', 'UNKNOWN')]
EXPSTART += [head.get('EXPSTART', np.nan)]
nintegrations = head.get('NINTS', data.shape[0] if data is not None and data.ndim == 3 else 1)
NINTS += [nintegrations]
EFFINTTM += [head.get('EFFINTTM', np.nan)]
IS_PSF += [head.get('IS_PSF', 'NONE')]
SELFREF += [head.get('SELFREF', 'NONE')]
SUBARRAY += [head.get('SUBARRAY', 'UNKNOWN')]
NUMDTHPT += [head.get('NUMDTHPT', 1)]
XOFFSET += [head.get('XOFFSET', 0.)]
YOFFSET += [head.get('YOFFSET', 0.)]
apname = nircam_apname(head) if INSTRUME[-1] == 'NIRCAM' else head.get('APERNAME', 'UNKNOWN')
APERNAME += [apname]
PPS_APER += [head.get('PPS_APER', 'UNKNOWN')]
coronmask = get_nrcmask_from_apname(PPS_APER[-1]) if INSTRUME[-1] == 'NIRCAM' else head.get('CORONMSK', 'NONE')
CORONMSK += [coronmask]
if TELESCOP[-1] == 'JWST':
if INSTRUME[-1] == 'NIRCAM':
ap = siaf_nrc[apname]
elif INSTRUME[-1] == 'NIRISS':
ap = siaf_nis[apname]
elif INSTRUME[-1] == 'MIRI':
ap = siaf_mir[apname]
else:
raise UserWarning('Data originates from unknown JWST instrument')
# Save the average of the X and Y pixel scales.
PIXSCALE += [(ap.XSciScale + ap.YSciScale) / 2.]
else:
raise UserWarning('Data originates from unknown telescope')
BLURFWHM += [head.get('BLURFWHM', np.nan)]
head = hdul['SCI'].header
PIXAR_SR += [head.get('PIXAR_SR', np.nan)]
BUNIT += [head.get('BUNIT', 'NONE')]
if cr_from_siaf:
CRPIX1 += [ap.XSciRef]
CRPIX2 += [ap.YSciRef]
else:
CRPIX1 += [head.get('CRPIX1', np.nan)]
CRPIX2 += [head.get('CRPIX2', np.nan)]
MASKCENX += [head.get('MASKCENX', float(CRPIX1[i]))]
MASKCENY += [head.get('MASKCENY', float(CRPIX2[i]))]
NANMASKCENX += [head.get('NANMASKCENX', float(CRPIX1[i]))]
NANMASKCENY += [head.get('NANMASKCENY', float(CRPIX2[i]))]
STARCENX += [head.get('STARCENX', MASKCENX[-1])]
STARCENY += [head.get('STARCENY', MASKCENY[-1])]
CROP_SHIFTX += [head.get('CROP_SHIFTX', np.nan)]
CROP_SHIFTY += [head.get('CROP_SHIFTY', np.nan)]
ALIGN_SHIFT += [head.get('ALIGN_SHIFT', np.zeros((nintegrations, 2)))]
CENTER_SHIFT += [head.get('CENTER_SHIFT', np.zeros((nintegrations, 2)))]
ALIGN_MASK += [head.get('ALIGN_MASK', np.zeros((2)))]
CENTER_MASK += [head.get('CENTER_MASK', np.zeros((2)))]
VPARITY += [head.get('VPARITY', -1)]
V3I_YANG += [head.get('V3I_YANG', 0.)]
RA_REF += [head.get('RA_REF', np.nan)]
DEC_REF += [head.get('DEC_REF', np.nan)]
ROLL_REF += [head.get('ROLL_REF', 0.)]
HASH += [TELESCOP[-1] + '_' + INSTRUME[-1] + '_' + DETECTOR[-1] + '_' + FILTER[-1] + '_' + PUPIL[-1] + '_' + CORONMSK[-1] + '_' + SUBARRAY[-1]]
hdul.close()
DATAMODL = np.array(DATAMODL)
TELESCOP = np.array(TELESCOP)
TARGPROP = np.array(TARGPROP)
TARG_RA = np.array(TARG_RA)
TARG_DEC = np.array(TARG_DEC)
INSTRUME = np.array(INSTRUME)
DETECTOR = np.array(DETECTOR)
FILTER = np.array(FILTER)
CWAVEL = np.array(CWAVEL)
DWAVEL = np.array(DWAVEL)
PUPIL = np.array(PUPIL)
CORONMSK = np.array(CORONMSK)
EXP_TYPE = np.array(EXP_TYPE)
EXPSTART = np.array(EXPSTART)
NINTS = np.array(NINTS)
EFFINTTM = np.array(EFFINTTM)
IS_PSF = np.array(IS_PSF, dtype='<U5')
SELFREF = np.array(SELFREF, dtype='<U5')
SUBARRAY = np.array(SUBARRAY)
NUMDTHPT = np.array(NUMDTHPT)
XOFFSET = np.array(XOFFSET)
YOFFSET = np.array(YOFFSET)
APERNAME = np.array(APERNAME)
PPS_APER = np.array(PPS_APER)
PIXSCALE = np.array(PIXSCALE)
PIXAR_SR = np.array(PIXAR_SR)
BUNIT = np.array(BUNIT)
CRPIX1 = np.array(CRPIX1)
CRPIX2 = np.array(CRPIX2)
MASKCENX = np.array(MASKCENX)
MASKCENY = np.array(MASKCENY)
NANMASKCENX = np.array(NANMASKCENX)
NANMASKCENY = np.array(NANMASKCENY)
STARCENX = np.array(STARCENX)
STARCENY = np.array(STARCENY)
CROP_SHIFTX = np.array(CROP_SHIFTX)
CROP_SHIFTY = np.array(CROP_SHIFTY)
ALIGN_SHIFT = np.array(ALIGN_SHIFT, dtype='object')
CENTER_SHIFT = np.array(CENTER_SHIFT, dtype='object')
ALIGN_MASK = np.array(ALIGN_MASK, dtype='object')
CENTER_MASK = np.array(CENTER_MASK, dtype='object')
VPARITY = np.array(VPARITY)
V3I_YANG = np.array(V3I_YANG)
RA_REF = np.array(RA_REF)
DEC_REF = np.array(DEC_REF)
ROLL_REF = np.array(ROLL_REF)
BLURFWHM = np.array(BLURFWHM)
HASH = np.array(HASH)
# Find unique concatenations.
HASH_unique = np.unique(HASH)
NHASH_unique = len(HASH_unique)
# Associate TA files with science or reference files.
ww = []
for i in range(NHASH_unique):
if 'MASKRND_NONE' in HASH_unique[i] or 'MASKBAR_NONE' in HASH_unique[i] or 'NONE_MASK1065' in HASH_unique[i] or 'NONE_MASK1140' in HASH_unique[i] or 'NONE_MASK1550' in HASH_unique[i] or 'NONE_MASKLYOT' in HASH_unique[i]:
ww += [-1]
HASH_i_split = HASH_unique[i].split('_')
for j in range(NHASH_unique):
HASH_j_split = HASH_unique[j].split('_')
if HASH_j_split[0] == HASH_i_split[0] and HASH_j_split[1] == HASH_i_split[1] and HASH_j_split[2] == HASH_i_split[2] and HASH_j_split[4] == HASH_i_split[4] and HASH_j_split[5] != 'NONE' and HASH_j_split[6][-4:] in HASH_i_split[6]:
ww[-1] = i
break
if ww[-1] != -1:
HASH[HASH == HASH_unique[i]] = HASH_unique[j]
else:
raise UserWarning('Could not associate TA files with science or reference files')
HASH_unique = np.delete(HASH_unique, ww)
NHASH_unique = len(HASH_unique)
# Get PSF mask directory.
maskbase = os.path.split(os.path.abspath(__file__))[0]
maskbase = os.path.join(maskbase, 'resources/transmissions/')
# Loop through concatenations.
for i in range(NHASH_unique):
ww = HASH == HASH_unique[i]
# Find science and reference files.
if psflibpaths is not None:
ww_sci = []
ww_ref = []
for j in range(len(allpaths[ww])):
if allpaths[ww][j] in psflibpaths:
ww_ref.append(j)
else:
ww_sci.append(j)
ww_sci = np.array(ww_sci, dtype='int')
ww_ref = np.array(ww_ref, dtype='int')
else:
is_psf = IS_PSF[ww]
exp_type = EXP_TYPE[ww]
for j in range(len(exp_type)):
if 'TA' in exp_type[j]:
is_psf[j] = 'False'
if 'NONE' not in is_psf:
ww_sci = np.where(is_psf == 'False')[0]
ww_ref = np.where(is_psf == 'True')[0]
else:
log.warning(' --> Could not find IS_PSF header keyword')
numdthpt = NUMDTHPT[ww]
numdthpt_unique = np.unique(numdthpt)
if len(numdthpt_unique) == 2 and numdthpt_unique[0] == 1:
ww_sci = np.where(numdthpt == numdthpt_unique[0])[0]
ww_ref = np.where(numdthpt == numdthpt_unique[1])[0]
else:
ww_sci = np.where(numdthpt == numdthpt_unique[0])[0]
ww_ref = None
log.warning(' --> Could not identify science and reference files based on dither pattern.'
'Please use psflibpaths to specify reference files before running klip subtraction step')
# Make Astropy tables for concatenations.
tab = Table(names=('TYPE',
'EXP_TYPE',
'DATAMODL',
'TELESCOP',
'TARGPROP',
'TARG_RA',
'TARG_DEC',
'INSTRUME',
'DETECTOR',
'FILTER',
'CWAVEL',
'DWAVEL',
'PUPIL',
'CORONMSK',
'EXPSTART',
'NINTS',
'EFFINTTM',
'SUBARRAY',
'NUMDTHPT',
'XOFFSET',
'YOFFSET',
'APERNAME',
'PPS_APER',
'PIXSCALE',
'PIXAR_SR',
'BUNIT',
'CRPIX1',
'CRPIX2',
'MASKCENX',
'MASKCENY',
'NANMASKCENX',
'NANMASKCENY',
'STARCENX',
'STARCENY',
'CROP_SHIFTX',
'CROP_SHIFTY',
'ALIGN_SHIFT',
'CENTER_SHIFT',
'ALIGN_MASK',
'CENTER_MASK',
'RA_REF',
'DEC_REF',
'ROLL_REF',
'BLURFWHM',
'FITSFILE',
'MASKFILE',
'NANMASKFILE'),
dtype=('object',
'object',
'object',
'object',
'object',
'float',
'float',
'object',
'object',
'object',
'float',
'float',
'object',
'object',
'float',
'int',
'float',
'object',
'int',
'float',
'float',
'object',
'object',
'float',
'float',
'object',
'float',
'float',
'float',
'float',
'float',
'float',
'float',
'float',
'float',
'float',
'object',
'object',
'object',
'object',
'float',
'float',
'float',
'float',
'object',
'object',
'object'))
ww_all = ww_sci if ww_ref is None else np.append(ww_sci, ww_ref)
for j in ww_all:
if j in ww_sci:
sci = True
else:
sci = False
if 'TA' in EXP_TYPE[ww][j]:
if sci:
tt = 'SCI_TA'
else:
tt = 'REF_TA'
elif any(bgid in TARGPROP[ww][j].upper() for bgid in ('BG', 'BKG', 'BACK', 'BACKGROUND')):
if sci:
tt = 'SCI_BG'
else:
tt = 'REF_BG'
else:
if sci:
tt = 'SCI'
else:
tt = 'REF'
if bgpaths is not None:
if allpaths[ww][j] in bgpaths:
if sci:
tt = 'SCI_BG'
else:
tt = 'REF_BG'
maskfile = allpaths[ww][j].replace('.fits', '_psfmask.fits')
if not os.path.exists(maskfile):
if EXP_TYPE[ww][j] == 'NRC_CORON':
config_stpipe_log(suppress=True) # Suppress logging.
pipeline = Detector1Pipeline()
input = datamodels.open(allpaths[ww][j])
maskfile = pipeline.get_reference_file(input, 'psfmask')
if (maskfile is None) or (not os.path.exists(maskfile)):
maskfile = 'NONE'
config_stpipe_log(suppress=False) # Revert to default logging.
elif EXP_TYPE[ww][j] == 'MIR_4QPM' or EXP_TYPE[ww][j] == 'MIR_LYOT':
if APERNAME[ww][j] == 'MIRIM_MASK1065':
maskpath = 'JWST_MIRI_F1065C_transmission_webbpsf-ext_v2.fits'
elif APERNAME[ww][j] == 'MIRIM_MASK1140':
maskpath = 'JWST_MIRI_F1140C_transmission_webbpsf-ext_v2.fits'
elif APERNAME[ww][j] == 'MIRIM_MASK1550':
maskpath = 'JWST_MIRI_F1550C_transmission_webbpsf-ext_v2.fits'
elif APERNAME[ww][j] == 'MIRIM_MASKLYOT':
maskpath = 'jwst_miri_psfmask_0009.fits' # FIXME!
maskfile = os.path.join(maskbase, maskpath)
else:
maskfile = 'NONE'
nanmaskfile = allpaths[ww][j].replace('.fits', '_nanmask.fits')
if not os.path.exists(nanmaskfile):
nanmaskfile = 'NONE'
tab.add_row((tt,
EXP_TYPE[ww][j],
DATAMODL[ww][j],
TELESCOP[ww][j],
TARGPROP[ww][j],
TARG_RA[ww][j],
TARG_DEC[ww][j],
INSTRUME[ww][j],
DETECTOR[ww][j],
FILTER[ww][j],
CWAVEL[ww][j],
DWAVEL[ww][j],
PUPIL[ww][j],
CORONMSK[ww][j],
EXPSTART[ww][j],
NINTS[ww][j],
EFFINTTM[ww][j],
SUBARRAY[ww][j],
NUMDTHPT[ww][j],
XOFFSET[ww][j],
YOFFSET[ww][j],
APERNAME[ww][j],
PPS_APER[ww][j],
PIXSCALE[ww][j],
PIXAR_SR[ww][j],
BUNIT[ww][j],
CRPIX1[ww][j],
CRPIX2[ww][j],
MASKCENX[ww][j],
MASKCENY[ww][j],
NANMASKCENX[ww][j],
NANMASKCENY[ww][j],
STARCENX[ww][j],
STARCENY[ww][j],
CROP_SHIFTX[ww][j],
CROP_SHIFTY[ww][j],
ALIGN_SHIFT[ww][j],
CENTER_SHIFT[ww][j],
ALIGN_MASK[ww][j],
CENTER_MASK[ww][j],
RA_REF[ww][j],
DEC_REF[ww][j],
ROLL_REF[ww][j] - V3I_YANG[ww][j] * VPARITY[ww][j],
BLURFWHM[ww][j],
allpaths[ww][j],
maskfile,
nanmaskfile))
self.obs[HASH_unique[i]] = tab.copy()
del tab
# Associate background files with science or reference files.
for j in range(len(self.obs[HASH_unique[i]])):
if self.obs[HASH_unique[i]]['TYPE'][j] == 'SCI_BG':
if (self.obs[HASH_unique[i]]['EFFINTTM'][j] not in self.obs[HASH_unique[i]]['EFFINTTM'][self.obs[HASH_unique[i]]['TYPE'] == 'SCI']) or (self.obs[HASH_unique[i]]['NINTS'][j] not in self.obs[HASH_unique[i]]['NINTS'][self.obs[HASH_unique[i]]['TYPE'] == 'SCI']):
if (self.obs[HASH_unique[i]]['EFFINTTM'][j] in self.obs[HASH_unique[i]]['EFFINTTM'][self.obs[HASH_unique[i]]['TYPE'] == 'REF']) and (self.obs[HASH_unique[i]]['NINTS'][j] in self.obs[HASH_unique[i]]['NINTS'][self.obs[HASH_unique[i]]['TYPE'] == 'REF']):
self.obs[HASH_unique[i]]['TYPE'][j] = 'REF_BG'
else:
raise UserWarning('Background exposure ' + self.obs[HASH_unique[i]]['FITSFILE'][j] + ' could not be matched with PSF')
elif self.obs[HASH_unique[i]]['TYPE'][j] == 'REF_BG':
if (self.obs[HASH_unique[i]]['EFFINTTM'][j] not in self.obs[HASH_unique[i]]['EFFINTTM'][self.obs[HASH_unique[i]]['TYPE'] == 'REF']) or (self.obs[HASH_unique[i]]['NINTS'][j] not in self.obs[HASH_unique[i]]['NINTS'][self.obs[HASH_unique[i]]['TYPE'] == 'REF']):
if (self.obs[HASH_unique[i]]['EFFINTTM'][j] in self.obs[HASH_unique[i]]['EFFINTTM'][self.obs[HASH_unique[i]]['TYPE'] == 'SCI']) and (self.obs[HASH_unique[i]]['NINTS'][j] in self.obs[HASH_unique[i]]['NINTS'][self.obs[HASH_unique[i]]['TYPE'] == 'SCI']):
self.obs[HASH_unique[i]]['TYPE'][j] = 'SCI_BG'
else:
raise UserWarning('Background exposure ' + self.obs[HASH_unique[i]]['FITSFILE'][j] + ' could not be matched with PSF')
# Reassociate TA and background files with science or reference
# files based on target name.
if assoc_using_targname:
for j in range(len(self.obs[HASH_unique[i]])):
if self.obs[HASH_unique[i]]['TYPE'][j] in ['SCI_TA', 'SCI_BG']:
targprop = self.obs[HASH_unique[i]]['TARGPROP'][self.obs[HASH_unique[i]]['TYPE'] == 'SCI']
ww = np.array([s == self.obs[HASH_unique[i]]['TARGPROP'][j] for s in targprop])
if np.sum(ww) == 0:
targprop = self.obs[HASH_unique[i]]['TARGPROP'][self.obs[HASH_unique[i]]['TYPE'] == 'REF']
ww = np.array([s == self.obs[HASH_unique[i]]['TARGPROP'][j] for s in targprop])
if np.sum(ww) != 0:
self.obs[HASH_unique[i]]['TYPE'][j] = self.obs[HASH_unique[i]]['TYPE'][j].replace('SCI', 'REF')
if self.obs[HASH_unique[i]]['TYPE'][j] in ['REF_TA', 'REF_BG']:
targprop = self.obs[HASH_unique[i]]['TARGPROP'][self.obs[HASH_unique[i]]['TYPE'] == 'REF']
ww = np.array([s == self.obs[HASH_unique[i]]['TARGPROP'][j] for s in targprop])
if np.sum(ww) == 0:
targprop = self.obs[HASH_unique[i]]['TARGPROP'][self.obs[HASH_unique[i]]['TYPE'] == 'SCI']
ww = np.array([s == self.obs[HASH_unique[i]]['TARGPROP'][j] for s in targprop])
if np.sum(ww) != 0:
self.obs[HASH_unique[i]]['TYPE'][j] = self.obs[HASH_unique[i]]['TYPE'][j].replace('REF', 'SCI')
# Print Astropy tables for concatenations.
if self.verbose:
self.print_obs()
pass
[docs]
def read_jwst_s3_data(self,
datapaths,
cr_from_siaf=False):
"""
Read JWST stage 3 data (this can be i2d data from the official JWST
pipeline, or data products from the pyKLIP and classical PSF
subtraction pipelines implemented in spaceKLIP) into the Database.red
dictionary. It contains a table of metadata for each concatenation,
which are identified automatically based on instrument, filter, pupil
mask, and image mask. The tables can be edited by the user at any stage
of the data reduction process and spaceKLIP will continue with the
updated metadata (e.g., modified host star position).
Parameters
----------
datapaths : list of paths
List of paths of the input JWST data.
Returns
-------
None.
"""
# Check input.
if isinstance(datapaths, str):
datapaths = [datapaths]
if len(datapaths) == 0:
raise UserWarning('Could not find any data paths')
# Read FITS headers.
TYPE = []
DATAMODL = []
TELESCOP = []
TARGPROP = []
TARG_RA = [] # deg
TARG_DEC = [] # deg
INSTRUME = []
DETECTOR = []
FILTER = []
CWAVEL = [] # micron
DWAVEL = [] # micron
PUPIL = []
CORONMSK = []
EXP_TYPE = []
EXPSTART = [] # MJD
NINTS = []
EFFINTTM = [] # s
SUBARRAY = []
APERNAME = []
PPS_APER = []
PIXSCALE = [] # arcsec
PIXAR_SR = [] # sr
MODE = []
ANNULI = []
SUBSECTS = []
KLMODES = []
BUNIT = []
CRPIX1 = [] # pix
CRPIX2 = [] # pix
MASKCENX = [] # pix
MASKCENY = [] # pix
NANMASKCENX = [] # pix
NANMASKCENY = [] # pix
STARCENX = [] # pix
STARCENY = [] # pix
CROP_SHIFTX = [] # pix
CROP_SHIFTY = [] # pix
BLURFWHM = [] # pix
HASH = []
Ndatapaths = len(datapaths)
for i in range(Ndatapaths):
hdul = fits.open(datapaths[i])
head = hdul[0].header
if datapaths[i].endswith('i2d.fits'):
TYPE += ['CORON3']
elif datapaths[i].endswith('KLmodes-all.fits'):
TYPE += ['PYKLIP']
else:
raise UserWarning('File must have one of the following endings: i2d.fits, KLmodes-all.fits')
DATAMODL += ['STAGE3']
TELESCOP += [head.get('TELESCOP', 'JWST')]
TARGPROP += [head.get('TARGPROP', 'UNKNOWN')]
TARG_RA += [head.get('TARG_RA', np.nan)]
TARG_DEC += [head.get('TARG_DEC', np.nan)]
INSTRUME += [head.get('INSTRUME', 'UNKNOWN')]
DETECTOR += [head.get('DETECTOR', 'UNKNOWN')]
FILTER += [head['FILTER']]
PUPIL += [head.get('PUPIL', 'NONE')]
if TELESCOP[-1] == 'JWST':
if INSTRUME[-1] in ('NIRCAM', 'MIRI'):
try:
_pce = get_pce_info(INSTRUME[-1], FILTER[-1], DETECTOR[-1],
head.get('EXP_TYPE', 'UNKNOWN'))
CWAVEL += [_pce['WavelengthPivot'] / 1e4] # angstrom -> micron
DWAVEL += [_pce['WidthEff'] / 1e4] # angstrom -> micron
except KeyError:
log.warning(f'PCE data not found for {INSTRUME[-1]} {FILTER[-1]} '
f'{DETECTOR[-1]}. Using NaN for CWAVEL/DWAVEL.')
CWAVEL += [np.nan]
DWAVEL += [np.nan]
elif INSTRUME[-1] == 'NIRISS':
log.warning('NIRISS PCE data not available. Using NaN for CWAVEL/DWAVEL.')
CWAVEL += [np.nan]
DWAVEL += [np.nan]
else:
raise UserWarning('Data originates from unknown JWST instrument')
else:
raise UserWarning('Data originates from unknown telescope')
EXP_TYPE += [head.get('EXP_TYPE', 'UNKNOWN')]
EXPSTART += [head.get('EXPSTART', np.nan)]
NINTS += [head.get('NINTS', 1)]
EFFINTTM += [head.get('EFFINTTM', np.nan)]
SUBARRAY += [head.get('SUBARRAY', 'UNKNOWN')]
apname = nircam_apname(head) if INSTRUME[-1] == 'NIRCAM' else head.get('APERNAME', 'UNKNOWN')
APERNAME += [apname]
PPS_APER += [head.get('PPS_APER', 'UNKNOWN')]
coronmask = get_nrcmask_from_apname(PPS_APER[-1]) if INSTRUME[-1] == 'NIRCAM' else head.get('CORONMSK', 'NONE')
CORONMSK += [coronmask]
if TELESCOP[-1] == 'JWST':
if INSTRUME[-1] == 'NIRCAM':
ap = siaf_nrc[apname]
elif INSTRUME[-1] == 'NIRISS':
ap = siaf_nis[apname]
elif INSTRUME[-1] == 'MIRI':
ap = siaf_mir[apname]
else:
raise UserWarning('Data originates from unknown JWST instrument')
# Save the average of the X and Y pixel scales.
PIXSCALE += [(ap.XSciScale + ap.YSciScale) / 2.]
else:
raise UserWarning('Data originates from unknown telescope')
if TYPE[-1] == 'CORON3':
MODE += ['RDI']
ANNULI += [1]
SUBSECTS += [1]
try:
KLMODES += [str(head['KLMODE0'])]
except KeyError:
log.warning(' --> Could not find KL mode in header, assuming default value of 50')
KLMODES += ['50']
elif TYPE[-1] == 'PYKLIP':
MODE += [head['MODE']]
ANNULI += [head['ANNULI']]
SUBSECTS += [head['SUBSECTS']]
klmodes = str(head['KLMODE0'])
j = 1
while True:
try:
klmodes += ',' + str(head['KLMODE{0}'.format(j)])
except KeyError:
break
j += 1
KLMODES += [klmodes]
else:
raise UserWarning('File must have one of the following types: CORON3, PYKLIP')
BLURFWHM += [head.get('BLURFWHM', np.nan)]
if TYPE[-1] == 'CORON3':
head = hdul['SCI'].header
PIXAR_SR += [head.get('PIXAR_SR', np.nan)]
BUNIT += [head.get('BUNIT', 'NONE')]
if cr_from_siaf:
CRPIX1 += [ap.XSciRef]
CRPIX2 += [ap.YSciRef]
else:
CRPIX1 += [head.get('CRPIX1', np.nan)]
CRPIX2 += [head.get('CRPIX2', np.nan)]
MASKCENX += [head.get('MASKCENX', CRPIX1[i])]
MASKCENY += [head.get('MASKCENY', CRPIX2[i])]
NANMASKCENX += [head.get('NANMASKCENX', CRPIX1[i])]
NANMASKCENY += [head.get('NANMASKCENY', CRPIX2[i])]
STARCENX += [head.get('STARCENX', np.nan)]
STARCENY += [head.get('STARCENY', np.nan)]
CROP_SHIFTX += [head.get('CROP_SHIFTX', 0.)]
CROP_SHIFTY += [head.get('CROP_SHIFTY', 0.)]
HASH += [TELESCOP[-1] + '_' + INSTRUME[-1] + '_' + DETECTOR[-1] + '_' + FILTER[-1] + '_' + PUPIL[-1] + '_' + CORONMSK[-1] + '_' + SUBARRAY[-1]]
hdul.close()
TYPE = np.array(TYPE)
DATAMODL = np.array(DATAMODL)
TELESCOP = np.array(TELESCOP)
TARGPROP = np.array(TARGPROP)
TARG_RA = np.array(TARG_RA)
TARG_DEC = np.array(TARG_DEC)
INSTRUME = np.array(INSTRUME)
DETECTOR = np.array(DETECTOR)
FILTER = np.array(FILTER)
CWAVEL = np.array(CWAVEL)
DWAVEL = np.array(DWAVEL)
PUPIL = np.array(PUPIL)
CORONMSK = np.array(CORONMSK)
EXP_TYPE = np.array(EXP_TYPE)
EXPSTART = np.array(EXPSTART)
NINTS = np.array(NINTS)
EFFINTTM = np.array(EFFINTTM)
SUBARRAY = np.array(SUBARRAY)
APERNAME = np.array(APERNAME)
PPS_APER = np.array(PPS_APER)
PIXSCALE = np.array(PIXSCALE)
PIXAR_SR = np.array(PIXAR_SR)
MODE = np.array(MODE)
ANNULI = np.array(ANNULI)
SUBSECTS = np.array(SUBSECTS)
KLMODES = np.array(KLMODES)
BUNIT = np.array(BUNIT)
CRPIX1 = np.array(CRPIX1)
CRPIX2 = np.array(CRPIX2)
MASKCENX = np.array(MASKCENX)
MASKCENY = np.array(MASKCENY)
NANMASKCENX = np.array(NANMASKCENX)
NANMASKCENY = np.array(NANMASKCENY)
STARCENX = np.array(STARCENX)
STARCENY = np.array(STARCENY)
CROP_SHIFTX = np.array(CROP_SHIFTX)
CROP_SHIFTY = np.array(CROP_SHIFTY)
BLURFWHM = np.array(BLURFWHM)
HASH = np.array(HASH)
# Find unique concatenations.
HASH_unique = np.unique(HASH)
NHASH_unique = len(HASH_unique)
# Loop through concatenations.
for i in range(NHASH_unique):
ww = np.where(HASH == HASH_unique[i])[0]
# Make Astropy tables for concatenations.
if HASH_unique[i] not in self.red.keys():
tab = Table(names=('TYPE',
'EXP_TYPE',
'DATAMODL',
'TELESCOP',
'TARGPROP',
'TARG_RA',
'TARG_DEC',
'INSTRUME',
'DETECTOR',
'FILTER',
'CWAVEL',
'DWAVEL',
'PUPIL',
'CORONMSK',
'EXPSTART',
'NINTS',
'EFFINTTM',
'SUBARRAY',
'APERNAME',
'PPS_APER',
'PIXSCALE',
'PIXAR_SR',
'STARCENX',
'STARCENY',
'MODE',
'ANNULI',
'SUBSECTS',
'KLMODES',
'BUNIT',
'BLURFWHM',
'FITSFILE',
'MASKFILE'),
dtype=('object',
'object',
'object',
'object',
'object',
'float',
'float',
'object',
'object',
'object',
'float',
'float',
'object',
'object',
'float',
'int',
'float',
'object',
'object',
'object',
'float',
'float',
'float',
'float',
'object',
'int',
'int',
'object',
'object',
'float',
'object',
'object'))
else:
tab = self.red[HASH_unique[i]].copy()
for j in range(len(ww)):
maskfile = os.path.join(os.path.split(datapaths[ww[j]])[0], HASH_unique[i] + '_psfmask.fits')
if not os.path.exists(maskfile):
maskfile = 'NONE'
tab.add_row((TYPE[ww[j]],
EXP_TYPE[ww[j]],
DATAMODL[ww[j]],
TELESCOP[ww[j]],
TARGPROP[ww[j]],
TARG_RA[ww[j]],
TARG_DEC[ww[j]],
INSTRUME[ww[j]],
DETECTOR[ww[j]],
FILTER[ww[j]],
CWAVEL[ww[j]],
DWAVEL[ww[j]],
PUPIL[ww[j]],
CORONMSK[ww[j]],
EXPSTART[ww[j]],
NINTS[ww[j]],
EFFINTTM[ww[j]],
SUBARRAY[ww[j]],
APERNAME[ww[j]],
PPS_APER[ww[j]],
PIXSCALE[ww[j]],
PIXAR_SR[ww[j]],
STARCENX[ww[j]],
STARCENY[ww[j]],
MODE[ww[j]],
ANNULI[ww[j]],
SUBSECTS[ww[j]],
KLMODES[ww[j]],
BUNIT[ww[j]],
BLURFWHM[ww][j],
datapaths[ww[j]],
maskfile))
self.red[HASH_unique[i]] = tab.copy()
del tab
# Read corresponding observations database.
if HASH_unique[i] not in self.obs.keys():
try:
file = os.path.join(os.path.split(datapaths[ww[j]])[0], HASH_unique[i] + '.dat')
self.obs[HASH_unique[i]] = Table.read(file, format='ascii')
self.obs[HASH_unique[i]]['FITSFILE'] = self.obs[HASH_unique[i]]['FITSFILE'].astype(object)
self.obs[HASH_unique[i]]['MASKFILE'] = self.obs[HASH_unique[i]]['MASKFILE'].astype(object)
except FileNotFoundError:
raise UserWarning('Observations database for concatenation ' + HASH_unique[i] + ' not found')
# Print Astropy tables for concatenations.
if self.verbose:
self.print_red()
pass
[docs]
def read_jwst_s4_data(self,
datapaths):
"""
Read JWST stage 4 data (spaceKLIP PSF fitting products) into the
Database.src dictionary. It contains a list of tables of metadata for
each concatenation, which are identified automatically based on
instrument, filter, pupil mask, and image mask.
Parameters
----------
datapaths : list of paths
List of paths of the input JWST data.
Returns
-------
None.
"""
# Check input.
if isinstance(datapaths, str):
datapaths = [datapaths]
if len(datapaths) == 0:
raise UserWarning('Could not find any data paths')
# Find unique concatenations.
HASH = []
Ndatapaths = len(datapaths)
for i in range(Ndatapaths):
temp = os.path.split(datapaths[i])[1]
ww = temp.find('-fitpsf_')
HASH += [temp[:ww]]
HASH_unique = np.unique(np.array(HASH))
NHASH_unique = len(HASH_unique)
# Loop through concatenations.
for i in range(NHASH_unique):
# Make Astropy tables for concatenations.
if HASH_unique[i] not in self.src.keys():
self.src[HASH_unique[i]] = []
tab = Table(names=('ID',
'RA',
'RA_ERR',
'DEC',
'DEC_ERR',
'CON',
'CON_ERR',
'DELMAG',
'DELMAG_ERR',
'APPMAG',
'APPMAG_ERR',
'MSTAR',
'MSTAR_ERR',
'SNR',
'LN(Z/Z0)',
'FITSFILE'),
dtype=('int',
'float',
'float',
'float',
'float',
'float',
'float',
'float',
'float',
'float',
'float',
'float',
'float',
'float',
'float',
'object'))
# Read FITS headers.
for j in range(Ndatapaths):
if HASH[j] == HASH_unique[i]:
hdul = fits.open(datapaths[j])
head = hdul[0].header
if head['LN(Z/Z0)'] == 'NONE':
evidence_ratio = np.nan
else:
evidence_ratio = head['LN(Z/Z0)']
tab.add_row((head['ID'],
head['RA'],
head['RA_ERR'],
head['DEC'],
head['DEC_ERR'],
head['CON'],
head['CON_ERR'],
head['DELMAG'],
head['DELMAG_ERR'],
head['APPMAG'],
head['APPMAG_ERR'],
head['MSTAR'],
head['MSTAR_ERR'],
head['SNR'],
evidence_ratio,
head['FITSFILE']))
self.src[HASH_unique[i]] += [tab.copy()]
del tab
# Print Astropy tables for concatenations.
if self.verbose:
self.print_src()
pass
[docs]
def print_obs(self,
include_fitsfiles=False):
"""
Print an abbreviated version of the observations database.
Parameters
----------
include_fitsfiles : bool, optional
Include the FITS file and PSF mask paths in the output. The default
is False.
Returns
-------
None.
"""
# Print Astropy tables for concatenations.
log.info('--> Identified %.0f concatenation(s)' % len(self.obs))
for i, key in enumerate(self.obs.keys()):
log.info(' --> Concatenation %.0f: ' % (i + 1) + key)
print_tab = copy.deepcopy(self.obs[key])
if include_fitsfiles:
print_tab.remove_columns(['TARG_RA', 'TARG_DEC', 'EXPSTART', 'APERNAME', 'PPS_APER',
'CRPIX1', 'CRPIX2', 'MASKCENX', 'MASKCENY', 'NANMASKCENX', 'NANMASKCENY', 'STARCENX', 'STARCENY', 'RA_REF', 'DEC_REF'])
else:
print_tab.remove_columns(['TARG_RA', 'TARG_DEC', 'EXPSTART', 'APERNAME', 'PPS_APER',
'CRPIX1', 'CRPIX2', 'MASKCENX', 'MASKCENY', 'NANMASKCENX', 'NANMASKCENY', 'STARCENX', 'STARCENY', 'RA_REF', 'DEC_REF', 'FITSFILE', 'MASKFILE'])
print_tab['XOFFSET'] *= 1e3
print_tab['XOFFSET'] = np.round(print_tab['XOFFSET'])
print_tab['XOFFSET'][print_tab['XOFFSET'] == 0.] = 0.
print_tab['YOFFSET'] *= 1e3
print_tab['YOFFSET'] = np.round(print_tab['YOFFSET'])
print_tab['YOFFSET'][print_tab['YOFFSET'] == 0.] = 0.
print_tab.pprint()
pass
[docs]
def print_red(self,
include_fitsfiles=False):
"""
Print an abbreviated version of the reductions database.
Parameters
----------
include_fitsfiles : bool, optional
Include the FITS file and PSF mask paths in the output. The default
is False.
Returns
-------
None.
"""
# Print Astropy tables for concatenations.
log.info('--> Identified %.0f concatenation(s)' % len(self.red))
for i, key in enumerate(self.red.keys()):
log.info(' --> Concatenation %.0f: ' % (i + 1) + key)
print_tab = copy.deepcopy(self.red[key])
if include_fitsfiles:
print_tab.remove_columns(['TARG_RA', 'TARG_DEC', 'EXPSTART', 'APERNAME', 'PPS_APER'])
else:
print_tab.remove_columns(['TARG_RA', 'TARG_DEC', 'EXPSTART', 'APERNAME', 'PPS_APER', 'FITSFILE', 'MASKFILE'])
print_tab.pprint()
pass
[docs]
def print_src(self,
include_fitsfiles=False):
"""
Print an abbreviated version of the source database.
Parameters
----------
include_fitsfiles : bool, optional
Include the FITS file paths in the output. The default is False.
Returns
-------
None.
"""
# Print Astropy tables for concatenations.
log.info('--> Identified %.0f concatenation(s)' % len(self.src))
for i, key in enumerate(self.src.keys()):
log.info(' --> Concatenation %.0f: ' % (i + 1) + key)
log.info(' --> Identified %.0f system(s)' % len(self.src[key]))
for j in range(len(self.src[key])):
log.info(' --> System %.0f:' % (j + 1))
print_tab = copy.deepcopy(self.src[key][j])
if include_fitsfiles:
pass
else:
print_tab.remove_columns(['FITSFILE'])
print_tab.pprint()
pass
[docs]
def update_obs(self,
key,
index,
fitsfile,
maskfile=None,
nanmaskfile=None,
nints=None,
effinttm=None,
xoffset=None,
yoffset=None,
crpix1=None,
crpix2=None,
maskcenx=None,
maskceny=None,
nanmaskcenx=None,
nanmaskceny=None,
starcenx=None,
starceny=None,
crop_shiftx=None,
crop_shifty=None,
align_shift=None,
center_shift=None,
align_mask=None,
center_mask=None,
blurfwhm=None,
update_pxar=False):
"""
Update the content of the observations database.
Parameters
----------
key : str
Database key of the observation to be updated.
index : int
Database index of the observation to be updated.
fitsfile : path
New FITS file path for the observation to be updated.
maskfile : path, optional
New PSF mask path for the observation to be updated. The default is
None.
nanmaskfile : path, optional
New NaNs mask path for the observation to be updated. The default is
None.
nints : int, optional
New number of integrations for the observation to be updated. The
default is None.
effinttm : float, optional
New effective integration time (s) for the observation to be
updated. The default is None.
xoffset : float, optional
New PSF x-offset (mas) for the observation to be updated. The
default is None.
yoffset : float, optional
New PSF y-offset (mas) for the observation to be updated. The
default is None.
crpix1 : float, optional
New WCS reference x-position (pix, 1-indexed) for the observation to be
updated. The default is None.
crpix2 : float, optional
New WCS reference y-position (pix, 1-indexed) for the observation to be
updated. The default is None.
maskcenx : float, optional
New mask x-position (pix, 1-indexed) for the observation to be
updated. The default is None.
maskceny : float, optional
New mask y-position (pix, 1-indexed) for the observation to be
updated. The default is None.
nanmaskcenx : float, optional
New nanmask x-position (pix, 1-indexed) for the observation to be
updated. The default is None.
nanmaskceny : float, optional
New nanmask y-position (pix, 1-indexed) for the observation to be
updated. The default is None.
starcenx : float, optional
New star x-position (pix, 1-indexed) for the observation to be
updated. The default is None.
starceny : float, optional
New star y-position (pix, 1-indexed) for the observation to be
updated. The default is None.
crop_shiftx : tuple, optional
Stores shifts in x due to any cropping. The default is None.
crop_shifty : tuple, optional
Stores shifts in y due to any cropping. The default is None.
align_shift: object
Array of shift values to recenter science frames. The default is None.
center_shift: object
Array of shift values to align science frames. The default is None.
align_mask: object
Array of shift values to align masks. The default is None.
center_mask: object
Array of shift values to recenter masks. The default is None.
blurfwhm : float, optional
New FWHM for the Gaussian filter blurring (pix) for the observation
to be updated. The default is None.
update_pxar : bool, optional
Update the pixel area column of the database based on the FITS file
header information? The default is False.
Returns
-------
None.
"""
# Update spaceKLIP database.
if 'uncal' in fitsfile:
DATAMODL = 'STAGE0'
elif 'rate' in fitsfile or 'rateints' in fitsfile:
DATAMODL = 'STAGE1'
elif 'cal' in fitsfile or 'calints' in fitsfile:
DATAMODL = 'STAGE2'
else:
raise UserWarning('File name must contain one of the following: uncal, rate, rateints, cal, calints')
hdul = fits.open(fitsfile)
self.obs[key]['DATAMODL'][index] = DATAMODL
if nints is not None:
self.obs[key]['NINTS'][index] = nints
if effinttm is not None:
self.obs[key]['EFFINTTM'][index] = effinttm
self.obs[key]['BUNIT'][index] = hdul['SCI'].header['BUNIT']
if xoffset is not None:
self.obs[key]['XOFFSET'][index] = xoffset
if yoffset is not None:
self.obs[key]['YOFFSET'][index] = yoffset
if crpix1 is not None:
self.obs[key]['CRPIX1'][index] = crpix1
if crpix2 is not None:
self.obs[key]['CRPIX2'][index] = crpix2
if maskcenx is not None:
self.obs[key]['MASKCENX'][index] = maskcenx
if maskceny is not None:
self.obs[key]['MASKCENY'][index] = maskceny
if nanmaskcenx is not None:
self.obs[key]['NANMASKCENX'][index] = nanmaskcenx
if nanmaskceny is not None:
self.obs[key]['NANMASKCENY'][index] = nanmaskceny
if starcenx is not None:
self.obs[key]['STARCENX'][index] = starcenx
if starceny is not None:
self.obs[key]['STARCENY'][index] = starceny
if crop_shiftx is not None:
self.obs[key]['CROP_SHIFTX'][index] = crop_shiftx
if crop_shifty is not None:
self.obs[key]['CROP_SHIFTY'][index] = crop_shifty
if align_shift is not None:
self.obs[key]['ALIGN_SHIFT'][index] = align_shift
if center_shift is not None:
self.obs[key]['CENTER_SHIFT'][index] = center_shift
if align_mask is not None:
self.obs[key]['ALIGN_MASK'][index] = align_mask
if center_mask is not None:
self.obs[key]['CENTER_MASK'][index] = center_mask
if blurfwhm is not None:
self.obs[key]['BLURFWHM'][index] = blurfwhm
self.obs[key]['FITSFILE'][index] = fitsfile
if maskfile is not None:
self.obs[key]['MASKFILE'][index] = maskfile
if nanmaskfile is not None:
self.obs[key]['NANMASKFILE'][index] = nanmaskfile
if update_pxar:
try:
pxar = fits.getheader(self.obs[key]['FITSFILE'][index], 'SCI')['PIXAR_SR']
self.obs[key]['PIXAR_SR'][index] = pxar
except:
pass
hdul.close()
pass
[docs]
def update_src(self,
key,
index,
tab):
"""
Update the content of the source database.
Parameters
----------
key : str
Database key of the source to be updated.
index : int
Database index of the source to be updated.
tab : astropy.table.Table
Astropy table of the companions to be saved to the source database.
Returns
-------
None.
"""
# Update spaceKLIP database.
try:
if isinstance(self.src[key], list):
nindex = len(self.src[key])
if nindex > index:
self.src[key][index] = tab
else:
self.src[key] += [None] * (index - nindex) + [tab]
except KeyError:
self.src[key] = [None] * index + [tab]
pass
[docs]
def adjust_dir(self, dir_path, data_type):
"""
Adjust the directory of the data products.
Parameters
----------
dir_path : str
New directory of the data products.
data_type : str
Type of data to adjust ('red' for reduced data, 'obs' for observations data, 'src' for source data).
Returns
-------
None.
"""
# Adjust the directory of the data products.
try:
data_dict = getattr(self, data_type)
for key in data_dict.keys():
for i in range(len(data_dict[key])):
data_dict[key]['FITSFILE'][i] = os.path.join(dir_path,
os.path.split(data_dict[key]['FITSFILE'][i])[1])
data_dict[key]['MASKFILE'][i] = os.path.join(dir_path,
os.path.split(data_dict[key]['MASKFILE'][i])[1])
except Exception as e:
raise UserWarning(f"Invalid directory: {dir_path}. Error: {e}")
pass
[docs]
def summarize(self):
"""
Succinctly summarize the contents of the observations database, i.e.,
how many files are present at each level of reduction, what kind (SCI,
REF, TA), etc.
Returns
-------
None.
"""
def short_concat_name(concat_name):
"""
Return a shorter, less redundant name for a coronagraphic mode,
useful for display.
Parameters
----------
concat_name : str
Concatenation name.
Returns
-------
concat_name : str
Less redundant concatenation name.
"""
parts = concat_name.split('_')
return "_".join([parts[1], parts[3], parts[5], ])
for mode in self.obs:
print(short_concat_name(mode))
tab = self.obs[mode]
for stage in [0, 1, 2]:
stagetab = tab[tab['DATAMODL'] == f'STAGE{stage}']
if len(stagetab):
nsci = np.sum(stagetab['TYPE'] == 'SCI')
nref = np.sum(stagetab['TYPE'] == 'REF')
nta = np.sum((stagetab['TYPE'] == 'SCI_TA') | (stagetab['TYPE'] == 'REF_TA'))
nbg = np.sum((stagetab['TYPE'] == 'SCI_BG') | (stagetab['TYPE'] == 'REF_BG'))
summarystr = f'\tSTAGE{stage}: {len(stagetab)} files;\t{nsci} SCI, {nref} REF'
if nta:
summarystr += f', {nta} TA'
if nbg:
summarystr += f', {nbg} BG'
print(summarystr)
if hasattr(self, 'red') and mode in self.red:
tab = self.red[mode]
stage = 3
stagetab = tab[tab['DATAMODL'] == f'STAGE{stage}']
if len(stagetab):
s3types = sorted(list(set(tab['TYPE'].value)))
nta = np.sum((stagetab['TYPE'] == 'SCI_TA') | (stagetab['TYPE'] == 'REF_TA'))
nbg = np.sum((stagetab['TYPE'] == 'SCI_BG') | (stagetab['TYPE'] == 'REF_BG'))
summarystr = f'\tSTAGE{stage}: {len(stagetab)} files;\t'
for i, typestr in enumerate(s3types):
ntype = np.sum(stagetab['TYPE'] == typestr)
summarystr += (', ' if i > 0 else '') + f'{ntype} {typestr}'
print(summarystr)
[docs]
def create_database(output_dir,
pid=None,
obsids=None,
input_dir=None,
psflibpaths=None,
bgpaths=None,
assoc_using_targname=True,
verbose=True,
readlevel='012',
cr_from_siaf=False,
**kwargs):
"""
Create a spaceKLIP database from JWST data
Automatically searches for uncal.fits in the input directory and creates
a database of the JWST data. Only works for stage0, stage1, or stage2 data
by default; set readlevel=3 to read in stage 3 outputs.
Parameters
----------
output_dir : str
Directory to save the database.
pid : str
Program ID.
obsids : list of ints, optional
List of observation numbers. If not set, will search for all
observations in the input directory.
input_dir : str
Directory containing the JWST data. If not set, will search for
MAST directory.
psflibpaths : list of paths, optional
List of paths of the input PSF references. Make sure that they are
NOT duplicated in the 'datapaths'. The default is None.
bgpaths : list of paths, optional
List of paths of the input MIRI background observations. Make sure
that they ARE duplicated in the 'datapaths' or 'psflibpaths'. The
default is None.
assoc_using_targname : bool, optional
Associate observations using the TARGNAME keyword. The default is True.
readlevel : str or int
Level of data products to read in, either '012' (or an integer 0,1,2) to read in
individual exposures, or '3' to read in level-3 PSF-subtracted products,
or '4' to read in extracted PSF fitting products.
verbose : bool, optional
Print information to the screen. The default is True.
Keyword Arguments
-----------------
sca : str
Name of detector (e.g., 'along' or 'a3')
filt : str
Return files observed in given filter.
file_type : str
'uncal.fits', 'rateints.fits', 'calints.fits', etc.
exp_type : str
Exposure type such as NRC_TACQ, NRC_TACONFIRM
vst_grp_act : str
The _<gg><s><aa>_ portion of the file name.
hdr0['VISITGRP'] + hdr0['SEQ_ID'] + hdr0['ACT_ID']
apername : str
Name of aperture (e.g., NRCA5_FULL)
apername_pps : str
Name of aperture from PPS (e.g., NRCA5_FULL)
readlevel : str or int
Set this to 3 invoke the code for re-reading in level 3 output
products. By default, only levels 0,1,2 data will be read and indexed.
"""
if (pid is None) and (input_dir is None):
raise ValueError("Must provide either a pid or an input_dir")
elif input_dir is None:
mast_dir = os.getenv('JWSTDOWNLOAD_OUTDIR')
input_dir = os.path.join(mast_dir, f'{pid:05d}')
# Check if obsids is not a list, tuple, or numpy array
if not isinstance(obsids, (list, tuple, np.ndarray)):
obsids = [obsids]
# Cycle through all obsids and get the files in a single list
fitsfiles = [get_files(input_dir, pid=pid, obsid=oid, **kwargs) for oid in obsids]
fitsfiles = [f for sublist in fitsfiles for f in sublist]
datapaths = [os.path.join(input_dir, f) for f in fitsfiles]
# Initialize the spaceKLIP database and read the input FITS files.
db = Database(output_dir=output_dir)
db.verbose = verbose
if str(readlevel) in '012' or str(readlevel) == '012':
db.read_jwst_s012_data(datapaths=datapaths,
psflibpaths=psflibpaths,
bgpaths=bgpaths,
cr_from_siaf=cr_from_siaf,
assoc_using_targname=assoc_using_targname)
elif str(readlevel) == '3':
# the above get_files usage won't match KLIP outputsa, so find them here
datapaths_klip = sorted(glob.glob(os.path.join(input_dir, "*KLmodes-all.fits")))
db.read_jwst_s3_data(datapaths=datapaths+datapaths_klip,
cr_from_siaf=cr_from_siaf,)
elif str(readlevel) == '4':
db.read_jwst_s4_data(datapaths=datapaths,)
else:
raise ValueError("Invalid/unknown value for readlevel parameter")
return db