#!/usr/bin/env python # dask datarmor simple demo # master is on 'ftp' queue, so this script can be converted to reachable python notebook # workers are on 'sequentiel' queue, they don't have network access # # ssh datarmor # qsub -I -q ftp -l walltime=01:00:00,mem=16g # conda activate xxxx # %./dask_datarmor.py from dask_jobqueue import PBSCluster from dask.distributed import Client import os import time import socket import fcntl import struct def get_ip_address(ifname): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) return socket.inet_ntoa(fcntl.ioctl( s.fileno(), 0x8915, # SIOCGIFADDR struct.pack('256s', str.encode(ifname[:15])) )[20:24]) def processing(input): # this function will be executed on the workers # see https://distributed.dask.org/en/latest/client.html # the result have to be as lightweight as possible # if possible, do not return a big array, but save the array on disk on the worker, and return a status time.sleep(1) return input*input if __name__ == "__main__": # see https://jobqueue.dask.org/en/latest/generated/dask_jobqueue.PBSCluster.html memory = "2GB" nprocs = 1 cluster = PBSCluster( cores=1, memory=memory, project='myproject', queue='sequentiel', processes=nprocs, resource_spec='select=1:ncpus=%d:mem=%s' % (nprocs, memory), local_directory=os.path.expandvars("$TMPDIR"), interface='ib1', # workers interface (routable to queue ftp) walltime='01:00:00', scheduler_options={'interface': 'ib0'}) # if scheduler is on queue 'ftp' cluster.scale(jobs=10) # ask for 10 jobs client = Client(cluster) # getting a working dashboard link is little tricky on datarmor print("Client dashboard: %s" % client.dashboard_link.replace(get_ip_address('ib0'),get_ip_address('bond1'))) inputs = range(1000) # get list of futures outputs_futures = client.map(processing, inputs) # gather futures on workers outputs = client.gather(outputs_futures) print('outputs: %s' % outputs) client.close() cluster.close()