Skip to content

Commit

Permalink
support writing OME ZARR without a client by adding asynchronous para…
Browse files Browse the repository at this point in the history
…meter
  • Loading branch information
Karl5766 committed Nov 22, 2024
1 parent 001a186 commit 65e4ed2
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 14 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@
/.idea/
*__pycache__/
/docs/_build/
/src/experiment/
/src/experiment/

*/log_stderr.txt
*/log_stdout.txt
11 changes: 7 additions & 4 deletions examples/test_coiled.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
LOCAL_TESTING = False
USE_GCS = True
import numpy as np

LOCAL_TESTING = True
USE_GCS = False
COMP_SLI = np.index_exp[:, :256, :256]


async def main(dask_worker):
Expand Down Expand Up @@ -282,7 +285,7 @@ async def forward(self, im, cptr, viewer_args: dict = None):

cur_im = da.from_zarr(cvpl_ome_zarr_io.load_zarr_group_from_path(
path=ORIG_IM_PATH, mode='r', level=0
)) / 1000
))[COMP_SLI] / 1000
assert cur_im.ndim == 3
print(f'imshape={cur_im.shape}')
cur_im = cur_im.rechunk(chunks=(64, 64, 64))
Expand All @@ -306,7 +309,7 @@ async def compute_masking():
neg_mask = tifffile.imread(infile)
neg_mask = da.from_array(neg_mask, chunks=(64, 64, 64))
neg_mask = dask_ndinterp.scale_nearest(neg_mask,
scale=1, output_shape=cur_im.shape, output_chunks=(64, 64, 64))
scale=1, output_shape=cur_im.shape, output_chunks=(64, 64, 64))[COMP_SLI]
neg_mask = await temp_directory.cache_im(fn=lambda: neg_mask,
cid='neg_mask_upsampling',
viewer_args=viewer_args | dict(is_label=True))
Expand Down
2 changes: 1 addition & 1 deletion src/cvpl_tools/im/ndblock.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ async def save(file: str, ndblock: NDBlock, storage_options: dict | None = None)
MAX_LAYER = storage_options.get('multiscale') or 0
await cvpl_ome_zarr_io.write_ome_zarr_image(f'{file}/dask_im', da_arr=ndblock.arr,
make_zip=False, MAX_LAYER=MAX_LAYER,
storage_options=storage_options)
storage_options=storage_options, asynchronous=True)
else:
assert fmt == ReprFormat.DICT_BLOCK_INDEX_SLICES, fmt
raise ValueError(f'NDBlock type to be saved at path {file} is of '
Expand Down
15 changes: 10 additions & 5 deletions src/cvpl_tools/ome_zarr/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ async def write_ome_zarr_image_direct(group_url: str,
lbl_name: str | None = None,
MAX_LAYER: int = 3,
storage_options: dict = None,
lbl_storage_options: dict = None):
lbl_storage_options: dict = None,
asynchronous: bool = False):
"""Direct write of dask array to target ome zarr group (can not be a zip)
Args:
Expand Down Expand Up @@ -196,7 +197,8 @@ async def write_ome_zarr_image_direct(group_url: str,
scaler=scaler,
coordinate_transformations=_get_coord_transform_yx_for_write(da_arr.ndim, MAX_LAYER),
storage_options=storage_options,
axes=_get_axes_for_write(da_arr.ndim))
axes=_get_axes_for_write(da_arr.ndim),
asynchronous=asynchronous)

if lbl_arr is not None:
assert lbl_name is not None, ('ERROR: Please provide lbl_name along when writing labels '
Expand All @@ -215,7 +217,8 @@ async def write_ome_zarr_image_direct(group_url: str,
name=lbl_name,
coordinate_transformations=_get_coord_transform_yx_for_write(lbl_arr.ndim, MAX_LAYER),
storage_options=lbl_storage_options,
axes=lbl_axes)
axes=lbl_axes,
asynchronous=asynchronous)
# ome_zarr.writer.write_label_metadata(group=g,
# name=f'/labels/{lbl_name}',
# properties=properties)
Expand All @@ -230,7 +233,8 @@ async def write_ome_zarr_image(out_ome_zarr_path: str,
MAX_LAYER: int = 0,
logging=False,
storage_options: dict = None,
lbl_storage_options: dict = None):
lbl_storage_options: dict = None,
asynchronous: bool = False):
"""Write dask array as an ome zarr
For writing to zip file: due to dask does not directly support write to zip file, we instead create a temp ome zarr
Expand Down Expand Up @@ -268,7 +272,8 @@ async def write_ome_zarr_image(out_ome_zarr_path: str,
fs.makedirs_cur()
await write_ome_zarr_image_direct(folder_out_ome_zarr_path, da_arr, lbl_arr, lbl_name, MAX_LAYER=MAX_LAYER,
storage_options=storage_options,
lbl_storage_options=lbl_storage_options)
lbl_storage_options=lbl_storage_options,
asynchronous=asynchronous)
if logging:
print('Folder is written.')

