Skip to content

Python examples

Below are examples in Python for working with the Acquire Zarr library. Have an example you'd like to share with the community? Submit a GitHub issue and include "Example:" in your title.

Example: Basic streaming to filesystem
# Stream to filesystem without compression
import numpy as np
from acquire_zarr import (
    ArraySettings, StreamSettings, ZarrStream, Dimension, DimensionType,
    DataType
)


def make_sample_data():
    """Generate sample data with moving diagonal pattern"""
    width, height = 64, 48
    frames = []

    for t in range(50):
        frame = np.zeros((height, width), dtype=np.uint16)

        for y in range(height):
            for x in range(width):
                # Create a diagonal pattern that moves with time
                diagonal = (x + y + t * 8) % 32

                # Create intensity variation
                if diagonal < 16:
                    intensity = diagonal * 4096  # Ramp up
                else:
                    intensity = (31 - diagonal) * 4096  # Ramp down

                # Add circular features
                center_x, center_y = width // 2, height // 2
                dx, dy = x - center_x, y - center_y
                radius = int(np.sqrt(dx*dx + dy*dy))

                # Modulate with concentric circles
                if radius % 16 < 8:
                    intensity = int(intensity * 0.7)

                frame[y, x] = intensity

        frames.append(frame)

    return np.array(frames)


def main():
    settings = StreamSettings()

    # Configure 3D array (t, y, x) - no compression
    settings.arrays = [
        ArraySettings(
            dimensions=[
                Dimension(
                    name="t",
                    kind=DimensionType.TIME,
                    array_size_px=0,  # Unlimited
                    chunk_size_px=5,
                    shard_size_chunks=2,
                ),
                Dimension(
                    name="y",
                    kind=DimensionType.SPACE,
                    array_size_px=48,
                    chunk_size_px=16,
                    shard_size_chunks=1,
                ),
                Dimension(
                    name="x",
                    kind=DimensionType.SPACE,
                    array_size_px=64,
                    chunk_size_px=16,
                    shard_size_chunks=2,
                ),
            ],
            data_type=DataType.UINT16,
        )
    ]

    settings.store_path = "output_v3.zarr"

    # Create stream
    stream = ZarrStream(settings)

    # Write sample data
    sample_data = make_sample_data()
    stream.append(sample_data)


if __name__ == "__main__":
    main()

Download this example

Example: Multiscale streaming to filesystem
# Stream to filesystem with multiscale, no compression
import numpy as np
from acquire_zarr import (
    ArraySettings, StreamSettings, ZarrStream, Dimension, DimensionType,
    DownsamplingMethod
)


def make_sample_data():
    """Generate sample data matching the 5D structure (t, c, z, y, x)"""
    # Shape: (10 timepoints, 8 channels, 6 z-slices, 48 height, 64 width)
    return np.random.randint(
        0, 65535,
        (10, 8, 6, 48, 64),
        dtype=np.uint16
    )


def main():
    settings = StreamSettings()

    # Configure 5D array (t, c, z, y, x) with multiscale, no compression
    settings.arrays = [
        ArraySettings(
            dimensions=[
                Dimension(
                    name="t",
                    kind=DimensionType.TIME,
                    array_size_px=10,
                    chunk_size_px=5,
                    shard_size_chunks=2,
                ),
                Dimension(
                    name="c",
                    kind=DimensionType.CHANNEL,
                    array_size_px=8,
                    chunk_size_px=4,
                    shard_size_chunks=2,
                ),
                Dimension(
                    name="z",
                    kind=DimensionType.SPACE,
                    array_size_px=6,
                    chunk_size_px=2,
                    shard_size_chunks=1,
                ),
                Dimension(
                    name="y",
                    kind=DimensionType.SPACE,
                    array_size_px=48,
                    chunk_size_px=16,
                    shard_size_chunks=1,
                ),
                Dimension(
                    name="x",
                    kind=DimensionType.SPACE,
                    array_size_px=64,
                    chunk_size_px=16,
                    shard_size_chunks=2,
                ),
            ],
            data_type=np.uint16,
            downsampling_method=DownsamplingMethod.MEAN
        )
    ]

    settings.store_path = "output_v3_multiscale.zarr"

    # Create stream
    stream = ZarrStream(settings)

    # Write sample data
    stream.append(make_sample_data())


if __name__ == "__main__":
    main()

Download this example

Example: Compressed streaming to filesystem
# Stream to filesystem with LZ4 compression
import numpy as np
from acquire_zarr import (
    ArraySettings, StreamSettings, ZarrStream, Dimension, DimensionType,
    DataType, Compressor, CompressionCodec, CompressionSettings
)


