///////////////////////////////////////////////////////////////////////////////
// MQ4CPP - Message queuing for C++
// Copyright (C) 2004-2007  Riccardo Pompeo (Italy)
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
//

#ifndef __MESSAGEPROXY__

#define __MESSAGEPROXY__


#include "Socket.h"

#include "MessageQueue.h"

#include "Vector.h"

#include "Timer.h"

#include "Encription.h"

#include "Compression.h"

#include "Properties.h"


#ifdef WIN32

#include <windows.h>

#else

#include <pthread.h>

#include <sys/errno.h>

#endif


#include <vector>


#define MESSAGEPROXYHEADER "MessageProxy("


enum NetworkMessages
{
	MQ_PROXY_MESSAGE=1,
	MQ_PROXY_LOOKUP_REQUEST,
	MQ_PROXY_LOOKUP_REPLY,
	MQ_PROXY_PING_REQUEST,
	MQ_PROXY_PING_REPLY,	
	MQ_PROXY_UNSOLICITED,
	MQ_PROXY_BROADCAST	
};

class NetworkMessage : public Message
{
public:	
	typedef struct NetwokMessageStruct
	{
		MQHANDLE sender;
		unsigned short seqnum; 
		unsigned short topiclen;
		unsigned short buflen;	
	} NetworkMessageHeader;	
	
protected:
	string itsTopic;
	string itsBuffer;
	MQHANDLE itsTarget;	
	MQHANDLE itsRemoteSender;
	unsigned short itsSeqNum;
	bool itsUnsolicitedFlag;
	bool itsBroadcastFlag;
	
public:
	NetworkMessage(NetworkMessage& o);
	NetworkMessage(char* theBuffer, unsigned short theLen); 
	NetworkMessage(string theBuffer);
	virtual ~NetworkMessage() {};
	virtual Message* clone() { return new NetworkMessage(*this); };
	
	string getTopic() { return itsTopic; };
	void setTopic(string theTopic) { itsTopic=theTopic; };
	void setTopic(const char* theTopic) { itsTopic=theTopic; };  
	void setTopic(char* theTopic,int len) { itsTopic.assign(theTopic,len); };
	void setRemoteSender(MQHANDLE theHandle) { itsRemoteSender=theHandle; }; 
	MQHANDLE getRemoteSender() { return itsRemoteSender; }; 
	void setTarget(MQHANDLE theHandle) { itsTarget=theHandle; };
	MQHANDLE getTarget() { return itsTarget; };
	void setSequenceNumber(unsigned short theSequence) { itsSeqNum=theSequence; }; 
	unsigned short getSequenceNumber() { return itsSeqNum; }; 
	bool isUnsolicited() { return itsUnsolicitedFlag; };
	void setUnsolicited() { itsUnsolicitedFlag=true; };
	bool isBroadcasting() { return itsBroadcastFlag; };
	void setBroadcasting() { itsBroadcastFlag=true; };
	string get() { return itsBuffer; };
	virtual string toString(); 
	virtual void toStream(ostream& theStream);
	virtual void code(Encription* theEncr);
	virtual void decode(Encription* theEncr);
	virtual void inflate(Compression* theCompr);
	virtual void deflate(Compression* theCompr);
};

class PingRequestMessage : public Message
{
public:	
	typedef struct PingRequestStruct
	{
		MQHANDLE sender;
	} PingRequest;	
			
public:
	PingRequestMessage(MQHANDLE theSenderID);
	virtual ~PingRequestMessage() {};
	virtual string toString();
};

class PingReplyMessage : public Message
{
protected:
	MQHANDLE itsTarget;
	
public:
	PingReplyMessage(MQHANDLE theTarget);
	virtual ~PingReplyMessage() {};
	virtual MQHANDLE getTarget() { return itsTarget; };
};

class LookupRequestMessage : public Message
{
public:	
	typedef struct LookupRequestStruct
	{
		MQHANDLE sender;
		unsigned short namelen;	
	} LookupRequest;	
	
protected:
	string itsNameToLookup;
		
public:
	LookupRequestMessage(const char* theName,MQHANDLE theSenderID);
	virtual ~LookupRequestMessage() {};
	virtual string toString();
	virtual string getTarget () { return itsNameToLookup; };
};

