We present here a function which adds a file copy engine to an event system. It is simple to add the engine several times to the event system to copy several files in parallel.
open Unixqueue type copy_state = { copy_ues : Unixqueue.event_system; copy_group : Unixqueue.group; copy_infd : Unix.file_descr; copy_outfd : Unix.file_descr; copy_size : int; copy_inbuf : string; copy_outbuf : string; mutable copy_outlen : int; mutable copy_eof : bool; mutable copy_have_inres : bool; mutable copy_have_outres : bool; mutable copy_cleared : bool; }This record type contains the state of the engine.
copy_ues: The event system to which the engine is attached
copy_group: The group to which all the entities belong
copy_infd: The file descriptor of the source file
copy_outfd: The file descriptor of the copy file
copy_size: The size of copy_inbuf and copy_outbuf
copy_inbuf: The string buffer used to read the bytes of the source file
copy_outbuf: The string buffer used to write the bytes to the copy file
copy_outlen: The portion of copy_outbuf that is actually used
copy_eof: Whether the EOF marker has been read or not
copy_have_inres: Whether there is currently an input resource for the input file
copy_have_outres: Whether there is currently an output resource for the output file
copy_cleared: Whether the copy is over or not
Now the core function begins:
let copy_file ues old_name new_name = (* Adds the necessary handlers and actions to the Unixqueue.event_system * ues that copy the file 'old_name' to 'new_name'. *)
Several inner functions are defined now. First, update_resources adds or removes the resources involved into copying. The record components copy_have_inres and copy_have_outres store whether there is currently a resource for input and for output, respectively. It is computed whether a input or output resource is wanted; and then the resource is added or removed as needed. If both resources are deleted, the file descriptors are closed, and the event system is cleaned.
We want input if there is space in the output buffer, and the end of the input file has not yet been reached. If this is true, it is ensured that an input resource is defined for the input file such that input events are generated.
We want output if there is something in the output buffer. In the same manner it is ensured that an output resource is defined for the output file.
Note that normally the input and output resources are added and removed several times until the complete file is copied.
let update_resources state ues = let want_input_resource = not state.copy_eof && state.copy_outlen < state.copy_size in let want_output_resource = state.copy_outlen > 0 in if want_input_resource && not state.copy_have_inres then add_resource ues state.copy_group (Wait_in state.copy_infd, -.1.0); if not want_input_resource && state.copy_have_inres then remove_resource ues state.copy_group (Wait_in state.copy_infd); if want_output_resource && not state.copy_have_outres then add_resource ues state.copy_group (Wait_out state.copy_outfd, -.1.0); if not want_output_resource && state.copy_have_outres then remove_resource ues state.copy_group (Wait_out state.copy_outfd); state.copy_have_inres <- want_input_resource; state.copy_have_outres <- want_output_resource; if not want_input_resource && not want_output_resource && not state.copy_cleared then begin (* Close file descriptors at end: *) Unix.close state.copy_infd; Unix.close state.copy_outfd; (* Remove everything: *) clear ues state.copy_group; state.copy_cleared <- true; (* avoid to call 'clear' twice *) end in
The input handler is called only for input events belonging to our own group. It is very similar to the example in the introductory chapter.
The input handler calls update_resource after the work is done. It is now possible that the output buffer contentains data after it was previously empty, and update_resource will then add the output resource. Or, it is possible that the output buffer is now full, and update_resource will then remove the input resource such that no more input data will be accepted. Of course, both conditions can happen at the same time.
let handle_input state ues esys e = (* There is data on the input file descriptor. *) (* Calculate the available space in the output buffer: *) let n = state.copy_size - state.copy_outlen in assert(n > 0); (* Read the data: *) let n' = Unix.read state.copy_infd state.copy_inbuf 0 n in (* End of stream reached? *) state.copy_eof <- n' = 0; (* Append the read data to the output buffer: *) String.blit state.copy_inbuf 0 state.copy_outbuf state.copy_outlen n'; state.copy_outlen <- state.copy_outlen + n'; (* Add or remove resources: *) update_resources state ues in
The output handler is called only for output events of our own group, too.
The output handler calls update_resource after the work is done. It is now possible that the output buffer has space again, and update_resource will add the input resource again. Or, th output buffer is even empty, and update_resource will also remove the output resource.
let handle_output state ues esys e = (* The file descriptor is ready to output data. *) (* Write as much as possible: *) let n' = Unix.write state.copy_outfd state.copy_outbuf 0 state.copy_outlen in (* Remove the written bytes from the output buffer: *) String.blit state.copy_outbuf n' state.copy_outbuf 0 (state.copy_outlen - n'); state.copy_outlen <- state.copy_outlen - n'; (* Add or remove resources: *) update_resources state ues in
This is the main event handler. It accepts only Input_arrived and Output_readiness events belonging to our own group. All other events are rejected.
let handle state ues esys e = (* Only accept events associated with our own group. *) match e with Input_arrived (g,fd) -> handle_input state ues esys e | Output_readiness (g,fd) -> handle_output state ues esys e | _ -> raise Equeue.Reject in
Now the body of the copy_file function follows. It contains only initializations.
let g = new_group ues in let infd = Unix.openfile old_name [ Unix.O_RDONLY; Unix.O_NONBLOCK ] 0 in let outfd = Unix.openfile new_name [ Unix.O_WRONLY; Unix.O_NONBLOCK; Unix.O_CREAT; Unix.O_TRUNC ] 0o666 in Unix.clear_nonblock infd; Unix.clear_nonblock outfd; let size = 1024 in let state = { copy_ues = ues; copy_group = g; copy_infd = infd; copy_outfd = outfd; copy_size = size; copy_inbuf = String.create size; copy_outbuf = String.create size; copy_outlen = 0; copy_eof = false; copy_have_inres = false; copy_have_outres = false; copy_cleared = false; } in update_resources state ues; add_handler ues g (handle state); ;;Note that the files are opened in "non-blocking" mode. This ensures that the Unix.openfile system call does not block itself. After the files have been opened, the non-blocking flag is reset; the event system already guarantees that I/O will not block.
Now we can add our copy engine to an event system, e.g.
let ues = create_unix_event_system() in copy_file ues "a.old" "a.new"; copy_file ues "b.old" "b.new"; run ues ;;This piece of code will copy both files in parallel. Note that the concept of "groups" is very helpful to avoid that several instances of the same engine interfer with each other.