Skip to content

FITS to Zarr (Low-Level)

The ovro_lwa_portal.fits_to_zarr_xradio module provides the low-level functions for converting OVRO-LWA FITS image files to Zarr format using xradio.

Prefer the high-level API for most use cases

The FITSToZarrConverter class in the ingest module wraps these functions with FileLock-based concurrency protection, progress callbacks, and a simpler interface. Use the low-level functions here only when you need fine-grained control over the conversion process.

Quick Reference

from pathlib import Path
from ovro_lwa_portal.fits_to_zarr_xradio import (
    convert_fits_dir_to_zarr,
    fix_fits_headers,
)

# Fix headers first (optional — convert_fits_dir_to_zarr can do this on-demand)
fits_files = sorted(Path("/data/fits").glob("*.fits"))
fixed = fix_fits_headers(fits_files, Path("/data/fixed_fits"))

# Convert to Zarr
result = convert_fits_dir_to_zarr(
    input_dir="/data/fits",
    out_dir="/data/output",
    fixed_dir="/data/fixed_fits",
    fix_headers_on_demand=False,  # already fixed above
)

API Reference

convert_fits_dir_to_zarr

convert_fits_dir_to_zarr(input_dir, out_dir, zarr_name='ovro_lwa_full_lm_only.zarr', fixed_dir='fixed_fits', chunk_lm=1024, rebuild=False, resume=False, fix_headers_on_demand=True, cleanup_fixed_fits=False, progress_callback=None, duplicate_resolver=None, discovery_freq_bin_hz=_DISCOVERY_FREQ_BIN_HZ, time_keys_only=None, lm_reference_ds=None, lm_reference_target_size=None, group_metadata_source='fits')

Convert all matching FITS in a directory into a single LM-only Zarr store.

Parameters:

Name Type Description Default
input_dir str | Path

Directory containing input FITS files.

required
out_dir str | Path

Directory where the Zarr store will be written.

required
zarr_name str

Name of the Zarr store directory (under out_dir).

'ovro_lwa_full_lm_only.zarr'
fixed_dir str | Path

Directory to place generated *_fixed.fits files.

'fixed_fits'
chunk_lm int

Optional LM chunk size for the in-memory xarray datasets (0 disables).

1024
rebuild bool

If True, overwrite any existing Zarr; otherwise append to it.

False
fix_headers_on_demand bool

If True, fix FITS headers on-demand during conversion if they don't exist. If False, assume headers are already fixed using :func:fix_fits_headers. Default is True.

True
cleanup_fixed_fits bool

If True (and fix_headers_on_demand is enabled), delete temporary *_fixed.fits files created during each time-step after writing that step to Zarr. Use this to reduce peak disk usage.

False
progress_callback Optional[Callable[[str, int, int, str], None]]

Optional callback function for progress reporting. Should accept (stage: str, current: int, total: int, message: str).

None
duplicate_resolver Optional[Callable[[str, float, List[Path]], Path]]

Optional callback to resolve duplicate files that map to the same time/frequency group. Signature: (time_key, frequency_hz, candidates) -> selected_path.

None
discovery_freq_bin_hz float

Bin width in Hz for treating header frequencies as the same subband during discovery (default 23~kHz). Must be positive.

_DISCOVERY_FREQ_BIN_HZ
time_keys_only Optional[Sequence[str]]

If set, only time keys in this collection are converted (after discovery). Used for incremental pipelines (e.g. dewarp one time step, then append Zarr).

None
lm_reference_ds Optional[Dataset]

If provided, skip the global LM reference scan and use this dataset instead. Must match the grid chosen for the full run (typically built once from the same input layout before dewarping). Callers should pass a deep-copied dataset if the same object might be mutated elsewhere.

None
lm_reference_target_size int | None

When building the global LM reference (lm_reference_ds is None), reproject that reference onto this square grid size. Use the same value as dewarp target_size so staged FITS and the Zarr LM grid stay aligned.

None
group_metadata_source Literal['fits', 'filename']

"fits" (default): discover and sort by time/frequency using FITS headers (with filename fallbacks) as in :func:_discover_groups. "filename": group and order files using only basename -image- time and _NNNMHz_ tokens, avoiding fits.getheader during discovery and frequency sorting.

