Go to the documentation of this file.00001 #ifndef _BULKDATA_SENDER_H
00002 #define _BULKDATA_SENDER_H
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 #include <iostream>
00032 
00033 #include "orbsvcs/AV/AVStreams_i.h"
00034 #include "orbsvcs/AV/Endpoint_Strategy.h"
00035 #include "orbsvcs/AV/Protocol_Factory.h"
00036 #include "orbsvcs/AV/Flows_T.h"
00037 #include "orbsvcs/AV/Transport.h"
00038 #include "orbsvcs/AV/Policy.h"
00039 
00040 #include "ACSBulkDataError.h"
00041 #include "bulkDataSenderDefaultCb.h"
00042 #include "bulkDataFlowProducer.h"
00043 
00044 #include "bulkDataC.h"
00045 
00046 
00047 
00048 
00049 
00053 namespace AcsBulkdata
00054 {  
00074     template<class TSenderCallback>
00075     class BulkDataSender
00076     {
00077       public:
00078       
00082         BulkDataSender();
00083       
00087         virtual ~BulkDataSender();
00088       
00096         void initialize();
00097 
00106         void createSingleFlow();
00107 
00118         void createMultipleFlows(const char *fepsConfig);
00119 
00128         void connectToPeer(bulkdata::BulkDataReceiverConfig *recvConfig_p);
00129 
00140         void getFlowProtocol(ACE_CString &flowname, TAO_AV_Protocol_Object *¤tProtocol_p);
00141 
00152         void startSend(CORBA::ULong flownumber, ACE_Message_Block *param = 0);
00153 
00164         void startSend(CORBA::ULong flownumber, const char *param, size_t len);
00165 
00176         void sendData(CORBA::ULong flownumber, ACE_Message_Block *buffer);
00177            
00190         void sendData(CORBA::ULong flownumber, const char *buffer, size_t len);
00191 
00202         void stopSend(CORBA::ULong flownumber);
00203 
00211         void disconnectPeer();
00212  
00213         TAO_StreamCtrl * getStreamCtrl() 
00214             {
00215                 return streamctrl_p;
00216             }
00217 
00218         const char *getFlowSpec(const ACE_CString & flowName);
00219 
00226         std::vector<std::string> getFlowNames();
00227 
00228         
00229         ACE_HANDLE findHandle(ACE_CString &flowname);
00230 
00231         
00232         
00233 
00234         void startStream(CORBA::ULong flownumber);
00235         
00236         void sendStream(CORBA::ULong flownumber, ACE_Message_Block *buffer);
00237 
00238         void stopStream(CORBA::ULong flownumber);
00239 
00240         
00241 
00242       private:
00243 
00244         typedef ACE_Hash_Map_Manager<ACE_CString, BulkDataFlowProducer<TSenderCallback> *, ACE_Null_Mutex> FepObjects;
00245         typedef ACE_Hash_Map_Iterator<ACE_CString, BulkDataFlowProducer<TSenderCallback> *, ACE_Null_Mutex>  FepObjectsIterator;
00246 
00247         typedef ACE_Hash_Map_Manager<ACE_CString, ACE_HANDLE, ACE_Null_Mutex> HandleMap;
00248         typedef ACE_Hash_Map_Iterator<ACE_CString, ACE_HANDLE, ACE_Null_Mutex>  HandleMapIterator;
00249 
00256         void initPartA();
00257 
00264         AVStreams::StreamEndPoint_A_ptr createSepA();
00265 
00275         AVStreams::FlowProducer_ptr createFepProducerA(ACE_CString &flowname,
00276                                                        AVStreams::protocolSpec protocols,
00277                                                        ACE_CString &format,
00278                                                        TAO_StreamCtrl *strctrl_p);
00279 
00280 
00289         void addFepToSep(AVStreams::StreamEndPoint_A_ptr locSepA_p, AVStreams::FlowProducer_ptr locFepA_p);
00290 
00291 
00298         TAO_StreamCtrl *createStreamCtrl();     
00299 
00300 
00307         
00308 
00309 
00323         const char * createFwdFlowSpec(ACE_CString &flowname,
00324                                        ACE_CString &direction,
00325                                        ACE_CString &formatName,
00326                                        ACE_CString &flowProtocol,
00327                                        ACE_CString &carrierProtocol,
00328                                        ACE_CString &localAddress,
00329                                        ACE_CString &remoteAddress);
00330 
00331 
00339         void setReceiverConfig(bulkdata::BulkDataReceiverConfig *recvConfig_p);
00340 
00341                     
00342 
00349         AVStreams::StreamEndPoint_A_ptr getStreamEndPointA();
00350 
00351 
00358         void deleteStreamCtrl();
00359 
00360 
00361 
00368         void deleteFepsA();
00369 
00376         void deleteSepA();
00377 
00378         void deleteConnector();
00379 
00380         void deleteHandler();
00381 
00390         const char * createFlowSpec(ACE_CString &flowname,
00391                                     ACE_CString &fepProtocol);
00392 
00393         void mergeFlowSpecs();
00394 
00398         TAO_AV_Endpoint_Reactive_Strategy_A<TAO_StreamEndPoint_A,TAO_VDev,AV_Null_MediaCtrl> endpointStrategy_m;
00399 
00400         AVStreams::StreamEndPoint_A_var sepA_p;
00401 
00402         AVStreams::StreamEndPoint_B_var sepB_p;
00403 
00404 
00405         
00406         struct FepsCfgA
00407         {
00408             ACE_CString fepsFlowname;
00409             ACE_CString fepsFormat;
00410             ACE_CString fepsProtocol;
00411         };
00412 
00413         FepObjects fepMap_m;
00414 
00415         HandleMap handleMap_m;
00416 
00417         AVStreams::flowSpec_var recvFeps_p;
00418 
00419         AVStreams::flowSpec senderFeps_m;
00420 
00421         TAO_StreamEndPoint_A * sepRefCount_p;
00422 
00423         CORBA::Boolean disconnectPeerFlag;
00424     
00425         AVStreams::flowSpec flowSpec_m;
00426 
00427         TAO_StreamCtrl *streamctrl_p;
00428 
00432         
00433 
00434 
00435 
00436 
00437 
00438 
00439 
00440 
00441     };
00442 }
00443 
00444 #include "bulkDataSender.i"
00445 
00446 #endif