Expand Down
15 changes: 13 additions & 2 deletions src/cvpl_tools/ome_zarr/ome_zarr_writer_patched.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ async def write_image(
coordinate_transformations: Optional[List[List[Dict[str, Any]]]] = None,
storage_options: Optional[Union[JSONDict, List[JSONDict]]] = None,
compute: Optional[bool] = True,
asynchronous: bool = False,
**metadata: Union[str, JSONDict, List[JSONDict]],
) -> List:
"""Writes an image to the zarr store according to ome-zarr specification
Expand Down Expand Up @@ -422,6 +423,7 @@ async def write_image(
storage_options=storage_options,
name=None,
compute=compute,
asynchronous=asynchronous,
**metadata,
)
return dask_delayed_jobs
Expand Down Expand Up @@ -453,6 +455,7 @@ async def _write_dask_image(
storage_options: Optional[Union[JSONDict, List[JSONDict]]] = None,
name: Optional[str] = None,
compute: Optional[bool] = True,
asynchronous: bool = False,
**metadata: Union[str, JSONDict, List[JSONDict]],
) -> List:
assert compute, 'ERROR: PATCH ONLY ALLOWS COMPUTE=TRUE!'
Expand Down Expand Up @@ -516,12 +519,18 @@ async def _write_dask_image(
)
if str(path) == '0' and max_layer > 0:
# for the second image onward, we need to read the written image and then write new images
await dask_compute(get_dask_client(), delayed)
if asynchronous:
await dask_compute(get_dask_client(), delayed)
else:
dask.compute(delayed)
delayed = []
image = da.from_zarr(group['0'])
datasets.append({"path": str(path)})

await dask_compute(get_dask_client(), delayed)
if asynchronous:
await dask_compute(get_dask_client(), delayed)
else:
dask.compute(delayed)
delayed = []

if coordinate_transformations is None:
Expand Down Expand Up @@ -599,6 +608,7 @@ async def write_labels(
storage_options: Optional[Union[JSONDict, List[JSONDict]]] = None,
label_metadata: Optional[JSONDict] = None,
compute: Optional[bool] = True,
asynchronous: bool = False,
**metadata: JSONDict,
) -> List:
"""
Expand Down Expand Up @@ -673,6 +683,7 @@ async def write_labels(
storage_options=storage_options,
name=name,
compute=compute,
asynchronous=asynchronous,
**metadata,
)
else:
Expand Down
2 changes: 1 addition & 1 deletion src/cvpl_tools/tools/dask_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def log(client: dask.distributed.Client, msg, end='\n'):

omefs = RDirFileSystem(f'{TMP_DIR}/testim2.ome.zarr')
log(client, f'Test writing dask array as ome zarr image at path {omefs.url}:', end='')
await ome_io.write_ome_zarr_image(out_ome_zarr_path=omefs.url, da_arr=dask_arr)
await ome_io.write_ome_zarr_image(out_ome_zarr_path=omefs.url, da_arr=dask_arr, asynchronous=True)
log(client, f'ok.')
log(client, f'Test reading dask array from ome zarr image just written:', end='')
read_arr = await ome_io.load_dask_array_from_path(f'{omefs.url}/0', mode='r')
Expand Down

0 comments on commit 65e4ed2

Please sign in to comment.