/*
* VBI proxy daemon
*
* Copyright (C) 2002-2004 Tom Zoerner (and others)
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*
*
* Description:
*
* This is the main module of the VBI proxy daemon. Please refer to
* the manual page for information on the daemon's general purpose.
*
* When started, the daemon will at first only create a named socket in
* /tmp for the devices given on the command line and wait for client
* connections. When a client connects the VBI device is opened and
* configured for the requested services. If no services are requested,
* the device is still opened, but acquisition not started. When more
* clients connect, the daemon will reset service parameters and add them
* newly to the slicer in order of connection times, adjusting VBI device
* parameters as required and possible (e.g. enlarging VBI window.)
*
* Client handling was originally derived from alevtd by Gerd Knorr, then
* adapted/extended for nxtvepg and again adapted/reduced for the VBI proxy
* by Tom Zoerner.
*
*
* $Log: proxyd.c,v $
* Revision 1.16 2006/05/22 09:01:53 mschimek
* s/vbi_asprintf/asprintf.
*
* Revision 1.15 2006/02/10 06:25:36 mschimek
* *** empty log message ***
*
* Revision 1.14 2005/01/20 01:39:15 mschimek
* gcc 4.0 char pointer signedness warnings.
*
* Revision 1.13 2004/12/30 02:26:02 mschimek
* printf ptrdiff_t fixes.
*
* Revision 1.12 2004/11/07 10:52:01 mschimek
* dprintf: s/proxyd/zvbid.
*
* Revision 1.11 2004/10/25 16:56:26 mschimek
* *** empty log message ***
*
* Revision 1.10 2004/10/24 18:15:33 tomzo
* - added handling of norm changes
* - adapted for interface change to proxy-msg.c (split socket read/write func)
* - improved debug level handling (command line option -debug)
*
*/
static const char rcsid[] = "$Id: proxyd.c,v 1.16 2006/05/22 09:01:53 mschimek Exp $";
#include "config.h"
#ifdef ENABLE_PROXY
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/ioctl.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>
#include <assert.h>
#include <pthread.h>
#include "src/vbi.h"
#include "src/io.h"
#include "src/bcd.h"
#include "src/proxy-msg.h"
#ifdef ENABLE_V4L2
#include <asm/types.h>
#include "src/videodev2k.h" /* for setting device priority */
#endif
#define DBG_MSG 1
#define DBG_QU 2
#define DBG_CLNT 4
#define DBG_SCHED 8
#define dprintf(flags, fmt, arg...) \
do { \
if (opt_debug_level & (flags)) \
fprintf (stderr, "zvbid: " fmt, ## arg); \
} while (0)
/* Macro to cast (void *) to (int) and backwards without compiler warning
** (note: 64-bit compilers warn when casting a pointer to an int) */
#define PVOID2INT(X) ((int)((long)(X)))
#define INT2PVOID(X) ((void *)((long)(X)))
/* ----------------------------------------------------------------------------
** This struct is one element in the slicer data queue
*/
typedef struct PROXY_QUEUE_s
{
struct PROXY_QUEUE_s * p_next;
unsigned int ref_count;
unsigned int use_count;
int max_lines;
int line_count;
double timestamp;
void * p_raw_data;
vbi_sliced lines[1];
} PROXY_QUEUE;
#define QUEUE_ELEM_SIZE(Q,C) (sizeof(PROXY_QUEUE) + (sizeof(vbi_sliced) * ((C) - 1)))
/* ----------------------------------------------------------------------------
** Declaration of types of internal state variables
*/
/* Note mutex conventions:
** - mutex are only required for v4l devices which do not support select(2),
** because only then a separate thread is started which blocks in read(2)
** - when both the client chain and a slicer queue mutex is required, the
** client mutex is acquired first; order is important to prevent deadlocks
** - the master thread locks the client chain mutex only for write access,
** i.e. if a client is added or removed
*/
typedef enum
{
REQ_TOKEN_NONE, /* this client is not allowed to switch channels */
REQ_TOKEN_RECLAIM, /* return of token will be requested */
REQ_TOKEN_RELEASE, /* waiting for client to release token */
REQ_TOKEN_GRANT, /* this client will be sent the token a.s.a.p. */
REQ_TOKEN_GRANTED, /* this client currently holds the token */
REQ_TOKEN_RETURNED /* this client has returned the token, but still 'owns' the channel */
} REQ_TOKEN_STATE;
#define REQ_CONTROLS_CHN(X) ((X) >= REQ_TOKEN_GRANTED)
/* client channel control scheduler state */
typedef struct
{
REQ_TOKEN_STATE token_state;
vbi_bool is_completed;
int cycle_count;
time_t last_start;
time_t last_duration;
} VBIPROXY_CHN_STATE;
/* client connection state */
typedef enum
{
REQ_STATE_WAIT_CON_REQ,
REQ_STATE_WAIT_CLOSE,
REQ_STATE_FORWARD,
REQ_STATE_CLOSED,
} REQ_STATE;
#define SRV_MAX_DEVICES 4
#define VBI_MAX_BUFFER_COUNT 32
#define VBI_MIN_STRICT -1
#define VBI_MAX_STRICT 2
#define VBI_GET_SERVICE_P(PREQ,STRICT) ((PREQ)->services + (signed)(STRICT) - VBI_MIN_STRICT)
#define VBI_RAW_SERVICES(SRV) (((SRV) & (VBI_SLICED_VBI_625 | VBI_SLICED_VBI_525)) != 0)
/* this struct holds client-specific state and parameters */
typedef struct PROXY_CLNT_s
{
struct PROXY_CLNT_s * p_next;
REQ_STATE state;
VBIPROXY_MSG_STATE io;
vbi_bool endianSwap;
VBI_PROXY_CLIENT_FLAGS client_flags;
int dev_idx;
VBIPROXY_MSG msg_buf;
unsigned int services[VBI_MAX_STRICT - VBI_MIN_STRICT + 1];
unsigned int all_services;
int vbi_start[2];
int vbi_count[2];
int buffer_count;
vbi_bool buffer_overflow;
PROXY_QUEUE * p_sliced;
vbi_channel_profile chn_profile;
VBIPROXY_CHN_STATE chn_state;
VBI_CHN_PRIO chn_prio;
VBI_PROXY_CHN_FLAGS chn_status_ind;
} PROXY_CLNT;
/* this struct holds the state of a device */
typedef struct
{
const char * p_dev_name;
char * p_sock_path;
int pipe_fd;
vbi_capture * p_capture;
vbi_raw_decoder * p_decoder;
int vbi_fd;
VBI_DRIVER_API_REV vbi_api;
unsigned int all_services;
unsigned int scanning;
int max_lines;
PROXY_QUEUE * p_sliced;
PROXY_QUEUE * p_free;
PROXY_QUEUE * p_tmp_buf;
VBI_CHN_PRIO chn_prio;
vbi_bool use_thread;
int wr_fd;
vbi_bool wait_for_exit;
vbi_bool thread_active;
pthread_t thread_id;
pthread_cond_t start_cond;
pthread_mutex_t start_mutex;
pthread_mutex_t queue_mutex;
} PROXY_DEV;
/* this struct holds the global state of the module */
typedef struct
{
char * listen_ip;
char * listen_port;
vbi_bool do_tcp_ip;
int tcp_ip_fd;
int max_conn;
vbi_bool should_exit;
vbi_bool chn_sched_alarm;
PROXY_CLNT * p_clnts;
int clnt_count;
pthread_mutex_t clnt_mutex;
PROXY_DEV dev[4];
int dev_count;
} PROXY_SRV;
#define SRV_CONNECT_TIMEOUT 60
#define SRV_STALLED_STATS_INTV 15
#define SRV_QUEUE_BUFFER_COUNT 10
#define DEFAULT_MAX_CLIENTS 10
#define DEFAULT_VBI_DEV_PATH "/dev/vbi"
#define DEFAULT_VBI_DEVFS_PATH "/dev/v4l/vbi"
#define DEFAULT_CHN_PRIO VBI_CHN_PRIO_INTERACTIVE
#define DEFAULT_BUFFER_COUNT 8
#define MAX_DEV_ERROR_COUNT 10
/* ----------------------------------------------------------------------------
** Local variables
*/
static PROXY_SRV proxy;
static char * p_opt_log_name = NULL;
static int opt_log_level = -1;
static int opt_syslog_level = -1;
static vbi_bool opt_no_detach = FALSE;
static vbi_bool opt_kill_daemon = FALSE;
static unsigned int opt_max_clients = DEFAULT_MAX_CLIENTS;
static unsigned int opt_debug_level = 0;
static unsigned int opt_buffer_count = DEFAULT_BUFFER_COUNT;
/* ----------------------------------------------------------------------------
** Add one buffer to the tail of a queue
** - slicer queue is organized so that new data is appended to the tail,
** forwarded data is taken from the head
** - note that a buffer is not released from the slicer queue until all
** clients have processed it's data; client structs hold a pointer to
** the first unprocessed (by the respective client) buffer in the queue
*/
static void vbi_proxy_queue_add_tail( PROXY_QUEUE ** q, PROXY_QUEUE * p_buf )
{
PROXY_QUEUE * p_last;
dprintf(DBG_QU, "queue_add_tail: buffer 0x%lX\n", (long)p_buf);
p_buf->p_next = NULL;
if (*q != NULL)
{
assert(*q != p_buf);
p_last = *q;
while (p_last->p_next != NULL)
p_last = p_last->p_next;
p_last->p_next = p_buf;
}
else
*q = p_buf;
assert((*q != NULL) && ((*q)->p_next != *q));
}
/* ----------------------------------------------------------------------------
** Retrieve one buffer from the queue of unused buffers
** - checks if the buffer size still matches the current VBI format
** if not, the buffer is re-allocated
*/
static PROXY_QUEUE * vbi_proxy_queue_get_free( PROXY_DEV * p_proxy_dev )
{
PROXY_QUEUE * p_buf;
pthread_mutex_lock(&p_proxy_dev->queue_mutex);
p_buf = p_proxy_dev->p_free;
if (p_buf != NULL)
{
p_proxy_dev->p_tmp_buf = p_buf;
p_proxy_dev->p_free = p_buf->p_next;
pthread_mutex_unlock(&p_proxy_dev->queue_mutex);
if (p_buf->max_lines != p_proxy_dev->max_lines)
{
/* max line parameter changed -> re-alloc the buffer */
p_proxy_dev->p_tmp_buf = NULL;
if (p_buf->p_raw_data != NULL)
free(p_buf->p_raw_data);
free(p_buf);
p_buf = malloc(QUEUE_ELEM_SIZE(p_buf, p_proxy_dev->max_lines));
p_buf->p_raw_data = NULL;
p_buf->max_lines = p_proxy_dev->max_lines;
p_proxy_dev->p_tmp_buf = p_buf;
}
/* add/remove "sub-buffer" for raw data */
if (VBI_RAW_SERVICES(p_proxy_dev->all_services))
{
if (p_buf->p_raw_data == NULL)
p_buf->p_raw_data = malloc(p_proxy_dev->max_lines * VBIPROXY_RAW_LINE_SIZE);
}
else
{
if (p_buf->p_raw_data != NULL)
free(p_buf->p_raw_data);
p_buf->p_raw_data = NULL;
}
p_buf->p_next = NULL;
p_buf->ref_count = 0;
p_buf->use_count = 0;
}
else
pthread_mutex_unlock(&p_proxy_dev->queue_mutex);
dprintf(DBG_QU, "queue_get_free: buffer 0x%lX\n", (long)p_buf);
return p_buf;
}
/* ----------------------------------------------------------------------------
** Add a buffer to the queue of unused buffers
** - there's no ordering between buffers in the free queue, hence we don't
** care if the buffer is inserted at head or tail of the queue
*/
static void vbi_proxy_queue_add_free( PROXY_DEV * p_proxy_dev, PROXY_QUEUE * p_buf )
{
dprintf(DBG_QU, "queue_add_free: buffer 0x%lX\n", (long)p_buf);
p_buf->p_next = p_proxy_dev->p_free;
p_proxy_dev->p_free = p_buf;
}
/* ----------------------------------------------------------------------------
** Decrease reference counter on a buffer, add back to free queue upon zero
** - called when a buffer has been processed for one client
*/
static void vbi_proxy_queue_release_sliced( PROXY_CLNT * req )
{
PROXY_QUEUE * p_buf;
PROXY_DEV * p_proxy_dev;
p_proxy_dev = proxy.dev + req->dev_idx;
p_buf = req->p_sliced;
req->p_sliced = p_buf->p_next;
if (p_buf->ref_count > 0)
p_buf->ref_count -= 1;
if (p_buf->ref_count == 0)
{
assert(p_proxy_dev->p_sliced == p_buf);
p_proxy_dev->p_sliced = p_buf->p_next;
/* add the buffer to the free queue */
p_buf->p_next = p_proxy_dev->p_free;
p_proxy_dev->p_free = p_buf;
}
}
/* ----------------------------------------------------------------------------
** Free all resources of all buffers in a queue
** - called upon stop of acquisition for all queues
*/
static void vbi_proxy_queue_release_all( int dev_idx )
{
PROXY_DEV * p_proxy_dev;
PROXY_CLNT * req;
PROXY_QUEUE * p_next;
p_proxy_dev = proxy.dev + dev_idx;
pthread_mutex_lock(&p_proxy_dev->queue_mutex);
while (p_proxy_dev->p_sliced != NULL)
{
p_next = p_proxy_dev->p_sliced->p_next;
vbi_proxy_queue_add_free(p_proxy_dev, p_proxy_dev->p_sliced);
p_proxy_dev->p_sliced = p_next;
}
for (req = proxy.p_clnts; req != NULL; req = req->p_next)
{
if (req->dev_idx == dev_idx)
{
req->p_sliced = NULL;
}
}
pthread_mutex_unlock(&p_proxy_dev->queue_mutex);
}
/* ----------------------------------------------------------------------------
** Free all resources of all buffers in a queue
** - called upon stop of acquisition for all queues
*/
static void vbi_proxy_queue_free_all( PROXY_QUEUE ** q )
{
PROXY_QUEUE * p_next;
while (*q != NULL)
{
p_next = (*q)->p_next;
if ((*q)->p_raw_data != NULL)
free((*q)->p_raw_data);
free(*q);
*q = p_next;
}
}
/* ----------------------------------------------------------------------------
** Allocate buffers
** - determines number of required buffers and adds or removes buffers from queue
** - buffer count depends on
** (i) minimum which is always allocated (>= number of raw buffers)
** (ii) max. requested buffer count of all connected clients
** (iii) number of clients (one spare for each client)
*/
static vbi_bool vbi_proxy_queue_allocate( int dev_idx )
{
PROXY_DEV * p_proxy_dev;
PROXY_CLNT * p_walk;
PROXY_QUEUE * p_buf;
int client_count;
int buffer_count;
int buffer_free;
int buffer_used;
p_proxy_dev = proxy.dev + dev_idx;
buffer_count = opt_buffer_count;
client_count = 0;
for (p_walk = proxy.p_clnts; p_walk != NULL; p_walk = p_walk->p_next)
{
if (p_walk->dev_idx == dev_idx)
{
client_count += 1;
if (buffer_count < p_walk->buffer_count)
buffer_count = p_walk->buffer_count;
}
}
buffer_count += client_count;
pthread_mutex_lock(&p_proxy_dev->queue_mutex);
/* count buffers in sliced data output queue */
buffer_used = 0;
for (p_buf = p_proxy_dev->p_sliced; p_buf != NULL; p_buf = p_buf->p_next)
{
buffer_used += 1;
}
/* count buffers in free queue */
buffer_free = 0;
for (p_buf = p_proxy_dev->p_free; p_buf != NULL; p_buf = p_buf->p_next)
{
buffer_free += 1;
}
dprintf(DBG_MSG, "queue_allocate: need %d buffers, have %d+%d (free+used)\n", buffer_count, buffer_free, buffer_used);
if (buffer_free + buffer_used > buffer_count)
{
/* too many buffers: first reclaim from free queue (possibly too many) */
vbi_proxy_queue_free_all(&p_proxy_dev->p_free);
buffer_free = 0;
}
/* XXX we could also force-free more buffers in out queue, but that may be overkill */
while (buffer_free + buffer_used < buffer_count)
{
p_buf = malloc(QUEUE_ELEM_SIZE(p_buf, p_proxy_dev->max_lines));
if (p_buf != NULL)
{
p_buf->p_raw_data = NULL;
p_buf->max_lines = p_proxy_dev->max_lines;
vbi_proxy_queue_add_free(p_proxy_dev, p_buf);
buffer_free += 1;
}
else
{
dprintf(DBG_MSG, "queue_allocate: failed to allocate buffer (errno %d)\n", errno);
break;
}
}
pthread_mutex_unlock(&p_proxy_dev->queue_mutex);
return ((unsigned int)(buffer_free + buffer_used)
>= opt_buffer_count + client_count);
}
/* ----------------------------------------------------------------------------
** Free the first buffer in the output queue by force
** - required if one client is blocked but others still active
** - client(s) will lose this frame's data
*/
static PROXY_QUEUE * vbi_proxy_queue_force_free( PROXY_DEV * p_proxy_dev )
{
PROXY_CLNT * req;
pthread_mutex_lock(&proxy.clnt_mutex);
pthread_mutex_lock(&p_proxy_dev->queue_mutex);
if ((p_proxy_dev->p_free == NULL) && (p_proxy_dev->p_sliced != NULL))
{
dprintf(DBG_MSG, "queue_force_free: buffer 0x%lX\n", (long)p_proxy_dev->p_sliced);
for (req = proxy.p_clnts; req != NULL; req = req->p_next)
{
if (req->p_sliced == p_proxy_dev->p_sliced)
{
vbi_proxy_queue_release_sliced(req);
}
}
}
pthread_mutex_unlock(&p_proxy_dev->queue_mutex);
pthread_mutex_unlock(&proxy.clnt_mutex);
return vbi_proxy_queue_get_free(p_proxy_dev);
}
/* ----------------------------------------------------------------------------
** Read sliced data and forward it to all clients
*/
static void vbi_proxyd_forward_data( int dev_idx )
{
PROXY_QUEUE * p_buf;
PROXY_CLNT * req;
PROXY_DEV * p_proxy_dev;
struct timeval timeout;
int res;
p_proxy_dev = proxy.dev + dev_idx;
/* unlink a buffer from the free queue */
p_buf = vbi_proxy_queue_get_free(p_proxy_dev);
if (p_buf == NULL)
p_buf = vbi_proxy_queue_force_free(p_proxy_dev);
if (p_buf != NULL)
{
timeout.tv_sec = 0;
timeout.tv_usec = 0;
if (VBI_RAW_SERVICES(p_proxy_dev->all_services) == FALSE)
{
res = vbi_capture_read_sliced(p_proxy_dev->p_capture, p_buf->lines,
&p_buf->line_count, &p_buf->timestamp, &timeout);
}
else
{
res = vbi_capture_read(p_proxy_dev->p_capture,
p_buf->p_raw_data, p_buf->lines,
&p_buf->line_count, &p_buf->timestamp, &timeout);
}
if (res > 0)
{
assert(p_buf->line_count < p_buf->max_lines);
pthread_mutex_lock(&proxy.clnt_mutex);
pthread_mutex_lock(&p_proxy_dev->queue_mutex);
for (req = proxy.p_clnts; req != NULL; req = req->p_next)
{
if ( (req->dev_idx == dev_idx) &&
(req->state == REQ_STATE_FORWARD) &&
(req->all_services != 0) )
{
p_buf->ref_count += 1;
if (req->p_sliced == NULL)
req->p_sliced = p_buf;
}
}
pthread_mutex_unlock(&p_proxy_dev->queue_mutex);
pthread_mutex_unlock(&proxy.clnt_mutex);
}
else if (res < 0)
{
/* XXX abort upon error (esp. EBUSY) */
perror("VBI read");
}
pthread_mutex_lock(&p_proxy_dev->queue_mutex);
if (p_buf->ref_count > 0)
vbi_proxy_queue_add_tail(&p_proxy_dev->p_sliced, p_buf);
else
vbi_proxy_queue_add_free(p_proxy_dev, p_buf);
p_proxy_dev->p_tmp_buf = NULL;
pthread_mutex_unlock(&p_proxy_dev->queue_mutex);
}
else
dprintf(DBG_MSG, "forward_data: queue overflow\n");
}
/* ----------------------------------------------------------------------------
** Process a norm change notification
** - query driver for new norm: if sucessful, this overrides information
** provided by the client (client may also provide 0)
** - trigger sending of norm change indication to all clients if scanning changes:
** -> clients must re-apply for their services; note norm changes which don't
** affect the scanning (e.g. PAL<->SECAM) are ignored
*/
static void vbi_proxyd_update_scanning( int dev_idx, PROXY_CLNT * req, int scanning )
{
PROXY_DEV * p_proxy_dev;
PROXY_CLNT * p_walk;
unsigned int new_scanning;
p_proxy_dev = proxy.dev + dev_idx;
if (p_proxy_dev->p_capture != NULL)
{
/* if the info is coming from a client verify it */
if (req != NULL)
{
new_scanning = vbi_capture_get_scanning(p_proxy_dev->p_capture);
if (new_scanning <= 0)
{
if ((scanning == 525) || (scanning == 625))
new_scanning = scanning;
}
}
else
new_scanning = scanning;
if (new_scanning != p_proxy_dev->scanning)
{
dprintf(DBG_MSG, "update_scanning: changed from %d to %d\n", p_proxy_dev->scanning, new_scanning);
p_proxy_dev->scanning = new_scanning;
/* trigger sending of change indication to all clients except the caller */
for (p_walk = proxy.p_clnts; p_walk != NULL; p_walk = p_walk->p_next)
{
if ( (p_walk->dev_idx == dev_idx) &&
((p_walk->client_flags & VBI_PROXY_CLIENT_NO_STATUS_IND) == 0) )
{
p_walk->chn_status_ind |= VBI_PROXY_CHN_NORM;
}
}
}
}
}
/* ----------------------------------------------------------------------------
** Helper function: calculate timespec for 50ms timeout
*/
static void vbi_proxyd_calc_timeout_ms( struct timespec * p_tsp, int msecs )
{
struct timeval tv;
gettimeofday(&tv, NULL);
tv.tv_usec += msecs * 1000L;
if (tv.tv_usec > 1000 * 1000L)
{
tv.tv_sec += 1;
tv.tv_usec -= 1000 * 1000;
}
p_tsp->tv_sec = tv.tv_sec;
p_tsp->tv_nsec = tv.tv_usec * 1000;
}
/* ----------------------------------------------------------------------------
** Clean up after thread cancellation: signal waiting master thread
*/
static void vbi_proxyd_acq_thread_cleanup( void * pvoid_arg )
{
PROXY_DEV * p_proxy_dev;
int dev_idx;
dev_idx = PVOID2INT(pvoid_arg);
p_proxy_dev = proxy.dev + dev_idx;
dprintf(DBG_QU, "acq thread cleanup: signaling master (%d)\n", p_proxy_dev->wait_for_exit);
pthread_mutex_lock(&p_proxy_dev->start_mutex);
if (p_proxy_dev->wait_for_exit)
{
pthread_cond_signal(&p_proxy_dev->start_cond);
}
if (p_proxy_dev->p_tmp_buf != NULL)
{
vbi_proxy_queue_add_free(p_proxy_dev, p_proxy_dev->p_tmp_buf);
p_proxy_dev->p_tmp_buf = NULL;
}
p_proxy_dev->thread_active = FALSE;
pthread_mutex_unlock(&p_proxy_dev->start_mutex);
}
/* ----------------------------------------------------------------------------
** Main loop for acquisition thread for devices that don't support select(2)
*/
static void * vbi_proxyd_acq_thread( void * pvoid_arg )
{
PROXY_DEV * p_proxy_dev;
int dev_idx;
int ret;
char byte_buf[1];
sigset_t sigmask;
dev_idx = PVOID2INT(pvoid_arg);
p_proxy_dev = proxy.dev + dev_idx;
/* block signals which are handled by main thread */
sigemptyset(&sigmask);
sigaddset(&sigmask, SIGHUP);
sigaddset(&sigmask, SIGINT);
sigaddset(&sigmask, SIGTERM);
pthread_sigmask(SIG_BLOCK, &sigmask, NULL);
pthread_cleanup_push(vbi_proxyd_acq_thread_cleanup, pvoid_arg);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
p_proxy_dev->thread_active = TRUE;
pthread_mutex_lock(&p_proxy_dev->start_mutex);
pthread_cond_signal(&p_proxy_dev->start_cond);
pthread_mutex_unlock(&p_proxy_dev->start_mutex);
while (p_proxy_dev->wait_for_exit == FALSE)
{
/* read data from the VBI device and append the buffer to all client queues
** note: this function blocks in read(2) until data is available */
vbi_proxyd_forward_data(dev_idx);
/* wake up the master thread to process client queues */
ret = write(p_proxy_dev->wr_fd, byte_buf, 1);
if ((ret < 0) && (errno != EAGAIN))
{
dprintf(DBG_MSG, "acq_thread: write error to pipe: %d\n", errno);
break;
}
else if (ret != 1)
dprintf(DBG_MSG, "acq_thread: pipe overflow\n");
}
pthread_cleanup_pop(1);
pthread_exit(0);
return NULL;
}
/* ----------------------------------------------------------------------------
** Stop acquisition thread
*/
static void vbi_proxyd_stop_acq_thread( PROXY_DEV * p_proxy_dev )
{
struct timespec tsp;
int ret;
int vbi_fd;
assert(p_proxy_dev->use_thread);
pthread_mutex_lock(&p_proxy_dev->start_mutex);
if (p_proxy_dev->thread_active)
{
p_proxy_dev->wait_for_exit = TRUE;
pthread_cancel(p_proxy_dev->thread_id);
vbi_proxyd_calc_timeout_ms(&tsp, 50);
ret = pthread_cond_timedwait(&p_proxy_dev->start_cond, &p_proxy_dev->start_mutex, &tsp);
if (ret != 0)
{ /* thread did not stop within 50ms: probably blocked in read with no incoming data */
/* dirty hack: force to wake up by closing the file handle */
vbi_fd = vbi_capture_fd(p_proxy_dev->p_capture);
close(vbi_fd);
dprintf(DBG_MSG, "stop_acq_thread: thread did not exit (%d): closed VBI filehandle %d\n", ret, vbi_fd);
vbi_proxyd_calc_timeout_ms(&tsp, 50);
ret = pthread_cond_timedwait(&p_proxy_dev->start_cond, &p_proxy_dev->start_mutex, &tsp);
}
if (ret == 0)
{
ret = pthread_join(p_proxy_dev->thread_id, NULL);
if (ret == 0)
dprintf(DBG_MSG, "stop_acq_thread: acq thread killed sucessfully\n");
else
dprintf(DBG_MSG, "stop_acq_thread: pthread_join failed: %d (%s)\n", errno, strerror(errno));
}
}
close(p_proxy_dev->vbi_fd);
close(p_proxy_dev->wr_fd);
p_proxy_dev->vbi_fd = -1;
p_proxy_dev->wr_fd = -1;
p_proxy_dev->use_thread = FALSE;
pthread_mutex_unlock(&p_proxy_dev->start_mutex);
}
/* ----------------------------------------------------------------------------
** Start a thread to block in read(2) for devices that don't support select(2)
*/
static vbi_bool vbi_proxyd_start_acq_thread( int dev_idx )
{
PROXY_DEV * p_proxy_dev;
int pipe_fds[2];
vbi_bool result = FALSE;
p_proxy_dev = proxy.dev + dev_idx;
p_proxy_dev->use_thread = TRUE;
p_proxy_dev->wait_for_exit = FALSE;
p_proxy_dev->thread_active = FALSE;
if (pipe(pipe_fds) == 0)
{
p_proxy_dev->vbi_fd = pipe_fds[0];
p_proxy_dev->wr_fd = pipe_fds[1];
fcntl(p_proxy_dev->vbi_fd, F_SETFL, O_NONBLOCK);
fcntl(p_proxy_dev->wr_fd, F_SETFL, O_NONBLOCK);
/* start thread */
pthread_mutex_lock(&p_proxy_dev->start_mutex);
if (pthread_create(&p_proxy_dev->thread_id, NULL,
vbi_proxyd_acq_thread, INT2PVOID(dev_idx)) == 0)
{
dprintf(DBG_MSG, "acquisiton thread started: "
"id %ld, device %ld, pipe rd/wr %d/%d\n",
(long)p_proxy_dev->thread_id,
(long)(p_proxy_dev - proxy.dev),
p_proxy_dev->vbi_fd, p_proxy_dev->wr_fd);
/* wait for the slave to report the initialization result */
pthread_cond_wait(&p_proxy_dev->start_cond, &p_proxy_dev->start_mutex);
pthread_mutex_unlock(&p_proxy_dev->start_mutex);
result = p_proxy_dev->thread_active;
}
else
dprintf(DBG_MSG, "start_acq_thread: pthread_create: %d (%s)\n", errno, strerror(errno));
}
else
dprintf(DBG_MSG, "start_acq_thread: create pipe: %d (%s)\n", errno, strerror(errno));
return result;
}
/* ----------------------------------------------------------------------------
** Stop VBI acquisition (after the last client quit)
*/
static void vbi_proxy_stop_acquisition( PROXY_DEV * p_proxy_dev )
{
if (p_proxy_dev->p_capture != NULL)
{
dprintf(DBG_MSG, "stop_acquisition: stopping (prev. services 0x%X)\n", p_proxy_dev->all_services);
if (p_proxy_dev->use_thread)
vbi_proxyd_stop_acq_thread(p_proxy_dev);
vbi_capture_delete(p_proxy_dev->p_capture);
p_proxy_dev->p_capture = NULL;
p_proxy_dev->p_decoder = NULL;
p_proxy_dev->vbi_fd = -1;
vbi_proxy_queue_free_all(&p_proxy_dev->p_free);
vbi_proxy_queue_free_all(&p_proxy_dev->p_sliced);
}
}
/* ----------------------------------------------------------------------------
** Open capture device (for the first client)
** - does not yet any services yet
*/
static vbi_bool vbi_proxy_start_acquisition( int dev_idx, char ** pp_errorstr )
{
PROXY_DEV * p_proxy_dev;
char * p_errorstr;
vbi_bool result;
p_proxy_dev = proxy.dev + dev_idx;
result = FALSE;
/* assign dummy error string if necessary */
p_errorstr = NULL;
if (pp_errorstr == NULL)
pp_errorstr = &p_errorstr;
p_proxy_dev->vbi_api = VBI_API_V4L2;
p_proxy_dev->p_capture = vbi_capture_v4l2_new(p_proxy_dev->p_dev_name, opt_buffer_count,
NULL, -1, pp_errorstr, opt_debug_level);
if (p_proxy_dev->p_capture == NULL)
{
p_proxy_dev->vbi_api = VBI_API_V4L1;
p_proxy_dev->p_capture = vbi_capture_v4l_new(p_proxy_dev->p_dev_name, p_proxy_dev->scanning,
NULL, -1, pp_errorstr, opt_debug_level);
}
if (p_proxy_dev->p_capture != NULL)
{
p_proxy_dev->p_decoder = vbi_capture_parameters(p_proxy_dev->p_capture);
if (p_proxy_dev->p_decoder != NULL)
{
/* allocate buffer queue for sliced output data */
vbi_proxy_queue_allocate(dev_idx);
p_proxy_dev->chn_prio = VBI_CHN_PRIO_INTERACTIVE;
/* get file handle for select() to wait for VBI data */
if ((vbi_capture_get_fd_flags(p_proxy_dev->p_capture) & VBI_FD_HAS_SELECT) != 0)
{
p_proxy_dev->vbi_fd = vbi_capture_fd(p_proxy_dev->p_capture);
result = (p_proxy_dev->vbi_fd != -1);
}
else
result = vbi_proxyd_start_acq_thread(dev_idx);
}
else
dprintf(DBG_MSG, "start_acquisition: capture device has no slicer!?\n");
}
if (result == FALSE)
{
vbi_proxy_stop_acquisition(p_proxy_dev);
}
if ((pp_errorstr == &p_errorstr) && (p_errorstr != NULL))
free(p_errorstr);
return result;
}
/* ----------------------------------------------------------------------------
** Update service mask after a client was added or closed
** - TODO: update buffer_count
*/
static vbi_bool vbi_proxyd_update_services( int dev_idx, PROXY_CLNT * p_new_req,
int new_req_strict, char ** pp_errorstr )
{
PROXY_CLNT * req;
PROXY_CLNT * p_walk;
PROXY_DEV * p_proxy_dev;
unsigned int dev_services;
unsigned int tmp_services;
unsigned int next_srv;
int strict;
int strict2;
vbi_bool is_first;
vbi_bool result;
p_proxy_dev = proxy.dev + dev_idx;
if (p_proxy_dev->p_capture == NULL)
{
/* cpture device not opened yet */
/* check if other clients have any services enabled */
next_srv = 0;
for (req = proxy.p_clnts; req != NULL; req = req->p_next)
for (strict = VBI_MIN_STRICT; strict <= VBI_MAX_STRICT; strict++)
next_srv |= *VBI_GET_SERVICE_P(req, strict);
if (next_srv != 0)
{
result = vbi_proxy_start_acquisition(dev_idx, pp_errorstr);
}
else
{
/* XXX FIXME must open device at least once to query API
** XXX must be change since device open may fail with EBUSY: better leave device open while users are connected */
if (p_proxy_dev->vbi_api == VBI_API_UNKNOWN)
{
vbi_proxy_start_acquisition(dev_idx, NULL);
vbi_proxy_stop_acquisition(p_proxy_dev);
}
result = TRUE;
}
}
else
result = FALSE;
if (p_proxy_dev->p_capture != NULL)
{
/* terminate acq thread because we're about to suspend capturing */
if (p_proxy_dev->use_thread)
vbi_proxyd_stop_acq_thread(p_proxy_dev);
/* XXX TODO: possible optimization: reduce number of update_service calls:
** (1) collect all services first; (2) add services at 3 strict levels; (3) update all_services for all clients */
is_first = TRUE;
dev_services = 0;
for (req = proxy.p_clnts; req != NULL; req = req->p_next)
{
if ( (req->dev_idx == dev_idx) &&
(req->state == REQ_STATE_FORWARD) )
{
req->all_services = 0;
for (strict = VBI_MIN_STRICT; strict <= VBI_MAX_STRICT; strict++)
{
tmp_services = *VBI_GET_SERVICE_P(req, strict);
if (tmp_services != 0)
{
next_srv = 0;
for (strict2 = strict + 1; strict2 <= VBI_MAX_STRICT; strict2++)
if ((next_srv |= *VBI_GET_SERVICE_P(req, strict2)) != 0)
break;
/* search following clients if more services follow */
if (next_srv == 0)
for (p_walk = req->p_next; p_walk != NULL; p_walk = p_walk->p_next)
for (strict2 = VBI_MIN_STRICT; strict2 <= VBI_MAX_STRICT; strict2++)
if ((next_srv |= *VBI_GET_SERVICE_P(p_walk, strict2)) != 0)
goto next_srv_found; // break^2
next_srv_found:
dprintf(DBG_MSG, "service_update: fd %d: add services=0x%X strict=%d final=%d\n", req->io.sock_fd, tmp_services, strict, (next_srv == 0));
tmp_services =
vbi_capture_update_services( p_proxy_dev->p_capture,
is_first, (next_srv == 0),
tmp_services, strict,
/* return error strings only for the new client */
(((req == p_new_req) &&
(strict == new_req_strict)) ? pp_errorstr : NULL) );
dev_services |= tmp_services;
req->all_services |= tmp_services;
is_first = FALSE;
/* must not mask out client service bits unless upon a new request; afterwards
** services must be cached and re-applied, e.g. in case the norm changes back */
if (req == p_new_req)
*VBI_GET_SERVICE_P(req, strict) &= tmp_services;
}
}
}
}
/* check if scanning changed
** (even if all clients suspended: some might be waiting for scanning change) */
vbi_proxyd_update_scanning(dev_idx, NULL, p_proxy_dev->p_decoder->scanning);
if (dev_services != 0)
{
p_proxy_dev->all_services = dev_services;
p_proxy_dev->max_lines = p_proxy_dev->p_decoder->count[0]
+ p_proxy_dev->p_decoder->count[1];
/* grow/shrink buffer queue for sliced output data */
vbi_proxy_queue_allocate(dev_idx);
dprintf(DBG_MSG, "service_update: new service mask 0x%X, max.lines=%d, scanning=%d\n", dev_services, p_proxy_dev->max_lines, p_proxy_dev->scanning);
if ((vbi_capture_get_fd_flags(p_proxy_dev->p_capture) & VBI_FD_HAS_SELECT) != 0)
{
result = TRUE;
}
else
result = vbi_proxyd_start_acq_thread(dev_idx);
}
else
{ /* no services set: not an error if clien't didn't request any */
result = is_first;
}
if ((dev_services == 0) || (result == FALSE))
{
/* no clients remaining or acq start failed -> stop acquisition */
vbi_proxy_stop_acquisition(p_proxy_dev);
}
}
return result;
}
/* ----------------------------------------------------------------------------
** Process a client's service request
** - either during connect request or later upon service request
** - note if it's not the first request a different "strictness" may be given;
** must remember strictness for each service to be able to re-apply for the
** same services mask to the decoder later
*/
static vbi_bool vbi_proxyd_take_service_req( PROXY_CLNT * req,
unsigned int new_services, int new_strict,
char * errormsg )
{
char * p_errorstr;
PROXY_DEV * p_proxy_dev;
int strict;
vbi_bool result;
p_proxy_dev = proxy.dev + req->dev_idx;
p_errorstr = NULL;
/* remove new services from all strict levels */
for (strict = VBI_MIN_STRICT; strict <= VBI_MAX_STRICT; strict++)
*VBI_GET_SERVICE_P(req, strict) &= ~ new_services;
/* add new services at the given level of strictness */
*VBI_GET_SERVICE_P(req, new_strict) |= new_services;
/* merge with other client's requests and pass to the device */
result = vbi_proxyd_update_services(req->dev_idx, req, new_strict, &p_errorstr);
if ( (result == FALSE) ||
( ((req->all_services & new_services) == 0) && (new_services != 0) ))
{
if (p_errorstr != NULL)
{
strncpy(errormsg, p_errorstr, VBIPROXY_ERROR_STR_MAX_LENGTH);
errormsg[VBIPROXY_ERROR_STR_MAX_LENGTH - 1] = 0;
}
else if ( ((*VBI_GET_SERVICE_P(req, new_strict) & new_services) == 0) &&
(new_services != 0) )
{
strncpy(errormsg, "Sorry, proxy cannot capture any of the requested data services.",
VBIPROXY_ERROR_STR_MAX_LENGTH);
errormsg[VBIPROXY_ERROR_STR_MAX_LENGTH - 1] = 0;
}
else
{
strncpy(errormsg, "Internal error in service update.", VBIPROXY_ERROR_STR_MAX_LENGTH);
errormsg[VBIPROXY_ERROR_STR_MAX_LENGTH - 1] = 0;
}
result = FALSE;
}
if (p_proxy_dev->p_decoder != NULL)
{
/* keep a copy of the VBI line ranges: used as upper/lower boundaries in
** sliced data messages MUST NOT be changed later (at least not increased)
** even if services change, to avoid overflowing clients' buffers */
req->vbi_start[0] = p_proxy_dev->p_decoder->start[0];
req->vbi_count[0] = p_proxy_dev->p_decoder->count[0];
req->vbi_start[1] = p_proxy_dev->p_decoder->start[1];
req->vbi_count[1] = p_proxy_dev->p_decoder->count[1];
}
if (p_errorstr != NULL)
free(p_errorstr);
return result;
}
/* ----------------------------------------------------------------------------
** Search for client which owns the token
*/
static PROXY_CLNT * vbi_proxyd_get_token_owner( int dev_idx )
{
PROXY_DEV * p_proxy_dev;
PROXY_CLNT * p_walk;
PROXY_CLNT * p_owner;
p_proxy_dev = proxy.dev + dev_idx;
p_owner = NULL;
for (p_walk = proxy.p_clnts; p_walk != NULL; p_walk = p_walk->p_next)
{
if (p_walk->dev_idx == dev_idx)
{
switch (p_walk->chn_state.token_state)
{
case REQ_TOKEN_NONE:
break;
case REQ_TOKEN_GRANT:
case REQ_TOKEN_RETURNED:
case REQ_TOKEN_RECLAIM:
case REQ_TOKEN_RELEASE:
case REQ_TOKEN_GRANTED:
assert(p_owner == NULL);
p_owner = p_walk;
break;
default:
assert(FALSE); /* invalid state */
break;
}
}
}
return p_owner;
}
/* ----------------------------------------------------------------------------
** Grant token to a given client
** - basically implements a matrix of all possible token states in current and
** future token owner: however only one may have non-"NONE" state
** - if the token is still in posession of a different client the request will
** fail, but the token is reclaimed from the other client
*/
static vbi_bool vbi_proxyd_token_grant( PROXY_CLNT * req )
{
PROXY_CLNT * p_owner;
vbi_bool token_free = TRUE;
switch (req->chn_state.token_state)
{
case REQ_TOKEN_NONE:
p_owner = vbi_proxyd_get_token_owner(req->dev_idx);
if ( (p_owner == NULL) ||
( (p_owner->chn_state.token_state == REQ_TOKEN_GRANT) ||
(p_owner->chn_state.token_state == REQ_TOKEN_RETURNED) ))
{
/* token is free or grant message not yet sent -> immediately grant to new client */
req->chn_state.token_state = REQ_TOKEN_GRANT;
if (p_owner != NULL)
p_owner->chn_state.token_state = REQ_TOKEN_NONE;
}
else
{ /* have to reclaim token from previous owner first */
if (p_owner->chn_state.token_state != REQ_TOKEN_RELEASE)
p_owner->chn_state.token_state = REQ_TOKEN_RECLAIM;
token_free = FALSE;
}
break;
case REQ_TOKEN_GRANT:
/* client is already about to be granted the token -> nothing to do */
break;
case REQ_TOKEN_RECLAIM:
/* reclaim message not yet sent -> just return to GRANTED state */
req->chn_state.token_state = REQ_TOKEN_GRANTED;
break;
case REQ_TOKEN_RELEASE:
/* reclaim already sent -> must re-assign token */
req->chn_state.token_state = REQ_TOKEN_GRANT;
break;
case REQ_TOKEN_GRANTED:
case REQ_TOKEN_RETURNED:
/* client is still in control of the channel: nothing to do */
break;
default:
assert(FALSE); /* invalid state */
break;
}
return token_free;
}
/* ----------------------------------------------------------------------------
** Adapt channel scheduler state when switching away from a channel
*/
static void vbi_proxyd_channel_completed( PROXY_CLNT * req, time_t whence )
{
PROXY_CLNT * p_walk;
assert(REQ_CONTROLS_CHN(req->chn_state.token_state));
req->chn_state.last_duration = whence - req->chn_state.last_start;
req->chn_state.is_completed = TRUE;
req->chn_state.cycle_count += 1;
dprintf(DBG_MSG, "channel_schedule: fd %d terminated (duration %d, cycle #%d)\n", req->io.sock_fd, (int)req->chn_state.last_duration, req->chn_state.cycle_count);
if (req->chn_state.cycle_count > 2)
{
/* cycle counter overflow: only values 1, 2 allowed (plus 0 for new requests)
** -> reduce all counters by one */
dprintf(DBG_MSG, "channel_schedule: dev #%d: leveling cycle counters\n", req->dev_idx);
for (p_walk = proxy.p_clnts; p_walk != NULL; p_walk = p_walk->p_next)
if (p_walk->dev_idx == req->dev_idx)
if (p_walk->chn_state.cycle_count > 0)
p_walk->chn_state.cycle_count -= 1;
}
else if (req->chn_state.cycle_count == 1)
{
/* cycle counter hops always to maximum, i.e. from 0 to 2 so that a new request
** has immediately highest prio, but is only scheduled once before the others */
for (p_walk = proxy.p_clnts; p_walk != NULL; p_walk = p_walk->p_next)
if (p_walk->dev_idx == req->dev_idx)
if (p_walk->chn_state.cycle_count >= 2)
break;
if (p_walk != NULL)
req->chn_state.cycle_count = 2;
}
}
/* ----------------------------------------------------------------------------
** Adapt channel scheduler state when switching away from a channel
*/
static void vbi_proxyd_channel_stopped( PROXY_CLNT * req )
{
time_t now = time(NULL);
assert(REQ_CONTROLS_CHN(req->chn_state.token_state));
if ( (req->chn_state.is_completed == FALSE) &&
(now - req->chn_state.last_start >= req->chn_profile.min_duration) )
{
vbi_proxyd_channel_completed(req, now);
}
req->chn_state.is_completed = FALSE;
if (req->chn_state.token_state == REQ_TOKEN_GRANTED)
req->chn_state.token_state = REQ_TOKEN_RECLAIM;
else
req->chn_state.token_state = REQ_TOKEN_NONE;
}
/* ----------------------------------------------------------------------------
** Calculate next timer for scheduler
** - since there's only one alarm signal, the nearest timeout on all devices is searched
*/
static void vbi_proxyd_channel_timer_update( void )
{
PROXY_CLNT * p_walk;
PROXY_DEV * p_proxy_dev;
time_t rest;
time_t next_sched;
time_t now;
now = time(NULL);
next_sched = 0;
for (p_walk = proxy.p_clnts; p_walk != NULL; p_walk = p_walk->p_next)
{
p_proxy_dev = proxy.dev + p_walk->dev_idx;
if ( (p_proxy_dev->chn_prio == VBI_CHN_PRIO_BACKGROUND) &&
REQ_CONTROLS_CHN(p_walk->chn_state.token_state) &&
(p_walk->chn_state.is_completed == FALSE) )
{
rest = p_walk->chn_profile.min_duration - (now - p_walk->chn_state.last_start);
if ((rest > 0) && ((rest < next_sched) || (next_sched == 0)))
next_sched = rest;
else if (rest < 0)
next_sched = 1;
}
/* XXX TODO: set timer to supervise TOKEN RELEASE */
}
if (next_sched != 0)
dprintf(DBG_MSG, "channel_timer_update: set alarm timer in %d secs\n", (int)next_sched);
alarm(next_sched);
proxy.chn_sched_alarm = FALSE;
}
/* ----------------------------------------------------------------------------
** Determine which client's channel request is granted
*/
static PROXY_CLNT *
vbi_proxyd_channel_schedule( int dev_idx )
{
PROXY_CLNT * p_walk;
PROXY_CLNT * p_sched;
PROXY_CLNT * p_active;
PROXY_DEV * p_proxy_dev;
time_t now;
p_proxy_dev = proxy.dev + dev_idx;
p_sched = NULL;
p_active = NULL;
now = time(NULL);
for (p_walk = proxy.p_clnts; p_walk != NULL; p_walk = p_walk->p_next)
{
if ( (p_walk->dev_idx == dev_idx) &&
(p_walk->chn_profile.is_valid) &&
(p_walk->chn_prio == VBI_CHN_PRIO_BACKGROUND) )
{
/* if this client's channel is currently active, check if the reservation has expired */
if (REQ_CONTROLS_CHN(p_walk->chn_state.token_state))
{
if ( (now - p_walk->chn_state.last_start >= p_walk->chn_profile.min_duration) &&
(p_walk->chn_state.is_completed == FALSE) )
{
vbi_proxyd_channel_completed(p_walk, now);
}
p_active = p_walk;
}
dprintf(DBG_MSG, "channel_schedule: fd %d: active=%d compl=%d sub-prio=0x%02X cycles#%d min-dur=%d\n", p_walk->io.sock_fd, REQ_CONTROLS_CHN(p_walk->chn_state.token_state), p_walk->chn_state.is_completed, p_walk->chn_profile.sub_prio, p_walk->chn_state.cycle_count, (int)p_walk->chn_profile.min_duration);
if (p_sched != NULL)
{
if ( p_walk->chn_state.cycle_count
+ ((REQ_CONTROLS_CHN(p_walk->chn_state.token_state) && p_walk->chn_state.is_completed) ? 1 : 0)
< p_sched->chn_state.cycle_count
+ ((REQ_CONTROLS_CHN(p_sched->chn_state.token_state) && p_sched->chn_state.is_completed) ? 1 : 0) )
{ /* this one is already done (more often) */
dprintf(DBG_SCHED, "channel_schedule: fd %d wins by cycle count\n", p_walk->io.sock_fd);
p_sched = p_walk;
}
else if (p_walk->chn_profile.sub_prio > p_sched->chn_profile.sub_prio)
{ /* higher priority found */
dprintf(DBG_SCHED, "channel_schedule: fd %d wins by sub-prio\n", p_walk->io.sock_fd);
p_sched = p_walk;
}
else if (p_walk->chn_profile.sub_prio == p_sched->chn_profile.sub_prio)
{ /* same priority */
if ( REQ_CONTROLS_CHN(p_walk->chn_state.token_state) &&
!p_walk->chn_state.is_completed )
{ /* this one is still active */
dprintf(DBG_SCHED, "channel_schedule: fd %d wins by being already active and non-complete\n", p_walk->io.sock_fd);
p_sched = p_walk;
}
else if ( REQ_CONTROLS_CHN(p_sched->chn_state.token_state) &&
p_sched->chn_state.is_completed )
{ /* prev. selected one was completed -> choose next */
dprintf(DBG_SCHED, "channel_schedule: fd %d wins because active one is completed\n", p_walk->io.sock_fd);
p_sched = p_walk;
}
else if ( !REQ_CONTROLS_CHN(p_walk->chn_state.token_state) &&
!REQ_CONTROLS_CHN(p_sched->chn_state.token_state) )
{ /* none active -> first come first serve */
if ( (p_walk->chn_state.last_start < p_sched->chn_state.last_start) ||
( (p_walk->chn_state.last_start == p_sched->chn_state.last_start) &&
(p_walk->chn_profile.min_duration < p_sched->chn_profile.min_duration) ))
{
dprintf(DBG_SCHED, "channel_schedule: fd %d wins because longer non-active\n", p_walk->io.sock_fd);
p_sched = p_walk;
}
}
}
}
else
p_sched = p_walk;
}
}
if ((p_sched != p_active) && (p_active != NULL))
{
vbi_proxyd_channel_stopped(p_active);
}
return p_sched;
}
/* ----------------------------------------------------------------------------
** Update channel, after channel change request or connection release
** - if only background-prio clients are connected, the scheduler decides;
** else the channel is switched (if the client matches the daemon's max prio)
*/
static vbi_bool
vbi_proxyd_channel_update( int dev_idx, PROXY_CLNT * req, vbi_bool forced_switch )
{
PROXY_CLNT * p_walk;
PROXY_CLNT * p_sched;
PROXY_DEV * p_proxy_dev;
VBI_CHN_PRIO max_chn_prio;
vbi_bool result;
p_proxy_dev = proxy.dev + dev_idx;
result = FALSE;
/* determine new max. channel priority */
max_chn_prio = VBI_CHN_PRIO_BACKGROUND;
for (p_walk = proxy.p_clnts; p_walk != NULL; p_walk = p_walk->p_next)
if ((p_walk->dev_idx == dev_idx) && (p_walk->chn_prio > max_chn_prio))
max_chn_prio = p_walk->chn_prio;
if (p_proxy_dev->chn_prio != max_chn_prio)
{
#if defined(ENABLE_V4L2) && defined(VIDIOC_S_PRIORITY)
if (p_proxy_dev->vbi_api == VBI_API_V4L2)
{
enum v4l2_priority v4l2_prio = max_chn_prio;
int fd = vbi_capture_fd(p_proxy_dev->p_capture);
if (fd != -1)
{
if (ioctl(fd, VIDIOC_S_PRIORITY, &v4l2_prio) != 0)
{
dprintf(DBG_MSG, "Failed to set register v4l2 channel prio to %d: %d (%s)\n", p_proxy_dev->chn_prio, errno, strerror(errno));
}
else
{
ioctl(fd, VIDIOC_G_PRIORITY, &v4l2_prio);
dprintf(DBG_MSG, "channel_update: dev #%d: setting v4l2 channel prio to %d (was %d) (dev prio is %d)\n", dev_idx, max_chn_prio, p_proxy_dev->chn_prio, v4l2_prio);
}
}
}
#endif
/* save the priority registered with the device */
p_proxy_dev->chn_prio = max_chn_prio;
}
/* non-bg prio OR channel has already been switched -> clear scheduler active flag */
if ( (max_chn_prio > VBI_CHN_PRIO_BACKGROUND) || forced_switch )
{
for (p_walk = proxy.p_clnts; p_walk != NULL; p_walk = p_walk->p_next)
if ((p_walk->dev_idx == dev_idx) && REQ_CONTROLS_CHN(p_walk->chn_state.token_state))
vbi_proxyd_channel_stopped(p_walk);
}
if (max_chn_prio == VBI_CHN_PRIO_BACKGROUND)
{ /* background -> let scheduler decide */
p_sched = vbi_proxyd_channel_schedule(dev_idx);
}
else if ( (req != NULL) &&
(req->chn_prio == max_chn_prio) )
{ /* non-background prio -> latest request wins */
p_sched = req;
}
else
{ /* reject switch by priority */
p_sched = NULL;
}
if ( (p_sched != NULL) &&
(max_chn_prio == VBI_CHN_PRIO_BACKGROUND) &&
(REQ_CONTROLS_CHN(p_sched->chn_state.token_state) == FALSE) )
{
if ( vbi_proxyd_token_grant(p_sched) )
{
p_sched->chn_state.is_completed = FALSE;
p_sched->chn_state.last_duration = 0;
p_sched->chn_state.last_start = time(NULL);
/* return TRUE if requested channel control can be granted immediately */
result = (p_sched == req);
}
}
else
{ /* no channel change is allowed or required */
/* flush-only flag: assume client has already done the switch -> must flush VBI buffers */
if (forced_switch)
{
vbi_capture_flush(p_proxy_dev->p_capture);
}
}
if (max_chn_prio == VBI_CHN_PRIO_BACKGROUND)
vbi_proxyd_channel_timer_update();
return result;
}
/* ----------------------------------------------------------------------------
** Flush after channel change
*/
static void vbi_proxyd_channel_flush( int dev_idx, PROXY_CLNT * req )
{
PROXY_CLNT * p_walk;
PROXY_DEV * p_proxy_dev;
req = req;
p_proxy_dev = proxy.dev + dev_idx;
if (p_proxy_dev->p_capture != NULL)
{
/* flush capture buffers */
vbi_capture_flush(p_proxy_dev->p_capture);
/* flush slicer output buffer queues */
vbi_proxy_queue_release_all(dev_idx);
}
/* trigger sending of change indication to all clients except the caller */
for (p_walk = proxy.p_clnts; p_walk != NULL; p_walk = p_walk->p_next)
{
if ( (p_walk->dev_idx == dev_idx) &&
((p_walk->client_flags & VBI_PROXY_CLIENT_NO_STATUS_IND) == 0) )
{
p_walk->chn_status_ind |= VBI_PROXY_CHN_FLUSH;
}
}
}
/* ----------------------------------------------------------------------------
** Check channel scheduling on all devices for expired timers
*/
static void vbi_proxyd_channel_timer( void )
{
PROXY_CLNT * p_walk;
PROXY_DEV * p_proxy_dev;
int dev_idx;
time_t now;
vbi_bool do_schedule;
int user_count;
now = time(NULL);
for (dev_idx = 0; dev_idx < proxy.dev_count; dev_idx++)
{
p_proxy_dev = proxy.dev + dev_idx;
do_schedule = FALSE;
user_count = 0;
if (p_proxy_dev->chn_prio == VBI_CHN_PRIO_BACKGROUND)
{
for (p_walk = proxy.p_clnts; p_walk != NULL; p_walk = p_walk->p_next)
{
if (p_walk->dev_idx == dev_idx)
{
if ( REQ_CONTROLS_CHN(p_walk->chn_state.token_state) &&
(p_walk->chn_state.is_completed == FALSE) &&
(now - p_walk->chn_state.last_start >= p_walk->chn_profile.min_duration) )
{
do_schedule = TRUE;
}
user_count += 1;
}
}
if (do_schedule && (user_count > 1))
{
dprintf(DBG_MSG, "schedule_timer: schedule device #%d\n", dev_idx);
vbi_proxyd_channel_update(dev_idx, NULL, FALSE);
}
}
}
}
/* ----------------------------------------------------------------------------
** Process client ioctl request
*/
static vbi_bool
vbi_proxyd_take_ioctl_req( PROXY_CLNT * req, int request, void * p_arg_data,
unsigned int arg_size, int * p_result, int * p_errcode )
{
PROXY_DEV * p_proxy_dev;
vbi_bool req_perm;
int size;
int vbi_fd;
vbi_bool opened = FALSE;
vbi_bool result = FALSE;
p_proxy_dev = proxy.dev + req->dev_idx;
if (p_proxy_dev->p_capture == NULL)
{
vbi_proxy_start_acquisition(req->dev_idx, NULL);
opened = TRUE;
}
vbi_fd = vbi_capture_fd(p_proxy_dev->p_capture);
if (vbi_fd != -1)
{
size = vbi_proxy_msg_check_ioctl(p_proxy_dev->vbi_api, request, p_arg_data, &req_perm);
if ((size >= 0) && (size == (int) arg_size))
{
/* FIXME */
if ( (req_perm == FALSE) ||
(req->chn_prio >= p_proxy_dev->chn_prio) ||
REQ_CONTROLS_CHN(req->chn_state.token_state) )
{
/* TODO: possibly update norm, flush channel */
errno = 0;
/* do the actual ioctl */
*p_result = ioctl(vbi_capture_fd(p_proxy_dev->p_capture), request, p_arg_data);
*p_errcode = errno;
result = TRUE;
}
else
dprintf(DBG_MSG, "take_ioctl_req: no permission\n");
}
else
dprintf(DBG_MSG, "take_ioctl_req: invalid ioctl 0x%X or size %d\n", request, arg_size);
}
if (opened)
{
vbi_proxy_stop_acquisition(p_proxy_dev);
}
return result;
}
/* ----------------------------------------------------------------------------
** Close the connection to the client
** - frees all allocated resources
*/
static void vbi_proxyd_close( PROXY_CLNT * req, vbi_bool close_all )
{
close_all = close_all;
if (req->state != REQ_STATE_CLOSED)
{
dprintf(DBG_MSG, "close: fd %d\n", req->io.sock_fd);
vbi_proxy_msg_logger(LOG_INFO, req->io.sock_fd, 0, "closing connection", NULL);
vbi_proxy_msg_close_io(&req->io);
pthread_mutex_lock(&proxy.dev[req->dev_idx].queue_mutex);
while (req->p_sliced != NULL)
{
vbi_proxy_queue_release_sliced(req);
}
pthread_mutex_unlock(&proxy.dev[req->dev_idx].queue_mutex);
req->state = REQ_STATE_CLOSED;
}
}
/* ----------------------------------------------------------------------------
** Initialize a request structure for a new client and add it to the list
*/
static void vbi_proxyd_add_connection( int listen_fd, int dev_idx, vbi_bool isLocal )
{
PROXY_CLNT * req;
PROXY_CLNT * p_walk;
int sock_fd;
isLocal = isLocal;
sock_fd = vbi_proxy_msg_accept_connection(listen_fd);
if (sock_fd != -1)
{
req = calloc(sizeof(*req), 1);
if (req != NULL)
{
dprintf(DBG_MSG, "add_connection: fd %d\n", sock_fd);
req->state = REQ_STATE_WAIT_CON_REQ;
req->io.lastIoTime = time(NULL);
req->io.sock_fd = sock_fd;
req->dev_idx = dev_idx;
req->chn_prio = DEFAULT_CHN_PRIO;
pthread_mutex_lock(&proxy.clnt_mutex);
/* append request to the end of the chain
** note: order is significant for priority in adding services */
if (proxy.p_clnts != NULL)
{
p_walk = proxy.p_clnts;
while (p_walk->p_next != NULL)
p_walk = p_walk->p_next;
p_walk->p_next = req;
}
else
proxy.p_clnts = req;
proxy.clnt_count += 1;
pthread_mutex_unlock(&proxy.clnt_mutex);
}
else
dprintf(DBG_MSG, "add_connection: fd %d: virtual memory exhausted, abort\n", sock_fd);
}
}
/* ----------------------------------------------------------------------------
** Initialize state for a new device
*/
static void vbi_proxyd_add_device( const char * p_dev_name )
{
PROXY_DEV * p_proxy_dev;
if (proxy.dev_count < SRV_MAX_DEVICES)
{
p_proxy_dev = proxy.dev + proxy.dev_count;
p_proxy_dev->p_dev_name = p_dev_name;
p_proxy_dev->p_sock_path = vbi_proxy_msg_get_socket_name(p_dev_name);
p_proxy_dev->pipe_fd = -1;
p_proxy_dev->vbi_fd = -1;
p_proxy_dev->wr_fd = -1;
/* initialize synchonization facilities */
pthread_cond_init(&p_proxy_dev->start_cond, NULL);
pthread_mutex_init(&p_proxy_dev->start_mutex, NULL);
pthread_mutex_init(&p_proxy_dev->queue_mutex, NULL);
proxy.dev_count += 1;
}
}
/* ----------------------------------------------------------------------------
** Transmit one buffer of sliced data
** - returns FALSE upon I/O error
** - also returns a "blocked" flag which is TRUE if not all data could be written
** can be used by the caller to "stuff" the pipe, i.e. write a series of messages
** until the pipe is full
** - XXX optimization required: don't copy the block (required however if client
** doesn't want all services)
*/
static vbi_bool vbi_proxyd_send_sliced( PROXY_CLNT * req, vbi_bool * p_blocked )
{
VBIPROXY_MSG * p_msg;
uint32_t msg_size;
vbi_bool result = FALSE;
int max_lines;
int idx;
if ((req != NULL) && (p_blocked != NULL) && (req->p_sliced != NULL))
{
if (VBI_RAW_SERVICES(req->all_services))
msg_size = VBIPROXY_SLICED_IND_SIZE(0, req->p_sliced->max_lines);
else
msg_size = VBIPROXY_SLICED_IND_SIZE(req->p_sliced->line_count, 0);
msg_size += sizeof(VBIPROXY_MSG_HEADER);
p_msg = malloc(msg_size);
/* filter for services requested by this client */
max_lines = req->vbi_count[0] + req->vbi_count[1];
p_msg->body.sliced_ind.timestamp = req->p_sliced->timestamp;
p_msg->body.sliced_ind.sliced_lines = 0;
p_msg->body.sliced_ind.raw_lines = 0;
/* XXX TODO allow both raw and sliced in the same message */
if (VBI_RAW_SERVICES(req->all_services) == FALSE)
{
for (idx = 0; (idx < req->p_sliced->line_count) && (idx < max_lines); idx++)
{
if ((req->p_sliced->lines[idx].id & req->all_services) != 0)
{
memcpy(p_msg->body.sliced_ind.u.sliced + p_msg->body.sliced_ind.sliced_lines,
req->p_sliced->lines + idx, sizeof(vbi_sliced));
p_msg->body.sliced_ind.sliced_lines += 1;
}
}
}
else
{
if (req->p_sliced->p_raw_data != NULL)
{
memcpy(p_msg->body.sliced_ind.u.raw,
req->p_sliced->p_raw_data,
VBIPROXY_RAW_LINE_SIZE * req->p_sliced->max_lines);
p_msg->body.sliced_ind.raw_lines = req->p_sliced->max_lines;
}
}
msg_size = VBIPROXY_SLICED_IND_SIZE(p_msg->body.sliced_ind.sliced_lines,
p_msg->body.sliced_ind.raw_lines);
vbi_proxy_msg_write(&req->io, MSG_TYPE_SLICED_IND, msg_size, p_msg, TRUE);
if (vbi_proxy_msg_handle_write(&req->io, p_blocked))
{
/* if the last block could not be transmitted fully, quit the loop */
if (req->io.writeLen > 0)
{
dprintf(DBG_CLNT, "send_sliced: socket blocked\n");
*p_blocked = TRUE;
}
result = TRUE;
}
}
else
dprintf(DBG_MSG, "send_sliced: illegal NULL ptr params\n");
return result;
}
/* ----------------------------------------------------------------------------
** Checks the size of a message from client to server
*/
static vbi_bool vbi_proxyd_check_msg( VBIPROXY_MSG * pMsg, vbi_bool * pEndianSwap )
{
VBIPROXY_MSG_HEADER * pHead = &pMsg->head;
VBIPROXY_MSG_BODY * pBody = &pMsg->body;
unsigned int len = pMsg->head.len;
vbi_bool result = FALSE;
switch (pHead->type)
{
case MSG_TYPE_CONNECT_REQ:
if ( (len == sizeof(VBIPROXY_MSG_HEADER) + sizeof(pBody->connect_req)) &&
(memcmp(pBody->connect_req.magics.protocol_magic, VBIPROXY_MAGIC_STR, VBIPROXY_MAGIC_LEN) == 0) )
{
if (pBody->connect_req.magics.endian_magic == VBIPROXY_ENDIAN_MAGIC)
{
*pEndianSwap = FALSE;
result = TRUE;
}
else if (pBody->connect_req.magics.endian_magic == VBIPROXY_ENDIAN_MISMATCH)
{
*pEndianSwap = TRUE;
result = TRUE;
}
}
break;
case MSG_TYPE_SERVICE_REQ:
result = (len == sizeof(VBIPROXY_MSG_HEADER) + sizeof(pBody->service_req));
break;
case MSG_TYPE_CHN_TOKEN_REQ:
result = (len == sizeof(VBIPROXY_MSG_HEADER) + sizeof(pBody->chn_token_req));
break;
case MSG_TYPE_CHN_NOTIFY_REQ:
result = (len == sizeof(VBIPROXY_MSG_HEADER) + sizeof(pBody->chn_notify_req));
break;
case MSG_TYPE_CHN_SUSPEND_REQ:
result = (len == sizeof(VBIPROXY_MSG_HEADER) + sizeof(pBody->chn_notify_req));
break;
case MSG_TYPE_CHN_IOCTL_REQ:
result = (len == sizeof(VBIPROXY_MSG_HEADER) +
VBIPROXY_CHN_IOCTL_REQ_SIZE(pBody->chn_ioctl_req.arg_size));
break;
case MSG_TYPE_CHN_RECLAIM_CNF:
result = (len == sizeof(VBIPROXY_MSG_HEADER) + sizeof(pBody->chn_reclaim_cnf));
break;
case MSG_TYPE_CLOSE_REQ:
result = (len == sizeof(VBIPROXY_MSG_HEADER));
break;
case MSG_TYPE_DAEMON_PID_REQ:
result = ( (len == sizeof(VBIPROXY_MSG_HEADER) + sizeof(pBody->daemon_pid_req)) &&
(memcmp(pBody->daemon_pid_req.magics.protocol_magic, VBIPROXY_MAGIC_STR, VBIPROXY_MAGIC_LEN) == 0) &&
(pBody->daemon_pid_req.magics.endian_magic == VBIPROXY_ENDIAN_MAGIC) );
break;
case MSG_TYPE_DAEMON_PID_CNF:
/* note: this is a daemon reply but accepted here since the daemon sends it to itself */
result = (len == sizeof(VBIPROXY_MSG_HEADER) + sizeof(pBody->daemon_pid_cnf));
break;
case MSG_TYPE_CONNECT_CNF:
case MSG_TYPE_CONNECT_REJ:
case MSG_TYPE_SERVICE_CNF:
case MSG_TYPE_SERVICE_REJ:
case MSG_TYPE_SLICED_IND:
case MSG_TYPE_CHN_TOKEN_CNF:
case MSG_TYPE_CHN_TOKEN_IND:
case MSG_TYPE_CHN_NOTIFY_CNF:
case MSG_TYPE_CHN_SUSPEND_CNF:
case MSG_TYPE_CHN_SUSPEND_REJ:
case MSG_TYPE_CHN_IOCTL_CNF:
case MSG_TYPE_CHN_IOCTL_REJ:
case MSG_TYPE_CHN_RECLAIM_REQ:
case MSG_TYPE_CHN_CHANGE_IND:
dprintf(DBG_MSG, "check_msg: recv client msg %d (%s) at server side\n", pHead->type, vbi_proxy_msg_debug_get_type_str(pHead->type));
result = FALSE;
break;
default:
dprintf(DBG_MSG, "check_msg: unknown msg #%d\n", pHead->type);
result = FALSE;
break;
}
if (result == FALSE)
dprintf(DBG_MSG, "check_msg: illegal msg: len=%d, type=%d (%s)\n", len, pHead->type, vbi_proxy_msg_debug_get_type_str(pHead->type));
return result;
}
/* ----------------------------------------------------------------------------
** Handle message from client
** - note: consistancy checks were already done by the I/O handler
** except for higher level messages (must be checked by acqctl module)
** - implemented as a matrix: "switch" over server state, and "if" cascades
** over message type
** - XXX warning: inbound messages use the same buffer as outbound!
** must have finished evaluating the message before assembling the reply
*/
static vbi_bool vbi_proxyd_take_message( PROXY_CLNT *req, VBIPROXY_MSG * pMsg )
{
VBIPROXY_MSG_BODY * pBody = &pMsg->body;
vbi_bool result = FALSE;
dprintf(DBG_CLNT, "take_message: fd %d: recv msg type %d (%s)\n", req->io.sock_fd, pMsg->head.type, vbi_proxy_msg_debug_get_type_str(pMsg->head.type));
switch (pMsg->head.type)
{
case MSG_TYPE_CONNECT_REQ:
if (req->state == REQ_STATE_WAIT_CON_REQ)
{
if (pBody->connect_req.magics.protocol_compat_version == VBIPROXY_COMPAT_VERSION)
{
dprintf(DBG_MSG, "New client: fd %d: '%s' pid=%d services=0x%X\n", req->io.sock_fd, pBody->connect_req.client_name, pBody->connect_req.pid, pBody->connect_req.services);
/* if provided, update norm hint (used for first client on ancient v4l1 drivers only) */
if (pBody->connect_req.scanning != 0)
proxy.dev[req->dev_idx].scanning = pBody->connect_req.scanning;
/* enable forwarding of captured data (must be set before processing request!) */
req->state = REQ_STATE_FORWARD;
req->buffer_count = pBody->connect_req.buffer_count;
req->client_flags = pBody->connect_req.client_flags; /* XXX TODO (timeout supression) */
/* must make very sure strict is within bounds, because it's used as array index */
if (pBody->connect_req.strict < VBI_MIN_STRICT)
pBody->connect_req.strict = VBI_MIN_STRICT;
else if (pBody->connect_req.strict > VBI_MAX_STRICT)
pBody->connect_req.strict = VBI_MAX_STRICT;
if ( vbi_proxyd_take_service_req(req, pBody->connect_req.services,
pBody->connect_req.strict,
req->msg_buf.body.connect_rej.errorstr) )
{
/* open & service initialization succeeded -> reply with confirm */
vbi_proxy_msg_fill_magics(&req->msg_buf.body.connect_cnf.magics);
strncpy((char *) req->msg_buf.body.connect_cnf.dev_vbi_name,
proxy.dev[req->dev_idx].p_dev_name, VBIPROXY_DEV_NAME_MAX_LENGTH);
req->msg_buf.body.connect_cnf.dev_vbi_name[VBIPROXY_DEV_NAME_MAX_LENGTH - 1] = 0;
req->msg_buf.body.connect_cnf.pid = getpid();
req->msg_buf.body.connect_cnf.vbi_api_revision = proxy.dev[req->dev_idx].vbi_api;
req->msg_buf.body.connect_cnf.daemon_flags = ((opt_debug_level > 0) ? VBI_PROXY_DAEMON_NO_TIMEOUTS : 0);
req->msg_buf.body.connect_cnf.services = req->all_services;
if (proxy.dev[req->dev_idx].p_decoder != NULL)
{
req->msg_buf.body.connect_cnf.dec = *proxy.dev[req->dev_idx].p_decoder;
req->msg_buf.body.connect_cnf.dec.pattern = NULL;
}
else
{ /* acquisition not running: if the request is still considered sucessful
** this is only possible if no services were requested */
memset(&req->msg_buf.body.connect_cnf.dec, 0,
sizeof(req->msg_buf.body.connect_cnf.dec));
req->msg_buf.body.connect_cnf.dec.start[0] = -1;
req->msg_buf.body.connect_cnf.dec.start[1] = -1;
}
vbi_proxy_msg_write(&req->io, MSG_TYPE_CONNECT_CNF,
sizeof(req->msg_buf.body.connect_cnf),
&req->msg_buf, FALSE);
}
else
{
vbi_proxy_msg_fill_magics(&req->msg_buf.body.connect_rej.magics);
vbi_proxy_msg_write(&req->io, MSG_TYPE_CONNECT_REJ,
sizeof(req->msg_buf.body.connect_rej),
&req->msg_buf, FALSE);
/* drop the connection after sending the reject message */
req->state = REQ_STATE_WAIT_CLOSE;
}
}
else
{ /* client uses incompatible protocol version */
vbi_proxy_msg_fill_magics(&req->msg_buf.body.connect_rej.magics);
strncpy((char *) req->msg_buf.body.connect_rej.errorstr,
"Incompatible proxy protocol version", VBIPROXY_ERROR_STR_MAX_LENGTH);
req->msg_buf.body.connect_rej.errorstr[VBIPROXY_ERROR_STR_MAX_LENGTH - 1] = 0;
vbi_proxy_msg_write(&req->io, MSG_TYPE_CONNECT_REJ,
sizeof(req->msg_buf.body.connect_rej),
&req->msg_buf, FALSE);
/* drop the connection */
req->state = REQ_STATE_WAIT_CLOSE;
}
result = TRUE;
}
break;
case MSG_TYPE_DAEMON_PID_REQ:
if (req->state == REQ_STATE_WAIT_CON_REQ)
{ /* this message can be sent instead of a connect request */
vbi_proxy_msg_fill_magics(&req->msg_buf.body.daemon_pid_cnf.magics);
req->msg_buf.body.daemon_pid_cnf.pid = getpid();
vbi_proxy_msg_write(&req->io, MSG_TYPE_DAEMON_PID_CNF,
sizeof(req->msg_buf.body.daemon_pid_cnf),
&req->msg_buf, FALSE);
req->state = REQ_STATE_WAIT_CLOSE;
result = TRUE;
}
break;
case MSG_TYPE_SERVICE_REQ:
if (req->state == REQ_STATE_FORWARD)
{
if (pBody->service_req.reset)
memset(req->services, 0, sizeof(req->services));
dprintf(DBG_MSG, "Update client: fd %d services: 0x%X (was %X)\n", req->io.sock_fd, pBody->service_req.services, req->all_services);
/* flush all buffers in this client's queue */
pthread_mutex_lock(&proxy.dev[req->dev_idx].queue_mutex);
while (req->p_sliced != NULL)
{
vbi_proxy_queue_release_sliced(req);
}
pthread_mutex_unlock(&proxy.dev[req->dev_idx].queue_mutex);
if ( vbi_proxyd_take_service_req(req, pBody->service_req.services,
pBody->service_req.strict,
req->msg_buf.body.service_rej.errorstr) )
{
if (proxy.dev[req->dev_idx].p_decoder != NULL)
{
req->msg_buf.body.service_cnf.dec = *proxy.dev[req->dev_idx].p_decoder;
req->msg_buf.body.connect_cnf.dec.pattern = NULL;
}
else
{ /* acquisition not running: if the request is still considered sucessful
** this is only possible if no services were requested */
memset(&req->msg_buf.body.connect_cnf.dec, 0,
sizeof(req->msg_buf.body.connect_cnf.dec));
req->msg_buf.body.connect_cnf.dec.start[0] = -1;
req->msg_buf.body.connect_cnf.dec.start[1] = -1;
}
req->msg_buf.body.service_cnf.services = req->all_services;
vbi_proxy_msg_write(&req->io, MSG_TYPE_SERVICE_CNF,
sizeof(req->msg_buf.body.service_cnf),
&req->msg_buf, FALSE);
}
else
{
vbi_proxy_msg_write(&req->io, MSG_TYPE_SERVICE_REJ,
sizeof(req->msg_buf.body.service_rej),
&req->msg_buf, FALSE);
}
result = TRUE;
}
break;
case MSG_TYPE_CHN_TOKEN_REQ:
if (req->state == REQ_STATE_FORWARD)
{
dprintf(DBG_MSG, "channel token request: fd %d: prio=%d sub-prio=0x%02X\n", req->io.sock_fd, pBody->chn_token_req.chn_prio, pBody->chn_token_req.chn_profile.sub_prio);
/* update channel description and profile */
req->chn_prio = pBody->chn_token_req.chn_prio;
req->chn_profile = pBody->chn_token_req.chn_profile;
memset(&req->chn_state, 0, sizeof(req->chn_state));
/* XXX TODO: return elements: permitted, non_excl */
memset(&req->msg_buf.body.chn_token_cnf, 0, sizeof(req->msg_buf.body.chn_token_cnf));
vbi_proxyd_channel_update(req->dev_idx, req, FALSE);
if (req->chn_state.token_state == REQ_TOKEN_GRANT)
{
req->chn_state.token_state = REQ_TOKEN_GRANTED;
req->msg_buf.body.chn_token_cnf.token_ind = TRUE;
}
else
{
req->msg_buf.body.chn_token_cnf.token_ind = FALSE;
}
vbi_proxy_msg_write(&req->io, MSG_TYPE_CHN_TOKEN_CNF,
sizeof(req->msg_buf.body.chn_token_cnf),
&req->msg_buf, FALSE);
result = TRUE;
}
break;
case MSG_TYPE_CHN_NOTIFY_REQ:
if (req->state == REQ_STATE_FORWARD)
{
vbi_bool chn_upd = FALSE;
vbi_bool chn_forced = FALSE;
dprintf(DBG_MSG, "channel notify: fd %d: flags=0x%X scanning=%d\n", req->io.sock_fd, pBody->chn_notify_req.notify_flags, pBody->chn_notify_req.scanning);
if (pBody->chn_notify_req.notify_flags & VBI_PROXY_CHN_NORM)
{
/* query (verify) new scanning -> inform all clients (line count changes) */
vbi_proxyd_update_scanning(req->dev_idx, req, pBody->chn_notify_req.scanning);
}
if (pBody->chn_notify_req.notify_flags & VBI_PROXY_CHN_FAIL)
{
// XXX TODO: ignore if client hasn't got the token
// else inform scheduler:
}
if (pBody->chn_notify_req.notify_flags & VBI_PROXY_CHN_FLUSH)
{
vbi_proxyd_channel_flush(req->dev_idx, req);
chn_upd = TRUE;
chn_forced = ! REQ_CONTROLS_CHN(req->chn_state.token_state);
}
if (pBody->chn_notify_req.notify_flags & VBI_PROXY_CHN_RELEASE)
{
if (req->chn_state.token_state != REQ_TOKEN_NONE)
{
req->chn_state.token_state = REQ_TOKEN_NONE;
chn_upd = TRUE;
}
req->chn_profile.is_valid = FALSE;
}
else if (pBody->chn_notify_req.notify_flags & VBI_PROXY_CHN_TOKEN)
{
req->chn_state.token_state = REQ_TOKEN_RETURNED;
chn_upd = TRUE;
}
if (chn_upd)
vbi_proxyd_channel_update(req->dev_idx, req, chn_forced);
memset(&req->msg_buf.body.chn_notify_cnf, 0, sizeof(req->msg_buf.body.chn_notify_cnf));
req->msg_buf.body.chn_notify_cnf.scanning = proxy.dev[req->dev_idx].scanning;
vbi_proxy_msg_write(&req->io, MSG_TYPE_CHN_NOTIFY_CNF,
sizeof(req->msg_buf.body.chn_notify_cnf),
&req->msg_buf, FALSE);
req->chn_status_ind = VBI_PROXY_CHN_NONE;
result = TRUE;
}
break;
case MSG_TYPE_CHN_SUSPEND_REQ:
/* XXX TODO */
vbi_proxy_msg_write(&req->io, MSG_TYPE_CHN_SUSPEND_REJ,
sizeof(req->msg_buf.body.chn_suspend_rej),
&req->msg_buf, FALSE);
result = TRUE;
break;
case MSG_TYPE_CHN_IOCTL_REQ:
if (req->state == REQ_STATE_FORWARD)
{
/* XXX TODO: message may be longer than pre-allocated message buffer */
if ( vbi_proxyd_take_ioctl_req(req, req->msg_buf.body.chn_ioctl_req.request,
req->msg_buf.body.chn_ioctl_req.arg_data,
req->msg_buf.body.chn_ioctl_req.arg_size,
&req->msg_buf.body.chn_ioctl_cnf.result,
&req->msg_buf.body.chn_ioctl_cnf.errcode) )
{
/* note: argsize and arg_data unchanged from req. message */
dprintf(DBG_MSG, "channel control ioctl: fd %d: request=0x%X result=%d errno=%d\n", req->io.sock_fd, req->msg_buf.body.chn_ioctl_req.request, req->msg_buf.body.chn_ioctl_cnf.result, req->msg_buf.body.chn_ioctl_cnf.errcode);
vbi_proxy_msg_write(&req->io, MSG_TYPE_CHN_IOCTL_CNF,
VBIPROXY_CHN_IOCTL_CNF_SIZE(req->msg_buf.body.chn_ioctl_req.arg_size),
&req->msg_buf, FALSE);
}
else
{
vbi_proxy_msg_write(&req->io, MSG_TYPE_CHN_IOCTL_REJ,
sizeof(req->msg_buf.body.chn_ioctl_rej),
&req->msg_buf, FALSE);
}
result = TRUE;
}
break;
case MSG_TYPE_CHN_RECLAIM_CNF:
if (req->chn_state.token_state == REQ_TOKEN_RELEASE)
{
dprintf(DBG_MSG, "channel token reclain confirm: fd %d\n", req->io.sock_fd);
req->chn_state.token_state = REQ_TOKEN_NONE;
vbi_proxyd_channel_update(req->dev_idx, NULL, FALSE);
}
result = TRUE;
break;
case MSG_TYPE_CLOSE_REQ:
/* close the connection */
vbi_proxyd_close(req, FALSE);
result = TRUE;
break;
default:
/* unknown message or client-only message */
dprintf(DBG_MSG, "take_message: protocol error: unexpected message type %d (%s)\n", pMsg->head.type, vbi_proxy_msg_debug_get_type_str(pMsg->head.type));
break;
}
if (result == FALSE)
dprintf(DBG_MSG, "take_message: message type %d (%s, len %d) not expected in state %d\n", pMsg->head.type, vbi_proxy_msg_debug_get_type_str(pMsg->head.type), pMsg->head.len, req->state);
return result;
}
/* ----------------------------------------------------------------------------
** Set bits for all active sockets in fd_set for select syscall
*/
static int vbi_proxyd_get_fd_set( fd_set * rd, fd_set * wr )
{
PROXY_CLNT * req;
PROXY_DEV * p_proxy_dev;
int dev_idx;
int max_fd;
max_fd = 0;
/* add TCP/IP and UNIX-domain listening sockets */
if ((proxy.max_conn == 0) || (proxy.clnt_count < proxy.max_conn))
{
if (proxy.tcp_ip_fd != -1)
{
FD_SET(proxy.tcp_ip_fd, rd);
if (proxy.tcp_ip_fd > max_fd)
max_fd = proxy.tcp_ip_fd;
}
}
/* add listening sockets and VBI devices, if currently opened */
p_proxy_dev = proxy.dev;
for (dev_idx = 0; dev_idx < proxy.dev_count; dev_idx++, p_proxy_dev++)
{
if (p_proxy_dev->pipe_fd != -1)
{
FD_SET(p_proxy_dev->pipe_fd, rd);
if (p_proxy_dev->pipe_fd > max_fd)
max_fd = p_proxy_dev->pipe_fd;
}
if (p_proxy_dev->vbi_fd != -1)
{
FD_SET(p_proxy_dev->vbi_fd, rd);
if (p_proxy_dev->vbi_fd > max_fd)
max_fd = p_proxy_dev->vbi_fd;
}
}
/* add client connection sockets */
for (req = proxy.p_clnts; req != NULL; req = req->p_next)
{
/* read and write are exclusive and write takes precedence over read
** (i.e. read only if no write is pending or if a read operation has already been started)
*/
if ( vbi_proxy_msg_read_idle(&req->io) == FALSE )
{
FD_SET(req->io.sock_fd, rd);
}
else
if ( (vbi_proxy_msg_write_idle(&req->io) == FALSE) ||
(req->p_sliced != NULL) || (req->chn_status_ind != VBI_PROXY_CHN_NONE) )
{
FD_SET(req->io.sock_fd, wr);
}
else
FD_SET(req->io.sock_fd, rd);
if (req->io.sock_fd > max_fd)
max_fd = req->io.sock_fd;
}
return max_fd;
}
/* ----------------------------------------------------------------------------
** Proxy daemon central connection handling
*/
static void vbi_proxyd_handle_client_sockets( fd_set * rd, fd_set * wr )
{
PROXY_CLNT *req;
PROXY_CLNT *prev, *tmp;
vbi_bool io_blocked;
time_t now = time(NULL);
/* handle active connections */
for (req = proxy.p_clnts, prev = NULL; req != NULL; )
{
io_blocked = FALSE;
if ( FD_ISSET(req->io.sock_fd, rd) &&
vbi_proxy_msg_write_idle(&req->io) )
{
/* incoming data -> start reading */
dprintf(DBG_CLNT, "handle_client_sockets: fd %d: receiving data\n", req->io.sock_fd);
if (vbi_proxy_msg_handle_read(&req->io, &io_blocked, TRUE, &req->msg_buf, sizeof(req->msg_buf)))
{
/* check for finished read -> process request */
if ( (req->io.readOff != 0) && (req->io.readOff == req->io.readLen) )
{
if (vbi_proxyd_check_msg(&req->msg_buf, &req->endianSwap))
{
vbi_proxy_msg_close_read(&req->io);
if (vbi_proxyd_take_message(req, &req->msg_buf) == FALSE)
{ /* message no accepted (e.g. wrong state) */
vbi_proxyd_close(req, FALSE);
}
}
else
{ /* message has illegal size or content */
vbi_proxyd_close(req, FALSE);
}
}
}
else
vbi_proxyd_close(req, FALSE);
}
else if ( FD_ISSET(req->io.sock_fd, wr) &&
!vbi_proxy_msg_write_idle(&req->io) )
{
if (vbi_proxy_msg_handle_write(&req->io, &io_blocked) == FALSE)
{
vbi_proxyd_close(req, FALSE);
}
}
if (req->state == REQ_STATE_WAIT_CLOSE)
{ /* close was pending after last write */
vbi_proxyd_close(req, FALSE);
}
else if (vbi_proxy_msg_is_idle(&req->io))
{ /* currently no I/O in progress */
if (req->chn_state.token_state == REQ_TOKEN_RECLAIM)
{
dprintf(DBG_MSG, "channel token reclaim: fd %d\n", req->io.sock_fd);
/* XXX TODO: supervise return of token by timer */
memset(&req->msg_buf, 0, sizeof(req->msg_buf));
vbi_proxy_msg_write(&req->io, MSG_TYPE_CHN_RECLAIM_REQ,
sizeof(req->msg_buf.body.chn_reclaim_req), &req->msg_buf, FALSE);
req->chn_state.token_state = REQ_TOKEN_RELEASE;
}
else if (req->chn_state.token_state == REQ_TOKEN_GRANT)
{
dprintf(DBG_MSG, "channel token grant: fd %d\n", req->io.sock_fd);
memset(&req->msg_buf, 0, sizeof(req->msg_buf));
vbi_proxy_msg_write(&req->io, MSG_TYPE_CHN_TOKEN_IND,
sizeof(req->msg_buf.body.chn_token_ind), &req->msg_buf, FALSE);
req->chn_state.token_state = REQ_TOKEN_GRANTED;
}
else if (req->chn_status_ind)
{ /* send channel change indication */
memset(&req->msg_buf, 0, sizeof(req->msg_buf));
req->msg_buf.body.chn_change_ind.notify_flags = req->chn_status_ind;
req->msg_buf.body.chn_change_ind.scanning = proxy.dev[req->dev_idx].scanning;
vbi_proxy_msg_write(&req->io, MSG_TYPE_CHN_CHANGE_IND,
sizeof(req->msg_buf.body.chn_change_ind), &req->msg_buf, FALSE);
req->chn_status_ind = VBI_PROXY_CHN_NONE;
}
else
{
/* forward data from slicer out queue */
while ((req->p_sliced != NULL) && (io_blocked == FALSE))
{
dprintf(DBG_QU, "handle_sockets: fd %d: forward sliced frame with %d lines (of max %d)\n", req->io.sock_fd, req->p_sliced->line_count, req->p_sliced->max_lines);
if (vbi_proxyd_send_sliced(req, &io_blocked) )
{ /* only in success case because close releases all buffers */
pthread_mutex_lock(&proxy.dev[req->dev_idx].queue_mutex);
vbi_proxy_queue_release_sliced(req);
pthread_mutex_unlock(&proxy.dev[req->dev_idx].queue_mutex);
}
else
{ /* I/O error */
vbi_proxyd_close(req, FALSE);
io_blocked = TRUE;
}
}
}
}
if (req->io.sock_fd == -1)
{ /* free resources (should be redundant, but does no harm) */
vbi_proxyd_close(req, FALSE);
}
else if ( (req->state == REQ_STATE_WAIT_CON_REQ) &&
((req->client_flags & VBI_PROXY_CLIENT_NO_TIMEOUTS) == 0) &&
vbi_proxy_msg_check_timeout(&req->io, now) )
{
dprintf(DBG_MSG, "handle_sockets: fd %d: i/o timeout in state %d (writeLen=%d, readLen=%d, readOff=%d, read msg type=%d: %s)\n", req->io.sock_fd, req->state, req->io.writeLen, req->io.readLen, req->io.readOff, req->msg_buf.head.type, vbi_proxy_msg_debug_get_type_str(req->msg_buf.head.type));
vbi_proxyd_close(req, FALSE);
}
else /* check for protocol or network I/O timeout */
if ( (req->state == REQ_STATE_WAIT_CON_REQ) &&
(now > req->io.lastIoTime + SRV_CONNECT_TIMEOUT) )
{
dprintf(DBG_MSG, "handle_sockets: fd %d: protocol timeout in state %d\n", req->io.sock_fd, req->state);
vbi_proxyd_close(req, FALSE);
}
if (req->state == REQ_STATE_CLOSED)
{ /* connection was closed after network error */
unsigned int clnt_services = req->all_services;
int dev_idx = req->dev_idx;
if (proxy.clnt_count > 0)
proxy.clnt_count -= 1;
dprintf(DBG_MSG, "handle_sockets: closed conn, %d remain\n", proxy.clnt_count);
pthread_mutex_lock(&proxy.clnt_mutex);
/* unlink from list */
tmp = req;
if (prev == NULL)
{
proxy.p_clnts = req->p_next;
req = proxy.p_clnts;
}
else
{
prev->p_next = req->p_next;
req = req->p_next;
}
pthread_mutex_unlock(&proxy.clnt_mutex);
if (clnt_services != 0)
vbi_proxyd_update_services(dev_idx, NULL, 0, NULL);
if (proxy.dev[dev_idx].p_capture != NULL)
vbi_proxyd_channel_update(dev_idx, NULL, FALSE);
free(tmp);
}
else
{
prev = req;
req = req->p_next;
}
}
}
/* ----------------------------------------------------------------------------
** Set maximum number of open client connections
** - note: does not close connections if max count is already exceeded
*/
static void vbi_proxyd_set_max_conn( unsigned int max_conn )
{
proxy.max_conn = max_conn;
}
/* ----------------------------------------------------------------------------
** Set server IP address
** - must be called before the listening sockets are created
*/
static void vbi_proxyd_set_address( vbi_bool do_tcp_ip, const char * pIpStr, const char * pPort )
{
/* free the memory allocated for the old config strings */
if (proxy.listen_ip != NULL)
{
free(proxy.listen_ip);
proxy.listen_ip = NULL;
}
if (proxy.listen_port != NULL)
{
free(proxy.listen_port);
proxy.listen_port = NULL;
}
/* make a copy of the new config strings */
if (pIpStr != NULL)
{
proxy.listen_ip = malloc(strlen(pIpStr) + 1);
strcpy(proxy.listen_ip, pIpStr);
}
if (pPort != NULL)
{
proxy.listen_port = malloc(strlen(pPort) + 1);
strcpy(proxy.listen_port, pPort);
}
proxy.do_tcp_ip = do_tcp_ip;
}
/* ----------------------------------------------------------------------------
** Emulate device permissions on the socket file
*/
static void vbi_proxyd_set_socket_perm( PROXY_DEV * p_proxy_dev )
{
struct stat st;
if (stat(p_proxy_dev->p_dev_name, &st) != -1)
{
if ( (chown(p_proxy_dev->p_sock_path, st.st_uid, st.st_gid) != 0) &&
(chown(p_proxy_dev->p_sock_path, geteuid(), st.st_gid) != 0) )
dprintf(DBG_MSG, "set_perm: failed to set socket owner %d.%d: %s\n", st.st_uid, st.st_gid, strerror(errno));
if (chmod(p_proxy_dev->p_sock_path, st.st_mode) != 0)
dprintf(DBG_MSG, "set_perm: failed to set socket permission %o: %s\n", st.st_mode, strerror(errno));
}
else
dprintf(DBG_MSG, "set_perm: failed to stat VBI device %s\n", p_proxy_dev->p_dev_name);
}
/* ----------------------------------------------------------------------------
** Stop the server, close all connections, free resources
*/
static void vbi_proxyd_destroy( void )
{
PROXY_CLNT *req, *p_next;
int dev_idx;
/* close all devices */
for (dev_idx = 0; dev_idx < proxy.dev_count; dev_idx++)
{
vbi_proxy_stop_acquisition(proxy.dev + dev_idx);
}
/* shutdown all client connections & free resources */
req = proxy.p_clnts;
while (req != NULL)
{
p_next = req->p_next;
vbi_proxyd_close(req, TRUE);
free(req);
req = p_next;
}
proxy.p_clnts = NULL;
proxy.clnt_count = 0;
/* close listening sockets */
for (dev_idx = 0; dev_idx < proxy.dev_count; dev_idx++)
{
if (proxy.dev[dev_idx].pipe_fd != -1)
{
vbi_proxy_msg_stop_listen(FALSE, proxy.dev[dev_idx].pipe_fd, proxy.dev[dev_idx].p_sock_path);
}
if (proxy.dev[dev_idx].p_sock_path != NULL)
free(proxy.dev[dev_idx].p_sock_path);
pthread_cond_destroy(&proxy.dev[dev_idx].start_cond);
pthread_mutex_destroy(&proxy.dev[dev_idx].start_mutex);
pthread_mutex_destroy(&proxy.dev[dev_idx].queue_mutex);
}
if (proxy.tcp_ip_fd != -1)
{
vbi_proxy_msg_stop_listen(TRUE, proxy.tcp_ip_fd, NULL);
}
vbi_proxy_msg_logger(LOG_NOTICE, -1, 0, "shutting down", NULL);
/* free the memory allocated for the config strings */
vbi_proxyd_set_address(FALSE, NULL, NULL);
vbi_proxy_msg_set_logging(FALSE, 0, 0, NULL);
}
/* ---------------------------------------------------------------------------
** Signal handler to process alarm
*/
static void vbi_proxyd_alarm_handler( int sigval )
{
sigval = sigval;
proxy.chn_sched_alarm = TRUE;
}
/* ---------------------------------------------------------------------------
** Signal handler to catch deadly signals
*/
static void vbi_proxyd_signal_handler( int sigval )
{
char str_buf[10];
sprintf(str_buf, "%d", sigval);
vbi_proxy_msg_logger(LOG_NOTICE, -1, 0, "terminated by signal", str_buf, NULL);
proxy.should_exit = TRUE;
}
/* ----------------------------------------------------------------------------
** Initialize DB server
*/
static void vbi_proxyd_init( void )
{
struct sigaction act;
if (opt_no_detach == FALSE)
{
if (fork() > 0)
exit(0);
close(0);
open("/dev/null", O_RDONLY, 0);
if (opt_debug_level == 0)
{
close(1);
open("/dev/null", O_WRONLY, 0);
close(2);
dup(1);
setsid();
}
}
/* ignore broken pipes (handled by select/read) */
memset(&act, 0, sizeof(act));
act.sa_handler = SIG_IGN;
sigaction(SIGPIPE, &act, NULL);
/* handle alarm timers (for channel change scheduling) */
memset(&act, 0, sizeof(act));
act.sa_handler = vbi_proxyd_alarm_handler;
sigaction(SIGALRM, &act, NULL);
/* catch deadly signals for a clean shutdown (remove socket file) */
memset(&act, 0, sizeof(act));
sigemptyset(&act.sa_mask);
sigaddset(&act.sa_mask, SIGINT);
sigaddset(&act.sa_mask, SIGTERM);
sigaddset(&act.sa_mask, SIGHUP);
act.sa_handler = vbi_proxyd_signal_handler;
act.sa_flags = SA_ONESHOT;
sigaction(SIGINT, &act, NULL);
sigaction(SIGTERM, &act, NULL);
sigaction(SIGHUP, &act, NULL);
}
/* ----------------------------------------------------------------------------
** Set up sockets for listening to client requests
*/
static vbi_bool vbi_proxyd_listen( void )
{
PROXY_DEV * p_proxy_dev;
int dev_idx;
vbi_bool result;
result = TRUE;
p_proxy_dev = proxy.dev;
for (dev_idx = 0; (dev_idx < proxy.dev_count) && result; dev_idx++, p_proxy_dev++)
{
if (vbi_proxy_msg_check_connect(p_proxy_dev->p_sock_path) == FALSE)
{
/* create named socket in /tmp for listening to local clients */
p_proxy_dev->pipe_fd = vbi_proxy_msg_listen_socket(FALSE, NULL, p_proxy_dev->p_sock_path);
if (p_proxy_dev->pipe_fd != -1)
{
/* copy VBI device permissions to the listening socket */
vbi_proxyd_set_socket_perm(p_proxy_dev);
vbi_proxy_msg_logger(LOG_NOTICE, -1, 0, "started listening on local socket for ", p_proxy_dev->p_dev_name, NULL);
}
else
result = FALSE;
}
else
{
vbi_proxy_msg_logger(LOG_ERR, -1, 0, "a proxy daemon is already running for ", p_proxy_dev->p_dev_name, NULL);
result = FALSE;
}
}
if (proxy.do_tcp_ip && result)
{
/* create TCP/IP socket */
proxy.tcp_ip_fd = vbi_proxy_msg_listen_socket(TRUE, proxy.listen_ip, proxy.listen_port);
if (proxy.tcp_ip_fd != -1)
{
vbi_proxy_msg_logger(LOG_NOTICE, -1, 0, "started listening on TCP/IP socket", NULL);
}
else
result = FALSE;
}
return result;
}
/* ---------------------------------------------------------------------------
** Proxy daemon main loop
*/
static void vbi_proxyd_main_loop( void )
{
fd_set rd, wr;
int max_fd;
int sel_cnt;
int dev_idx;
while (proxy.should_exit == FALSE)
{
FD_ZERO(&rd);
FD_ZERO(&wr);
max_fd = vbi_proxyd_get_fd_set(&rd, &wr);
/* wait for new clients, client messages or VBI device data (indefinitly) */
sel_cnt = select(((max_fd > 0) ? (max_fd + 1) : 0), &rd, &wr, NULL, NULL);
if (sel_cnt != -1)
{
if (sel_cnt > 0)
dprintf(DBG_CLNT, "main_loop: select: events on %d sockets\n", sel_cnt);
for (dev_idx = 0; dev_idx < proxy.dev_count; dev_idx++)
{
/* accept new client connections on device socket */
if ((proxy.dev[dev_idx].pipe_fd != -1) && (FD_ISSET(proxy.dev[dev_idx].pipe_fd, &rd)))
{
vbi_proxyd_add_connection(proxy.dev[dev_idx].pipe_fd, dev_idx, TRUE);
}
/* check for incoming data on VBI device */
if ((proxy.dev[dev_idx].vbi_fd != -1) && (FD_ISSET(proxy.dev[dev_idx].vbi_fd, &rd)))
{
if (proxy.dev[dev_idx].use_thread == FALSE)
{
vbi_proxyd_forward_data(dev_idx);
}
else
{ /* message from acq thread slave:
** sent data is only a trigger to wake up from select() above -> discard it */
char dummy_buf[100];
int rd_count;
do {
rd_count = read(proxy.dev[dev_idx].vbi_fd, dummy_buf, sizeof(dummy_buf));
dprintf(DBG_QU, "main_loop: read from acq thread dev #%d pipe fd %d: %d errno=%d\n", dev_idx, proxy.dev[dev_idx].vbi_fd, sel_cnt, errno);
} while (rd_count == 100);
}
}
}
/* accept new TCP/IP connections */
if ((proxy.tcp_ip_fd != -1) && (FD_ISSET(proxy.tcp_ip_fd, &rd)))
{
vbi_proxyd_add_connection(proxy.tcp_ip_fd, 0, FALSE);
}
/* send queued data or process incoming messages from clients */
vbi_proxyd_handle_client_sockets(&rd, &wr);
if (proxy.chn_sched_alarm)
{
proxy.chn_sched_alarm = FALSE;
vbi_proxyd_channel_timer();
}
}
else
{
if (errno != EINTR)
{ /* select syscall failed */
dprintf(DBG_MSG, "main_loop: select with max. fd %d: %s\n", max_fd, strerror(errno));
sleep(1);
}
}
}
}
/* ---------------------------------------------------------------------------
** Kill-daemon only: exit upon timeout in I/O to daemon
*/
static void vbi_proxyd_kill_timeout( int sigval )
{
sigval = sigval;
/* note: cannot use printf in signal handler without risking deadlock */
exit(2);
}
/* ---------------------------------------------------------------------------
** Connect to running daemon, query its pid and kill it, exit.
*/
static void vbi_proxyd_kill_daemon( void )
{
struct sigaction act;
char * p_errorstr;
char * p_srv_port;
vbi_bool io_blocked;
VBIPROXY_MSG msg_buf;
VBIPROXY_MSG_STATE io;
memset(&io, 0, sizeof(io));
io.sock_fd = -1;
p_errorstr = NULL;
p_srv_port = vbi_proxy_msg_get_socket_name(proxy.dev[0].p_dev_name);
if (p_srv_port == NULL)
goto failure;
io.sock_fd = vbi_proxy_msg_connect_to_server(FALSE, NULL, p_srv_port, &p_errorstr);
if (io.sock_fd == -1)
goto failure;
memset(&act, 0, sizeof(act));
act.sa_handler = vbi_proxyd_kill_timeout;
sigaction(SIGALRM, &act, NULL);
/* use non-blocking I/O and alarm timer for timeout handling (simpler than select) */
alarm(4);
fcntl(io.sock_fd, F_SETFL, 0);
/* wait for socket to reach connected state */
if (vbi_proxy_msg_finish_connect(io.sock_fd, &p_errorstr) == FALSE)
goto failure;
/* write service request parameters */
vbi_proxy_msg_fill_magics(&msg_buf.body.daemon_pid_req.magics);
/* send the pid request message to the proxy server */
vbi_proxy_msg_write(&io, MSG_TYPE_DAEMON_PID_REQ, sizeof(msg_buf.body.daemon_pid_req),
&msg_buf, FALSE);
if (vbi_proxy_msg_handle_write(&io, &io_blocked) == FALSE)
goto io_error;
if (vbi_proxy_msg_handle_read(&io, &io_blocked, TRUE, &msg_buf, sizeof(msg_buf)) == FALSE)
goto io_error;
if ( (vbi_proxyd_check_msg(&msg_buf, FALSE) == FALSE) ||
(msg_buf.head.type != MSG_TYPE_DAEMON_PID_CNF) )
{
asprintf(&p_errorstr, "%s", "Proxy protocol error");
goto failure;
}
if (kill(msg_buf.body.daemon_pid_cnf.pid, SIGTERM) != 0)
{
asprintf(&p_errorstr, "Failed to kill the daemon process (pid %d): %s",
msg_buf.body.daemon_pid_cnf.pid, strerror(errno));
goto failure;
}
dprintf(DBG_MSG, "Killed daemon process %d.\n", msg_buf.body.daemon_pid_cnf.pid);
close(io.sock_fd);
exit(0);
io_error:
if (p_errorstr == NULL)
asprintf(&p_errorstr, "Lost connection to proxy (I/O error)");
failure:
/* failed to establish a connection to the server */
if (io.sock_fd != -1)
close(io.sock_fd);
if (p_errorstr != NULL)
{
fprintf(stderr, "%s\n", p_errorstr);
free(p_errorstr);
}
exit(1);
}
/* ---------------------------------------------------------------------------
** Print usage and exit
*/
static void proxy_usage_exit( const char *argv0, const char *argvn, const char * reason )
{
fprintf(stderr, "%s: %s: %s\n"
"Options:\n"
" -dev <path> : VBI device path (allowed repeatedly)\n"
" -buffers <count> : number of raw capture buffers (v4l2 only)\n"
" -nodetach : process remains connected to tty\n"
" -kill : kill running daemon process, then exit\n"
" -debug <level> : enable debug output: 1=warnings, 2=all\n"
" -syslog <level> : enable syslog output\n"
" -loglevel <level> : log file level\n"
" -logfile <path> : log file name\n"
" -maxclients <count> : max. number of clients\n"
" -help : this message\n",
argv0, reason, argvn);
exit(1);
}
/* ---------------------------------------------------------------------------
** Parse numeric value in command line options
*/
static vbi_bool proxy_parse_argv_numeric( char * p_number, int * p_value )
{
char * p_num_end;
if (*p_number != 0)
{
*p_value = strtol(p_number, &p_num_end, 0);
return (*p_num_end == 0);
}
else
return FALSE;
}
/* ---------------------------------------------------------------------------
** Parse command line options
*/
static void vbi_proxyd_parse_argv( int argc, char * argv[] )
{
struct stat stb;
int arg_val;
int arg_idx = 1;
while (arg_idx < argc)
{
if (strcasecmp(argv[arg_idx], "-dev") == 0)
{
if (arg_idx + 1 < argc)
{
if (proxy.dev_count >= SRV_MAX_DEVICES)
proxy_usage_exit(argv[0], argv[arg_idx], "too many device paths");
if (stat(argv[arg_idx + 1], &stb) == -1)
proxy_usage_exit(argv[0], argv[arg_idx +1], strerror(errno));
if (!S_ISCHR(stb.st_mode))
proxy_usage_exit(argv[0], argv[arg_idx +1], "not a character device");
if (access(argv[arg_idx + 1], R_OK | W_OK) == -1)
proxy_usage_exit(argv[0], argv[arg_idx +1], "failed to access device");
vbi_proxyd_add_device(argv[arg_idx + 1]);
arg_idx += 2;
}
else
proxy_usage_exit(argv[0], argv[arg_idx], "missing mode keyword after");
}
else if (strcasecmp(argv[arg_idx], "-buffers") == 0)
{
if ((arg_idx + 1 < argc) && proxy_parse_argv_numeric(argv[arg_idx + 1], &arg_val))
{
opt_buffer_count = arg_val;
if ((opt_buffer_count < 1) || (opt_buffer_count > VBI_MAX_BUFFER_COUNT))
proxy_usage_exit(argv[0], argv[arg_idx], "buffer count unsupported");
arg_idx += 2;
}
else
proxy_usage_exit(argv[0], argv[arg_idx], "missing buffer count after");
}
else if (strcasecmp(argv[arg_idx], "-debug") == 0)
{
if ((arg_idx + 1 < argc) && proxy_parse_argv_numeric(argv[arg_idx + 1], &arg_val))
{
opt_debug_level = arg_val;
if (opt_debug_level > 0)
opt_debug_level |= DBG_MSG;
arg_idx += 2;
}
else
proxy_usage_exit(argv[0], argv[arg_idx], "missing debug level after");
}
else if (strcasecmp(argv[arg_idx], "-nodetach") == 0)
{
opt_no_detach = TRUE;
arg_idx += 1;
}
else if (strcasecmp(argv[arg_idx], "-kill") == 0)
{
opt_kill_daemon = TRUE;
arg_idx += 1;
}
else if (strcasecmp(argv[arg_idx], "-syslog") == 0)
{
if ((arg_idx + 1 < argc) && proxy_parse_argv_numeric(argv[arg_idx + 1], &arg_val))
{
opt_syslog_level = arg_val;
arg_idx += 2;
}
else
proxy_usage_exit(argv[0], argv[arg_idx], "missing log level after");
}
else if (strcasecmp(argv[arg_idx], "-loglevel") == 0)
{
if ((arg_idx + 1 < argc) && proxy_parse_argv_numeric(argv[arg_idx + 1], &arg_val))
{
opt_log_level = arg_val;
arg_idx += 2;
}
else
proxy_usage_exit(argv[0], argv[arg_idx], "missing log level after");
}
else if (strcasecmp(argv[arg_idx], "-logfile") == 0)
{
if (arg_idx + 1 < argc)
{
p_opt_log_name = argv[arg_idx + 1];
arg_idx += 2;
}
else
proxy_usage_exit(argv[0], argv[arg_idx], "missing mode keyword after");
}
else if (strcasecmp(argv[arg_idx], "-maxclients") == 0)
{
if ((arg_idx + 1 < argc) && proxy_parse_argv_numeric(argv[arg_idx + 1], &arg_val))
{
opt_max_clients = arg_val;
arg_idx += 2;
}
else
proxy_usage_exit(argv[0], argv[arg_idx], "missing log level after");
}
else if (strcasecmp(argv[arg_idx], "-help") == 0)
{
char versbuf[50];
sprintf(versbuf, "(version %d.%d.%d)", VBIPROXY_VERSION>>16, (VBIPROXY_VERSION>>8)&0xff, VBIPROXY_VERSION&0xff);
proxy_usage_exit(argv[0], versbuf, "the following options are available");
}
else
proxy_usage_exit(argv[0], argv[arg_idx], "unknown option or argument");
}
/* if no device was given, use default path */
if (proxy.dev_count == 0)
{
/* use devfs path if subdirectory exists */
if (access(DEFAULT_VBI_DEVFS_PATH, R_OK | W_OK) == 0)
vbi_proxyd_add_device(DEFAULT_VBI_DEVFS_PATH);
else
vbi_proxyd_add_device(DEFAULT_VBI_DEV_PATH);
}
}
/* ----------------------------------------------------------------------------
** Proxy daemon entry point
*/
int main( int argc, char ** argv )
{
/* initialize state struct */
memset(&proxy, 0, sizeof(proxy));
proxy.tcp_ip_fd = -1;
pthread_mutex_init(&proxy.clnt_mutex, NULL);
vbi_proxyd_parse_argv(argc, argv);
vbi_proxy_msg_set_debug_level( (opt_debug_level == 0) ? 0 : ((opt_debug_level & DBG_CLNT) ? 2 : 1) );
if (opt_kill_daemon)
{
vbi_proxyd_kill_daemon();
exit(0);
}
dprintf(DBG_MSG, "proxy daemon starting, rev.\n%s\n", rcsid);
vbi_proxyd_init();
vbi_proxyd_set_max_conn(opt_max_clients);
vbi_proxyd_set_address(FALSE, NULL, NULL);
vbi_proxy_msg_set_logging(opt_debug_level > 0, opt_syslog_level, opt_log_level, p_opt_log_name);
/* start listening for client connections */
if (vbi_proxyd_listen())
{
vbi_proxyd_main_loop();
}
vbi_proxyd_destroy();
pthread_mutex_destroy(&proxy.clnt_mutex);
exit(0);
return 0;
}
#endif /* ENABLE_PROXY */
syntax highlighted by Code2HTML, v. 0.9.1