• Classes
  • Modules
  • Namespaces
  • Files
  • Related Pages
  • File List
  • File Members

bulkDataDistributer.h

Go to the documentation of this file.
00001 #ifndef _BULKDATA_DISTRIBUTER_H
00002 #define _BULKDATA_DISTRIBUTER_H
00003 /*******************************************************************************
00004  *    ALMA - Atacama Large Millimiter Array
00005  *    (c) European Southern Observatory, 2002
00006  *    Copyright by ESO (in the framework of the ALMA collaboration)
00007  *    and Cosylab 2002, All rights reserved
00008  *
00009  *    This library is free software; you can redistribute it and/or
00010  *    modify it under the terms of the GNU Lesser General Public
00011  *    License as published by the Free Software Foundation; either
00012  *    version 2.1 of the License, or (at your option) any later version.
00013  *
00014  *    This library is distributed in the hope that it will be useful,
00015  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
00016  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017  *    Lesser General Public License for more details.
00018  *
00019  *    You should have received a copy of the GNU Lesser General Public
00020  *    License along with this library; if not, write to the Free Software
00021  *    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
00022  *
00023  *
00024  * "@(#)"
00025  *
00026  * who       when      what
00027  * --------  --------  ----------------------------------------------
00028  * oat       02/03/05  created 
00029  */
00030 
00031 
00032 #include "bulkDataSender.h"
00033 #include "bulkDataReceiver.h"
00034 
00035 #include <Pair_T.h>
00036 
00037 #include <acsQoS.h>
00038 
00042 namespace AcsBulkdata
00043 {  
00044 
00045 
00049 template <class T>
00050 class ACS_Read_Guard
00051 {
00052 public:
00053         ACS_Read_Guard(T &l, int &ret) :
00054                 lock_(&l)
00055         {
00056                 ret_ = this->lock_->tryacquire_read();
00057                 ret = ret_;
00058         }
00059 
00060         ~ACS_Read_Guard()
00061         {
00062                 release();
00063         }
00064 
00065         void release()
00066         {
00067                 if (ret_==0) this->lock_->release();
00068         }
00069 protected:
00070         int ret_;
00071         T *lock_;
00072 };//ACS_Read_Guard
00073 
00074 
00075 template <class T>
00076 class ACS_Write_Guard
00077 {
00078 public:
00079         ACS_Write_Guard(T &l, int &ret) :
00080                 lock_(&l)
00081         {
00082                 ret = 0;
00083                 ownership_ = false;
00084                 ret_ = this->lock_->acquire_write();
00085                 ret = ret_;
00086         }
00087 
00088         ACS_Write_Guard(T *l, int &ret) :
00089                         lock_(l)
00090                 {
00091                         ownership_ = true;
00092                         ret_ = this->lock_->acquire_write();
00093                         ret = ret_;
00094                 }
00095 // ownership means that is deleted at the nd
00096         void take_ownership()
00097         {
00098                 ownership_ = true;
00099         }
00100 
00101         ~ACS_Write_Guard()
00102         {
00103                 release();
00104                 if (ownership_)
00105                 {
00106                         delete lock_;
00107                 }
00108         }
00109 
00110         void release()
00111         {
00112 
00113                 if (ret_==0)
00114                 {
00115                         this->lock_->release();
00116                 }
00117         }
00118 protected:
00119         bool ownership_;
00120         int ret_;
00121         T *lock_;
00122 };//ACS_Write_Guard
00123 
00124 
00125 typedef struct RecvDataStruct
00126                         {
00127                         bulkdata::BulkDataReceiver_ptr receiver;
00128                         ACE_RW_Thread_Mutex *mutex;
00129                         } RecvData;
00130 
00131 
00150     //forward declaration
00151     template<class TReceiverCallback, class TSenderCallback>
00152     class BulkDataDistributerNotifCb;
00153     
00154     template<class TReceiverCallback, class TSenderCallback>
00155     class BulkDataDistributer
00156     { 
00157 
00158         enum Flow_Status
00159         {
00160             FLOW_AVAILABLE,
00161             FLOW_NOT_AVAILABLE
00162         };
00163 
00164     typedef ACE_Pair< RecvData /*bulkdata::BulkDataReceiver_ptr*/, AcsBulkdata::BulkDataSender<TSenderCallback> *> Sender_Map_Pair;
00165 
00166 //      typedef ACE_Pair< bulkdata::BulkDataReceiver_ptr, BulkDataSender<TSenderCallback> *> Sender_Map_Pair;
00167 
00168         /*typedef ACE_Hash_Map_Manager <ACE_CString, BulkDataSender<TSenderCallback> *,ACE_Null_Mutex>  Sender_Map;
00169           typedef ACE_Hash_Map_Entry <ACE_CString, BulkDataSender<TSenderCallback> * > Sender_Map_Entry;
00170           typedef ACE_Hash_Map_Iterator <ACE_CString, BulkDataSender<TSenderCallback> * ,ACE_Null_Mutex>  Sender_Map_Iterator;*/
00171 
00172         typedef ACE_Hash_Map_Manager <ACE_CString, Sender_Map_Pair, ACE_Null_Mutex>  Sender_Map;
00173         typedef ACE_Hash_Map_Entry <ACE_CString, Sender_Map_Pair > Sender_Map_Entry;
00174         typedef ACE_Hash_Map_Iterator <ACE_CString, Sender_Map_Pair ,ACE_Null_Mutex>  Sender_Map_Iterator;
00175 
00176         typedef ACE_Hash_Map_Manager <CORBA::ULong, Flow_Status, ACE_Null_Mutex> Flows_Status_Map;
00177         typedef ACE_Hash_Map_Entry <CORBA::ULong, Flow_Status> Flows_Status_Map_Entry;
00178         typedef ACE_Hash_Map_Iterator <CORBA::ULong, Flow_Status, ACE_Null_Mutex> Flows_Status_Map_Iterator;
00179 
00180         typedef ACE_Hash_Map_Manager <ACE_CString, CORBA::ULong, ACE_Null_Mutex> Recv_Status_Map;
00181         typedef ACE_Hash_Map_Entry <ACE_CString, CORBA::ULong> Recv_Status_Map_Entry;
00182         typedef ACE_Hash_Map_Iterator <ACE_CString, CORBA::ULong, ACE_Null_Mutex> Recv_Status_Map_Iterator;
00183 
00184 
00185 
00186       public:
00187       
00191         BulkDataDistributer();
00192       
00196         virtual ~BulkDataDistributer();
00197 
00201         virtual void multiConnect(bulkdata::BulkDataReceiverConfig *recvConfig_p, const char *fepsConfig, const ACE_CString& receiverName);
00202         
00206         virtual void multiDisconnect(const ACE_CString& receiverName);
00207 
00208         virtual BulkDataReceiver<TReceiverCallback> *getReceiver() 
00209             {
00210                 return &receiver_m;
00211             }
00212 
00213         virtual Sender_Map *getSenderMap() 
00214             {
00215                 return &senderMap_m;
00216             }
00217 
00218         virtual bool isRecvConnected (const ACE_CString& receiverName);
00219 
00220         virtual bool isSenderConnected (const ACE_CString& receiverName);
00221 
00222         virtual bool isReceiverConnected (const ACE_CString& receiverName);
00223 
00224         virtual void distSendStart (ACE_CString& flowName, CORBA::ULong flowNumber);
00225 
00226         virtual int distSendDataHsk (ACE_CString& flowName, ACE_Message_Block * frame_p, CORBA::ULong flowNumber);
00227 
00228         virtual int distSendData (ACE_CString& flowName, ACE_Message_Block * frame_p, CORBA::ULong flowNumber);
00229 
00230         virtual CORBA::Boolean distSendStopTimeout (ACE_CString& flowName, CORBA::ULong flowNumber);
00231 
00232         virtual void distSendStop (ACE_CString& flowName, CORBA::ULong flowNumber);
00233 
00234         void setTimeout (CORBA::ULong user_timeout) 
00235             { timeout_m = user_timeout; }
00236 
00237         void setContSvc (maci::ContainerServices * services_p)
00238             {  contSvc_p = services_p; }  
00239 
00243         void subscribeNotification(ACS::CBvoid_ptr notifCb);
00244 
00248         void notifySender(const ACSErr::Completion& comp);
00249 
00250         bulkdata::Connection getSenderConnectionState()
00251             {
00252                 return getReceiver()->getSenderConnectionState();
00253             }
00254 
00255       private:
00256 
00257         CORBA::Boolean getFlowReceiverStatus(const ACE_CString& receiverName, CORBA::ULong flowNumber);
00258 
00259         CORBA::Boolean isFlowReceiverAvailable(const ACE_CString& receiverName, CORBA::ULong flowNumber);
00260 
00261         BulkDataSender<TSenderCallback> *sender_p;
00262         
00263         BulkDataReceiver<TReceiverCallback> receiver_m;
00264 
00265         Sender_Map senderMap_m;
00266 
00267         Recv_Status_Map recvStatusMap_m;
00268         Flows_Status_Map flowsStatusMap_m;
00269 
00270         CORBA::ULong timeout_m;
00271         CORBA::ULong numberOfFlows;
00272         CORBA::ULong offset;
00273 
00274         maci::ContainerServices *contSvc_p;
00275 
00276         BulkDataDistributerNotifCb<TReceiverCallback, TSenderCallback> *distributerNotifCb_p;
00277 
00278         ACS::CBvoid_ptr locNotifCb_p;
00279     };
00280 
00281 
00282 
00283     template<class TReceiverCallback, class TSenderCallback = BulkDataSenderDefaultCallback>
00284     class BulkDataDistributerNotifCb: public virtual POA_ACS::CBvoid
00285     {
00286 
00287       public:
00288 
00289         BulkDataDistributerNotifCb(BulkDataDistributer<TReceiverCallback, TSenderCallback> *distr)
00290             {
00291                 ACS_TRACE("BulkDataDistributerNotifCb<>::BulkDataDistributerNotifCb");
00292 
00293                 distr_p = distr;
00294             }
00295 
00296         ~BulkDataDistributerNotifCb()
00297             {
00298                 ACS_TRACE("BulkDataDistributerNotifCb<>::~BulkDataDistributerNotifCb");
00299             }
00300 
00301         void working(const Completion &comp, const ACS::CBDescOut &desc) 
00302             {
00303             }
00304         
00305         void done(const Completion &comp, const ACS::CBDescOut &desc) 
00306             {
00307                 try
00308                     {
00309                     distr_p->notifySender(comp);
00310                     }
00311                 catch(ACSErr::ACSbaseExImpl &ex)
00312                     {
00313                     ACS_SHORT_LOG((LM_ERROR,"BulkDataDistributerNotifCb::done error"));
00314                     ex.log();
00315                     }
00316                 catch(...)
00317                     {
00318                     ACS_SHORT_LOG((LM_ERROR,"BulkDataDistributerNotifCb::done unknown error"));     
00319                     }
00320             }
00321 
00322         CORBA::Boolean negotiate (ACS::TimeInterval timeToTransmit, const ACS::CBDescOut &desc) 
00323             {
00324                 return true;
00325             }
00326 
00327       private:
00328 
00329         BulkDataDistributer<TReceiverCallback, TSenderCallback> *distr_p;
00330     };   
00331 
00332 
00333 }
00334 
00335 
00336 #include "bulkDataDistributer.i"
00337 
00338 #endif /* _BULKDATA_DISTRIBUTER_H */

Generated on Thu Jan 12 2012 23:13:50 for ACS-10.0 C++ API by  doxygen 1.7.0