class LookupReplyMessage : public Message
{
public:
	typedef struct LookupStruct
	{
		bool fail;
		MQHANDLE handle;
	} LookupReply;	

protected:
	
	LookupReply itsBuffer;
	MQHANDLE itsTarget;
	
public:
	LookupReplyMessage();
	LookupReplyMessage(MQHANDLE theTarget);
	LookupReplyMessage(MQHANDLE theTarget,MQHANDLE theHandle);
	LookupReplyMessage(LookupReply& theReply);
	virtual ~LookupReplyMessage() {};
	virtual string toString();
	virtual bool isFailed() { return itsBuffer.fail; };
	virtual MQHANDLE getHandle() { return itsBuffer.handle; };
	virtual MQHANDLE getTarget() { return itsTarget; };
};

class Observer : public MessageQueue
{
protected:	
	Encription* itsEncription;
	Compression* itsCompression;
	vector<string> itsTopicList;
	MQHANDLE itsLastMessageProxy;
	string itsLastReceivedTopic;
	
public:
	Observer(const char* theName);
	virtual ~Observer();
	virtual void setEncription(Encription* theEncr);
	virtual void setCompression(Compression* theCompr);

protected:
	virtual void post(MQHANDLE theTarget,NetworkMessage* theMessage);
	virtual void publish(string theTopic,string theMessage); // ++ v1.5
	virtual void subscribe(string theTopic) { itsTopicList.push_back(theTopic); }; // ++ v1.5

	virtual void onMessage(Message* theMessage);
	virtual void onWakeup(Wakeup* theMessage) {};
	virtual void onPing(PingReplyMessage* theMessage) {};
	virtual void onLookup(LookupReplyMessage* theMessage) {};
	virtual void onBroadcast(NetworkMessage* theMessage) {}; // ++ v1.5
	virtual void onUnsolicited(NetworkMessage* theMessage) {};
	virtual NetworkMessage* onRequest(NetworkMessage* theMessage) { return NULL; };
	virtual void onLocal(Message* theMessage) {};
	
	virtual void decodeProperties(string& theBuffer,ListProperty& theProperties);
	virtual void decodeProperties(char* theBuffer,unsigned long theLen,ListProperty& theProperties);	
	virtual void encodeProperties(ListProperty& theProperties,string& theBuffer);
};

class MessageProxy : public MessageQueue
{
protected:
	Socket* itsSocket;
	
	typedef struct PacketHeaderStruct
	{
		unsigned short sync;
		unsigned short type;		
		unsigned short target;
		unsigned short msglen;
	} header;
	
#ifdef WIN32	

	unsigned long* m_hThreadRx;
#else

	pthread_t m_hThreadRx;
#endif

	
public:
	MessageProxy(const char* theName);
	MessageProxy(const char* theName,Socket* theSocket);
	virtual ~MessageProxy();
	virtual void receive();
	virtual string getConnectionAddress(MQHANDLE theCaller,int& thePort);

protected:
	virtual void onMessage(Message* theMessage);
};

class MessageProxyFactory : public Thread, protected SocketServer
{
protected:
	unsigned long itsCount;
	unsigned itsPort;
	static Thread itsMutex;  // ++ v1.5

public:
	MessageProxyFactory(const char* theFactoryName,int theSocket);
	~MessageProxyFactory();
	static void ping(const char* theHost, unsigned thePort,MessageQueue* theSourceQueue);
	static void lookupAt(const char* theHost, unsigned thePort,
						 const char* theRemoteQueueName,MessageQueue* theSourceQueue);
	static string getUniqueNetID();
	
protected:
	void run();
	static void post(const char* theHost, unsigned thePort,Message* theMessage,MQHANDLE theSender);
	virtual void onNewConnection(string theAddress,unsigned short thePort) {};
};

extern "C" {
#ifdef WIN32	

	unsigned int _mp_thread_proc(void* param);
#else

	void* _mp_thread_proc(void* param);
#endif

}

#endif



syntax highlighted by Code2HTML, v. 0.9.1