'fits'
After
required
join
required
store
required
When
required
that
required
resumed
required
steps
required
present
required
the
required
the
required
explicit
required
growing
required
Within
required
frame
required
slices
required
is
required
Mixed
required
largest
required
and
required
same
required
sky
required

Returns:

Type Description
Path

Path to the resulting Zarr store directory.

Raises:

Type Description
FileNotFoundError

If no matching FITS files are found.

RuntimeError

If LM grids differ across time steps.

Source code in src/ovro_lwa_portal/fits_to_zarr_xradio.py
def convert_fits_dir_to_zarr(
    input_dir: str | Path,
    out_dir: str | Path,
    zarr_name: str = "ovro_lwa_full_lm_only.zarr",
    fixed_dir: str | Path = "fixed_fits",
    chunk_lm: int = 1024,
    rebuild: bool = False,
    resume: bool = False,
    fix_headers_on_demand: bool = True,
    cleanup_fixed_fits: bool = False,
    progress_callback: Optional[Callable[[str, int, int, str], None]] = None,
    duplicate_resolver: Optional[Callable[[str, float, List[Path]], Path]] = None,
    discovery_freq_bin_hz: float = _DISCOVERY_FREQ_BIN_HZ,
    time_keys_only: Optional[Sequence[str]] = None,
    lm_reference_ds: Optional[xr.Dataset] = None,
    lm_reference_target_size: int | None = None,
    group_metadata_source: Literal["fits", "filename"] = "fits",
) -> Path:
    """Convert all matching FITS in a directory into a single LM-only Zarr store.

    Parameters
    ----------
    input_dir
        Directory containing input FITS files.
    out_dir
        Directory where the Zarr store will be written.
    zarr_name
        Name of the Zarr store directory (under ``out_dir``).
    fixed_dir
        Directory to place generated ``*_fixed.fits`` files.
    chunk_lm
        Optional LM chunk size for the in-memory xarray datasets (0 disables).
    rebuild
        If True, overwrite any existing Zarr; otherwise append to it.
    fix_headers_on_demand
        If True, fix FITS headers on-demand during conversion if they don't exist.
        If False, assume headers are already fixed using :func:`fix_fits_headers`.
        Default is True.
    cleanup_fixed_fits
        If True (and ``fix_headers_on_demand`` is enabled), delete temporary
        ``*_fixed.fits`` files created during each time-step after writing that
        step to Zarr. Use this to reduce peak disk usage.
    progress_callback
        Optional callback function for progress reporting. Should accept
        (stage: str, current: int, total: int, message: str).
    duplicate_resolver
        Optional callback to resolve duplicate files that map to the same
        time/frequency group. Signature: ``(time_key, frequency_hz, candidates) -> selected_path``.
    discovery_freq_bin_hz
        Bin width in Hz for treating header frequencies as the same subband during
        discovery (default 23~kHz). Must be positive.
    time_keys_only
        If set, only time keys in this collection are converted (after discovery).
        Used for incremental pipelines (e.g. dewarp one time step, then append Zarr).
    lm_reference_ds
        If provided, skip the global LM reference scan and use this dataset instead.
        Must match the grid chosen for the full run (typically built once from the
        same input layout before dewarping). Callers should pass a deep-copied dataset
        if the same object might be mutated elsewhere.
    lm_reference_target_size
        When building the global LM reference (``lm_reference_ds`` is None), reproject
        that reference onto this square grid size. Use the same value as dewarp
        ``target_size`` so staged FITS and the Zarr LM grid stay aligned.
    group_metadata_source
        ``"fits"`` (default): discover and sort by time/frequency using FITS headers
        (with filename fallbacks) as in :func:`_discover_groups`. ``"filename"``: group
        and order files using only basename ``-image-`` time and ``_NNNMHz_`` tokens,
        avoiding ``fits.getheader`` during discovery and frequency sorting.

    After discovery, files whose primary FITS header is missing or has a zero/negative
    ``BMAJ``/``BMIN`` are dropped via :func:`_filter_invalid_beam_files`. Their
    ``(time, frequency)`` cells are left unwritten so the Zarr append step's outer
    join fills them with the float ``NaN`` fill value instead of contaminating the
    store with placeholder zeros.

    When *rebuild* is False and ``out_dir / zarr_name`` already exists, time keys
    that are already present in the store are silently skipped via
    :func:`_filter_completed_time_keys`. An interrupted prior run can therefore be
    resumed by re-invoking with the same arguments; only the not-yet-written time
    steps are read, combined, and appended. If every discovered time key is already
    present, the function returns the existing Zarr path without writing.
    :func:`_write_or_append_zarr` overwrites an existing ``time`` row in-place when
    the same MJD timestamp is written again (``keep='last'`` semantics), so an
    explicit re-run that bypasses the resume filter can repair a time step without
    growing the ``time`` dimension twice.

    Within each observation time step, after subbands are stacked along ``frequency``,
    ``right_ascension`` / ``declination`` are reduced to a single ``(l, m)`` celestial
    frame taken from the lowest-frequency slice. If sampled on-sky positions in other
    slices disagree with that reference by more than about one arcminute, a warning
    is logged.

    Mixed-resolution inputs (different ``l``/``m`` pixel shapes) are supported: the
    largest LM grid among all selected files becomes the conversion-wide reference,
    and smaller images are linearly interpolated onto that grid before combine. The
    same reference is used for every time step so output Zarr has one consistent
    sky pixel grid.

    Returns
    -------
    Path
        Path to the resulting Zarr store directory.

    Raises
    ------
    FileNotFoundError
        If no matching FITS files are found.
    RuntimeError
        If LM grids differ across time steps.
    """
    input_dir = Path(input_dir)
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)
    fixed_dir = Path(fixed_dir)
    fixed_dir.mkdir(parents=True, exist_ok=True)
    out_zarr = out_dir / zarr_name

    if discovery_freq_bin_hz <= 0.0:
        msg = f"discovery_freq_bin_hz must be positive, got {discovery_freq_bin_hz}"
        raise ValueError(msg)
    if lm_reference_target_size is not None and int(lm_reference_target_size) <= 0:
        msg = f"lm_reference_target_size must be positive, got {lm_reference_target_size}"
        raise ValueError(msg)

    by_time = _discover_groups(
        input_dir,
        duplicate_resolver=duplicate_resolver,
        freq_bin_hz=discovery_freq_bin_hz,
        group_metadata_source=group_metadata_source,
    )
    if time_keys_only is not None:
        allowed = {str(k) for k in time_keys_only}
        by_time = {k: v for k, v in by_time.items() if k in allowed}
        missing = allowed - set(by_time.keys())
        if missing:
            logger.warning(
                "time_keys_only contained keys with no FITS in %s: %s",
                input_dir,
                ", ".join(sorted(missing)),
            )
    by_time = _filter_invalid_beam_files(by_time)
    total_files = sum(len(v) for v in by_time.values())
    logger.info(f"Discovered {total_files} FITS across {len(by_time)} time step(s).")
    for k, v in by_time.items():
        if group_metadata_source == "filename":
            freqs_hz = [_extract_group_metadata_filename_only(p)[1] for p in v]
        else:
            freqs_hz = [_extract_group_metadata(p)[1] for p in v]
        logger.info(f"  time {k}: {len(v)} file(s), frequencies (Hz): {freqs_hz}")

    if not by_time:
        raise FileNotFoundError(
            f"No matching FITS found in {input_dir} (none passed discovery and the "
            "BMAJ/BMIN beam-validity filter)."
        )

    by_time_for_global_freq = dict(by_time)

    # Resume: skip time keys already present in the output store.
    if resume and out_zarr.exists() and not rebuild:
        existing_time_keys = _existing_time_keys_from_zarr(out_zarr)
        by_time = {k: v for k, v in by_time.items() if k not in existing_time_keys}
        if not by_time:
            logger.info(
                "Nothing to do: every discovered time key is already present in %s. "
                "Pass rebuild=True to overwrite.",
                out_zarr,
            )
            return out_zarr
    elif not rebuild:
        by_time = _filter_completed_time_keys(
            by_time, out_zarr, rebuild=rebuild, context="convert"
        )
        if not by_time:
            logger.info(
                "Nothing to do: every discovered time key is already present in %s. "
                "Pass rebuild=True to overwrite.",
                out_zarr,
            )
            return out_zarr

    if _zarr_store_exists(out_zarr) and not rebuild:
        freq_coord_hz = _frequency_coord_hz_from_zarr(out_zarr)
        logger.info(
            "Using frequency axis from existing Zarr (%d channel(s)).",
            int(freq_coord_hz.size),
        )
    else:
        freq_coord_hz = _global_frequency_coord_hz(
            by_time_for_global_freq,
            group_metadata_source=group_metadata_source,
        )
        logger.info(
            "Built global frequency axis: %d channel(s), %.3f%.3f MHz",
            int(freq_coord_hz.size),
            float(np.min(freq_coord_hz)) / 1e6,
            float(np.max(freq_coord_hz)) / 1e6,
        )

    if lm_reference_ds is not None:
        lm_ref_ds = lm_reference_ds
    elif _zarr_store_exists(out_zarr) and not rebuild:
        lm_ref_ds = _lm_reference_from_existing_zarr(out_zarr)
        logger.info("LM (l, m) reference grid loaded from existing Zarr for resume.")
    else:
        lm_ref_ds = _load_global_lm_reference_dataset(
            by_time,
            fixed_dir,
            chunk_lm=chunk_lm,
            fix_headers_on_demand=fix_headers_on_demand,
            target_size=lm_reference_target_size,
            group_metadata_source=group_metadata_source,
        )
    lm_reference = (lm_ref_ds["l"].values.copy(), lm_ref_ds["m"].values.copy())

    # Decide whether we write a fresh store or append to an existing one
    first_write = not (out_zarr.exists() and not rebuild)

    total_time_steps = len(by_time)
    for idx, tkey in enumerate(sorted(by_time.keys())):
        files = by_time[tkey]

        logger.info(f"[read/combine] time {tkey}")
        xds_t, freqs, created_fixed_paths = _combine_time_step(
            files,
            fixed_dir,
            chunk_lm=chunk_lm,
            fix_headers_on_demand=fix_headers_on_demand,
            lm_reference_ds=lm_ref_ds,
            group_metadata_source=group_metadata_source,
        )
        xds_t = _align_time_step_to_frequency_grid(xds_t, freq_coord_hz)
        logger.info(f"  combined dims: {dict(xds_t.sizes)}")
        logger.info(f"  combined freqs (Hz): {freqs[:8]}{' ...' if len(freqs) > 8 else ''}")

        lm_current = (xds_t["l"].values, xds_t["m"].values)
        _assert_same_lm(lm_reference, lm_current)
        logger.info("  l/m grid matches global reference")

        logger.info(f"[{'write new' if first_write else 'append'}] {out_zarr}")
        _write_or_append_zarr(xds_t, out_zarr, first_write=first_write, chunk_lm=chunk_lm)
        first_write = False
        if cleanup_fixed_fits and fix_headers_on_demand and created_fixed_paths:
            removed = 0
            for fixed_path in created_fixed_paths:
                try:
                    fixed_path.unlink(missing_ok=True)
                    removed += 1
                except OSError as exc:
                    logger.warning("Could not remove temporary fixed FITS %s: %s", fixed_path, exc)
            logger.info("Cleaned up %d temporary fixed FITS file(s) for time %s", removed, tkey)

        # Report progress after completing this time step
        if progress_callback:
            progress_callback(
                "converting",
                idx + 1,
                total_time_steps,
                f"Completed time step {idx + 1}/{total_time_steps}"
            )

    logger.info(f"[done] All times appended into: {out_zarr}")
    return out_zarr

