Go to the documentation of this file.00001 #ifndef _BULKDATA_DISTRIBUTER_H
00002 #define _BULKDATA_DISTRIBUTER_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #include "bulkDataSender.h"
00033 #include "bulkDataReceiver.h"
00034
00035 #include <Pair_T.h>
00036
00037 #include <acsQoS.h>
00038
00042 namespace AcsBulkdata
00043 {
00044
00045
00049 template <class T>
00050 class ACS_Read_Guard
00051 {
00052 public:
00053 ACS_Read_Guard(T &l, int &ret) :
00054 lock_(&l)
00055 {
00056 ret_ = this->lock_->tryacquire_read();
00057 ret = ret_;
00058 }
00059
00060 ~ACS_Read_Guard()
00061 {
00062 release();
00063 }
00064
00065 void release()
00066 {
00067 if (ret_==0) this->lock_->release();
00068 }
00069 protected:
00070 int ret_;
00071 T *lock_;
00072 };
00073
00074
00075 template <class T>
00076 class ACS_Write_Guard
00077 {
00078 public:
00079 ACS_Write_Guard(T &l, int &ret) :
00080 lock_(&l)
00081 {
00082 ret = 0;
00083 ownership_ = false;
00084 ret_ = this->lock_->acquire_write();
00085 ret = ret_;
00086 }
00087
00088 ACS_Write_Guard(T *l, int &ret) :
00089 lock_(l)
00090 {
00091 ownership_ = true;
00092 ret_ = this->lock_->acquire_write();
00093 ret = ret_;
00094 }
00095
00096 void take_ownership()
00097 {
00098 ownership_ = true;
00099 }
00100
00101 ~ACS_Write_Guard()
00102 {
00103 release();
00104 if (ownership_)
00105 {
00106 delete lock_;
00107 }
00108 }
00109
00110 void release()
00111 {
00112
00113 if (ret_==0)
00114 {
00115 this->lock_->release();
00116 }
00117 }
00118 protected:
00119 bool ownership_;
00120 int ret_;
00121 T *lock_;
00122 };
00123
00124
00125 typedef struct RecvDataStruct
00126 {
00127 bulkdata::BulkDataReceiver_ptr receiver;
00128 ACE_RW_Thread_Mutex *mutex;
00129 } RecvData;
00130
00131
00150
00151 template<class TReceiverCallback, class TSenderCallback>
00152 class BulkDataDistributerNotifCb;
00153
00154 template<class TReceiverCallback, class TSenderCallback>
00155 class BulkDataDistributer
00156 {
00157
00158 enum Flow_Status
00159 {
00160 FLOW_AVAILABLE,
00161 FLOW_NOT_AVAILABLE
00162 };
00163
00164 typedef ACE_Pair< RecvData , AcsBulkdata::BulkDataSender<TSenderCallback> *> Sender_Map_Pair;
00165
00166
00167
00168
00169
00170
00171
00172 typedef ACE_Hash_Map_Manager <ACE_CString, Sender_Map_Pair, ACE_Null_Mutex> Sender_Map;
00173 typedef ACE_Hash_Map_Entry <ACE_CString, Sender_Map_Pair > Sender_Map_Entry;
00174 typedef ACE_Hash_Map_Iterator <ACE_CString, Sender_Map_Pair ,ACE_Null_Mutex> Sender_Map_Iterator;
00175
00176 typedef ACE_Hash_Map_Manager <CORBA::ULong, Flow_Status, ACE_Null_Mutex> Flows_Status_Map;
00177 typedef ACE_Hash_Map_Entry <CORBA::ULong, Flow_Status> Flows_Status_Map_Entry;
00178 typedef ACE_Hash_Map_Iterator <CORBA::ULong, Flow_Status, ACE_Null_Mutex> Flows_Status_Map_Iterator;
00179
00180 typedef ACE_Hash_Map_Manager <ACE_CString, CORBA::ULong, ACE_Null_Mutex> Recv_Status_Map;
00181 typedef ACE_Hash_Map_Entry <ACE_CString, CORBA::ULong> Recv_Status_Map_Entry;
00182 typedef ACE_Hash_Map_Iterator <ACE_CString, CORBA::ULong, ACE_Null_Mutex> Recv_Status_Map_Iterator;
00183
00184
00185
00186 public:
00187
00191 BulkDataDistributer();
00192
00196 virtual ~BulkDataDistributer();
00197
00201 virtual void multiConnect(bulkdata::BulkDataReceiverConfig *recvConfig_p, const char *fepsConfig, const ACE_CString& receiverName);
00202
00206 virtual void multiDisconnect(const ACE_CString& receiverName);
00207
00208 virtual BulkDataReceiver<TReceiverCallback> *getReceiver()
00209 {
00210 return &receiver_m;
00211 }
00212
00213 virtual Sender_Map *getSenderMap()
00214 {
00215 return &senderMap_m;
00216 }
00217
00218 virtual bool isRecvConnected (const ACE_CString& receiverName);
00219
00220 virtual bool isSenderConnected (const ACE_CString& receiverName);
00221
00222 virtual bool isReceiverConnected (const ACE_CString& receiverName);
00223
00224 virtual void distSendStart (ACE_CString& flowName, CORBA::ULong flowNumber);
00225
00226 virtual int distSendDataHsk (ACE_CString& flowName, ACE_Message_Block * frame_p, CORBA::ULong flowNumber);
00227
00228 virtual int distSendData (ACE_CString& flowName, ACE_Message_Block * frame_p, CORBA::ULong flowNumber);
00229
00230 virtual CORBA::Boolean distSendStopTimeout (ACE_CString& flowName, CORBA::ULong flowNumber);
00231
00232 virtual void distSendStop (ACE_CString& flowName, CORBA::ULong flowNumber);
00233
00234 void setTimeout (CORBA::ULong user_timeout)
00235 { timeout_m = user_timeout; }
00236
00237 void setContSvc (maci::ContainerServices * services_p)
00238 { contSvc_p = services_p; }
00239
00243 void subscribeNotification(ACS::CBvoid_ptr notifCb);
00244
00248 void notifySender(const ACSErr::Completion& comp);
00249
00250 bulkdata::Connection getSenderConnectionState()
00251 {
00252 return getReceiver()->getSenderConnectionState();
00253 }
00254
00255 private:
00256
00257 CORBA::Boolean getFlowReceiverStatus(const ACE_CString& receiverName, CORBA::ULong flowNumber);
00258
00259 CORBA::Boolean isFlowReceiverAvailable(const ACE_CString& receiverName, CORBA::ULong flowNumber);
00260
00261 BulkDataSender<TSenderCallback> *sender_p;
00262
00263 BulkDataReceiver<TReceiverCallback> receiver_m;
00264
00265 Sender_Map senderMap_m;
00266
00267 Recv_Status_Map recvStatusMap_m;
00268 Flows_Status_Map flowsStatusMap_m;
00269
00270 CORBA::ULong timeout_m;
00271 CORBA::ULong numberOfFlows;
00272 CORBA::ULong offset;
00273
00274 maci::ContainerServices *contSvc_p;
00275
00276 BulkDataDistributerNotifCb<TReceiverCallback, TSenderCallback> *distributerNotifCb_p;
00277
00278 ACS::CBvoid_ptr locNotifCb_p;
00279 };
00280
00281
00282
00283 template<class TReceiverCallback, class TSenderCallback = BulkDataSenderDefaultCallback>
00284 class BulkDataDistributerNotifCb: public virtual POA_ACS::CBvoid
00285 {
00286
00287 public:
00288
00289 BulkDataDistributerNotifCb(BulkDataDistributer<TReceiverCallback, TSenderCallback> *distr)
00290 {
00291 ACS_TRACE("BulkDataDistributerNotifCb<>::BulkDataDistributerNotifCb");
00292
00293 distr_p = distr;
00294 }
00295
00296 ~BulkDataDistributerNotifCb()
00297 {
00298 ACS_TRACE("BulkDataDistributerNotifCb<>::~BulkDataDistributerNotifCb");
00299 }
00300
00301 void working(const Completion &comp, const ACS::CBDescOut &desc)
00302 {
00303 }
00304
00305 void done(const Completion &comp, const ACS::CBDescOut &desc)
00306 {
00307 try
00308 {
00309 distr_p->notifySender(comp);
00310 }
00311 catch(ACSErr::ACSbaseExImpl &ex)
00312 {
00313 ACS_SHORT_LOG((LM_ERROR,"BulkDataDistributerNotifCb::done error"));
00314 ex.log();
00315 }
00316 catch(...)
00317 {
00318 ACS_SHORT_LOG((LM_ERROR,"BulkDataDistributerNotifCb::done unknown error"));
00319 }
00320 }
00321
00322 CORBA::Boolean negotiate (ACS::TimeInterval timeToTransmit, const ACS::CBDescOut &desc)
00323 {
00324 return true;
00325 }
00326
00327 private:
00328
00329 BulkDataDistributer<TReceiverCallback, TSenderCallback> *distr_p;
00330 };
00331
00332
00333 }
00334
00335
00336 #include "bulkDataDistributer.i"
00337
00338 #endif