Skip to content

Ingest Module

The ovro_lwa_portal.ingest package provides tools for converting OVRO-LWA FITS image files to cloud-optimized Zarr format, with support for incremental processing, WCS coordinate preservation, and concurrent write protection.

CLI Reference

The ovro-ingest command-line tool is installed automatically with the package.

ovro-ingest convert

Convert FITS files to a single Zarr store:

ovro-ingest convert INPUT_DIR OUTPUT_DIR [OPTIONS]
Option Short Default Description
--zarr-name -z ovro_lwa_full_lm_only.zarr Name of output Zarr store
--fixed-dir -f OUTPUT_DIR/fixed_fits Directory for fixed FITS files
--chunk-lm -c 1024 Chunk size for l/m dimensions (0 to disable)
--rebuild -r False Overwrite existing store instead of appending
--skip-header-fixing -s False Skip header fixing (assume pre-fixed)
--log-level -l info Logging level (debug/info/warning/error)

ovro-ingest fix-headers

Fix FITS headers as a separate step before conversion:

ovro-ingest fix-headers INPUT_DIR FIXED_DIR [OPTIONS]
Option Short Default Description
--skip-existing/--overwrite --skip-existing Skip files with existing fixed versions
--log-level -l info Logging level

Examples

# Basic conversion (fixes headers on-demand)
ovro-ingest convert /data/fits /data/output

# Rebuild with verbose logging
ovro-ingest convert /data/fits /data/output --rebuild --log-level debug

# Custom Zarr name and chunk size
ovro-ingest convert /data/fits /data/output \
    --zarr-name my_data.zarr --chunk-lm 2048

# Two-step workflow: fix headers first, then convert
ovro-ingest fix-headers /data/fits /data/fixed_fits
ovro-ingest convert /data/fits /data/output \
    --fixed-dir /data/fixed_fits --skip-header-fixing

Python API

FITSToZarrConverter

FITSToZarrConverter

Orchestrates FITS to Zarr conversion with progress tracking and locking.

Parameters:

Name Type Description Default
config ConversionConfig

Conversion configuration.

required
progress_callback ProgressCallback | None

Optional callback for progress reporting.

None
Source code in src/ovro_lwa_portal/ingest/core.py
class FITSToZarrConverter:
    """Orchestrates FITS to Zarr conversion with progress tracking and locking.

    Parameters
    ----------
    config : ConversionConfig
        Conversion configuration.
    progress_callback : ProgressCallback | None, optional
        Optional callback for progress reporting.
    """

    def __init__(
        self,
        config: ConversionConfig,
        progress_callback: ProgressCallback | None = None,
    ) -> None:
        self.config = config
        self.progress_callback = progress_callback

        # Configure logging
        log_level = logging.DEBUG if config.verbose else logging.INFO
        logging.basicConfig(
            level=log_level,
            format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
        )

    def _report_progress(self, stage: str, current: int, total: int, message: str) -> None:
        """Report progress if callback is configured."""
        if self.progress_callback:
            self.progress_callback(stage, current, total, message)

    def convert(self) -> Path:
        """Execute the FITS to Zarr conversion.

        Returns
        -------
        Path
            Path to the output Zarr store.

        Raises
        ------
        FileNotFoundError
            If no matching FITS files are found.
        RuntimeError
            If conversion fails or another process is writing to the same output.
        """
        # Validate configuration
        self.config.validate()

        # Create output directory
        self.config.output_dir.mkdir(parents=True, exist_ok=True)

        # Acquire lock to prevent concurrent writes
        lock_path = self.config.output_dir / f".{self.config.zarr_name}.lock"

        with FileLock(lock_path):
            logger.info("Starting FITS to Zarr conversion")
            logger.info(f"  Input: {self.config.input_dir}")
            logger.info(f"  Output: {self.config.zarr_path}")
            logger.info(f"  Mode: {'rebuild' if self.config.rebuild else 'append'}")

            self._report_progress("start", 0, 1, "Starting conversion")

            try:
                result = convert_fits_dir_to_zarr(
                    input_dir=self.config.input_dir,
                    out_dir=self.config.output_dir,
                    zarr_name=self.config.zarr_name,
                    fixed_dir=self.config.fixed_dir,
                    chunk_lm=self.config.chunk_lm,
                    rebuild=self.config.rebuild,
                    resume=self.config.resume,
                    fix_headers_on_demand=self.config.fix_headers_on_demand,
                    cleanup_fixed_fits=self.config.cleanup_fixed_fits,
                    progress_callback=self.progress_callback,
                    duplicate_resolver=self.config.duplicate_resolver,
                    discovery_freq_bin_hz=self.config.discovery_freq_bin_hz,
                    time_keys_only=self.config.time_keys_only,
                    lm_reference_ds=self.config.lm_reference_ds,
                    group_metadata_source=self.config.group_metadata_source,
                )

                self._report_progress("complete", 1, 1, "Conversion complete")
                logger.info(f"Conversion successful: {result}")
                return result

            except FileNotFoundError as e:
                logger.error(f"No matching FITS files found in {self.config.input_dir}")
                self._report_progress("error", 0, 1, f"Error: {e}")
                raise
            except RuntimeError as e:
                logger.error(f"Conversion failed: {e}")
                self._report_progress("error", 0, 1, f"Error: {e}")
                raise
            except Exception as e:
                logger.exception("Unexpected error during conversion")
                self._report_progress("error", 0, 1, f"Unexpected error: {e}")
                raise RuntimeError(f"Conversion failed: {e}") from e