fix_fits_headers

fix_fits_headers(files, fixed_dir, *, skip_existing=True, group_metadata_source='fits')

Fix FITS headers for a list of files, creating *_fixed.fits files.

This function processes FITS files to ensure they have the necessary headers for xradio conversion. It can be run ahead of time before calling :func:convert_fits_dir_to_zarr to separate the header fixing step from the conversion process.

Parameters:

Name Type Description Default
files List[Path]

List of FITS file paths to process.

required
fixed_dir Path

Directory where *_fixed.fits files will be written.

required
skip_existing bool

If True, skip files that already have corresponding fixed versions. Default is True.

True
group_metadata_source Literal['fits', 'filename']

Frequency sort order for processing files (see :func:_discover_groups). Default "fits" uses header-backed keys; "filename" uses basename MHz only.

'fits'

Returns:

Type Description
List[Path]

List of paths to the fixed FITS files.

Notes
  • Files already ending with _fixed.fits are considered already fixed and are returned as-is.
  • The :func:_fix_headers function applies BSCALE/BZERO and adds minimal WCS/spectral keywords required by xradio.
  • Files whose primary header lacks a real BMAJ/BMIN (missing or non-positive) raise :class:InvalidBeamError inside :func:_fix_headers; they are logged at WARNING level, omitted from the returned list, and any partially-written *_fixed.fits is removed so downstream consumers see only files with a real synthesized beam.

