Python examples¶
Below are examples in Python for working with the Acquire Zarr library. Have an example you'd like to share with the community? Submit a GitHub issue and include "Example:" in your title.
Example: Basic streaming to filesystem
# Stream to filesystem without compression
import numpy as np
from acquire_zarr import (
ArraySettings, StreamSettings, ZarrStream, Dimension, DimensionType,
DataType
)
def make_sample_data():
"""Generate sample data with moving diagonal pattern"""
width, height = 64, 48
frames = []
for t in range(50):
frame = np.zeros((height, width), dtype=np.uint16)
for y in range(height):
for x in range(width):
# Create a diagonal pattern that moves with time
diagonal = (x + y + t * 8) % 32
# Create intensity variation
if diagonal < 16:
intensity = diagonal * 4096 # Ramp up
else:
intensity = (31 - diagonal) * 4096 # Ramp down
# Add circular features
center_x, center_y = width // 2, height // 2
dx, dy = x - center_x, y - center_y
radius = int(np.sqrt(dx*dx + dy*dy))
# Modulate with concentric circles
if radius % 16 < 8:
intensity = int(intensity * 0.7)
frame[y, x] = intensity
frames.append(frame)
return np.array(frames)
def main():
settings = StreamSettings()
# Configure 3D array (t, y, x) - no compression
settings.arrays = [
ArraySettings(
dimensions=[
Dimension(
name="t",
kind=DimensionType.TIME,
array_size_px=0, # Unlimited
chunk_size_px=5,
shard_size_chunks=2,
),
Dimension(
name="y",
kind=DimensionType.SPACE,
array_size_px=48,
chunk_size_px=16,
shard_size_chunks=1,
),
Dimension(
name="x",
kind=DimensionType.SPACE,
array_size_px=64,
chunk_size_px=16,
shard_size_chunks=2,
),
],
data_type=DataType.UINT16,
)
]
settings.store_path = "output_v3.zarr"
# Create stream
stream = ZarrStream(settings)
# Write sample data
sample_data = make_sample_data()
stream.append(sample_data)
if __name__ == "__main__":
main()
Example: Multiscale streaming to filesystem
# Stream to filesystem with multiscale, no compression
import numpy as np
from acquire_zarr import (
ArraySettings, StreamSettings, ZarrStream, Dimension, DimensionType,
DownsamplingMethod
)
def make_sample_data():
"""Generate sample data matching the 5D structure (t, c, z, y, x)"""
# Shape: (10 timepoints, 8 channels, 6 z-slices, 48 height, 64 width)
return np.random.randint(
0, 65535,
(10, 8, 6, 48, 64),
dtype=np.uint16
)
def main():
settings = StreamSettings()
# Configure 5D array (t, c, z, y, x) with multiscale, no compression
settings.arrays = [
ArraySettings(
dimensions=[
Dimension(
name="t",
kind=DimensionType.TIME,
array_size_px=10,
chunk_size_px=5,
shard_size_chunks=2,
),
Dimension(
name="c",
kind=DimensionType.CHANNEL,
array_size_px=8,
chunk_size_px=4,
shard_size_chunks=2,
),
Dimension(
name="z",
kind=DimensionType.SPACE,
array_size_px=6,
chunk_size_px=2,
shard_size_chunks=1,
),
Dimension(
name="y",
kind=DimensionType.SPACE,
array_size_px=48,
chunk_size_px=16,
shard_size_chunks=1,
),
Dimension(
name="x",
kind=DimensionType.SPACE,
array_size_px=64,
chunk_size_px=16,
shard_size_chunks=2,
),
],
data_type=np.uint16,
downsampling_method=DownsamplingMethod.MEAN
)
]
settings.store_path = "output_v3_multiscale.zarr"
# Create stream
stream = ZarrStream(settings)
# Write sample data
stream.append(make_sample_data())
if __name__ == "__main__":
main()
Example: Compressed streaming to filesystem
# Stream to filesystem with LZ4 compression
import numpy as np
from acquire_zarr import (
ArraySettings, StreamSettings, ZarrStream, Dimension, DimensionType,
DataType, Compressor, CompressionCodec, CompressionSettings
)
def make_sample_data():
return np.random.randint(
0, 65535,
(5, 4, 2, 48, 64), # Shape matches chunk sizes
dtype=np.uint16
)
def main():
settings = StreamSettings()
# Configure a 5D compressed output array
settings.arrays = [
ArraySettings(
compression=CompressionSettings(
compressor=Compressor.BLOSC1,
codec=CompressionCodec.BLOSC_LZ4,
level=1,
shuffle=1,
),
dimensions=[
Dimension(
name="t",
kind=DimensionType.TIME,
array_size_px=10,
chunk_size_px=5,
shard_size_chunks=2,
),
Dimension(
name="c",
kind=DimensionType.CHANNEL,
array_size_px=8,
chunk_size_px=4,
shard_size_chunks=2,
),
Dimension(
name="z",
kind=DimensionType.SPACE,
array_size_px=6,
chunk_size_px=2,
shard_size_chunks=1,
),
Dimension(
name="y",
kind=DimensionType.SPACE,
array_size_px=48,
chunk_size_px=16,
shard_size_chunks=1,
),
Dimension(
name="x",
kind=DimensionType.SPACE,
array_size_px=64,
chunk_size_px=16,
shard_size_chunks=2,
),
],
data_type=DataType.UINT16,
)
]
settings.store_path = "output_compressed.zarr"
# Create stream
stream = ZarrStream(settings)
# Write sample data
stream.append(make_sample_data())
if __name__ == "__main__":
main()
Example: Basic streaming to S3
# Stream to S3
import numpy as np
# Ensure that you have set your S3 credentials in the environment variables
# AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and optionally AWS_SESSION_TOKEN
# BEFORE importing acquire_zarr
from acquire_zarr import (
ArraySettings, StreamSettings, ZarrStream, Dimension, DimensionType,
DataType, S3Settings
)
def make_sample_data():
return np.random.randint(
0, 65535,
(5, 2, 48, 64), # Shape matches chunk sizes
dtype=np.uint16
)
def main():
settings = StreamSettings()
# Configure S3
settings.s3 = S3Settings(
endpoint="http://localhost:9000",
bucket_name="my-bucket",
region="us-east-2"
)
# Configure 4D array (t, z, y, x)
settings.arrays = [
ArraySettings(
dimensions=[
Dimension(
name="t",
kind=DimensionType.TIME,
array_size_px=0, # Unlimited
chunk_size_px=5,
shard_size_chunks=2,
),
Dimension(
name="z",
kind=DimensionType.SPACE,
array_size_px=10,
chunk_size_px=2,
shard_size_chunks=1,
),
Dimension(
name="y",
kind=DimensionType.SPACE,
array_size_px=48,
chunk_size_px=16,
shard_size_chunks=1,
),
Dimension(
name="x",
kind=DimensionType.SPACE,
array_size_px=64,
chunk_size_px=16,
shard_size_chunks=2,
),
],
data_type= DataType.UINT16,
)
]
settings.store_path = "output_s3.zarr"
# Create stream
stream = ZarrStream(settings)
# Write sample data
stream.append(make_sample_data())
if __name__ == "__main__":
main()
Example: Compressed streaming to S3
# Stream to S3 with Zstd compression
import numpy as np
# Ensure that you have set your S3 credentials in the environment variables
# AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and optionally AWS_SESSION_TOKEN
# BEFORE importing acquire_zarr
from acquire_zarr import (
ArraySettings, StreamSettings, ZarrStream, Dimension, DimensionType,
DataType, S3Settings, Compressor, CompressionCodec, CompressionSettings
)
def make_sample_data():
"""Generate sample data with moving diagonal pattern"""
width, height = 64, 48
frames = []
for t in range(50):
frame = np.zeros((height, width), dtype=np.uint16)
for y in range(height):
for x in range(width):
# Create a diagonal pattern that moves with time
diagonal = (x + y + t * 8) % 32
# Create intensity variation
if diagonal < 16:
intensity = diagonal * 4096 # Ramp up
else:
intensity = (31 - diagonal) * 4096 # Ramp down
# Add circular features
center_x, center_y = width // 2, height // 2
dx, dy = x - center_x, y - center_y
radius = int(np.sqrt(dx*dx + dy*dy))
# Modulate with concentric circles
if radius % 16 < 8:
intensity = int(intensity * 0.7)
frame[y, x] = intensity
frames.append(frame)
return np.array(frames)
def main():
settings = StreamSettings()
# Configure S3
settings.s3 = S3Settings(
endpoint="http://localhost:9000",
bucket_name="my-bucket"
)
# Configure 3D array (t, y, x) with Zstd compression
settings.arrays = [
ArraySettings(
compression=CompressionSettings(
compressor=Compressor.BLOSC1,
codec=CompressionCodec.BLOSC_ZSTD,
level=1,
shuffle=1,
),
dimensions=[
Dimension(
name="t",
kind=DimensionType.TIME,
array_size_px=0, # Unlimited
chunk_size_px=5,
shard_size_chunks=2,
),
Dimension(
name="y",
kind=DimensionType.SPACE,
array_size_px=48,
chunk_size_px=16,
shard_size_chunks=1,
),
Dimension(
name="x",
kind=DimensionType.SPACE,
array_size_px=64,
chunk_size_px=16,
shard_size_chunks=2,
),
],
data_type=DataType.UINT16,
)
]
settings.store_path = "output_v3_compressed_s3.zarr"
# Create stream
stream = ZarrStream(settings)
# Write sample data
sample_data = make_sample_data()
stream.append(sample_data)
if __name__ == "__main__":
main()
Example: Multiscale compressed streaming to S3
# Stream to S3 with multiscale and Zstd compression
import numpy as np
# Ensure that you have set your S3 credentials in the environment variables
# AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and optionally AWS_SESSION_TOKEN
# BEFORE importing acquire_zarr
from acquire_zarr import (
ArraySettings, StreamSettings, ZarrStream, Dimension, DimensionType,
DownsamplingMethod, S3Settings, Compressor, CompressionCodec, CompressionSettings
)
def make_sample_data():
"""Generate sample data with moving diagonal pattern for 4D array (t, z, y, x)"""
width, height, depth = 64, 48, 10
frames = []
for t in range(10):
volume = np.zeros((depth, height, width), dtype=np.uint16)
for z in range(depth):
for y in range(height):
for x in range(width):
# Create a diagonal pattern that moves with time
diagonal = (x + y + t * 8) % 32
# Create intensity variation
if diagonal < 16:
intensity = diagonal * 4096 # Ramp up
else:
intensity = (31 - diagonal) * 4096 # Ramp down
# Add circular features
center_x, center_y = width // 2, height // 2
dx, dy = x - center_x, y - center_y
radius = int(np.sqrt(dx * dx + dy * dy))
# Modulate with concentric circles
if radius % 16 < 8:
intensity = int(intensity * 0.7)
volume[z, y, x] = intensity
frames.append(volume)
return np.array(frames)
def main():
settings = StreamSettings()
# Configure S3
settings.s3 = S3Settings(
endpoint="http://127.0.0.1:9000",
bucket_name="my-bucket"
)
# Configure 4D array (t, z, y, x) with multiscale and Zstd compression
settings.arrays = [
ArraySettings(
compression=CompressionSettings(
compressor=Compressor.BLOSC1,
codec=CompressionCodec.BLOSC_ZSTD,
level=1,
shuffle=1,
),
dimensions=[
Dimension(
name="t",
kind=DimensionType.TIME,
array_size_px=0, # Unlimited
chunk_size_px=5,
shard_size_chunks=2,
),
Dimension(
name="z",
kind=DimensionType.SPACE,
array_size_px=10,
chunk_size_px=2,
shard_size_chunks=1,
),
Dimension(
name="y",
kind=DimensionType.SPACE,
array_size_px=48,
chunk_size_px=16,
shard_size_chunks=1,
),
Dimension(
name="x",
kind=DimensionType.SPACE,
array_size_px=64,
chunk_size_px=16,
shard_size_chunks=2,
),
],
data_type=np.uint16,
downsampling_method=DownsamplingMethod.MEAN,
)
]
settings.store_path = "output_v3_compressed_multiscale_s3.zarr"
# Create stream
stream = ZarrStream(settings)
# Write sample data
stream.append(make_sample_data())
if __name__ == "__main__":
main()
Example: Streaming multiple arrays to a Zarr store on the filesystem
# Stream multiple arrays to filesystem
import numpy as np
from acquire_zarr import (
ArraySettings, StreamSettings, ZarrStream, Dimension, DimensionType,
DownsamplingMethod, Compressor, CompressionCodec, CompressionSettings
)
from typing import Tuple
def make_sample_data(shape: Tuple[int, ...], dtype: np.dtype) -> np.ndarray:
is_int = np.issubdtype(dtype, np.integer)
typemin = np.iinfo(dtype).min if is_int else np.finfo(dtype).min
typemax = np.iinfo(dtype).max if is_int else np.finfo(dtype).max
if is_int:
return np.random.randint(
typemin, typemax,
shape,
dtype=dtype
)
elif np.issubdtype(dtype, np.floating):
return np.random.uniform(
typemin, typemax,
shape
).astype(dtype)
else:
raise ValueError(f"Unsupported data type: {dtype}")
def main():
settings = StreamSettings(
arrays=[
ArraySettings(
output_key="path/to/uint16_array",
compression=CompressionSettings(
compressor=Compressor.BLOSC1,
codec=CompressionCodec.BLOSC_LZ4,
level=1,
shuffle=1,
),
dimensions=[
Dimension(
name="t",
kind=DimensionType.TIME,
array_size_px=0,
chunk_size_px=5,
shard_size_chunks=2,
),
Dimension(
name="c",
kind=DimensionType.CHANNEL,
array_size_px=8,
chunk_size_px=4,
shard_size_chunks=2,
),
Dimension(
name="z",
kind=DimensionType.SPACE,
array_size_px=6,
chunk_size_px=2,
shard_size_chunks=1,
),
Dimension(
name="y",
kind=DimensionType.SPACE,
array_size_px=48,
chunk_size_px=16,
shard_size_chunks=1,
),
Dimension(
name="x",
kind=DimensionType.SPACE,
array_size_px=64,
chunk_size_px=16,
shard_size_chunks=2,
),
],
data_type=np.uint16,
),
ArraySettings(
output_key="a/float32/array",
compression=CompressionSettings(
compressor=Compressor.BLOSC1,
codec=CompressionCodec.BLOSC_ZSTD,
level=3,
shuffle=2,
),
dimensions=[
Dimension(
name="z",
kind=DimensionType.SPACE,
array_size_px=6,
chunk_size_px=2,
shard_size_chunks=1,
),
Dimension(
name="y",
kind=DimensionType.SPACE,
array_size_px=48,
chunk_size_px=16,
shard_size_chunks=1,
),
Dimension(
name="x",
kind=DimensionType.SPACE,
array_size_px=64,
chunk_size_px=16,
shard_size_chunks=2,
),
],
data_type=np.float32,
downsampling_method=DownsamplingMethod.MEAN
),
ArraySettings(
output_key="labels",
dimensions=[
Dimension(
name="z",
kind=DimensionType.SPACE,
array_size_px=6,
chunk_size_px=2,
shard_size_chunks=1,
),
Dimension(
name="y",
kind=DimensionType.SPACE,
array_size_px=48,
chunk_size_px=16,
shard_size_chunks=1,
),
Dimension(
name="x",
kind=DimensionType.SPACE,
array_size_px=64,
chunk_size_px=16,
shard_size_chunks=2,
),
],
data_type=np.uint8,
downsampling_method=DownsamplingMethod.MAX
)
],
store_path="output_multiarray.zarr",
overwrite=True
)
# Create stream
stream = ZarrStream(settings)
# Write sample data to each array
stream.append(make_sample_data((10, 8, 6, 48, 64), np.uint16), "path/to/uint16_array")
stream.append(make_sample_data((6, 48, 64), np.float32), "a/float32/array")
stream.append(make_sample_data((6, 48, 64), np.uint8), "labels")
if __name__ == "__main__":
main()