Go to the documentation of this file.00001 #ifndef _BULKDATA_DISTRIBUTER_H
00002 #define _BULKDATA_DISTRIBUTER_H
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 #include "bulkDataSender.h"
00033 #include "bulkDataReceiver.h"
00034 
00035 #include <Pair_T.h>
00036 
00037 #include <acsQoS.h>
00038 
00042 namespace AcsBulkdata
00043 {  
00044 
00045 
00049 template <class T>
00050 class ACS_Read_Guard
00051 {
00052 public:
00053         ACS_Read_Guard(T &l, int &ret) :
00054                 lock_(&l)
00055         {
00056                 ret_ = this->lock_->tryacquire_read();
00057                 ret = ret_;
00058         }
00059 
00060         ~ACS_Read_Guard()
00061         {
00062                 release();
00063         }
00064 
00065         void release()
00066         {
00067                 if (ret_==0) this->lock_->release();
00068         }
00069 protected:
00070         int ret_;
00071         T *lock_;
00072 };
00073 
00074 
00075 template <class T>
00076 class ACS_Write_Guard
00077 {
00078 public:
00079         ACS_Write_Guard(T &l, int &ret) :
00080                 lock_(&l)
00081         {
00082                 ret = 0;
00083                 ownership_ = false;
00084                 ret_ = this->lock_->acquire_write();
00085                 ret = ret_;
00086         }
00087 
00088         ACS_Write_Guard(T *l, int &ret) :
00089                         lock_(l)
00090                 {
00091                         ownership_ = true;
00092                         ret_ = this->lock_->acquire_write();
00093                         ret = ret_;
00094                 }
00095 
00096         void take_ownership()
00097         {
00098                 ownership_ = true;
00099         }
00100 
00101         ~ACS_Write_Guard()
00102         {
00103                 release();
00104                 if (ownership_)
00105                 {
00106                         delete lock_;
00107                 }
00108         }
00109 
00110         void release()
00111         {
00112 
00113                 if (ret_==0)
00114                 {
00115                         this->lock_->release();
00116                 }
00117         }
00118 protected:
00119         bool ownership_;
00120         int ret_;
00121         T *lock_;
00122 };
00123 
00124 
00125 typedef struct RecvDataStruct
00126                         {
00127                         bulkdata::BulkDataReceiver_ptr receiver;
00128                         ACE_RW_Thread_Mutex *mutex;
00129                         } RecvData;
00130 
00131 
00150     
00151     template<class TReceiverCallback, class TSenderCallback>
00152     class BulkDataDistributerNotifCb;
00153     
00154     template<class TReceiverCallback, class TSenderCallback>
00155     class BulkDataDistributer
00156     { 
00157 
00158         enum Flow_Status
00159         {
00160             FLOW_AVAILABLE,
00161             FLOW_NOT_AVAILABLE
00162         };
00163 
00164     typedef ACE_Pair< RecvData , AcsBulkdata::BulkDataSender<TSenderCallback> *> Sender_Map_Pair;
00165 
00166 
00167 
00168         
00169 
00170 
00171 
00172         typedef ACE_Hash_Map_Manager <ACE_CString, Sender_Map_Pair, ACE_Null_Mutex>  Sender_Map;
00173         typedef ACE_Hash_Map_Entry <ACE_CString, Sender_Map_Pair > Sender_Map_Entry;
00174         typedef ACE_Hash_Map_Iterator <ACE_CString, Sender_Map_Pair ,ACE_Null_Mutex>  Sender_Map_Iterator;
00175 
00176         typedef ACE_Hash_Map_Manager <CORBA::ULong, Flow_Status, ACE_Null_Mutex> Flows_Status_Map;
00177         typedef ACE_Hash_Map_Entry <CORBA::ULong, Flow_Status> Flows_Status_Map_Entry;
00178         typedef ACE_Hash_Map_Iterator <CORBA::ULong, Flow_Status, ACE_Null_Mutex> Flows_Status_Map_Iterator;
00179 
00180         typedef ACE_Hash_Map_Manager <ACE_CString, CORBA::ULong, ACE_Null_Mutex> Recv_Status_Map;
00181         typedef ACE_Hash_Map_Entry <ACE_CString, CORBA::ULong> Recv_Status_Map_Entry;
00182         typedef ACE_Hash_Map_Iterator <ACE_CString, CORBA::ULong, ACE_Null_Mutex> Recv_Status_Map_Iterator;
00183 
00184 
00185 
00186       public:
00187       
00191         BulkDataDistributer();
00192       
00196         virtual ~BulkDataDistributer();
00197 
00201         virtual void multiConnect(bulkdata::BulkDataReceiverConfig *recvConfig_p, const char *fepsConfig, const ACE_CString& receiverName);
00202         
00206         virtual void multiDisconnect(const ACE_CString& receiverName);
00207 
00208         virtual BulkDataReceiver<TReceiverCallback> *getReceiver() 
00209             {
00210                 return &receiver_m;
00211             }
00212 
00213         virtual Sender_Map *getSenderMap() 
00214             {
00215                 return &senderMap_m;
00216             }
00217 
00218         virtual bool isRecvConnected (const ACE_CString& receiverName);
00219 
00220         virtual bool isSenderConnected (const ACE_CString& receiverName);
00221 
00222         virtual bool isReceiverConnected (const ACE_CString& receiverName);
00223 
00224         virtual void distSendStart (ACE_CString& flowName, CORBA::ULong flowNumber);
00225 
00226         virtual int distSendDataHsk (ACE_CString& flowName, ACE_Message_Block * frame_p, CORBA::ULong flowNumber);
00227 
00228         virtual int distSendData (ACE_CString& flowName, ACE_Message_Block * frame_p, CORBA::ULong flowNumber);
00229 
00230         virtual CORBA::Boolean distSendStopTimeout (ACE_CString& flowName, CORBA::ULong flowNumber);
00231 
00232         virtual void distSendStop (ACE_CString& flowName, CORBA::ULong flowNumber);
00233 
00234         void setTimeout (CORBA::ULong user_timeout) 
00235             { timeout_m = user_timeout; }
00236 
00237         void setContSvc (maci::ContainerServices * services_p)
00238             {  contSvc_p = services_p; }  
00239 
00243         void subscribeNotification(ACS::CBvoid_ptr notifCb);
00244 
00248         void notifySender(const ACSErr::Completion& comp);
00249 
00250         bulkdata::Connection getSenderConnectionState()
00251             {
00252                 return getReceiver()->getSenderConnectionState();
00253             }
00254 
00255       private:
00256 
00257         CORBA::Boolean getFlowReceiverStatus(const ACE_CString& receiverName, CORBA::ULong flowNumber);
00258 
00259         CORBA::Boolean isFlowReceiverAvailable(const ACE_CString& receiverName, CORBA::ULong flowNumber);
00260 
00261         BulkDataSender<TSenderCallback> *sender_p;
00262         
00263         BulkDataReceiver<TReceiverCallback> receiver_m;
00264 
00265         Sender_Map senderMap_m;
00266 
00267         Recv_Status_Map recvStatusMap_m;
00268         Flows_Status_Map flowsStatusMap_m;
00269 
00270         CORBA::ULong timeout_m;
00271         CORBA::ULong numberOfFlows;
00272         CORBA::ULong offset;
00273 
00274         maci::ContainerServices *contSvc_p;
00275 
00276         BulkDataDistributerNotifCb<TReceiverCallback, TSenderCallback> *distributerNotifCb_p;
00277 
00278         ACS::CBvoid_ptr locNotifCb_p;
00279     };
00280 
00281 
00282 
00283     template<class TReceiverCallback, class TSenderCallback = BulkDataSenderDefaultCallback>
00284     class BulkDataDistributerNotifCb: public virtual POA_ACS::CBvoid
00285     {
00286 
00287       public:
00288 
00289         BulkDataDistributerNotifCb(BulkDataDistributer<TReceiverCallback, TSenderCallback> *distr)
00290             {
00291                 ACS_TRACE("BulkDataDistributerNotifCb<>::BulkDataDistributerNotifCb");
00292 
00293                 distr_p = distr;
00294             }
00295 
00296         ~BulkDataDistributerNotifCb()
00297             {
00298                 ACS_TRACE("BulkDataDistributerNotifCb<>::~BulkDataDistributerNotifCb");
00299             }
00300 
00301         void working(const Completion &comp, const ACS::CBDescOut &desc) 
00302             {
00303             }
00304         
00305         void done(const Completion &comp, const ACS::CBDescOut &desc) 
00306             {
00307                 try
00308                     {
00309                     distr_p->notifySender(comp);
00310                     }
00311                 catch(ACSErr::ACSbaseExImpl &ex)
00312                     {
00313                     ACS_SHORT_LOG((LM_ERROR,"BulkDataDistributerNotifCb::done error"));
00314                     ex.log();
00315                     }
00316                 catch(...)
00317                     {
00318                     ACS_SHORT_LOG((LM_ERROR,"BulkDataDistributerNotifCb::done unknown error"));     
00319                     }
00320             }
00321 
00322         CORBA::Boolean negotiate (ACS::TimeInterval timeToTransmit, const ACS::CBDescOut &desc) 
00323             {
00324                 return true;
00325             }
00326 
00327       private:
00328 
00329         BulkDataDistributer<TReceiverCallback, TSenderCallback> *distr_p;
00330     };   
00331 
00332 
00333 }
00334 
00335 
00336 #include "bulkDataDistributer.i"
00337 
00338 #endif