Skip to content

Data Loading

The ovro_lwa_portal.io module provides a unified interface for loading OVRO-LWA datasets from local paths, remote URLs, and DOI identifiers.

Quick Reference

import ovro_lwa_portal

# Local path
ds = ovro_lwa_portal.open_dataset("/path/to/data.zarr")

# Remote URL (S3, HTTPS, GCS)
ds = ovro_lwa_portal.open_dataset("s3://bucket/data.zarr")

# DOI identifier
ds = ovro_lwa_portal.open_dataset("doi:10.5281/zenodo.1234567")

# Custom chunking
ds = ovro_lwa_portal.open_dataset(
    "path/to/data.zarr",
    chunks={"time": 100, "frequency": 50},
)

Supported Protocols

Protocol Example Notes
Local path /data/obs.zarr Checks existence before loading
S3 s3://bucket/data.zarr Via fsspec
HTTPS https://example.com/data.zarr Via fsspec
GCS gs://bucket/data.zarr Via fsspec
Azure abfs://container/data.zarr Via fsspec
DOI doi:10.5281/zenodo.1234567 Resolves via DataCite API

Full API Reference

io

Data loading utilities for OVRO-LWA datasets.

This module provides a unified interface for loading OVRO-LWA data from multiple sources including local paths, remote URLs, and DOI identifiers.

DataSourceError

Bases: Exception

Exception raised for errors in data source handling.

Source code in src/ovro_lwa_portal/io.py
class DataSourceError(Exception):
    """Exception raised for errors in data source handling."""

    pass

resolve_source(source, production=True, storage_options=None)

Resolve a data source to its final URL without loading data.

Performs DOI resolution and URL normalization, returning the full resolution chain. Useful for debugging DOI→URL→S3 resolution without actually loading any data.

Parameters:

Name Type Description Default
source str or Path

Data source, can be: - Local file path (e.g., "/path/to/data.zarr") - Remote URL (e.g., "s3://bucket/data.zarr", "https://...") - DOI string (e.g., "doi:10.xxxx/xxxxx" or "10.xxxx/xxxxx")

required
production bool

Which DataCite API to use when resolving DOI identifiers.

True
storage_options dict

Options passed to the filesystem backend (e.g., S3 credentials). Used to determine if OSN HTTPS→S3 conversion should be applied.

None

Returns:

Type Description
dict[str, Any]

Resolution details with keys: - source_type: "local", "remote", or "doi" - original_source: the source string as provided - resolved_url: URL after DOI resolution (or original if not a DOI) - final_url: final URL after OSN conversion (if applicable) - s3_url: S3 URL if OSN conversion was applied, else None - endpoint: S3 endpoint URL if applicable, else None - bucket: S3 bucket name if applicable, else None - path: path within bucket if applicable, else None

Raises:

Type Description
DataSourceError

If DOI resolution fails.

Examples:

Resolve a DOI to see the full chain:

>>> from ovro_lwa_portal import resolve_source
>>> info = resolve_source("10.33569/9wsys-h7b71", production=False)
>>> info["source_type"]
'doi'
>>> info["resolved_url"]
'https://caltech1.osn.mghpcc.org/...'

Resolve with S3 credentials to see OSN conversion:

>>> info = resolve_source(
...     "10.33569/9wsys-h7b71",
...     production=False,
...     storage_options={"key": "ACCESS_KEY", "secret": "SECRET_KEY"},
... )
>>> info["s3_url"]
's3://...'
Source code in src/ovro_lwa_portal/io.py
def resolve_source(
    source: str | Path,
    production: bool = True,
    storage_options: dict[str, Any] | None = None,
) -> dict[str, Any]:
    """Resolve a data source to its final URL without loading data.

    Performs DOI resolution and URL normalization, returning the full
    resolution chain. Useful for debugging DOI→URL→S3 resolution
    without actually loading any data.

    Parameters
    ----------
    source : str or Path
        Data source, can be:
        - Local file path (e.g., "/path/to/data.zarr")
        - Remote URL (e.g., "s3://bucket/data.zarr", "https://...")
        - DOI string (e.g., "doi:10.xxxx/xxxxx" or "10.xxxx/xxxxx")
    production : bool, default True
        Which DataCite API to use when resolving DOI identifiers.
    storage_options : dict, optional
        Options passed to the filesystem backend (e.g., S3 credentials).
        Used to determine if OSN HTTPS→S3 conversion should be applied.

    Returns
    -------
    dict[str, Any]
        Resolution details with keys:
        - ``source_type``: "local", "remote", or "doi"
        - ``original_source``: the source string as provided
        - ``resolved_url``: URL after DOI resolution (or original if not a DOI)
        - ``final_url``: final URL after OSN conversion (if applicable)
        - ``s3_url``: S3 URL if OSN conversion was applied, else None
        - ``endpoint``: S3 endpoint URL if applicable, else None
        - ``bucket``: S3 bucket name if applicable, else None
        - ``path``: path within bucket if applicable, else None

    Raises
    ------
    DataSourceError
        If DOI resolution fails.

    Examples
    --------
    Resolve a DOI to see the full chain:

    >>> from ovro_lwa_portal import resolve_source
    >>> info = resolve_source("10.33569/9wsys-h7b71", production=False)
    >>> info["source_type"]
    'doi'
    >>> info["resolved_url"]
    'https://caltech1.osn.mghpcc.org/...'

    Resolve with S3 credentials to see OSN conversion:

    >>> info = resolve_source(
    ...     "10.33569/9wsys-h7b71",
    ...     production=False,
    ...     storage_options={"key": "ACCESS_KEY", "secret": "SECRET_KEY"},
    ... )
    >>> info["s3_url"]
    's3://...'
    """
    original_source = str(source)
    source_type, normalized = _detect_source_type(source)

    result: dict[str, Any] = {
        "source_type": source_type,
        "original_source": original_source,
        "resolved_url": None,
        "final_url": None,
        "s3_url": None,
        "endpoint": None,
        "bucket": None,
        "path": None,
    }

    # Resolve DOI to actual data URL
    if source_type == "doi":
        try:
            resolved = _resolve_doi(normalized, production=production)
        except Exception as e:
            msg = f"Failed to resolve DOI {normalized}: {e}"
            raise DataSourceError(msg) from e
        result["resolved_url"] = resolved
    else:
        result["resolved_url"] = normalized
        resolved = normalized

    # Check for OSN HTTPS→S3 conversion
    if storage_options and source_type in ("doi", "remote"):
        converted_url, converted_opts = _convert_osn_https_to_s3(
            resolved, storage_options
        )
        if converted_url != resolved:
            # OSN conversion was applied
            result["s3_url"] = converted_url
            result["endpoint"] = converted_opts.get("client_kwargs", {}).get(
                "endpoint_url"
            )
            # Parse bucket and path from the S3 URL
            parsed_s3 = urlparse(converted_url)
            result["bucket"] = parsed_s3.netloc
            result["path"] = parsed_s3.path.lstrip("/") or None
            result["final_url"] = converted_url
        else:
            result["final_url"] = resolved
    else:
        result["final_url"] = resolved

    return result

open_dataset(source, chunks='auto', production=True, storage_options=None, engine='zarr', validate=True, **kwargs)

Load OVRO-LWA data as an xarray Dataset.

This function provides a unified interface for loading OVRO-LWA data from multiple sources including local file paths, remote URLs, and DOI identifiers.

Parameters:

Name Type Description Default
source str or Path

Data source, can be: - Local file path (e.g., "/path/to/data.zarr") - Remote URL (e.g., "s3://bucket/data.zarr", "https://...") - DOI string (e.g., "doi:10.xxxx/xxxxx" or "10.xxxx/xxxxx")

required
chunks dict, str, or None

Chunking strategy for lazy loading: - dict: Explicit chunk sizes per dimension, e.g., {"time": 100, "frequency": 50} - "auto": Let xarray/dask determine optimal chunks - None: Load entire dataset into memory (not recommended for large data)

"auto"
production bool

Which DataCite API to use when resolving DOI identifiers: - True: production API (api.datacite.org) - False: test API (api.test.datacite.org)

True
storage_options dict

Options passed to the filesystem backend (e.g., S3 credentials). Example: storage_options={"key": "ACCESS_KEY", "secret": "SECRET_KEY"}

None
engine str

Backend engine for loading data. Currently supports "zarr".

"zarr"
validate bool

If True, validate that loaded data conforms to OVRO-LWA data model.

True
**kwargs Any

Additional arguments passed to the underlying loader (xr.open_zarr, etc.)

{}

Returns:

Type Description
Dataset

OVRO-LWA dataset with standardized structure.

Raises:

Type Description
DataSourceError

If source cannot be accessed or loaded.

FileNotFoundError

If local file path doesn't exist.

ImportError

If required dependencies for remote/DOI access are not installed.

Examples:

Load from local zarr store:

>>> import ovro_lwa_portal
>>> ds = ovro_lwa_portal.open_dataset("/path/to/observation.zarr")

Load from S3 bucket:

>>> ds = ovro_lwa_portal.open_dataset("s3://ovro-lwa-data/obs_12345.zarr")

Load from HTTP/HTTPS URL:

>>> ds = ovro_lwa_portal.open_dataset("https://data.ovro.caltech.edu/obs_12345.zarr")

Load via DOI (with or without prefix):