Examples:

>>> from pathlib import Path
>>> from ovro_lwa_portal.fits_to_zarr_xradio import fix_fits_headers
>>> input_files = list(Path("input").glob("*.fits"))
>>> fixed_dir = Path("fixed_fits")
>>> fixed_dir.mkdir(exist_ok=True)
>>> fixed_paths = fix_fits_headers(input_files, fixed_dir)
>>> print(f"Fixed {len(fixed_paths)} files")
Source code in src/ovro_lwa_portal/fits_to_zarr_xradio.py
def fix_fits_headers(
    files: List[Path],
    fixed_dir: Path,
    *,
    skip_existing: bool = True,
    group_metadata_source: Literal["fits", "filename"] = "fits",
) -> List[Path]:
    """Fix FITS headers for a list of files, creating ``*_fixed.fits`` files.

    This function processes FITS files to ensure they have the necessary
    headers for xradio conversion. It can be run ahead of time before
    calling :func:`convert_fits_dir_to_zarr` to separate the header
    fixing step from the conversion process.

    Parameters
    ----------
    files : List[Path]
        List of FITS file paths to process.
    fixed_dir : Path
        Directory where ``*_fixed.fits`` files will be written.
    skip_existing : bool, optional
        If True, skip files that already have corresponding fixed versions.
        Default is True.
    group_metadata_source
        Frequency sort order for processing files (see :func:`_discover_groups`).
        Default ``"fits"`` uses header-backed keys; ``"filename"`` uses basename MHz only.

    Returns
    -------
    List[Path]
        List of paths to the fixed FITS files.

    Notes
    -----
    * Files already ending with ``_fixed.fits`` are considered already fixed
      and are returned as-is.
    * The :func:`_fix_headers` function applies BSCALE/BZERO and adds minimal
      WCS/spectral keywords required by xradio.
    * Files whose primary header lacks a real ``BMAJ``/``BMIN`` (missing or
      non-positive) raise :class:`InvalidBeamError` inside :func:`_fix_headers`;
      they are logged at WARNING level, omitted from the returned list, and any
      partially-written ``*_fixed.fits`` is removed so downstream consumers see
      only files with a real synthesized beam.

    Examples
    --------
    >>> from pathlib import Path
    >>> from ovro_lwa_portal.fits_to_zarr_xradio import fix_fits_headers
    >>> input_files = list(Path("input").glob("*.fits"))
    >>> fixed_dir = Path("fixed_fits")
    >>> fixed_dir.mkdir(exist_ok=True)
    >>> fixed_paths = fix_fits_headers(input_files, fixed_dir)
    >>> print(f"Fixed {len(fixed_paths)} files")
    """
    fixed_dir.mkdir(parents=True, exist_ok=True)
    fixed_paths: List[Path] = []

    sort_key = lambda p: _discovery_frequency_sort_tuple(p, group_metadata_source=group_metadata_source)
    for f in sorted(files, key=sort_key):
        if f.name.endswith("_fixed.fits"):
            # Already fixed, use as-is
            fixed_paths.append(f)
            logger.debug(f"Skipping already-fixed file: {f.name}")
        else:
            fixed = fixed_dir / (f.stem + "_fixed.fits")
            if skip_existing and fixed.exists():
                logger.debug(f"Skipping existing fixed file: {fixed.name}")
                fixed_paths.append(fixed)
            else:
                logger.info(f"Fixing headers: {f.name} -> {fixed.name}")
                try:
                    _fix_headers(f, fixed)
                except InvalidBeamError as exc:
                    logger.warning(
                        "Skipping %s: %s; excluded from the fixed-FITS set.",
                        f.name,
                        exc,
                    )
                    # Make sure no partial output is left behind for resume-style
                    # invocations (``skip_existing=True``) on a later run.
                    try:
                        fixed.unlink(missing_ok=True)
                    except OSError:
                        pass
                    continue
                fixed_paths.append(fixed)

    return fixed_paths