This notebook will show how to use xsar
with dask
to process many SAFEs, on datarmor@ifremer.
ssh datarmor
qsub -I -q ftp -l mem=16g,walltime=4:00:00
Install xsar. More infos on https://cyclobs.ifremer.fr/static/sarwing_datarmor/xsar/installing.html
conda create -n xsar
conda activate xsar
conda install -c conda-forge xsar rioxarray jupyterlab geoviews cartopy_offlinedata holoviews datashader 'shapely<1.8.0' nbsphinx pandoc jq docutils pip git lxml 'python<3.10'
pip install git+https://github.com/umr-lops/xsar.git
This is needed for coastlines, because datarmor nodes don't have internet access
cartopy_feature_download.py --output `python -c 'import cartopy ; print(cartopy.config["pre_existing_data_dir"])'` physical
This is needed for https://datarmor-jupyterhub.ifremer.fr/
conda install -c conda-forge jupyterhub
Install https://dask-hpcconfig.readthedocs.io/en/latest/
pip install git+https://github.com/umr-lops/dask-hpcconfig.git#egg=dask-hpcconfig
Enable dask-labextension, so the dask dashboard can be reached
pip install dask-labextension
pip install ipywidgets
edit ~/.config/dask/distributed.yaml
and add:
distributed:
dashboard:
link: "/user/{JUPYTERHUB_USER}/proxy/{port}/status"
Now , go to https://datarmor-jupyterhub.ifremer.fr, and choose 'jupyter lab'. Put 'xar' in the optionnal field, to use the previously created env. We don't need much memory, because we will use workers to do the computation. However, 2h of lifetime might be an issue for long processings.
You are now ready to execute this notebook.
import xsar
import distributed
import dask_hpcconfig
import glob
import pandas as pd
import os
import dask.dataframe as dd
import time
from tqdm.auto import tqdm, trange
import numpy as np
import traceback
import cartopy
# get input SAFEs, as a pandas dataframe
df_safes = pd.DataFrame(glob.glob('/home/datawork-cersat-public/cache/project/mpc-sentinel1/data/esa/sentinel-1a/L1/IW/S1A_IW_GRDH_1S/2021/12*/*.SAFE'), columns=['safe'])
# we just add an invalid SAFE, to be able to handle errors
df_safes.loc[-1,'safe'] = 'error.SAFE'
df_safes
safe | |
---|---|
0 | /home/datawork-cersat-public/cache/project/mpc... |
1 | /home/datawork-cersat-public/cache/project/mpc... |
2 | /home/datawork-cersat-public/cache/project/mpc... |
3 | /home/datawork-cersat-public/cache/project/mpc... |
4 | /home/datawork-cersat-public/cache/project/mpc... |
... | ... |
252 | /home/datawork-cersat-public/cache/project/mpc... |
253 | /home/datawork-cersat-public/cache/project/mpc... |
254 | /home/datawork-cersat-public/cache/project/mpc... |
255 | /home/datawork-cersat-public/cache/project/mpc... |
-1 | error.SAFE |
257 rows × 1 columns
# compute out_path
out_path_prefix = '%s/xsar_dask_demo' % os.environ['SCRATCH']
os.makedirs(out_path_prefix, exist_ok=True)
df_safes['out_path'] = df_safes['safe'].apply(lambda f: '%s/%s.nc' % (out_path_prefix, os.path.splitext(os.path.basename(f))[0]))
df_safes
safe | out_path | |
---|---|---|
0 | /home/datawork-cersat-public/cache/project/mpc... | /home1/scratch/oarcher/xsar_dask_demo/S1A_IW_G... |
1 | /home/datawork-cersat-public/cache/project/mpc... | /home1/scratch/oarcher/xsar_dask_demo/S1A_IW_G... |
2 | /home/datawork-cersat-public/cache/project/mpc... | /home1/scratch/oarcher/xsar_dask_demo/S1A_IW_G... |
3 | /home/datawork-cersat-public/cache/project/mpc... | /home1/scratch/oarcher/xsar_dask_demo/S1A_IW_G... |
4 | /home/datawork-cersat-public/cache/project/mpc... | /home1/scratch/oarcher/xsar_dask_demo/S1A_IW_G... |
... | ... | ... |
252 | /home/datawork-cersat-public/cache/project/mpc... | /home1/scratch/oarcher/xsar_dask_demo/S1A_IW_G... |
253 | /home/datawork-cersat-public/cache/project/mpc... | /home1/scratch/oarcher/xsar_dask_demo/S1A_IW_G... |
254 | /home/datawork-cersat-public/cache/project/mpc... | /home1/scratch/oarcher/xsar_dask_demo/S1A_IW_G... |
255 | /home/datawork-cersat-public/cache/project/mpc... | /home1/scratch/oarcher/xsar_dask_demo/S1A_IW_G... |
-1 | error.SAFE | /home1/scratch/oarcher/xsar_dask_demo/error.nc |
257 rows × 2 columns
# filter out_path that allready exists
df_safes = df_safes[df_safes['out_path'].apply(lambda f: not os.path.exists(f))]
df_safes
safe | out_path | |
---|---|---|
0 | /home/datawork-cersat-public/cache/project/mpc... | /home1/scratch/oarcher/xsar_dask_demo/S1A_IW_G... |
1 | /home/datawork-cersat-public/cache/project/mpc... | /home1/scratch/oarcher/xsar_dask_demo/S1A_IW_G... |
2 | /home/datawork-cersat-public/cache/project/mpc... | /home1/scratch/oarcher/xsar_dask_demo/S1A_IW_G... |
3 | /home/datawork-cersat-public/cache/project/mpc... | /home1/scratch/oarcher/xsar_dask_demo/S1A_IW_G... |
4 | /home/datawork-cersat-public/cache/project/mpc... | /home1/scratch/oarcher/xsar_dask_demo/S1A_IW_G... |
... | ... | ... |
252 | /home/datawork-cersat-public/cache/project/mpc... | /home1/scratch/oarcher/xsar_dask_demo/S1A_IW_G... |
253 | /home/datawork-cersat-public/cache/project/mpc... | /home1/scratch/oarcher/xsar_dask_demo/S1A_IW_G... |
254 | /home/datawork-cersat-public/cache/project/mpc... | /home1/scratch/oarcher/xsar_dask_demo/S1A_IW_G... |
255 | /home/datawork-cersat-public/cache/project/mpc... | /home1/scratch/oarcher/xsar_dask_demo/S1A_IW_G... |
-1 | error.SAFE | /home1/scratch/oarcher/xsar_dask_demo/error.nc |
257 rows × 2 columns
the l1b
function is the main processing function. It should take very small arguments size, like input file and output file. Do not use arguments with big size here (like xarray or complex objects)
In this example, we just open the safe at 1000m resolution, and save it as a netcdf file.
This is the function you will need to change for your own processing.
def l1b(safe, outfile):
ds = xsar.open_dataset(safe, resolution='1000m')
# make attributes to be str, so writable to file
to_str = ['start_date', 'stop_date', 'footprint']
for attr in to_str:
ds.attrs[attr] = str(ds.attrs[attr])
ds.to_netcdf(outfile)
return outfile
what we don't want to do is to use a sequential loop like
for idx, safe in df_safes.iterrows():
print(l1b(*safe))
Because the processing whould be sequential (one safe is processed at a time), and it can take a long time.
So what we want to do is to use dask to execute many l1b
function in parallel.
#import dask
#nanny_env = dask.config.get("distributed.nanny.environ")
#nanny_env['PYTHONPROFILEIMPORTTIME'] = 1
#dask.config.set({"distributed.nanny.environ": nanny_env})
#from dask_jobqueue import PBSCluster
#cluster = PBSCluster(
# cores=28,
# memory='120Gb',
# project='xsar',
# queue='mpi_1',
# processes=28,
# resource_spec='select=1:ncpus=28:mem=120GB',
# local_directory=os.path.expandvars("$TMPDIR"),
# #interface='ib1', # workers interface (routable to queue ftp)
# walltime='12:00:00',
# #scheduler_options={'interface': 'ib0'}, # if scheduler is on queue 'ftp'
# log_directory='/home1/scratch/oarcher/dask-logs',
# job_extra=['-v DASK_DISTRIBUTED__WORKER__RESOURCES__count=1', '-m n', '-v PYTHONPROFILEIMPORTTIME=1'],
# #extra=["--lifetime", "10m", "--lifetime-stagger", "8m" ],
#)
#cluster.scale(2)
#client = distributed.Client(cluster)
#client
We use dask-hpcconfig. See the doc to set up a different cluster.
# see https://dask-hpcconfig.readthedocs.io/en/latest/ if you want to change the cluster config
cluster = dask_hpcconfig.cluster('datarmor', )
cluster.scale(2)
n_workers = 14
client = distributed.Client(cluster, {'cluster.n_workers': n_workers})
client
/home1/datahome/oarcher/conda-env/xsar/lib/python3.9/site-packages/dask_jobqueue/core.py:20: FutureWarning: tmpfile is deprecated and will be removed in a future release. Please use dask.utils.tmpfile instead. from distributed.utils import tmpfile
Client-a6ce34c2-a7ee-11ec-9f8d-0cc47a3f75e7
Connection method: Cluster object | Cluster type: dask_jobqueue.PBSCluster |
Dashboard: /user/oarcher/proxy/8787/status |
dask-worker-datarmor
Dashboard: /user/oarcher/proxy/8787/status | Workers: 0 |
Total threads: 0 | Total memory: 0 B |
Scheduler-16a6bad8-5242-4f20-9491-2fb2eb4250a3
Comm: tcp://10.148.1.88:39004 | Workers: 0 |
Dashboard: /user/oarcher/proxy/8787/status | Total threads: 0 |
Started: Just now | Total memory: 0 B |
If you click the above dashboard link, you should reach the dask status page that look like this:
From now, the interresing parts are the 'workers' tab and the 'info' tab.
the workers tab should be something like
If it's empty, the cluster is not allready instanciated (execute the cell bellow to wait for it)
The info tab should be something like
What's important here is the 'last seen' column. It should be above '1s'
This should probably not needed, but we want to be sure that the cluster is ok.
The main problem is that the check
function that just do import xsar
can be very long (up to 8 minutes).
The issue is partialy solved by https://github.com/umr-lops/xsar/issues/65 , but it's probably due to a datarmor IO problem.
The cluster should be up in 15s-60s, and the import xsar
should take 30s-400s.
The import xsar
time freeze the worker (probably because the GIL is not released). This can be show in the 'last seen' column of the 'info' tab as seen above. (If it's greater than 60s, the tab should turn red).
while len(client.scheduler_info()['workers']) == 0:
print('waiting for cluster')
time.sleep(5)
print('cluster is running. checking import xsar')
def check():
import xsar
return True
t0 = time.time()
client.run(check)
print('client checked in %d s' % (time.time() - t0 ))
waiting for cluster waiting for cluster waiting for cluster waiting for cluster waiting for cluster waiting for cluster waiting for cluster waiting for cluster waiting for cluster waiting for cluster waiting for cluster waiting for cluster waiting for cluster waiting for cluster waiting for cluster waiting for cluster waiting for cluster waiting for cluster waiting for cluster cluster is running. checking import xsar client checked in 258 s
So we have df_safes
that is a pandas.Dataframe
object, with all SAFEs to be processed.
We want to split this dataframe in smaller parts, and process sequentialy those parts in parallel on different workers.
To to so, we use dask.dataframe
# we build a dask dataframe, with npartitions, so npartitions safes will be processed in parallel
# npartitions depend on your processing and the workers count
# a good starting value is to use the same number as the workers count
# with heavy full res processing you will have to reduce it
npartitions = n_workers
ddf_safes = dd.from_pandas(df_safes, npartitions=npartitions)
ddf_safes
safe | out_path | |
---|---|---|
npartitions=14 | ||
-1 | object | object |
18 | ... | ... |
... | ... | ... |
246 | ... | ... |
255 | ... | ... |
If an error is raised in the l1b
function, we will not able to see it, unless we use the dask dashboard to see workers log.
So we use a distributed.Queue
that will be used by the below batch_processing
function to communicate various info like errors messages to the notebook.
# we set up a dask queue, so the batch_processing function, will be able to communicate processing infos to the notebook
messages_queue = distributed.Queue('batch_processing')
batch_processing
processing function is the function that will be run on the workers (in parallel).
It take df_safes_part
as argument. It's a pandas.dataframe
object, but smaller than the original df_safes
(it's a partition of the ddf_safes
dask.dataframe
object).
Basically, this function is just a sequential loop over rows to call l1b
function.
We just want to retry the l1b
function, and try/except
in case of error. We use msg_queue
to communicate with the notebook.
Note also that we check that outfile
allready exist, to not process the SAFE again. This is needed because if the worker restart, it won't redo what's allready done.
def batch_processing(df_safes_part, msg_queue=messages_queue):
res = []
for idx, safe_row in df_safes_part.iterrows():
safe, outfile = safe_row[['safe', 'out_path']]
# we need to re-check if outfile allready exist, because dask might have restarted the worker
# and we don't want the whole df_safes to be reprocessed
if os.path.exists(outfile):
res.append(outfile)
continue
# we set up a dict that we will send to msg_queue
# it contains general processing info
message = {
'status': False,
'args': (safe, outfile),
'time': 0,
'error': ""
}
# if a processing fail, we will retry
for retry in range(2):
# we enclose the processing in a try/except, so the worker won't be killed if error
try:
t1 = time.time()
# this is the real call to our processing function
out = l1b(safe, outfile)
elapsed = time.time() - t1
message['status'] = True
message['time'] = elapsed
break # process ok, exit the loop
except Exception as e:
# error while processing.
# we get the error message that will be sent to the queue
message['error'] = traceback.format_exc()
msg_queue.put(message)
res.append(outfile)
return res
Now, we want to apply the batch_processing
function to each partition of dd_safes
.
We use the map_partitions
method. We use use a dummy meta
keyword. What's important here is str
(the output type of the l1b
function).
res = ddf_safes.map_partitions(batch_processing, meta=('foo', str))
At this stage, the computation has not yet started.
One way to to it should be:
res.compute()
But we won't be able to see messages from messages_queue
and we won't see any progress information.
we will instead use
res.persist()
and build a progress bar with tqdm
that will wait for messages from messages_queue
, and display status information.
res.persist(retries=2)
count = len(ddf_safes)
pbar = trange(count,smoothing=0)
elapsed = np.array([],dtype=float)
for _ in pbar:
message = messages_queue.get()
if message['status']:
elapsed = np.append(elapsed,message['time'])
pbar.set_description('%03.0fs' % elapsed.mean())
else:
tqdm.write('ERROR: "\n%s\n" on args %s' % ( message['error'] , message['args']))
0%| | 0/257 [00:00<?, ?it/s]
ERROR: " Traceback (most recent call last): File "/dev/shm/pbs.9532383.datarmor0/ipykernel_24461/72104825.py", line 26, in batch_processing File "/dev/shm/pbs.9532383.datarmor0/ipykernel_24461/873178010.py", line 2, in l1b File "/home1/datahome/oarcher/gitlab/xsar/src/xsar/utils.py", line 65, in wrapper result = f(*args, **kwargs) File "/home1/datahome/oarcher/gitlab/xsar/src/xsar/xsar.py", line 87, in open_dataset sar_obj = Sentinel1Dataset(*args, **kwargs) File "/home1/datahome/oarcher/gitlab/xsar/src/xsar/sentinel1_dataset.py", line 140, in __init__ self.s1meta = BlockingActorProxy(Sentinel1Meta, dataset_id) File "/home1/datahome/oarcher/gitlab/xsar/src/xsar/utils.py", line 369, in __init__ self._actor = self._actor_future.result() File "/home1/datahome/oarcher/conda-env/xsar/lib/python3.9/site-packages/distributed/client.py", line 279, in result raise exc.with_traceback(tb) File "/home1/datahome/oarcher/gitlab/xsar/src/xsar/utils.py", line 65, in wrapper result = f(*args, **kwargs) File "/home1/datahome/oarcher/gitlab/xsar/src/xsar/sentinel1_meta.py", line 95, in __init__ self.product = os.path.basename(self.path).split('_')[2] IndexError: list index out of range " on args ('error.SAFE', '/home1/scratch/oarcher/xsar_dask_demo/error.nc')
While processing occur, you should seel a progressbar like this:
18s is the mean time to process one SAFE, per worker. But as we have many workers, an new SAFE is processed every 1.39s.
We have inserted an invalid 'error.SAFE' file. The error traceback should be displayed.
While processing, the dask status dashboard should look like this:
The processing is finished.
We close the cluster
and the client
.
cluster.close()
client.close()