Ingest Module¶
The ovro_lwa_portal.ingest package provides tools for converting OVRO-LWA FITS
image files to cloud-optimized Zarr format, with support for incremental
processing, WCS coordinate preservation, and concurrent write protection.
CLI Reference¶
The ovro-ingest command-line tool is installed automatically with the package.
ovro-ingest convert¶
Convert FITS files to a single Zarr store:
| Option | Short | Default | Description |
|---|---|---|---|
--zarr-name |
-z |
ovro_lwa_full_lm_only.zarr |
Name of output Zarr store |
--fixed-dir |
-f |
OUTPUT_DIR/fixed_fits |
Directory for fixed FITS files |
--chunk-lm |
-c |
1024 |
Chunk size for l/m dimensions (0 to disable) |
--rebuild |
-r |
False |
Overwrite existing store instead of appending |
--skip-header-fixing |
-s |
False |
Skip header fixing (assume pre-fixed) |
--log-level |
-l |
info |
Logging level (debug/info/warning/error) |
ovro-ingest fix-headers¶
Fix FITS headers as a separate step before conversion:
| Option | Short | Default | Description |
|---|---|---|---|
--skip-existing/--overwrite |
--skip-existing |
Skip files with existing fixed versions | |
--log-level |
-l |
info |
Logging level |
Examples¶
# Basic conversion (fixes headers on-demand)
ovro-ingest convert /data/fits /data/output
# Rebuild with verbose logging
ovro-ingest convert /data/fits /data/output --rebuild --log-level debug
# Custom Zarr name and chunk size
ovro-ingest convert /data/fits /data/output \
--zarr-name my_data.zarr --chunk-lm 2048
# Two-step workflow: fix headers first, then convert
ovro-ingest fix-headers /data/fits /data/fixed_fits
ovro-ingest convert /data/fits /data/output \
--fixed-dir /data/fixed_fits --skip-header-fixing
Python API¶
FITSToZarrConverter¶
FITSToZarrConverter
¶
Orchestrates FITS to Zarr conversion with progress tracking and locking.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
ConversionConfig
|
Conversion configuration. |
required |
progress_callback
|
ProgressCallback | None
|
Optional callback for progress reporting. |
None
|
Source code in src/ovro_lwa_portal/ingest/core.py
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 | |
convert()
¶
Execute the FITS to Zarr conversion.
Returns:
| Type | Description |
|---|---|
Path
|
Path to the output Zarr store. |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If no matching FITS files are found. |
RuntimeError
|
If conversion fails or another process is writing to the same output. |
Source code in src/ovro_lwa_portal/ingest/core.py
ConversionConfig¶
ConversionConfig
¶
Configuration for FITS to Zarr conversion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input_dir
|
Path
|
Directory containing input FITS files. |
required |
output_dir
|
Path
|
Directory where the Zarr store will be written. |
required |
zarr_name
|
str
|
Name of the output Zarr store. Defaults to "ovro_lwa_full_lm_only.zarr". |
'ovro_lwa_full_lm_only.zarr'
|
fixed_dir
|
Path | None
|
Directory for storing fixed FITS files. If None, creates a "fixed_fits" subdirectory in output_dir. |
None
|
chunk_lm
|
int
|
Chunk size for l and m spatial dimensions. Defaults to 1024. |
1024
|
rebuild
|
bool
|
If True, overwrite existing Zarr store. If False, append new data. Defaults to False. |
False
|
fix_headers_on_demand
|
bool
|
If True, fix headers during conversion if they don't exist. If False, assume headers are already fixed. Defaults to True. |
True
|
resume
|
bool
|
If True, skip discovered FITS time steps that already exist in the output Zarr time coordinate. Defaults to False. |
False
|
cleanup_fixed_fits
|
bool
|
If True, delete temporary |
False
|
discovery_freq_bin_hz
|
float
|
Bin width in Hz when grouping FITS by header frequency during discovery. Defaults to the library default (23~kHz). |
_DISCOVERY_FREQ_BIN_HZ
|
verbose
|
bool
|
Enable verbose logging. Defaults to False. |
False
|
time_keys_only
|
sequence of str | None
|
If set, only these observation time keys are converted (after discovery).
Use with |
None
|
lm_reference_ds
|
Dataset | None
|
Precomputed global LM reference. When set, conversion skips the reference
scan over |
None
|
group_metadata_source
|
('fits', 'filename')
|
How to discover observation time / subband for grouping and frequency ordering.
|
"fits"
|
Source code in src/ovro_lwa_portal/ingest/core.py
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | |
zarr_path
property
¶
Full path to the output Zarr store.
validate()
¶
Validate configuration parameters.
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If input directory doesn't exist. |
ValueError
|
If parameters are invalid. |
Source code in src/ovro_lwa_portal/ingest/core.py
ProgressCallback¶
ProgressCallback
¶
Bases: Protocol
Protocol for progress reporting callbacks.
Source code in src/ovro_lwa_portal/ingest/core.py
__call__(stage, current, total, message)
¶
Report progress to the caller.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stage
|
str
|
The current stage of conversion (e.g., 'discovery', 'fixing', 'combining'). |
required |
current
|
int
|
Current progress count. |
required |
total
|
int
|
Total items to process. |
required |
message
|
str
|
Human-readable progress message. |
required |
Source code in src/ovro_lwa_portal/ingest/core.py
Optional Prefect Integration¶
For orchestrated workflows, the package includes optional Prefect-based workflow orchestration with automatic retries, logging, and monitoring.
Installation¶
Usage¶
from ovro_lwa_portal.ingest.prefect_workflow import run_conversion_flow
result = run_conversion_flow(
input_dir="/data/fits",
output_dir="/data/output",
rebuild=False,
)
run_conversion_flow¶
run_conversion_flow(input_dir, output_dir, zarr_name='ovro_lwa_full_lm_only.zarr', fixed_dir=None, chunk_lm=1024, rebuild=False, duplicate_resolver=None, verbose=False, group_metadata_source='fits')
¶
Run the FITS to Zarr conversion using Prefect orchestration.
This is a convenience wrapper around the Prefect flow that checks for Prefect availability and provides helpful error messages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input_dir
|
str | Path
|
Directory containing input FITS files. |
required |
output_dir
|
str | Path
|
Directory where the Zarr store will be written. |
required |
zarr_name
|
str
|
Name of the output Zarr store. |
'ovro_lwa_full_lm_only.zarr'
|
fixed_dir
|
str | Path | None
|
Directory for storing fixed FITS files. |
None
|
chunk_lm
|
int
|
Chunk size for l and m spatial dimensions. |
1024
|
rebuild
|
bool
|
If True, overwrite existing Zarr store. |
False
|
duplicate_resolver
|
Callable[[str, float, list[Path]], Path] | None
|
Resolves duplicate FITS files in the same (time, frequency) group; if |
None
|
verbose
|
bool
|
Enable verbose logging. |
False
|
Returns:
| Type | Description |
|---|---|
Path
|
Path to the output Zarr store. |
Raises:
| Type | Description |
|---|---|
ImportError
|
If Prefect is not installed. |
RuntimeError
|
If duplicate FITS are found and |
Source code in src/ovro_lwa_portal/ingest/prefect_workflow.py
fits_to_zarr_flow¶
fits_to_zarr_flow is the underlying Prefect @flow-decorated function called
by run_conversion_flow. It accepts the same parameters (input_dir,
output_dir, zarr_name, fixed_dir, chunk_lm, rebuild, verbose) and
orchestrates three Prefect tasks in sequence: configuration validation,
directory preparation, and the conversion itself (with automatic retries).
Note
fits_to_zarr_flow is conditionally defined depending on whether Prefect is
installed. Use run_conversion_flow as the stable entry point — it checks
for Prefect availability and provides a clear error message if it is missing.