Download this notebook from github.


xsar batch processing on datarmor (ifremer)

unlike others examples, this notebook is not executed by the documentation module, so there are no output cells.

This notebook will show how to use xsar with dask to process many SAFEs, on datarmor.

We will see how to convert some L1 SAFEs files to L1B netcdf files at 1000m resolution.

Basically, to use your own processing function, you will have to adapt the l1b function defined below.

Prerequisite

This example is fully documented, and shoul be used by new users that have never used xsar or dask in a jupyter notebook on datarmor.

Set up environment

We will assume that xsar is not installed, and dask is not configured for datarmor.

Get a node with internet access

This is needed, because conda need more cpu/ram than available on the submit node, and an internet access.

ssh datarmor
qsub -I -q ftp -l mem=16g,walltime=4:00:00

install environement

Install (and update) xsar. More infos on https://cyclobs.ifremer.fr/static/sarwing_datarmor/xsar/installing.html

conda create -n xsar
conda activate xsar
conda install -c conda-forge xsar 'python<3.10'
pip install git+https://github.com/umr-lops/xsar.git
pip install -r https://raw.githubusercontent.com/umr-lops/xsar/develop/requirements.txt
pip install git+https://github.com/umr-lops/xsarsea.git

This is needed for coastlines, because datarmor nodes don’t have internet access

cartopy_feature_download.py --output `python -c 'import cartopy ; print(cartopy.config["data_dir"])'` physical

This is needed for https://datarmor-jupyterhub.ifremer.fr/

conda install -c conda-forge jupyterhub
python -m ipykernel install --user

Install https://dask-hpcconfig.readthedocs.io/en/latest/

pip install git+https://github.com/umr-lops/dask-hpcconfig.git#egg=dask-hpcconfig

Enable dask-labextension, so the dask dashboard can be reached

pip install dask-labextension
pip install ipywidgets

download and execute this notebook

Now , go to https://datarmor-jupyterhub.ifremer.fr, and choose ‘jupyter lab’. Put ‘xar’ in the optionnal field, to use the previously created env.

It better to use a 8Gb ram notebook or more to handle the dask scheduler.

image.png

Dowlnoad this notebook as ipynb, and open it jupyterhub. You are now ready to execute it.

[ ]:
import xsar
import distributed
import dask_hpcconfig
import glob
import pandas as pd
import os
import dask.dataframe as dd
import time
from tqdm.auto import tqdm, trange
import numpy as np
import traceback

Get input and outputs as rows of a pandas dataframe

The first step is to build a pandas.Dataframe with input SAFEs and output paths. Basicallys, eachs rows will be fed to our processing function, in parallel.

Elements size should be small, like filenames or very small python objects. Do not use here complex datatypes like xsar.Sentinel1Meta objects, or numpy array. (However, it’s possible to use dask futures).

It’s important to be able to know if the processing is allready done, because we wan’t to be able to skip what’s allready done.

It’s a good practice to do that before calling the processing function, and a bad practice to do than inside the processing function, because a processing slot will be used on a worker to do nothing.

[ ]:
# get input SAFEs, as a pandas dataframe
df_safes = pd.DataFrame(glob.glob('/home/datawork-cersat-public/cache/project/mpc-sentinel1/data/esa/sentinel-1a/L1/IW/S1A_IW_GRDH_1S/2021/12*/*.SAFE'), columns=['safe'])
# we just add an invalid SAFE, to be able to handle errors
df_safes.loc[-1,'safe'] = 'error.SAFE'
df_safes
[ ]:
# compute out_path
out_path_prefix = '%s/xsar_dask_demo' % os.environ['SCRATCH']
os.makedirs(out_path_prefix, exist_ok=True)
df_safes['out_path'] = df_safes['safe'].apply(lambda f: '%s/%s.nc' % (out_path_prefix, os.path.splitext(os.path.basename(f))[0]))
df_safes
[ ]:
# filter out_path that allready exists
df_safes = df_safes[df_safes['out_path'].apply(lambda f: not os.path.exists(f))]
df_safes

Main processing function

the l1b function is the main processing function. It should take very small arguments size, like input file and output file. Do not use arguments with big size here (like xarray or complex objects)

In this example, we just open the safe at 1000m resolution, and save it as a netcdf file.

This is the function you will need to change for your own processing.

[ ]:
def l1b(safe, outfile):
    ds = xsar.open_dataset(safe, resolution='1000m')['measurement'].ds
    # make attributes to be str, so writable to file
    to_str = ['start_date', 'stop_date', 'footprint']
    for attr in to_str:
        ds.attrs[attr] = str(ds.attrs[attr])
    ds.to_netcdf(outfile)
    return outfile

To process all SAFEs from df_safes, what we don’t want to do is to use a sequential loop like

for idx, safe in df_safes.iterrows():
    print(l1b(*safe))

Because the processing whould be sequential (one safe is processed at a time), and it can take a long time.

So what we want to do is to use dask to execute many l1b function in parallel.

set up the dask cluster

We use dask-hpcconfig. See the doc to set up a different cluster.

[ ]:
# see https://dask-hpcconfig.readthedocs.io/en/latest/ if you want to change the cluster config
override = {
    "cluster.n_workers": 28,
    "cluster.cores": 1,
    "cluster.extra": ["--lifetime", "30m", "--lifetime-stagger", "10m", "--lifetime-restart" ] # restart worker to clean state periodicaly
}
cluster = dask_hpcconfig.cluster('datarmor' , **override)
cluster.scale(4)
client = distributed.Client(cluster)
client

If you click the above dashboard link, you should reach the dask status page that look like this:

