#!/usr/bin/env ruby # # $Source: /var/cvs/sysvipc/sysvipc/test_sysvipc,v $ # # $Revision: 1.20 $ # $Date: 2006/12/18 14:16:23 $ # # Copyright (c) 2006 James Steven Jenkins. # $:.unshift(ENV['PWD']) require 'sysvipc' require 'test/unit' include SystemVIPC KEY = ftok(__FILE__, 0) NSEMS = 16 NSEMOPS = 5 NMSGS = 16 SHMSIZE = 1024 class TestCradle < Test::Unit::TestCase def setup end def test_msg msg = MessageQueue.new(KEY, IPC_CREAT | 0660) assert_instance_of(MessageQueue, msg, 'MessageQueue.new') uid = Process.uid gid = Process.gid perm = Permission.new(msg) assert_instance_of(Permission, perm, 'Permission.new') assert_equal(uid, perm.uid, 'Permission#uid') assert_equal(gid, perm.gid, 'Permission#gid') assert_equal(uid, perm.cuid, 'Permission#cuid') assert_equal(gid, perm.cgid, 'Permission#cgid') 1.upto(NMSGS) do |i| assert_equal(msg, msg.send(i, "message #{i}"), 'MessageQueue#send') end NMSGS.downto(1) do |i| assert_equal("message #{i}", msg.recv(i, 100), 'MessageQueue#recv') end Process.fork do sleep 1 msg.send(1, 'message 1') end assert_equal("message 1", msg.recv(1, 100), 'MessageQueue#recv') Process.wait t = Thread.new do sleep 1 msg.send(2, 'message 2') end assert_equal('message 2', msg.recv(2, 100), 'MessageQueue#recv') msg.remove end def test_sem sem = Semaphore.new(KEY, NSEMS, IPC_CREAT | 0660) assert_kind_of(Semaphore, sem, 'Semaphore.new') uid = Process.uid gid = Process.gid perm = Permission.new(sem) assert_instance_of(Permission, perm, 'Permission.new') assert_equal(uid, perm.uid, 'Permission#uid') assert_equal(gid, perm.gid, 'Permission#gid') assert_equal(uid, perm.cuid, 'Permission#cuid') assert_equal(gid, perm.cgid, 'Permission#cgid') assert_equal(sem.size, NSEMS, 'Semaphore#size') NSEMS.times do |i| assert_equal(sem, sem.set_value(i, 2), 'Semaphore#set_value') assert_equal(2, sem.value(i), 'Semaphore#value') end values = Array.new(NSEMS, 1) assert_equal(sem, sem.set_all(values), 'Semaphore#set_all') assert_equal(values, sem.to_a, 'Semaphore#to_a') NSEMS.times do |i| assert_equal(0, sem.n_count(i), 'Semaphore#n_count') assert_equal(0, sem.z_count(i), 'Semaphore#z_count') end acquire = [] release = [] NSEMOPS.times do |i| op = SemaphoreOperation.new(i, -1) assert_kind_of(SemaphoreOperation, op, 'SemaphoreOperation.new') assert_equal(i, op.pos, 'SemaphoreOperation#pos') assert_equal(-1, op.value, 'SemaphoreOperation#value') assert_equal(0, op.flags, 'SemaphoreOperation#flags') acquire << op op = SemaphoreOperation.new(i, 1) assert_equal(i, op.pos, 'SemaphoreOperation#pos') assert_equal(1, op.value, 'SemaphoreOperation#value') assert_equal(0, op.flags, 'SemaphoreOperation#flags') release << op end assert_equal(sem, sem.apply(acquire), 'Semaphore#apply') assert_equal(sem, sem.apply(release), 'Semaphore#apply') pid = Process.pid NSEMOPS.times do |i| assert_equal(pid, sem.pid(i), 'Semaphore#pid') end rd, wr = IO.pipe wr.write '1' assert_equal(sem, sem.apply(acquire), 'Semaphore#apply') Process.fork do rd.close wr.write '2' sem.apply(acquire) wr.write '4' sem.apply(release) wr.write '5' wr.close end sleep 1 wr.write '3' assert_equal(sem, sem.apply(release), 'Semaphore#apply') wr.close Process.wait assert_equal('12345', rd.read, 'Semaphore') rd.close seq = '1' assert_equal(sem, sem.apply(acquire), 'Semaphore#apply') t = Thread.new do seq << '2' sem.apply(acquire) seq << '4' sem.apply(release) seq << '5' end sleep 1 seq << '3' assert_equal(sem, sem.apply(release), 'Semaphore#apply') t.join assert_equal('12345', seq, 'Semaphore') sem.remove end def test_shm shm = SharedMemory.new(KEY, SHMSIZE, IPC_CREAT | 0660) assert_kind_of(SharedMemory, shm, 'SharedMemory.new') uid = Process.uid gid = Process.gid perm = Permission.new(shm) assert_instance_of(Permission, perm, 'Permission.new') assert_equal(uid, perm.uid, 'Permission#uid') assert_equal(gid, perm.gid, 'Permission#gid') assert_equal(uid, perm.cuid, 'Permission#cuid') assert_equal(gid, perm.cgid, 'Permission#cgid') assert_equal(SHMSIZE, shm.size, 'SharedMemory#size') assert_equal(shm, shm.attach, 'SharedMemory#attach') data_size = SHMSIZE - 1 adata = 'A' * data_size bdata = 'B' * data_size assert_equal(shm, shm.write(adata), 'SharedMemory#write') assert_equal(adata, shm.read(data_size), 'SharedMemory#read') assert_equal(shm, shm.write('test123'), 'SharedMemory#write') assert_equal('123', shm.read(3, 4), 'SharedMemory#read') assert_equal(shm, shm.write('test', 4), 'SharedMemory#write') assert_equal('testtest', shm.read(8, 0), 'SharedMemory#read') Process.fork do shm.write(bdata) end Process.wait assert_equal(bdata, shm.read(data_size), 'SharedMemory#read') t = Thread.new do shm.write(adata) end t.join assert_equal(adata, shm.read(data_size), 'SharedMemory#read') assert_equal(shm, shm.detach, 'SharedMemory#detach') shm.remove end def teardown end end