def make_sample_data():
    return np.random.randint(
        0, 65535,
        (5, 4, 2, 48, 64),  # Shape matches chunk sizes
        dtype=np.uint16
    )


def main():
    settings = StreamSettings()

    # Configure a 5D compressed output array
    settings.arrays = [
        ArraySettings(
            compression=CompressionSettings(
                compressor=Compressor.BLOSC1,
                codec=CompressionCodec.BLOSC_LZ4,
                level=1,
                shuffle=1,
            ),
            dimensions=[
                Dimension(
                    name="t",
                    kind=DimensionType.TIME,
                    array_size_px=10,
                    chunk_size_px=5,
                    shard_size_chunks=2,
                ),
                Dimension(
                    name="c",
                    kind=DimensionType.CHANNEL,
                    array_size_px=8,
                    chunk_size_px=4,
                    shard_size_chunks=2,
                ),
                Dimension(
                    name="z",
                    kind=DimensionType.SPACE,
                    array_size_px=6,
                    chunk_size_px=2,
                    shard_size_chunks=1,
                ),
                Dimension(
                    name="y",
                    kind=DimensionType.SPACE,
                    array_size_px=48,
                    chunk_size_px=16,
                    shard_size_chunks=1,
                ),
                Dimension(
                    name="x",
                    kind=DimensionType.SPACE,
                    array_size_px=64,
                    chunk_size_px=16,
                    shard_size_chunks=2,
                ),
            ],
            data_type=DataType.UINT16,
        )
    ]

    settings.store_path = "output_compressed.zarr"

    # Create stream
    stream = ZarrStream(settings)

    # Write sample data
    stream.append(make_sample_data())


if __name__ == "__main__":
    main()

Download this example

Example: Basic streaming to S3
# Stream to S3
import numpy as np

# Ensure that you have set your S3 credentials in the environment variables
# AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and optionally AWS_SESSION_TOKEN
# BEFORE importing acquire_zarr
from acquire_zarr import (
    ArraySettings, StreamSettings, ZarrStream, Dimension, DimensionType,
    DataType, S3Settings
)


def make_sample_data():
    return np.random.randint(
        0, 65535,
        (5, 2, 48, 64),  # Shape matches chunk sizes
        dtype=np.uint16
    )


def main():
    settings = StreamSettings()

    # Configure S3
    settings.s3 = S3Settings(
        endpoint="http://localhost:9000",
        bucket_name="my-bucket",
        region="us-east-2"
    )

    # Configure 4D array (t, z, y, x)
    settings.arrays = [
        ArraySettings(
            dimensions=[
                Dimension(
                    name="t",
                    kind=DimensionType.TIME,
                    array_size_px=0,  # Unlimited
                    chunk_size_px=5,
                    shard_size_chunks=2,
                ),
                Dimension(
                    name="z",
                    kind=DimensionType.SPACE,
                    array_size_px=10,
                    chunk_size_px=2,
                    shard_size_chunks=1,
                ),
                Dimension(
                    name="y",
                    kind=DimensionType.SPACE,
                    array_size_px=48,
                    chunk_size_px=16,
                    shard_size_chunks=1,
                ),
                Dimension(
                    name="x",
                    kind=DimensionType.SPACE,
                    array_size_px=64,
                    chunk_size_px=16,
                    shard_size_chunks=2,
                ),
            ],
            data_type= DataType.UINT16,
        )
    ]

    settings.store_path = "output_s3.zarr"

    # Create stream
    stream = ZarrStream(settings)

    # Write sample data
    stream.append(make_sample_data())


if __name__ == "__main__":
    main()

Download this example

Example: Compressed streaming to S3
# Stream to S3 with Zstd compression
import numpy as np

# Ensure that you have set your S3 credentials in the environment variables
# AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and optionally AWS_SESSION_TOKEN
# BEFORE importing acquire_zarr
from acquire_zarr import (
    ArraySettings, StreamSettings, ZarrStream, Dimension, DimensionType,
    DataType, S3Settings, Compressor, CompressionCodec, CompressionSettings
)


def make_sample_data():
    """Generate sample data with moving diagonal pattern"""
    width, height = 64, 48
    frames = []

    for t in range(50):
        frame = np.zeros((height, width), dtype=np.uint16)

        for y in range(height):
            for x in range(width):
                # Create a diagonal pattern that moves with time
                diagonal = (x + y + t * 8) % 32

                # Create intensity variation
                if diagonal < 16:
                    intensity = diagonal * 4096  # Ramp up
                else:
                    intensity = (31 - diagonal) * 4096  # Ramp down

                # Add circular features
                center_x, center_y = width // 2, height // 2
                dx, dy = x - center_x, y - center_y
                radius = int(np.sqrt(dx*dx + dy*dy))

                # Modulate with concentric circles
                if radius % 16 < 8:
                    intensity = int(intensity * 0.7)

                frame[y, x] = intensity

        frames.append(frame)

    return np.array(frames)


