3.2. Example: Copying several files in parallel

We present here a function which adds a file copy engine to an event system. It is simple to add the engine several times to the event system to copy several files in parallel.


open Unixqueue

type copy_state =
    { copy_ues    : Unixqueue.event_system;      
      copy_group  : Unixqueue.group;             
      copy_infd   : Unix.file_descr;             
      copy_outfd  : Unix.file_descr;
      copy_size   : int;
      copy_inbuf  : string;
      copy_outbuf : string;
      mutable copy_outlen      : int;
      mutable copy_eof         : bool;
      mutable copy_have_inres  : bool;
      mutable copy_have_outres : bool;
      mutable copy_cleared     : bool;
    }
This record type contains the state of the engine.

Now the core function begins:

let copy_file ues old_name new_name =
  (* Adds the necessary handlers and actions to the Unixqueue.event_system
   * ues that copy the file 'old_name' to 'new_name'.
   *)

Several inner functions are defined now. First, update_resources adds or removes the resources involved into copying. The record components copy_have_inres and copy_have_outres store whether there is currently a resource for input and for output, respectively. It is computed whether a input or output resource is wanted; and then the resource is added or removed as needed. If both resources are deleted, the file descriptors are closed, and the event system is cleaned.

We want input if there is space in the output buffer, and the end of the input file has not yet been reached. If this is true, it is ensured that an input resource is defined for the input file such that input events are generated.

We want output if there is something in the output buffer. In the same manner it is ensured that an output resource is defined for the output file.

Note that normally the input and output resources are added and removed several times until the complete file is copied.

  let update_resources state ues =
    let want_input_resource =
      not state.copy_eof && state.copy_outlen < state.copy_size in
    let want_output_resource =
      state.copy_outlen > 0 in
    if want_input_resource && not state.copy_have_inres then
      add_resource ues state.copy_group (Wait_in state.copy_infd, -.1.0);
    if not want_input_resource && state.copy_have_inres then
      remove_resource ues state.copy_group (Wait_in state.copy_infd);
    if want_output_resource && not state.copy_have_outres then
      add_resource ues state.copy_group (Wait_out state.copy_outfd, -.1.0);
    if not want_output_resource && state.copy_have_outres then
      remove_resource ues state.copy_group (Wait_out state.copy_outfd);
    state.copy_have_inres <- want_input_resource;
    state.copy_have_outres <- want_output_resource;
    if not want_input_resource && not want_output_resource && 
      not state.copy_cleared 
    then begin
      (* Close file descriptors at end: *)
      Unix.close state.copy_infd;
      Unix.close state.copy_outfd;
      (* Remove everything: *)
      clear ues state.copy_group;
      state.copy_cleared <- true;   (* avoid to call 'clear' twice *)
    end
  in

The input handler is called only for input events belonging to our own group. It is very similar to the example in the introductory chapter.

The input handler calls update_resource after the work is done. It is now possible that the output buffer contentains data after it was previously empty, and update_resource will then add the output resource. Or, it is possible that the output buffer is now full, and update_resource will then remove the input resource such that no more input data will be accepted. Of course, both conditions can happen at the same time.

  let handle_input state ues esys e =
    (* There is data on the input file descriptor. *)
    (* Calculate the available space in the output buffer: *)
    let n = state.copy_size - state.copy_outlen in
    assert(n > 0);
    (* Read the data: *)
    let n' = Unix.read state.copy_infd state.copy_inbuf 0 n in
    (* End of stream reached? *)
    state.copy_eof <- n' = 0;
    (* Append the read data to the output buffer: *)
    String.blit state.copy_inbuf 0 state.copy_outbuf state.copy_outlen n';
    state.copy_outlen <- state.copy_outlen + n';
    (* Add or remove resources: *)
    update_resources state ues
  in

The output handler is called only for output events of our own group, too.

The output handler calls update_resource after the work is done. It is now possible that the output buffer has space again, and update_resource will add the input resource again. Or, th output buffer is even empty, and update_resource will also remove the output resource.

  let handle_output state ues esys e =
    (* The file descriptor is ready to output data. *)
    (* Write as much as possible: *)
    let n' = Unix.write state.copy_outfd state.copy_outbuf 0 state.copy_outlen 
    in
    (* Remove the written bytes from the output buffer: *)
    String.blit 
      state.copy_outbuf n' state.copy_outbuf 0 (state.copy_outlen - n');
    state.copy_outlen <- state.copy_outlen - n';
    (* Add or remove resources: *)
    update_resources state ues
  in

This is the main event handler. It accepts only Input_arrived and Output_readiness events belonging to our own group. All other events are rejected.

  let handle state ues esys e =
    (* Only accept events associated with our own group. *)
    match e with
	Input_arrived (g,fd) ->
	  handle_input state ues esys e
      | Output_readiness (g,fd) ->
	  handle_output state ues esys e
      | _ ->
	  raise Equeue.Reject
  in

Now the body of the copy_file function follows. It contains only initializations.

  let g = new_group ues in
  let infd = Unix.openfile 
	       old_name 
	       [ Unix.O_RDONLY; Unix.O_NONBLOCK ] 
	       0 in
  let outfd = Unix.openfile 
		new_name 
		[ Unix.O_WRONLY; Unix.O_NONBLOCK; Unix.O_CREAT; Unix.O_TRUNC ] 
		0o666 in
  Unix.clear_nonblock infd;
  Unix.clear_nonblock outfd;
  let size = 1024 in
  
  let state =
    { copy_ues = ues;
      copy_group = g;
      copy_infd = infd;
      copy_outfd = outfd;
      copy_size = size; 
      copy_inbuf = String.create size;
      copy_outbuf = String.create size;
      copy_outlen = 0;
      copy_eof = false; 
      copy_have_inres = false;
      copy_have_outres = false;
      copy_cleared = false;
    } in
  
  update_resources state ues;
  add_handler ues g (handle state);
;;
Note that the files are opened in "non-blocking" mode. This ensures that the Unix.openfile system call does not block itself. After the files have been opened, the non-blocking flag is reset; the event system already guarantees that I/O will not block.

Now we can add our copy engine to an event system, e.g.

let ues = create_unix_event_system() in
copy_file ues "a.old" "a.new";
copy_file ues "b.old" "b.new";
run ues
;;
This piece of code will copy both files in parallel. Note that the concept of "groups" is very helpful to avoid that several instances of the same engine interfer with each other.