convert()

Execute the FITS to Zarr conversion.

Returns:

Type Description
Path

Path to the output Zarr store.

Raises:

Type Description
FileNotFoundError

If no matching FITS files are found.

RuntimeError

If conversion fails or another process is writing to the same output.

Source code in src/ovro_lwa_portal/ingest/core.py
def convert(self) -> Path:
    """Execute the FITS to Zarr conversion.

    Returns
    -------
    Path
        Path to the output Zarr store.

    Raises
    ------
    FileNotFoundError
        If no matching FITS files are found.
    RuntimeError
        If conversion fails or another process is writing to the same output.
    """
    # Validate configuration
    self.config.validate()

    # Create output directory
    self.config.output_dir.mkdir(parents=True, exist_ok=True)

    # Acquire lock to prevent concurrent writes
    lock_path = self.config.output_dir / f".{self.config.zarr_name}.lock"

    with FileLock(lock_path):
        logger.info("Starting FITS to Zarr conversion")
        logger.info(f"  Input: {self.config.input_dir}")
        logger.info(f"  Output: {self.config.zarr_path}")
        logger.info(f"  Mode: {'rebuild' if self.config.rebuild else 'append'}")

        self._report_progress("start", 0, 1, "Starting conversion")

        try:
            result = convert_fits_dir_to_zarr(
                input_dir=self.config.input_dir,
                out_dir=self.config.output_dir,
                zarr_name=self.config.zarr_name,
                fixed_dir=self.config.fixed_dir,
                chunk_lm=self.config.chunk_lm,
                rebuild=self.config.rebuild,
                resume=self.config.resume,
                fix_headers_on_demand=self.config.fix_headers_on_demand,
                cleanup_fixed_fits=self.config.cleanup_fixed_fits,
                progress_callback=self.progress_callback,
                duplicate_resolver=self.config.duplicate_resolver,
                discovery_freq_bin_hz=self.config.discovery_freq_bin_hz,
                time_keys_only=self.config.time_keys_only,
                lm_reference_ds=self.config.lm_reference_ds,
                group_metadata_source=self.config.group_metadata_source,
            )

            self._report_progress("complete", 1, 1, "Conversion complete")
            logger.info(f"Conversion successful: {result}")
            return result

        except FileNotFoundError as e:
            logger.error(f"No matching FITS files found in {self.config.input_dir}")
            self._report_progress("error", 0, 1, f"Error: {e}")
            raise
        except RuntimeError as e:
            logger.error(f"Conversion failed: {e}")
            self._report_progress("error", 0, 1, f"Error: {e}")
            raise
        except Exception as e:
            logger.exception("Unexpected error during conversion")
            self._report_progress("error", 0, 1, f"Unexpected error: {e}")
            raise RuntimeError(f"Conversion failed: {e}") from e

ConversionConfig

ConversionConfig

Configuration for FITS to Zarr conversion.

Parameters:

Name Type Description Default
input_dir Path

Directory containing input FITS files.

required
output_dir Path

Directory where the Zarr store will be written.

required
zarr_name str

Name of the output Zarr store. Defaults to "ovro_lwa_full_lm_only.zarr".

'ovro_lwa_full_lm_only.zarr'
fixed_dir Path | None

Directory for storing fixed FITS files. If None, creates a "fixed_fits" subdirectory in output_dir.

