Because engines are based on Unixqueues, one can imagine that complex operations on file descriptors are executed by engines. Actually, there is a primitive that copies the whole byte stream arriving at one descriptor to another descriptor: The class copier. We do not discuss this class in detail, it is explained in the reference manual. From the outside it works like every engine: One specifies the task, creates the engine, and waits until it is finished. Internally, the class has to watch both file descriptors, check when data can be read and written, and to actually copy chunk by chunk.
Now imagine we do not only want to copy from descriptor to descriptor, but to copy from a descriptor into a data object. Of course, we have the phenomenon that the descriptor sometimes has data to be read and sometimes not, this is well-known and can be effectively handled by Unixqueue means. In addition to this, we assume that there is only limited processing capacity in the data object, so it can sometimes accept data and sometimes not. This sounds the same, but it is not, because there is no descriptor to which this phenomenon is bound. We have to develop our own interface to mimick this behaviour on a higher programming level: The asynchronous output channel.
The term channel is used by the O'Caml runtime system to refer to buffered I/O descriptors. The Ocamlnet library has extended the meaning of the term to objects that handle I/O in a configurable way. As this is what we are going to do, we adopt this meaning.
An asynchronous output channel is a class with the type:
class type async_out_channel = object method output : string -> int -> int -> int method close_out : unit -> unit method pos_out : int method flush : unit -> unit method can_output : bool method request_notification : (unit -> bool) -> unitThe first four methods are borrowed from Ocamlnet's class type raw_out_channel:
output s k n prints into the channel n bytes that can be found at position k of string s. The method returns the number of bytes that have been accepted
close_out() closes the channel
flush() causes that bytes found in internal buffers are immediately processed. Note that it is questionable what this means in an asynchronous programming environment, and because of this, we ignore this method.
pos_out returns the number of bytes that have been written into the channel since its creation (as object)
can_output returns true when output accepts at least one byte, and false otherwise
request_notification f requests that the function f is called back whenever can_output changes its value
We show now two examples: The first always accepts output and appends it to a buffer. Of course, the two methods can_output and request_notification are trivial in this case. The second example illustrates these methods: The channel pauses for one second after one kilobyte of data have been accepted. This is of little practical use, but quite simple to implement, and has the right niveau for an example.
Example 1: We just inherit from an Ocamlnet class that implements the buffer:
class async_buffer b = object (self) inherit Netchannels.output_buffer b method can_output = true method request_notification (f : unit->bool) = () endI insist that this is a good example because it demonstrates why the class type async_out_channel bases on an Ocamlnet class type. (Note that async_buffer defines more methods than necessary. It might be necessary to coerce objects of this class to async_out_channel if required by typing.)
Example 2: Again we use an Ocamlnet class to implement the buffer, but we do not directly inherit from this class. Instead we instantiate it as an instance variable real_buf. The variable barrier_enabled is true as long as no more than 1024 bytes have been written into the buffer, and the sleep second is not yet over. The variable barrier_reached is true if at least 1024 bytes have been written into the buffer.
class funny_async_buffer b ues = object (self) val real_buf = new Netchannels.output_buffer b val mutable barrier_enabled = true val mutable barrier_reached = false val mutable notify_list = [] val mutable notify_list_new = [] method output s k n = if barrier_enabled then ( let m = 1024 - real_buf#pos_out in let r = real_buf # output s k (min n m) in if m > 0 && real_buf#pos_out = 1024 then ( barrier_reached <- true; self # configure_sleep_second(); self # notify() ); r ) else real_buf # output s k n method flush() = () method pos_out = real_buf#pos_out method close_out() = real_buf#close_out() method can_output = if barrier_enabled then not barrier_reached else true method request_notification f = notify_list_new <- f :: notify_list_new method private notify() = notify_list <- notify_list @ notify_list_new; notify_list_new <- []; notify_list <- List.filter (fun f -> f()) notify_list method private configure_sleep_second() = let g = Unixqueue.new_group ues in Unixqueue.once ues g 1.0 self#wake_up method private wake_up() = barrier_enabled <- false; self # notify() endInitially, the barrier is enabled, and can_output returns true. The logic in output ensures that no more than 1024 bytes are added to the buffer. When the 1024th byte is printed, the barrier is reached, and the sleep second begins. can_output changes to false, and because of this, we must notify the functions that have requested that. The timer is implemented by a call of Unixqueue.once; this function performs a callback after a period of time has elapsed. Here, wake_up is called back. It disables the barrier, and because can_output is now again true, the notifications have to be done again.
The complete example can be found in the "examples/engines" directory of the equeue distribution.
An implementation of a useful asynchronous channel is output_async_descr that outputs the channel data to a file descriptor. This class is also an engine. See the reference manual for a description.