image.png

From now, the interresing parts are the ‘workers’ tab and the ‘info’ tab.

workers tab

the workers tab should be something like

image1

If it’s empty, the cluster is not allready instanciated (execute the cell bellow to wait for it)

info tab

The info tab should be something like

image2

What’s important here is the ‘last seen’ column. It should be above ‘1s’

waiting for the cluster to be operationnal

This should probably not needed, but we want to be sure that the cluster is ok.

The main problem is that the check function that just do import xsar can be very long (up to 8 minutes). The issue is partialy solved by https://github.com/umr-lops/xsar/issues/65 , but it’s probably due to a datarmor IO problem.

The cluster should be up in 15s-60s, and the import xsar should take 30s-400s.

The import xsar sometimes freeze the worker (probably because the GIL is not released). This can be show in the ‘last seen’ column of the ‘info’ tab as seen above. (If it’s greater than 60s, the tab should turn red).

image.png

[ ]:
while len(client.scheduler_info()['workers']) == 0:
    print('waiting for cluster')
    time.sleep(5)
print('cluster is up. checking `import xsar`')

def check():
    import xsar
    return True
t0 = time.time()
client.run(check)
print('client checked in %d s' % (time.time() - t0 ))

Split df_safes into smaller parts

So we have df_safes that is a pandas.Dataframe object, with all SAFEs to be processed.

We want to split this dataframe in smaller parts, and process those parts in parallel on different workers.

To to so, we use dask.dataframe.

[ ]:
# we build a dask dataframe, with npartitions, so npartitions safes will be processed in parallel
# npartitions depend on your processing and the workers count
# a good starting value is to use the same number as the workers count
# with heavy full res processing you will have to reduce it
ddf_safes = dd.from_pandas(df_safes, npartitions=14)
ddf_safes

the batch processing function

The batch_processing function will we executed on the workers. There will be as many batch_processing functions running in paralell as npartitions.

Each functions will be fed with a partition of ddf_safes. A partition is a regular pandas.Dataframe object like df_safes, but with fewer rows.

Basically, a minimal batch_processing function should look like this:

def batch_processing(df_safes_part):
    for _, safe_row in df_safes_part.iterrows():
        safe, outfile = safe_row[['safe', 'out_path']]
        l1b(safe, outfile)

the disadvantage of this method are:

  • If the worker restart, and the function is retried, allready processed file will be reprocessed.

So we need to test if the outfile file exists before calling l1b

  • Errors are not catched. If a l1b processing fail, remainings rows will be not processed.

We just need to surround the l1b call with a try/except block

  • Errors messages are hidden in the workers logs

If a l1b processing fail, we need to know the error cause, and send it to the notebook.

To be able to to that, we will use a distributed.Queue object, and will use it to send processing infos and error messages, as a dict like

{
    'status': False,
    'args': (safe, outfile),
    'time': 0,
    'error': ""
}
[ ]:
# we set up a dask queue, so the batch_processing function, will be able to communicate processing infos to the notebook
messages_queue = distributed.Queue('batch_processing')

def batch_processing(df_safes_part, msg_queue=messages_queue):
    res = []
    for idx, safe_row in df_safes_part.iterrows():
        safe, outfile = safe_row[['safe', 'out_path']]
        # we need to re-check if outfile allready exist, because dask might have restarted the worker
        # and we don't want the whole df_safes to be reprocessed
        if os.path.exists(outfile):
            res.append(outfile)
            continue

        # we set up a dict that we will send to msg_queue
        # it contains general processing info
        message = {
            'status': False,
            'args': (safe, outfile),
            'time': 0,
            'error': ""
        }

        # if a processing fail, we will retry
        for retry in range(2):
            # we enclose the processing in a try/except, so the worker won't be killed if error
            try:
                t1 = time.time()
                # this is the real call to our processing function
                out = l1b(safe, outfile)
                elapsed = time.time() - t1
                message['status'] = True
                message['time'] = elapsed
                break # process ok, exit the loop
            except Exception as e:
                # error while processing.
                # we get the error message that will be sent to the queue
                message['error'] = traceback.format_exc()
        msg_queue.put(message)
        res.append(outfile)
    return res

launch the distributed computation

Now, we want to apply the batch_processing function to each partition of dd_safes.

We use the map_partitions method. We use use a dummy meta keyword. What’s important here is str (the output type of the l1b function).

[ ]:
res = ddf_safes.map_partitions(batch_processing, meta=('foo', str))

At this stage, the computation has not yet started.

One way to to it should be:

res.compute()

But we won’t be able to see messages from messages_queue and we won’t see any progress information.

we will instead use

res.persist()

and build a progress bar with tqdm that will wait for messages from messages_queue, and display status information.

[ ]:

res.persist(retries=1) count = len(ddf_safes) pbar = trange(count,smoothing=0) elapsed = np.array([],dtype=float) for _ in pbar: message = messages_queue.get() if message['status']: elapsed = np.append(elapsed,message['time']) pbar.set_description('%03.0fs' % elapsed.mean()) else: tqdm.write('ERROR: "\n%s\n" on args %s' % ( message['error'] , message['args']))

While processing occur, you should seel a progressbar like this:

image.png

18s is the mean time to process one SAFE, per worker. But as we have many workers, an new SAFE is processed every 1.39s.

We have inserted an invalid ‘error.SAFE’ file. The error traceback should be displayed.

While processing, the dask status dashboard should look like this: image1

End

Once the processing is finished, processed files are in the output directory.

We can now close the cluster.

[ ]:
cluster.close()
client.close()