Go to the documentation of this file.00001 #ifndef _BULKDATA_RECEIVER_H
00002 #define _BULKDATA_RECEIVER_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include <vector>
00032
00033 #include "orbsvcs/AV/AVStreams_i.h"
00034 #include "orbsvcs/AV/Endpoint_Strategy.h"
00035 #include "orbsvcs/AV/Protocol_Factory.h"
00036 #include "orbsvcs/AV/Flows_T.h"
00037 #include "orbsvcs/AV/Transport.h"
00038 #include "orbsvcs/AV/Policy.h"
00039
00040 #include "ACSBulkDataError.h"
00041 #include "bulkDataFlowConsumer.h"
00042
00043 #include "bulkDataC.h"
00044
00048 namespace AcsBulkdata
00049 {
00068 template<class TReceiverCallback>
00069 class BulkDataReceiver
00070 {
00071 public:
00072
00076 BulkDataReceiver();
00077
00081 virtual ~BulkDataReceiver();
00082
00090 void initialize();
00091
00100 void createSingleFlow();
00101
00112 void createMultipleFlows(const char *fepsConfig);
00113
00121 bulkdata::BulkDataReceiverConfig * getReceiverConfig();
00122
00132 void getFlowCallback(ACE_CString &flowName, TReceiverCallback *&cb_p);
00133
00144 void getFlowCallback(CORBA::ULong flowNumber, TReceiverCallback *&cb_p);
00145
00153 void closeReceiver();
00154
00161 std::vector<std::string> getFlowNames();
00162
00170 void setReceiverName(ACE_CString recvName);
00171
00180 void subscribeNotification(ACS::CBvoid_ptr notifCb);
00181
00193 void fwdData2UserCB(CORBA::Boolean enable);
00194
00195
00196
00197
00198 void notifySender(const ACSErr::Completion& comp);
00199
00200
00201
00202
00203 void addHandle(ACE_CString flowName, ACE_HANDLE handle)
00204 {
00205 handleMap_m.rebind(flowName,handle);
00206 }
00207
00208 void setCbTimeout(const char * cbTimeout);
00209
00210
00211 private:
00212
00213 typedef ACE_Hash_Map_Manager<ACE_CString, BulkDataFlowConsumer<TReceiverCallback> *, ACE_Null_Mutex> FepObjects;
00214 typedef ACE_Hash_Map_Iterator<ACE_CString, BulkDataFlowConsumer<TReceiverCallback> *, ACE_Null_Mutex> FepObjectsIterator;
00215
00216 typedef ACE_Hash_Map_Manager<ACE_CString, ACE_HANDLE, ACE_Null_Mutex> HandleMap;
00217 typedef ACE_Hash_Map_Iterator<ACE_CString, ACE_HANDLE, ACE_Null_Mutex> HandleMapIterator;
00218
00225 void initPartB();
00226
00233 AVStreams::StreamEndPoint_B_ptr createSepB();
00234
00244 AVStreams::FlowConsumer_ptr createFepConsumerB(ACE_CString &flowName, AVStreams::protocolSpec protocols, ACE_CString &format);
00245
00246
00255 void addFepToSep(AVStreams::StreamEndPoint_B_ptr locSepB_p,AVStreams::FlowConsumer_ptr locFepB_p);
00256
00263 AVStreams::StreamEndPoint_B_ptr getStreamEndPointB();
00264
00271 AVStreams::flowSpec * getFepsConfig();
00272
00279 void deleteFepsB();
00280
00281
00288 void deleteSepB();
00289
00290 void deleteAcceptor();
00291
00292 void deleteHandler();
00293
00294 void closeSocket();
00295
00304 const char * createFlowSpec(ACE_CString &flowName,
00305 ACE_CString &fepProtocol);
00306
00307
00308 public:
00312 bulkdata::Connection checkFlowCallbacks();
00313
00314 bulkdata::Connection getSenderConnectionState()
00315 {
00316 return recvConfig_p->connectionState;
00317 }
00318
00319 private:
00320
00321 FepObjects fepMap_m;
00322
00323 HandleMap handleMap_m;
00324
00328 TAO_AV_Endpoint_Reactive_Strategy_B <TAO_StreamEndPoint_B,TAO_VDev,AV_Null_MediaCtrl> reactiveStrategy_m;
00329
00330 AVStreams::StreamEndPoint_B_var sepB_p;
00331
00332 struct FepsCfgB
00333 {
00334 ACE_CString fepsFlowname;
00335 ACE_CString fepsFormat;
00336 ACE_CString fepsProtocol;
00337 };
00338
00339 AVStreams::flowSpec fepsData;
00340
00341
00342 bulkdata::BulkDataReceiverConfig * recvConfig_p;
00343
00344 TAO_StreamEndPoint_B *sepRefCount_p;
00345
00346 CORBA::Boolean closeReceiverFlag;
00347
00348 ACS::CBvoid_ptr locNotifCb_p;
00349
00350 ACE_Time_Value cbTimeout_m;
00351
00355
00356 };
00357 }
00358
00359 #include "bulkDataReceiver.i"
00360
00361 #endif