None
chunk_lm int

Chunk size for l and m spatial dimensions. Defaults to 1024.

1024
rebuild bool

If True, overwrite existing Zarr store. If False, append new data. Defaults to False.

False
fix_headers_on_demand bool

If True, fix headers during conversion if they don't exist. If False, assume headers are already fixed. Defaults to True.

True
resume bool

If True, skip discovered FITS time steps that already exist in the output Zarr time coordinate. Defaults to False.

False
cleanup_fixed_fits bool

If True, delete temporary *_fixed.fits files created during on-demand conversion after each time-step is written. Defaults to False.

False
discovery_freq_bin_hz float

Bin width in Hz when grouping FITS by header frequency during discovery. Defaults to the library default (23~kHz).

_DISCOVERY_FREQ_BIN_HZ
verbose bool

Enable verbose logging. Defaults to False.

False
time_keys_only sequence of str | None

If set, only these observation time keys are converted (after discovery). Use with lm_reference_ds for incremental dewarp→Zarr workflows.

None
lm_reference_ds Dataset | None

Precomputed global LM reference. When set, conversion skips the reference scan over input_dir and uses this grid for each time step.

None
group_metadata_source ('fits', 'filename')

How to discover observation time / subband for grouping and frequency ordering. "fits" (default) reads FITS headers (with filename fallbacks). "filename" uses only basename -image- and _NNNMHz_ tokens (no header reads during discovery). Match the value used when building lm_reference_ds.

"fits"
Source code in src/ovro_lwa_portal/ingest/core.py
class ConversionConfig:
    """Configuration for FITS to Zarr conversion.

    Parameters
    ----------
    input_dir : Path
        Directory containing input FITS files.
    output_dir : Path
        Directory where the Zarr store will be written.
    zarr_name : str, optional
        Name of the output Zarr store. Defaults to "ovro_lwa_full_lm_only.zarr".
    fixed_dir : Path | None, optional
        Directory for storing fixed FITS files. If None, creates a "fixed_fits"
        subdirectory in output_dir.
    chunk_lm : int, optional
        Chunk size for l and m spatial dimensions. Defaults to 1024.
    rebuild : bool, optional
        If True, overwrite existing Zarr store. If False, append new data. Defaults to False.
    fix_headers_on_demand : bool, optional
        If True, fix headers during conversion if they don't exist. If False, assume
        headers are already fixed. Defaults to True.
    resume : bool, optional
        If True, skip discovered FITS time steps that already exist in the output
        Zarr time coordinate. Defaults to False.
    cleanup_fixed_fits : bool, optional
        If True, delete temporary ``*_fixed.fits`` files created during on-demand
        conversion after each time-step is written. Defaults to False.
    discovery_freq_bin_hz : float, optional
        Bin width in Hz when grouping FITS by header frequency during discovery.
        Defaults to the library default (23~kHz).
    verbose : bool, optional
        Enable verbose logging. Defaults to False.
    time_keys_only : sequence of str | None, optional
        If set, only these observation time keys are converted (after discovery).
        Use with ``lm_reference_ds`` for incremental dewarp→Zarr workflows.
    lm_reference_ds : xarray.Dataset | None, optional
        Precomputed global LM reference. When set, conversion skips the reference
        scan over ``input_dir`` and uses this grid for each time step.
    group_metadata_source : {"fits", "filename"}, optional
        How to discover observation time / subband for grouping and frequency ordering.
        ``"fits"`` (default) reads FITS headers (with filename fallbacks).
        ``"filename"`` uses only basename ``-image-`` and ``_NNNMHz_`` tokens (no header
        reads during discovery). Match the value used when building ``lm_reference_ds``.
    """

    def __init__(
        self,
        input_dir: Path,
        output_dir: Path,
        zarr_name: str = "ovro_lwa_full_lm_only.zarr",
        fixed_dir: Path | None = None,
        chunk_lm: int = 1024,
        rebuild: bool = False,
        fix_headers_on_demand: bool = True,
        resume: bool = False,
        cleanup_fixed_fits: bool = False,
        duplicate_resolver: Callable[[str, float, list[Path]], Path] | None = None,
        discovery_freq_bin_hz: float = _DISCOVERY_FREQ_BIN_HZ,
        verbose: bool = False,
        time_keys_only: Sequence[str] | None = None,
        lm_reference_ds: Any | None = None,
        group_metadata_source: Literal["fits", "filename"] = "fits",
    ) -> None:
        self.input_dir = input_dir
        self.output_dir = output_dir
        self.zarr_name = zarr_name
        self.fixed_dir = fixed_dir or (output_dir / "fixed_fits")
        self.chunk_lm = chunk_lm
        self.rebuild = rebuild
        self.fix_headers_on_demand = fix_headers_on_demand
        self.resume = resume
        self.cleanup_fixed_fits = cleanup_fixed_fits
        self.duplicate_resolver = duplicate_resolver
        self.discovery_freq_bin_hz = discovery_freq_bin_hz
        self.verbose = verbose
        self.time_keys_only = tuple(time_keys_only) if time_keys_only is not None else None
        self.lm_reference_ds = lm_reference_ds
        self.group_metadata_source = group_metadata_source

    @property
    def zarr_path(self) -> Path:
        """Full path to the output Zarr store."""
        return self.output_dir / self.zarr_name

    def validate(self) -> None:
        """Validate configuration parameters.

        Raises
        ------
        FileNotFoundError
            If input directory doesn't exist.
        ValueError
            If parameters are invalid.
        """
        if not self.input_dir.exists():
            msg = f"Input directory does not exist: {self.input_dir}"
            raise FileNotFoundError(msg)

        if not self.input_dir.is_dir():
            msg = f"Input path is not a directory: {self.input_dir}"
            raise ValueError(msg)

        if self.chunk_lm < 0:
            msg = f"chunk_lm must be non-negative, got {self.chunk_lm}"
            raise ValueError(msg)

        if self.discovery_freq_bin_hz <= 0.0:
            msg = f"discovery_freq_bin_hz must be positive, got {self.discovery_freq_bin_hz}"
            raise ValueError(msg)

