Description of DAG jobs

From EGEE-see WIki

Jump to: navigation, search

This guide is a part of SEE-GRID Gridification Guide. It is aimed to explain what is a DAG job, for what kind of jobs is DAG and describe a job workflow trough an example. For a web based graphical user interface for creating JDL files, submitting jobs and monitoring the progress, please refer to P-GRADE Portal

Contents

What does DAG stand for?

In case that we have a set of jobs that are related in a sense that output of some jobs will be used as input for another job, instead of waiting by the terminal for one job to finish in order to start another, we can use DAG job (supported with glite-WMS) to do the waiting for us.

DAG stands for Direct Acyclic Graph. Translating grid to graph terminology, grid jobs would translate to graph nodes, and job dependencies would be graph directed edges. A graph representing a DAG job has to be acyclic (no circular dependencies allowed), cause otherwise there would be no job to start from.

Just as we do for other types of grid jobs, we need to describe a DAG job in a JDL file. In this practical guide, we will use a simple example to illustrate a way of creating JDL file for DAG job.

Recipe for describing DAG jobs in JDL

What are we trying to make (work)

In our simple example, we have a father, two kids (twins) named Sam and Alex, and a judge. Father will decide the gender of each child, each child will be "born" at the time a job is executed, and a judge will decide which kid is older.

Ingredients

First, let's make a list of all the jobs we need for this operagtion, together with the needed input and output information.

  • We have a father program father.py, who will decide the gender for Sam and Alex. It will produce two output files that contains gender of each kid (Sam.gender and Alex.gender). It my also produce standard output and standard error
  • Then we have two kids:Sam and Alex. As they are twins, the same program mother.py will give birth to them. It will declare the gender of the kid (on standard output), and it will write the birth date (time of job execution) to a separate output file.
  • Judge (judge.py) will need dates of Alex's and Sam's birth, he will compare them and declare (on the standard output) which one of them is older.

Mixing the ingredients

Now that we know what we are putting in the cake we're making, let's take a look at its graph representation (shown on a picture bellow)

Image:Dag_Graph.jpg


Explaining the recipe to grid

Structure of the JDL file is more complex then a structure of normal job. Root JDL node contains the global attributes which will be inherited by all singular jobs within it. In addition, there are two "DAG specific" attributes that contain information about the graph structure. One of them is Nodes that contains a list of all singular jobs and its descriptions, and other one is Dependencies that defines the job mutual dependencies, i.e. it defines in which order the jobs will be executed. Note that values of attributes defined in description of singular jobs will override the global attributes.

We will now write the JDL file, step by step.

Global attributes (basics)

We start by declaring the job type, which is a mandatory attribute

Type = "dag";

Used VO must be the same for all jobs in DAG collection, so it is best to define it on global level.

VirtualOrganization = "seegrid";

Rank and Requirements are mandatory attributes for normal jobs, so we may define them on global level, and then override them in single job description if necessary.

Requirements = other.GlueCEStateStatus == "Production";
Rank = other.GlueCEStateFreeCPUs;

If several jobs in DAG collection uses the same file as input, it should be listed in global InputSandbox, so we would avoid multiple of the same file(s) between UI and WMS. In our case, two nodes (Sam and Alex) will use mother.py.

InputSandbox = {"mother.py"};

The OutputSandbox must not exist in DAG jobs as global attribute. All output files should be declared in node descriptions OutputSandbox, and the output of a DAG job will be the collection of nodes' outputs.

Nodes

The nodes are defined and described in a global attribute named Nodes. It contains a list of nodes, and it follows the syntax:

Nodes = [
    nodeA = [ nodeA description ];
    nodeB = [ nodeB description ];
    .....
];

Each node contains a one element list with either description or file attribute. We can write description of node inline, in which case we use description attribute, or in a separate file specified with file attribute. Syntax for node description is:

nodeA = [
          file = "/absolute/path/to/jdl";
        ]

or