def main():
    settings = StreamSettings()

    # Configure S3
    settings.s3 = S3Settings(
        endpoint="http://localhost:9000",
        bucket_name="my-bucket"
    )

    # Configure 3D array (t, y, x) with Zstd compression
    settings.arrays = [
        ArraySettings(
            compression=CompressionSettings(
                compressor=Compressor.BLOSC1,
                codec=CompressionCodec.BLOSC_ZSTD,
                level=1,
                shuffle=1,
            ),
            dimensions=[
                Dimension(
                    name="t",
                    kind=DimensionType.TIME,
                    array_size_px=0,  # Unlimited
                    chunk_size_px=5,
                    shard_size_chunks=2,
                ),
                Dimension(
                    name="y",
                    kind=DimensionType.SPACE,
                    array_size_px=48,
                    chunk_size_px=16,
                    shard_size_chunks=1,
                ),
                Dimension(
                    name="x",
                    kind=DimensionType.SPACE,
                    array_size_px=64,
                    chunk_size_px=16,
                    shard_size_chunks=2,
                ),
            ],
            data_type=DataType.UINT16,
        )
    ]

    settings.store_path = "output_v3_compressed_s3.zarr"

    # Create stream
    stream = ZarrStream(settings)

    # Write sample data
    sample_data = make_sample_data()
    stream.append(sample_data)


if __name__ == "__main__":
    main()

Download this example

Example: Multiscale compressed streaming to S3
# Stream to S3 with multiscale and Zstd compression
import numpy as np

# Ensure that you have set your S3 credentials in the environment variables
# AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and optionally AWS_SESSION_TOKEN
# BEFORE importing acquire_zarr
from acquire_zarr import (
    ArraySettings, StreamSettings, ZarrStream, Dimension, DimensionType,
    DownsamplingMethod, S3Settings, Compressor, CompressionCodec, CompressionSettings
)


def make_sample_data():
    """Generate sample data with moving diagonal pattern for 4D array (t, z, y, x)"""
    width, height, depth = 64, 48, 10
    frames = []

    for t in range(10):
        volume = np.zeros((depth, height, width), dtype=np.uint16)

        for z in range(depth):
            for y in range(height):
                for x in range(width):
                    # Create a diagonal pattern that moves with time
                    diagonal = (x + y + t * 8) % 32

                    # Create intensity variation
                    if diagonal < 16:
                        intensity = diagonal * 4096  # Ramp up
                    else:
                        intensity = (31 - diagonal) * 4096  # Ramp down

                    # Add circular features
                    center_x, center_y = width // 2, height // 2
                    dx, dy = x - center_x, y - center_y
                    radius = int(np.sqrt(dx * dx + dy * dy))

                    # Modulate with concentric circles
                    if radius % 16 < 8:
                        intensity = int(intensity * 0.7)

                    volume[z, y, x] = intensity

        frames.append(volume)

    return np.array(frames)


def main():
    settings = StreamSettings()

    # Configure S3
    settings.s3 = S3Settings(
        endpoint="http://127.0.0.1:9000",
        bucket_name="my-bucket"
    )

    # Configure 4D array (t, z, y, x) with multiscale and Zstd compression
    settings.arrays = [
        ArraySettings(
            compression=CompressionSettings(
                compressor=Compressor.BLOSC1,
                codec=CompressionCodec.BLOSC_ZSTD,
                level=1,
                shuffle=1,
            ),
            dimensions=[
                Dimension(
                    name="t",
                    kind=DimensionType.TIME,
                    array_size_px=0,  # Unlimited
                    chunk_size_px=5,
                    shard_size_chunks=2,
                ),
                Dimension(
                    name="z",
                    kind=DimensionType.SPACE,
                    array_size_px=10,
                    chunk_size_px=2,
                    shard_size_chunks=1,
                ),
                Dimension(
                    name="y",
                    kind=DimensionType.SPACE,
                    array_size_px=48,
                    chunk_size_px=16,
                    shard_size_chunks=1,
                ),
                Dimension(
                    name="x",
                    kind=DimensionType.SPACE,
                    array_size_px=64,
                    chunk_size_px=16,
                    shard_size_chunks=2,
                ),
            ],
            data_type=np.uint16,
            downsampling_method=DownsamplingMethod.MEAN,
        )
    ]

    settings.store_path = "output_v3_compressed_multiscale_s3.zarr"

    # Create stream
    stream = ZarrStream(settings)

    # Write sample data
    stream.append(make_sample_data())