zarr_path property

Full path to the output Zarr store.

validate()

Validate configuration parameters.

Raises:

Type Description
FileNotFoundError

If input directory doesn't exist.

ValueError

If parameters are invalid.

Source code in src/ovro_lwa_portal/ingest/core.py
def validate(self) -> None:
    """Validate configuration parameters.

    Raises
    ------
    FileNotFoundError
        If input directory doesn't exist.
    ValueError
        If parameters are invalid.
    """
    if not self.input_dir.exists():
        msg = f"Input directory does not exist: {self.input_dir}"
        raise FileNotFoundError(msg)

    if not self.input_dir.is_dir():
        msg = f"Input path is not a directory: {self.input_dir}"
        raise ValueError(msg)

    if self.chunk_lm < 0:
        msg = f"chunk_lm must be non-negative, got {self.chunk_lm}"
        raise ValueError(msg)

    if self.discovery_freq_bin_hz <= 0.0:
        msg = f"discovery_freq_bin_hz must be positive, got {self.discovery_freq_bin_hz}"
        raise ValueError(msg)

ProgressCallback

ProgressCallback

Bases: Protocol

Protocol for progress reporting callbacks.

Source code in src/ovro_lwa_portal/ingest/core.py
class ProgressCallback(Protocol):
    """Protocol for progress reporting callbacks."""

    def __call__(self, stage: str, current: int, total: int, message: str) -> None:
        """Report progress to the caller.

        Parameters
        ----------
        stage : str
            The current stage of conversion (e.g., 'discovery', 'fixing', 'combining').
        current : int
            Current progress count.
        total : int
            Total items to process.
        message : str
            Human-readable progress message.
        """
        ...

__call__(stage, current, total, message)

Report progress to the caller.

Parameters:

Name Type Description Default
stage str

The current stage of conversion (e.g., 'discovery', 'fixing', 'combining').

required
current int

Current progress count.

required
total int

Total items to process.

required
message str

Human-readable progress message.

required
Source code in src/ovro_lwa_portal/ingest/core.py
def __call__(self, stage: str, current: int, total: int, message: str) -> None:
    """Report progress to the caller.

    Parameters
    ----------
    stage : str
        The current stage of conversion (e.g., 'discovery', 'fixing', 'combining').
    current : int
        Current progress count.
    total : int
        Total items to process.
    message : str
        Human-readable progress message.
    """
    ...

Optional Prefect Integration

For orchestrated workflows, the package includes optional Prefect-based workflow orchestration with automatic retries, logging, and monitoring.

Installation

pip install 'ovro_lwa_portal[prefect]'

Usage

from ovro_lwa_portal.ingest.prefect_workflow import run_conversion_flow