>>> ds = ovro_lwa_portal.open_dataset("doi:10.5281/zenodo.1234567")
>>> ds = ovro_lwa_portal.open_dataset("10.5281/zenodo.1234567")

Load from test DataCite API with S3 credentials:

>>> ds = ovro_lwa_portal.open_dataset(
...     "10.33569/4q7nb-ahq31",
...     production=False,
...     storage_options={"key": "ACCESS_KEY", "secret": "SECRET_KEY"}
... )

Customize chunking:

>>> ds = ovro_lwa_portal.open_dataset(
...     "path/to/data.zarr",
...     chunks={"time": 100, "frequency": 50}
... )
Notes

For remote data sources (S3, GCS), authentication is handled via environment variables or configuration files specific to each cloud provider:

  • AWS S3: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, or ~/.aws/credentials
  • Google Cloud Storage: GOOGLE_APPLICATION_CREDENTIALS
  • Azure: AZURE_STORAGE_ACCOUNT_NAME, AZURE_STORAGE_ACCOUNT_KEY

For large datasets, lazy loading with dask is used by default (chunks="auto"). This allows working with datasets larger than memory.

Source code in src/ovro_lwa_portal/io.py
def open_dataset(
    source: str | Path,
    chunks: dict[str, int] | str | None = "auto",
    production: bool = True,
    storage_options: dict[str, Any] | None = None,
    engine: str = "zarr",
    validate: bool = True,
    **kwargs: Any,
) -> xr.Dataset:
    """Load OVRO-LWA data as an xarray Dataset.

    This function provides a unified interface for loading OVRO-LWA data from
    multiple sources including local file paths, remote URLs, and DOI identifiers.

    Parameters
    ----------
    source : str or Path
        Data source, can be:
        - Local file path (e.g., "/path/to/data.zarr")
        - Remote URL (e.g., "s3://bucket/data.zarr", "https://...")
        - DOI string (e.g., "doi:10.xxxx/xxxxx" or "10.xxxx/xxxxx")
    chunks : dict, str, or None, default "auto"
        Chunking strategy for lazy loading:
        - dict: Explicit chunk sizes per dimension, e.g., {"time": 100, "frequency": 50}
        - "auto": Let xarray/dask determine optimal chunks
        - None: Load entire dataset into memory (not recommended for large data)
    production : bool, default True
        Which DataCite API to use when resolving DOI identifiers:
        - True: production API (api.datacite.org)
        - False: test API (api.test.datacite.org)
    storage_options : dict, optional
        Options passed to the filesystem backend (e.g., S3 credentials).
        Example: storage_options={"key": "ACCESS_KEY", "secret": "SECRET_KEY"}
    engine : str, default "zarr"
        Backend engine for loading data. Currently supports "zarr".
    validate : bool, default True
        If True, validate that loaded data conforms to OVRO-LWA data model.
    **kwargs
        Additional arguments passed to the underlying loader (xr.open_zarr, etc.)

    Returns
    -------
    xr.Dataset
        OVRO-LWA dataset with standardized structure.

    Raises
    ------
    DataSourceError
        If source cannot be accessed or loaded.
    FileNotFoundError
        If local file path doesn't exist.
    ImportError
        If required dependencies for remote/DOI access are not installed.

    Examples
    --------
    Load from local zarr store:

    >>> import ovro_lwa_portal
    >>> ds = ovro_lwa_portal.open_dataset("/path/to/observation.zarr")

    Load from S3 bucket:

    >>> ds = ovro_lwa_portal.open_dataset("s3://ovro-lwa-data/obs_12345.zarr")

    Load from HTTP/HTTPS URL:

    >>> ds = ovro_lwa_portal.open_dataset("https://data.ovro.caltech.edu/obs_12345.zarr")

    Load via DOI (with or without prefix):

    >>> ds = ovro_lwa_portal.open_dataset("doi:10.5281/zenodo.1234567")
    >>> ds = ovro_lwa_portal.open_dataset("10.5281/zenodo.1234567")

    Load from test DataCite API with S3 credentials:

    >>> ds = ovro_lwa_portal.open_dataset(
    ...     "10.33569/4q7nb-ahq31",
    ...     production=False,
    ...     storage_options={"key": "ACCESS_KEY", "secret": "SECRET_KEY"}
    ... )

    Customize chunking:

    >>> ds = ovro_lwa_portal.open_dataset(
    ...     "path/to/data.zarr",
    ...     chunks={"time": 100, "frequency": 50}
    ... )

    Notes
    -----
    For remote data sources (S3, GCS), authentication is handled via environment
    variables or configuration files specific to each cloud provider:

    - AWS S3: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, or ~/.aws/credentials
    - Google Cloud Storage: GOOGLE_APPLICATION_CREDENTIALS
    - Azure: AZURE_STORAGE_ACCOUNT_NAME, AZURE_STORAGE_ACCOUNT_KEY

    For large datasets, lazy loading with dask is used by default (chunks="auto").
    This allows working with datasets larger than memory.
    """
    original_source = str(source)
    source_type, normalized_source = _detect_source_type(source)
    resolved_url: str | None = None  # Track DOI-resolved URL for error messages

    # Resolve DOI to actual data URL
    if source_type == "doi":
        try:
            normalized_source = _resolve_doi(normalized_source, production=production)
            resolved_url = normalized_source
            source_type = "remote"  # After resolution, treat as remote URL
        except Exception as e:
            msg = f"Failed to resolve DOI {normalized_source}: {e}"
            raise DataSourceError(msg) from e

    # Convert OSN HTTPS URLs to S3 when credentials are provided
    # OSN provides both HTTPS and S3 access to the same data
    if source_type == "remote" and storage_options:
        normalized_source, storage_options = _convert_osn_https_to_s3(
            normalized_source, storage_options
        )

    # Load data based on engine
    try:
        if engine == "zarr":
            # Use fsspec's universal pathlib for unified handling of local and remote paths
            try:
                from upath import UPath
            except ImportError as e:
                msg = (
                    "universal-pathlib is required for path handling. "
                    "Install with: pip install universal-pathlib"
                )
                raise ImportError(msg) from e

            # Create filesystem and mapper
            # When storage_options are provided, use fsspec directly for cloud storage
            parsed_url = urlparse(normalized_source)
            protocol = parsed_url.scheme if parsed_url.scheme else "file"

            if storage_options and protocol in ("s3", "gs", "gcs", "abfs", "az"):
                # Use fsspec directly for cloud storage with credentials
                try:
                    import fsspec
                except ImportError as e:
                    msg = (
                        "fsspec is required for remote storage access. "
                        "Install with: pip install fsspec"
                    )
                    raise ImportError(msg) from e

                # Create filesystem with storage options
                fs = fsspec.filesystem(protocol, **storage_options)

                # Get path without protocol (e.g., s3://bucket/path -> bucket/path)
                path = f"{parsed_url.netloc}/{parsed_url.path.lstrip('/')}"
                store = fs.get_mapper(path)

                # Early accessibility check for cloud storage
                _check_remote_access(
                    fs, path, original_source, normalized_source, storage_options
                )
            else:
                # For local files or HTTPS, use UPath without storage_options
                store_path = UPath(normalized_source)

                # Explicit local existence check
                if store_path.protocol in ("", "file") and not store_path.exists():
                    raise FileNotFoundError(f"Local path does not exist: {store_path}")

                # Build a Zarr store (fsspec mapper) from the UPath
                fs = store_path.fs
                store = fs.get_mapper(store_path.path)

            # Check if we need cloud storage backends for remote paths
            if source_type == "remote":
                parsed = urlparse(normalized_source)
                if parsed.scheme == "s3":
                    try:
                        import s3fs  # noqa: F401
                    except ImportError as e:
                        msg = (
                            "s3fs is required for S3 access. "
                            "Install with: pip install s3fs"
                        )
                        raise ImportError(msg) from e
                elif parsed.scheme in ("gs", "gcs"):
                    try:
                        import gcsfs  # noqa: F401
                    except ImportError as e:
                        msg = (
                            "gcsfs is required for Google Cloud Storage access. "
                            "Install with: pip install gcsfs"
                        )
                        raise ImportError(msg) from e

            # Open the zarr store using the UPath
            # xr.open_zarr can handle fsspec mappers directly
            with warnings.catch_warnings():
                warnings.simplefilter("default")
                ds = xr.open_zarr(store, chunks=chunks, **kwargs)

        else:
            msg = f"Unsupported engine: {engine}. Currently only 'zarr' is supported."
            raise DataSourceError(msg)

    except FileNotFoundError:
        raise
    except ImportError:
        raise
    except DataSourceError:
        raise
    except Exception as e:
        # Build a detailed error message including the resolution chain
        parts = [f"Failed to load dataset from '{original_source}'"]
        if resolved_url and resolved_url != original_source:
            parts.append(f"resolved to: {resolved_url}")
        if normalized_source != original_source and normalized_source != resolved_url:
            parts.append(f"final URL: {normalized_source}")
        if storage_options and "client_kwargs" in storage_options:
            endpoint = storage_options["client_kwargs"].get("endpoint_url")
            if endpoint:
                parts.append(f"S3 endpoint: {endpoint}")
        parts.append(str(e))
        msg = "\n  ".join(parts)
        raise DataSourceError(msg) from e

    # Validate dataset structure if requested
    if validate:
        _validate_dataset(ds)

    return ds