if __name__ == "__main__":
    main()

Download this example

Example: Streaming multiple arrays to a Zarr store on the filesystem
# Stream multiple arrays to filesystem
import numpy as np

from acquire_zarr import (
    ArraySettings, StreamSettings, ZarrStream, Dimension, DimensionType,
    DownsamplingMethod, Compressor, CompressionCodec, CompressionSettings
)

from typing import Tuple


def make_sample_data(shape: Tuple[int, ...], dtype: np.dtype) -> np.ndarray:
    is_int = np.issubdtype(dtype, np.integer)
    typemin = np.iinfo(dtype).min if is_int else np.finfo(dtype).min
    typemax = np.iinfo(dtype).max if is_int else np.finfo(dtype).max

    if is_int:
        return np.random.randint(
            typemin, typemax,
            shape,
            dtype=dtype
        )
    elif np.issubdtype(dtype, np.floating):
        return np.random.uniform(
            typemin, typemax,
            shape
        ).astype(dtype)
    else:
        raise ValueError(f"Unsupported data type: {dtype}")


def main():
    settings = StreamSettings(
        arrays=[
            ArraySettings(
                output_key="path/to/uint16_array",
                compression=CompressionSettings(
                    compressor=Compressor.BLOSC1,
                    codec=CompressionCodec.BLOSC_LZ4,
                    level=1,
                    shuffle=1,
                ),
                dimensions=[
                    Dimension(
                        name="t",
                        kind=DimensionType.TIME,
                        array_size_px=0,
                        chunk_size_px=5,
                        shard_size_chunks=2,
                    ),
                    Dimension(
                        name="c",
                        kind=DimensionType.CHANNEL,
                        array_size_px=8,
                        chunk_size_px=4,
                        shard_size_chunks=2,
                    ),
                    Dimension(
                        name="z",
                        kind=DimensionType.SPACE,
                        array_size_px=6,
                        chunk_size_px=2,
                        shard_size_chunks=1,
                    ),
                    Dimension(
                        name="y",
                        kind=DimensionType.SPACE,
                        array_size_px=48,
                        chunk_size_px=16,
                        shard_size_chunks=1,
                    ),
                    Dimension(
                        name="x",
                        kind=DimensionType.SPACE,
                        array_size_px=64,
                        chunk_size_px=16,
                        shard_size_chunks=2,
                    ),
                ],
                data_type=np.uint16,
            ),
            ArraySettings(
                output_key="a/float32/array",
                compression=CompressionSettings(
                    compressor=Compressor.BLOSC1,
                    codec=CompressionCodec.BLOSC_ZSTD,
                    level=3,
                    shuffle=2,
                ),
                dimensions=[
                    Dimension(
                        name="z",
                        kind=DimensionType.SPACE,
                        array_size_px=6,
                        chunk_size_px=2,
                        shard_size_chunks=1,
                    ),
                    Dimension(
                        name="y",
                        kind=DimensionType.SPACE,
                        array_size_px=48,
                        chunk_size_px=16,
                        shard_size_chunks=1,
                    ),
                    Dimension(
                        name="x",
                        kind=DimensionType.SPACE,
                        array_size_px=64,
                        chunk_size_px=16,
                        shard_size_chunks=2,
                    ),
                ],
                data_type=np.float32,
                downsampling_method=DownsamplingMethod.MEAN
            ),
            ArraySettings(
                output_key="labels",
                dimensions=[
                    Dimension(
                        name="z",
                        kind=DimensionType.SPACE,
                        array_size_px=6,
                        chunk_size_px=2,
                        shard_size_chunks=1,
                    ),
                    Dimension(
                        name="y",
                        kind=DimensionType.SPACE,
                        array_size_px=48,
                        chunk_size_px=16,
                        shard_size_chunks=1,
                    ),
                    Dimension(
                        name="x",
                        kind=DimensionType.SPACE,
                        array_size_px=64,
                        chunk_size_px=16,
                        shard_size_chunks=2,
                    ),
                ],
                data_type=np.uint8,
                downsampling_method=DownsamplingMethod.MAX
            )
        ],
        store_path="output_multiarray.zarr",
        overwrite=True
    )

    # Create stream
    stream = ZarrStream(settings)

    # Write sample data to each array
    stream.append(make_sample_data((10, 8, 6, 48, 64), np.uint16), "path/to/uint16_array")
    stream.append(make_sample_data((6, 48, 64), np.float32), "a/float32/array")
    stream.append(make_sample_data((6, 48, 64), np.uint8), "labels")


if __name__ == "__main__":
    main()

Download this example