result = run_conversion_flow(
    input_dir="/data/fits",
    output_dir="/data/output",
    rebuild=False,
)

run_conversion_flow

run_conversion_flow(input_dir, output_dir, zarr_name='ovro_lwa_full_lm_only.zarr', fixed_dir=None, chunk_lm=1024, rebuild=False, duplicate_resolver=None, verbose=False, group_metadata_source='fits')

Run the FITS to Zarr conversion using Prefect orchestration.

This is a convenience wrapper around the Prefect flow that checks for Prefect availability and provides helpful error messages.

Parameters:

Name Type Description Default
input_dir str | Path

Directory containing input FITS files.

required
output_dir str | Path

Directory where the Zarr store will be written.

required
zarr_name str

Name of the output Zarr store.

'ovro_lwa_full_lm_only.zarr'
fixed_dir str | Path | None

Directory for storing fixed FITS files.

None
chunk_lm int

Chunk size for l and m spatial dimensions.

1024
rebuild bool

If True, overwrite existing Zarr store.

False
duplicate_resolver Callable[[str, float, list[Path]], Path] | None

Resolves duplicate FITS files in the same (time, frequency) group; if None and duplicates exist, conversion raises at discovery time.

None
verbose bool

Enable verbose logging.

False

Returns:

Type Description
Path

Path to the output Zarr store.

Raises:

Type Description
ImportError

If Prefect is not installed.

RuntimeError

If duplicate FITS are found and duplicate_resolver is not provided.

Source code in src/ovro_lwa_portal/ingest/prefect_workflow.py
def run_conversion_flow(
    input_dir: str | Path,
    output_dir: str | Path,
    zarr_name: str = "ovro_lwa_full_lm_only.zarr",
    fixed_dir: str | Path | None = None,
    chunk_lm: int = 1024,
    rebuild: bool = False,
    duplicate_resolver: Callable[[str, float, list[Path]], Path] | None = None,
    verbose: bool = False,
    group_metadata_source: Literal["fits", "filename"] = "fits",
) -> Path:
    """Run the FITS to Zarr conversion using Prefect orchestration.

    This is a convenience wrapper around the Prefect flow that checks for
    Prefect availability and provides helpful error messages.

    Parameters
    ----------
    input_dir : str | Path
        Directory containing input FITS files.
    output_dir : str | Path
        Directory where the Zarr store will be written.
    zarr_name : str, optional
        Name of the output Zarr store.
    fixed_dir : str | Path | None, optional
        Directory for storing fixed FITS files.
    chunk_lm : int, optional
        Chunk size for l and m spatial dimensions.
    rebuild : bool, optional
        If True, overwrite existing Zarr store.
    duplicate_resolver : Callable[[str, float, list[Path]], Path] | None, optional
        Resolves duplicate FITS files in the same (time, frequency) group; if ``None`` and
        duplicates exist, conversion raises at discovery time.
    verbose : bool, optional
        Enable verbose logging.

    Returns
    -------
    Path
        Path to the output Zarr store.

    Raises
    ------
    ImportError
        If Prefect is not installed.
    RuntimeError
        If duplicate FITS are found and ``duplicate_resolver`` is not provided.
    """
    if not PREFECT_AVAILABLE:
        msg = (
            "Prefect is not installed. Install it with:\n"
            "  pip install 'ovro_lwa_portal[prefect]'\n"
            "or use the core conversion API directly:\n"
            "  from ovro_lwa_portal.ingest import FITSToZarrConverter"
        )
        raise ImportError(msg)

    return fits_to_zarr_flow(
        input_dir=input_dir,
        output_dir=output_dir,
        zarr_name=zarr_name,
        fixed_dir=fixed_dir,
        chunk_lm=chunk_lm,
        rebuild=rebuild,
        duplicate_resolver=duplicate_resolver,
        verbose=verbose,
        group_metadata_source=group_metadata_source,
    )

fits_to_zarr_flow

fits_to_zarr_flow is the underlying Prefect @flow-decorated function called by run_conversion_flow. It accepts the same parameters (input_dir, output_dir, zarr_name, fixed_dir, chunk_lm, rebuild, verbose) and orchestrates three Prefect tasks in sequence: configuration validation, directory preparation, and the conversion itself (with automatic retries).

Note

fits_to_zarr_flow is conditionally defined depending on whether Prefect is installed. Use run_conversion_flow as the stable entry point — it checks for Prefect availability and provides a clear error message if it is missing.