nodeA = [
          description = [ 'inline node jdl ];
        ];

Now we will write node JDLs.

Starting with father node. This is a regular job description, with no signs of it belonging to a DAG collection. Sam and Alex mentioned in Argument are just a script arguments and it has nothing to do with other nodes in the graph

JobType = "Normal";
Executable = "/usr/bin/python";  
Arguments = "father.py Sam Alex";
InputSandbox = {"father.py"};
StdOutput = "father.out";
StdError  = "father.err";
OutputSandbox = {"father.out","father.err","Sam.gender","Alex.gender"};
ShallowRetryCount = 1;

Sam's JDL would look something like this

JobType = "Normal";
Executable = "/usr/bin/python";
InputSandbox = {root.InputSandbox, root.Nodes.father.description.OutputSandbox[2]};
Arguments = "mother.py Sam";
StdOutput = "Sam.out";
StdError = "Sam.err";
OutputSandbox = {"Sam.date", "Sam.out", "Sam.err"};
ShallowRetryCount = 1;

The only difference from how you would normally write this JDL description is in InputSandbox. The first input file is mother.py which we put in global InputSandbox since it is used by more than one node. We reference to this file in hierarchical way, with dots as separators and square brackets for denoting the position of file in lists. We start from root and add InputSandbox (we want to use the whole sandbox). The second input file is an output from father node, and its reference starts from root, plus Nodes - the DAG specific attribute name, plus father - the node name, plus description - the DAG node attribute, plus OutputSandbox and plus [2], since wanted file is listed third in OutputSandbox list (indexing starts from 0). For Alex's JDL this index will be [3], since his input file is listed fourth.

For judge's job, the input files are outputted by sam and alex. Finally, the judge's description is:

JobType = "Normal";
Executable = "/usr/bin/python";
InputSandbox = {"judge.py", root.nodes.sam.description.OutputSandbox[0], root.nodes.alex.description.OutputSandbox[0]};
Arguments = "judge.py Sam Alex";
StdOutput = "dag.out";
StdError = "dag.err";
OutputSandbox = {"dag.out", "dag.err"};
ShallowRetryCount = 1; 

Finally, integrated shape of Nodes attribute in our example is:

       Nodes = [
               father = [
                       description = [
                               JobType = "Normal";
                               Executable = "/usr/bin/python";
                               Arguments = "father.py Sam Alex";
                               InputSandbox = {"father.py"};
                               StdOutput = "father.out";
                               StdError = "father.err";
                               OutputSandbox = {"father.out", "father.err", "Sam.gender", "Alex.gender"};
                               ShallowRetryCount = 1;
                       ];
               ];
               sam = [
                       description = [
                               JobType = "Normal";
                               Executable = "/usr/bin/python";
                               InputSandbox = {root.InputSandbox, root.Nodes.father.description.OutputSandbox[2]};
                               Arguments = "mother.py Sam";
                               StdOutput = "Sam.out";
                               StdError = "Sam.err";
                               OutputSandbox = {"Sam.date", "Sam.out", "Sam.err"};
                               ShallowRetryCount = 1;
                       ];
               ];
               alex = [
                       description = [
                               JobType = "Normal";
                               Executable = "/usr/bin/python";
                               InputSandbox = {root.InputSandbox, root.Nodes.father.description.OutputSandbox[3]};
                               Arguments = "mother.py Alex";
                               StdOutput = "Alex.out";
                               StdError = "Alex.err";
                               OutputSandbox = {"Alex.date", "Alex.out", "Alex.err"};
                               ShallowRetryCount = 1;
                       ];
               ];
               judge = [
                       description = [
                               JobType = "Normal";
                               Executable = "/usr/bin/python";
                               InputSandbox = {"judge.py", root.nodes.sam.description.OutputSandbox[0], root.nodes.alex.description.OutputSandbox[0]};
                               Arguments = "judge.py Sam Alex";
                               StdOutput = "dag.out";
                               StdError = "dag.err";
                               OutputSandbox = {"dag.out", "dag.err"};
                               ShallowRetryCount = 1;
                       ];
               ];
       ]; 

Dependencies

The dependencies between job's are defined as a list of ordered pairs. In our example, there are 4 dependencies: sam and alex depends on father, and judge depends on sam and alex. We can write this in several ways. The expanded version looks like

Dependencies = {{father, alex}, {father, sam}, {alex, judge}, {sam, judge}};

while the compact would be

Dependencies = {  {father, {alex, sam}}, { {alex,sam}, judge} };

or we can combine:

Dependencies = {  {father, {alex, sam}}, {alex, judge}, {sam, judge}};

Useful tips

  • JobType of node jobs can not be Parametric or Partitionable
  • For input and output data files, the network files can be used just like in regular jobs. For more details about using network files in your jobs, take a look at Advanced Sandbox Management
  • Forbidden names for Nodes elements are parent and child, altough in user guide it is said that those names are free. We didn't run into any other forbidden names, but if you do, please share it. DAG job will fail with exit status: Done (Exit Code !=0)
  • You need to declare each job dependency in Dependencies attribute list. If you just put output of nodeA as an input of nodeB, and you don't put {nodeA, nodeB} in Dependencies, nodeB will not wait for nodeA to be executed, and it may fail if it starts before nodeA finishes.
  • root job must NOT contain OutputSandbox attribute
  • You can specify the maximum number of node jobs that are running at the same time with max_running_nodes attribute at global level
  • You can force all node jobs to be executed on the same CE by setting NodeCollocation = "true" at the global level
  • if InputSandbox is not specified in node description, it is entirely inherited from the global level

Baking the cake (a.k.a. Submitting the DAG job)

When we compile all what we discussed in previous section, we will get a decent JDL file:

[ 
       Type="dag";
       InputSandbox={"mother.py"};
       VirtualOrganization="seegrid";
       Requirements=other.GlueCEStateStatus=="Production";
       Rank=other.GlueCEStateFreeCPUs;
       Nodes = [
               father = [
                       description = [
                               JobType = "Normal";
                               Executable = "/usr/bin/python";
                               Arguments = "father.py Sam Alex";
                               InputSandbox = {"father.py"};
                               StdOutput = "father.out";
                               StdError = "father.err";
                               OutputSandbox = {"father.out", "father.err", "Sam.gender", "Alex.gender"};
                               ShallowRetryCount = 1;
                       ];
               ];
               sam = [
                       description = [
                               JobType = "Normal";
                               Executable = "/usr/bin/python";
                               InputSandbox = {root.InputSandbox, root.Nodes.father.description.OutputSandbox[2]};
                               Arguments = "mother.py Sam";
                               StdOutput = "Sam.out";
                               StdError = "Sam.err";
                               OutputSandbox = {"Sam.date", "Sam.out", "Sam.err"};
                               ShallowRetryCount = 1;
                       ];
               ];
               alex = [
                       description = [
                               JobType = "Normal";
                               Executable = "/usr/bin/python";
                               InputSandbox = {root.InputSandbox, root.Nodes.father.description.OutputSandbox[3]};
                               Arguments = "mother.py Alex";
                               StdOutput = "Alex.out";
                               StdError = "Alex.err";
                               OutputSandbox = {"Alex.date", "Alex.out", "Alex.err"};
                               ShallowRetryCount = 1;
                       ];
               ];
               judge = [
                       description = [
                               JobType = "Normal";
                               Executable = "/usr/bin/python";
                               InputSandbox = {"judge.py", root.nodes.sam.description.OutputSandbox[0], root.nodes.alex.description.OutputSandbox[0]};
                               Arguments = "judge.py Sam Alex";
                               StdOutput = "dag.out";
                               StdError = "dag.err";
                               OutputSandbox = {"dag.out", "dag.err"};
                               ShallowRetryCount = 1;
                       ];
               ];
       ]; 
       Dependencies = {{father, alex}, {father, sam}, {alex, judge}, {sam, judge}};
]

We named this file dag.jdl

We can submit this on glite-UI with a set of glite-wms-* commands. For details on how to submit jobs on grid take a look at Quick User Guide for Submitting Jobs

Creating proxy:

[danica@ui ~]$ voms-proxy-init --voms  seegrid
Enter GRID pass phrase:
Your identity: /C=RS/O=AEGIS/OU=Institute of Physics Belgrade/CN=Danica Stojiljkovic
Creating temporary proxy ...................................................... Done
Contacting  voms.irb.hr:15010 [/C=HR/O=edu/OU=irb/CN=host/voms.irb.hr] "seegrid" Done
Creating proxy ............................................................. Done
Your proxy is valid until Mon Dec 14 12:39:24 2009

Delegating proxy to WMS:

[danica@ui ~]$ glite-wms-job-delegate-proxy -d DanicaID 

Connecting to the service https://wms.ipb.ac.rs:7443/glite_wms_wmproxy_server 


================== glite-wms-job-delegate-proxy Success ==================

Your proxy has been successfully delegated to the WMProxy(s):
https://wms.ipb.ac.rs:7443/glite_wms_wmproxy_server
with the delegation identifier: DanicaID 

==========================================================================

Submitting job

[danica@ui ~]$ glite-wms-job-submit -d DanicaID  -o DAG.id dag.jdl 

Connecting to the service https://wms.ipb.ac.rs:7443/glite_wms_wmproxy_server 


====================== glite-wms-job-submit Success ======================

The job has been successfully submitted to the WMProxy
Your job identifier is:

https://wms.ipb.ac.rs:9000/_iDtGRp9XcEdeFT1Q1MJrg

The job identifier has been saved in the following file:
/home/danica/DAG.id

==========================================================================

Checking job status in the moment when father node is Done, and sam and alex are Running

[danica@ui ~]$ glite-wms-job-status -i DAG.id


======================= glite-wms-job-status Success =====================
BOOKKEEPING INFORMATION:

Status info for the Job : https://wms.ipb.ac.rs:9000/_iDtGRp9XcEdeFT1Q1MJrg
Current Status:     Running 
Status Reason:      unavailable
Destination:        dagman
Submitted:          Mon Dec 14 00:42:23 2009 CET
==========================================================================

- Nodes information for: 
    Status info for the Job : https://wms.ipb.ac.rs:9000/ABTzqEsjpUHpvusGxTnMZw
    Current Status:     Running 
    Status Reason:      Job successfully submitted to Globus
    Destination:        cox01.grid.metu.edu.tr:2119/jobmanager-lcgpbs-seegrid
    Submitted:          Mon Dec 14 00:42:23 2009 CET
==========================================================================
    
    Status info for the Job : https://wms.ipb.ac.rs:9000/C9DbLFJxr01k3hFh4C9n2w
    Current Status:     Submitted 
    Submitted:          Mon Dec 14 00:42:23 2009 CET
==========================================================================
    
    Status info for the Job : https://wms.ipb.ac.rs:9000/gfyygqHzJXnlSSBUfWx1ew
    Current Status:     Done (Success)
    Logged Reason(s):
        - 
        - Job terminated successfully
    Exit code:          0
    Status Reason:      Job terminated successfully
    Destination:        kalkan1.ulakbim.gov.tr:2119/jobmanager-lcgpbs-seegrid
    Submitted:          Mon Dec 14 00:42:23 2009 CET
==========================================================================
    
    Status info for the Job : https://wms.ipb.ac.rs:9000/n28P8UYZdQjmqbvXAK8PEQ
    Current Status:     Running 
    Status Reason:      Job successfully submitted to Globus
    Destination:        cox01.grid.metu.edu.tr:2119/jobmanager-lcgpbs-seegrid
    Submitted:          Mon Dec 14 00:42:23 2009 CET
==========================================================================

Retrieving job outputs:

[danica@ui ~]$ glite-wms-job-output -i DAG.id 

Connecting to the service https://wms.ipb.ac.rs:7443/glite_wms_wmproxy_server


================================================================================ 

                        JOB GET OUTPUT OUTCOME

Output sandbox files for the DAG/Collection :
https://wms.ipb.ac.rs:9000/_iDtGRp9XcEdeFT1Q1MJrg
have been successfully retrieved and stored in the directory:
/tmp/jobOutput/danica__iDtGRp9XcEdeFT1Q1MJrg

================================================================================


[danica@ui ~]$ ls /tmp/jobOutput/danica__iDtGRp9XcEdeFT1Q1MJrg/
alex  father  ids_nodes.map  judge  sam
[danica@ui ~]$ cat /tmp/jobOutput/danica__iDtGRp9XcEdeFT1Q1MJrg/ids_nodes.map
Dag JobId: https://wms.ipb.ac.rs:9000/_iDtGRp9XcEdeFT1Q1MJrg
         - - -
        Node Name:      sam
        JobId:          https://wms.ipb.ac.rs:9000/ABTzqEsjpUHpvusGxTnMZw
        Dir:            /tmp/jobOutput/danica__iDtGRp9XcEdeFT1Q1MJrg/sam
         - - -
        Node Name:      judge
        JobId:          https://wms.ipb.ac.rs:9000/C9DbLFJxr01k3hFh4C9n2w
         Dir:            /tmp/jobOutput/danica__iDtGRp9XcEdeFT1Q1MJrg/judge
         - - -
        Node Name:      father
        JobId:          https://wms.ipb.ac.rs:9000/gfyygqHzJXnlSSBUfWx1ew
        Dir:            /tmp/jobOutput/danica__iDtGRp9XcEdeFT1Q1MJrg/father
         - - -
        Node Name:      alex
        JobId:          https://wms.ipb.ac.rs:9000/n28P8UYZdQjmqbvXAK8PEQ
        Dir:            /tmp/jobOutput/danica__iDtGRp9XcEdeFT1Q1MJrg/alex

[danica@ui ~]$ ls /tmp/jobOutput/danica__iDtGRp9XcEdeFT1Q1MJrg/*
/tmp/jobOutput/danica__iDtGRp9XcEdeFT1Q1MJrg/ids_nodes.map 

/tmp/jobOutput/danica__iDtGRp9XcEdeFT1Q1MJrg/alex:
Alex.date  Alex.err  Alex.out

/tmp/jobOutput/danica__iDtGRp9XcEdeFT1Q1MJrg/father:
Alex.gender  father.err  father.out  Sam.gender

/tmp/jobOutput/danica__iDtGRp9XcEdeFT1Q1MJrg/judge:
dag.err  dag.out

/tmp/jobOutput/danica__iDtGRp9XcEdeFT1Q1MJrg/sam:
Sam.date  Sam.err  Sam.out

Appendix: code of the python scripts used in this example

Code father.py

#!/usr/bin/python
import random, sys
from datetime import datetime 

for name in sys.argv[1:]:
  gender = random.sample(["boy","girl"],1)[0]
  file = open(name+".gender","w")
  file.write(gender)
  file.close()

print datetime.now()
 

Usage father.py

[danica@ui ~]$ python father.py Sam Alex
2009-12-14 01:04:47.643682
[danica@ui ~]$ cat Sam.gender 
girl
[danica@ui ~]$ cat Alex.gender 
boy

Code mother.py

#!/usr/bin/python
import sys
from datetime import datetime

name=sys.argv[1]
file=open(name+".gender","r")
gender = file.read()
file.close()
print name+" is a "+gender
file=open(name+".date","w")
file.write(str(datetime.now()))
file.close()

Usage mother.py

[danica@ui ~]$ python mother.py Sam
Sam is a girl
[danica@ui ~]$ cat Sam.date 
2009-12-14 01:07:26.554631
[danica@ui ~]$ python mother.py Alex
Alex is a boy
[danica@ui ~]$ cat Alex.date 
2009-12-14 01:07:43.402617

Code judge.py

#!/usr/bin/python
import sys
name1=sys.argv[1]
name2=sys.argv[2]

f=open(name1+".date","r")
date1=f.read()
f.close()
f=open(name2+".date","r")
date2=f.read()
f.close() 

if date1 < date2:
  print name1+" is older than "+name2
else:
  print name2+" is older than "+name1

Usage judge.py

[danica@ui ~]$ python judge.py Sam Alex
Sam is older than Alex
Personal tools