Performance analysis. Case studies
From EGEE-see WIki
Performance increase of FDT controlled transfers using LISA
FDT is an Application for Efficient Data Transfers which is capable of reading and writing at disk speed over wide area networks (with standard TCP). It is written in Java, runs an all major platforms and it is easy to use.
FDT is based on an asynchronous, flexible multithreaded system and is using the capabilities of the Java NIO libraries. Its main features are:
- Streams a dataset (list of files) continuously, using a managed pool of buffers through one or more TCP sockets.
- Uses independent threads to read and write on each physical device
- Transfers data in parallel on multiple TCP streams, when necessary
- Uses appropriate-sized buffers for disk I/O and for the network
- Restores the files from buffers asynchronously
- Resumes a file transfer session without loss, when needed
FDT can be used in one of these three modes:
1. Server: java -jar fdt.jar [ OPTIONS ]
2. Client: java -jar fdt.jar [ OPTIONS ] -c <host> [file1 ...]|[-fl <fileList>] -d <destinationDirectory>
3. SCP: java -jar fdt.jar [ OPTIONS ] [[[user@][host1:]]file1 [[[user@][host2:]]file2
In Server mode the FDT will start listening for incoming client connections. The server may or may not stop after the last client finishes the transfer. In Client mode the client will connect to the specified host, where an FDT Server is expected to be running. The client can either read or write file from/to the server. In the SCP mode the local FDT instance will use SSH to start/stop the FDT server and/or client.
The OPTIONS currently supported may be server or client specific, or may be used in both modes.
Common options used for both server and client:
-p <portNo> specifies the TCP port to be used ( for the server it is the port used to listen on; for the client the port to connect to )
-bio Blocking I/O mode. n this mode every channel (socket) will be configured to send/receive data synchronously and FDT will use one thread per channel. By default, non-blocking I/O will be used. On some platforms/systems the throughtput can be slightly higher in blocking I/O mode. The limitation in the blocking mode is the maximum number of threads that can be used and, for very high numbers of streams ( thousands ), the CPU used by the kernel for scheduling the threads. By default, FDT will use non-blocking mode.
-iof <iof> Non-blocking I/O retry factor. In non-blocking mode every read/write operation which returns 0, will be repeated up to <iof> times before waiting for I/O readiness. By default this value is set to 1, which means that every network read/write operation will return in the select() (which can also be poll()/epoll()) if no more data can be processed by the underlying channel(socket). The default value should work fine on most of the systems, but values of 2 or 3, may increase the throughput on some systems. Values higher than 5 will only increase the CPU system usage, without any gain in performance.
-limit <rate> Restrict the transfer speed at the specified rate. K ( KiloBytes/s ), M ( MegaBytes/s ) or G ( GigaBytes/s ) may be used as suffixes. When this parameter is specified in the server it represents the maximum transfer rate for every FDT session. If the parameter is specified in both the server and the client, the minimum value between them will be used.
-printStats Various statistics about buffer pools, sessions, etc will be printed
-v Verbose. Multiple 'v'-s ( up to three ) may be used to increment the verbosity level.Maximum level is three ( -vvv ) which corresponds to Level.FINEST for the standard Java logging system used by FDT.
Server options:
-S disable the standalone mode; when specified the FDT Server will stop after the last client finishes. By default, the server will continue to listen for incoming clients. This option is automatically passed to the server started in "SCP" mode.
-bs <bufferSize> Size for the I/O buffers. K ( KiloBytes ) or M ( MegaBytes ) may be used as suffixes. The default value is 512K. If the number of clients or sockets is expected to be very high is better to decrease this value. The memory used by this buffers is directly mapped in the operating system memory pages. The memory used by this buffers is limited by the JVM and can be icreased passing -XX:MaxDirectMemorySize=<X>m ( e.g -XX:MaxDirectMemorySize=256m ) to the 'java' command
Client options:
-c <host> connect to the specified host. If this parameter is missing the FDT will become server
-d <destinationDirectory> The destination directory used to copy files.
-fl <fileList> a list of files. Must have only one file per line.
-pull Pull mode. The client will receive the data from the server.
-N disable Nagle algorithm
-ss <windowSize> Set the TCP SO_SND_BUFFER size. M and K may be used as suffixes for Kilo/Mega.
-P <numberOfStreams> Number of paralel streams to use. Default is 1.
LISA can be used to ensure the control plane for transfering data FDT. The networking module can be used to continuously supervise and diagnose the state of the networking states of all the servers involved in a FDT transfer. The module can detect various problems related to the TCP/IP stack and can report the state of various parameters. The network monitoring module can be used to dynamically adjust important configuration parameters remotely and from a single central point of command. The deployed LISA agents are able to collect the global state of the distributed computing nodes and report in real time the status of the system to the connected clients.
In adition, the parameters reported by the host monitoring module provide good insights of the internal state of the computing nodes between which the transfers take place. The shell module can be used to send parallel commands on the remote nodes in a simple and intuitive way.
LISA also provides one module that is used in close relation with the FDT application. This module reports the state of the application and information pertaining to the network transfers in progress. LISA provides feedback in real-time of the status of the controlled system. LISA can be used to dynamically adjust the running parameters of the FDT application, and it can be used to restart the running networking transfers or to change the files being transferred. LISA performes everything in real time. Operations such as kernel parameters retrieval or asymmetric MTU detection can be also performed by LISA.
The distributed system based on LISA agents can be used for dynamic discovery of resources and to monitor, configure, control, and orchestrate efficient data transfer between several hundreds of computers using hybrid networks. In addition, a web-based administration interface can be easily deployed (see example) and used to activate and control the transfer flows. In this way, the LISA agents are able to orchestrate to start and change different parameters for the active transfers.
Performance analysis for DIOGENES agents platform using ApMON and MonALISA
A schematic view of the DIOGENES system is presented in bellow figure. Users submit Scheduling requests. A near-optimal schedule is computed by the Scheduler based on the Scheduling requests and the Monitoring data provided by the Grid Monitoring Service (MonALISA). The schedule is then sent as a Request for task execution to the Execution Service. The user receives feedback related to the solution determined by the scheduler, as well as to the status of the executed jobs in the form of the Schedule and task information. Furthermore, the system can easily integrate new hosts in the scheduling process, or overcome failure situations by means of the Discovery Service.
The Scheduling request contains a description (in XML) of the tasks to be scheduled. This way, a user may ask for the scheduling of more than one task at a time. Various parameters have been taken into account for task description:
- resource requirements (CPU Power, Free Memory, Free Swap)
- restrictions (deadlines), and
- priorities.
The assignment of a task on a given computing node is conditioned by meeting the resource requirements.
Grid Monitoring Service. The Grid Monitoring Service has the specific purpose to obtain real-time information in a heterogeneous and dynamic environment such as a Grid. It is used the MonALISA distributed service system in conjunction with ApMON, which is a library that can be used to send any status information in the form of UDP datagrams to MonALISA services. MonALISA provides system information for computer nodes and clusters, network information for WAN and LAN, monitoring information about the performance of applications, jobs or services. It proved its reliability and scalability in several large scale distributed systems. We have deployed the existing implementation of the MonALISA Web Service Client to connect to the monitoring service via proxy servers and obtain data for the genetic algorithm. Task monitoring is achieved by means of a daemon application based on ApMON. This daemon provides information regarding task status parameters on each node (amount of memory, disk and CPU time used by the tasks). The up-to-date information offered by the Grid Monitoring Service leads to realistic execution times for assigned tasks.
Execution Service. Given its capability to dynamically load modules that interface existing job monitoring with batch queuing applications and tools (e.g. Condor, PBS, SGE, LSF), the Execution Service can send execution requests to an already installed batch queuing system on the computing node to which a particular group of tasks was assigned. Sets of tasks are dynamically sent on computing nodes in the form of a specific command. The time ordering policy established in the genetic algorithm for tasks assigned on the same processor is preserved at execution time.
Discovery Service. Lookup processes are triggered by the Discovery Service and determine the possibility of achieving a decentralized schedule by increasing the number of hosts involved in the genetic scheduling. The apparition or dysfunction of agents in the system can easily be intercepted, resulting in a scalable and highly reliable optimization tool. If one agent ceases to function, the system as a whole is not prejudiced, but the probability of reaching a less optimal solution for the same number of generations increases.
The functionality of the scheduler relies on a communication model in which two different entities are involved: Brokers and Agents. The Brokers collect user requests for task allocations (Task Description Files), parse them and create an object, batch of tasks, which contains the tasks to be scheduled. Then the Brokers forward the requests to Agents. Each Agent executes the scheduling algorithm.
ApMON is used in Broker and Agent to send feedback informations to the monitoring service.
//other imports
import apmon.*;
public class Agent implements ServiceDiscoveryListener, RemoteEventListener{
static Configuration config;
ServiceDiscoveryManager serviceDiscovery;
ServiceItemFilter filter;
static Vector masters = new Vector();
static TaskQueue taskQueue = new TaskQueue();
/* export an object of this class */
RemoteEventListener proxy = null;
/* wait on this object until the chromosome is full or the time elapsed */
static Object InitializeGA = new Object();
/* SIMULATION = 0: use real monitoring data, = 1: use simulation */
public static int SIMULATION = 1;
/* store schedule exchanged */
private Vector currentBestSchedules = new Vector();
/* store grid configuration */
private static HashMap hmConf = new HashMap();
private static Vector pDaemon;
private static int GAInProgress = 0;
private static int step = 0;
private static int NUMBER_OF_WORKERS;
public static int HOST_ID = 1;
private static Scheduler localBroker;
private static Vector processors;
private static GeneticAlgorithm ga;
private static ConfigurationDaemon cd;
private Chromosome c;
static DefaultListModel listModel = new DefaultListModel();
static Vector processingTimes = new Vector();
private static Processor next;
private static String node;
//...
ApMon apm = null;
Vector destList = new Vector();
public Agent() throws Exception {
LoginContext loginContext = (LoginContext) config.getEntry(
"com.sun.jini.saga.Agent", "loginContext", LoginContext.class, null);
if (loginContext == null) {
mainAsSubject(config);
} else {
loginContext.login();
Subject.doAsPrivileged( loginContext.getSubject(),
new PrivilegedExceptionAction() {
public Object run() throws Exception {
mainAsSubject(config);
return null;
}
}, null);
}
destList.add("http://diogenes.grid.pub.ro/apmon_config");
try {
apm = new ApMon(destList);
} catch (Exception e) {
logger.severe("Error initializing ApMon: " + e);
System.exit(-1);
}
}
/* submit scheduled tasks to execution */
public void executeSchedule(Chromosome c){
Iterator i;
Gene g;
Processor p;
Task t;
String command;
i = c.iterator();
while(i.hasNext()){
g = (Gene)i.next();
t = g.getTask();
p = g.getProc();
command = "qsub -l nodes="+p.getNodeName()+" "+t.getPath();
//System.out.println(command);
try {
/* Execute a command */
Process child = Runtime.getRuntime().exec(command);
sendFeedback(t, true);
} catch (Exception e) {
sendFeedback(t, false);
}
}
}
public void sendFeedback(Task t, boolean status){
Vector paramNames = new Vector();
Vector paramValues = new Vector();
String nodeName;
paramNames.add(new String("TaskID"));
paramValues.add(new Double(t.getId()));
paramNames.add(new String("MemoryRequired"));
paramValues.add(new Double(t.getMemory()));
paramNames.add(new String("CPURequired"));
paramValues.add(new Double(t.getCPU()));
if(status){
nodeName="SCHEDULED";
}else {
nodeName="FAILED";
}
try {
apm.sendParameters("DIOGENES",
nodeName,
paramNames.size(),
paramNames,
paramValues);
} catch(Exception e) {
System.err.println("Send operation failed: ");
e.printStackTrace();
}
}
// ...
}



