4.4. Asynchronous channels

Because engines are based on Unixqueues, one can imagine that complex operations on file descriptors are executed by engines. Actually, there is a primitive that copies the whole byte stream arriving at one descriptor to another descriptor: The class copier. We do not discuss this class in detail, it is explained in the reference manual. From the outside it works like every engine: One specifies the task, creates the engine, and waits until it is finished. Internally, the class has to watch both file descriptors, check when data can be read and written, and to actually copy chunk by chunk.

Now imagine we do not only want to copy from descriptor to descriptor, but to copy from a descriptor into a data object. Of course, we have the phenomenon that the descriptor sometimes has data to be read and sometimes not, this is well-known and can be effectively handled by Unixqueue means. In addition to this, we assume that there is only limited processing capacity in the data object, so it can sometimes accept data and sometimes not. This sounds the same, but it is not, because there is no descriptor to which this phenomenon is bound. We have to develop our own interface to mimick this behaviour on a higher programming level: The asynchronous output channel.

The term channel is used by the O'Caml runtime system to refer to buffered I/O descriptors. The Ocamlnet library has extended the meaning of the term to objects that handle I/O in a configurable way. As this is what we are going to do, we adopt this meaning.

An asynchronous output channel is a class with the type:

class type async_out_channel = object
  method output : string -> int -> int -> int
  method close_out : unit -> unit
  method pos_out : int
  method flush : unit -> unit
  method can_output : bool
  method request_notification : (unit -> bool) -> unit
The first four methods are borrowed from Ocamlnet's class type raw_out_channel:

Originally, these methods have been specified for synchronous channels. These are allowed to wait until a needed resource is again available - this is not possible for an asynchronous channel. For example, output ensures to accept at least one byte in the original specification. An implementation is free to wait until this is possible. Here, we should not do so because this would block the whole event system. Instead, there are two additional methods helping to cope with these difficulties:

The point is that now the user of an asynchronous channel is able to defer the output operation into the future when it is currently not possible. Of course, it is required that the user knows this - using an asynchronous channel is not as easy as using a synchronous channel.

We show now two examples: The first always accepts output and appends it to a buffer. Of course, the two methods can_output and request_notification are trivial in this case. The second example illustrates these methods: The channel pauses for one second after one kilobyte of data have been accepted. This is of little practical use, but quite simple to implement, and has the right niveau for an example.

Example 1: We just inherit from an Ocamlnet class that implements the buffer:

   class async_buffer b =
   object (self)
     inherit Netchannels.output_buffer b
     method can_output = true
     method request_notification (f : unit->bool) = ()
   end
I insist that this is a good example because it demonstrates why the class type async_out_channel bases on an Ocamlnet class type. (Note that async_buffer defines more methods than necessary. It might be necessary to coerce objects of this class to async_out_channel if required by typing.)

Example 2: Again we use an Ocamlnet class to implement the buffer, but we do not directly inherit from this class. Instead we instantiate it as an instance variable real_buf. The variable barrier_enabled is true as long as no more than 1024 bytes have been written into the buffer, and the sleep second is not yet over. The variable barrier_reached is true if at least 1024 bytes have been written into the buffer.

   class funny_async_buffer b ues =
   object (self)
     val real_buf = new Netchannels.output_buffer b
     val mutable barrier_enabled = true
     val mutable barrier_reached = false
     val mutable notify_list = []
     val mutable notify_list_new = []
 
     method output s k n =
       if barrier_enabled then (
	 let m = 1024 - real_buf#pos_out in
         let r = real_buf # output s k (min n m) in
         if m > 0 && real_buf#pos_out = 1024 then (
           barrier_reached <- true;
           self # configure_sleep_second();
           self # notify()
         );
         r
       )
       else 
         real_buf # output s k n

     method flush() = ()

     method pos_out = real_buf#pos_out

     method close_out() = real_buf#close_out()

     method can_output =
       if barrier_enabled then
         not barrier_reached
       else
         true

     method request_notification f =
       notify_list_new <- f :: notify_list_new

     method private notify() =
       notify_list <- notify_list @ notify_list_new;
       notify_list_new <- [];
       notify_list <- List.filter (fun f -> f()) notify_list

     method private configure_sleep_second() =
       let g = Unixqueue.new_group ues in
       Unixqueue.once ues g 1.0 self#wake_up

     method private wake_up() =
       barrier_enabled <- false;
       self # notify()
   end
Initially, the barrier is enabled, and can_output returns true. The logic in output ensures that no more than 1024 bytes are added to the buffer. When the 1024th byte is printed, the barrier is reached, and the sleep second begins. can_output changes to false, and because of this, we must notify the functions that have requested that. The timer is implemented by a call of Unixqueue.once; this function performs a callback after a period of time has elapsed. Here, wake_up is called back. It disables the barrier, and because can_output is now again true, the notifications have to be done again.

The complete example can be found in the "examples/engines" directory of the equeue distribution.

An implementation of a useful asynchronous channel is output_async_descr that outputs the channel data to a file descriptor. This class is also an engine. See the reference manual for a description.