Go to the documentation of this file.00001 #ifndef _BULKDATA_SENDER_H
00002 #define _BULKDATA_SENDER_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include <iostream>
00032
00033 #include "orbsvcs/AV/AVStreams_i.h"
00034 #include "orbsvcs/AV/Endpoint_Strategy.h"
00035 #include "orbsvcs/AV/Protocol_Factory.h"
00036 #include "orbsvcs/AV/Flows_T.h"
00037 #include "orbsvcs/AV/Transport.h"
00038 #include "orbsvcs/AV/Policy.h"
00039
00040 #include "ACSBulkDataError.h"
00041 #include "bulkDataSenderDefaultCb.h"
00042 #include "bulkDataFlowProducer.h"
00043
00044 #include "bulkDataC.h"
00045
00046
00047
00048
00049
00053 namespace AcsBulkdata
00054 {
00074 template<class TSenderCallback>
00075 class BulkDataSender
00076 {
00077 public:
00078
00082 BulkDataSender();
00083
00087 virtual ~BulkDataSender();
00088
00096 void initialize();
00097
00106 void createSingleFlow();
00107
00118 void createMultipleFlows(const char *fepsConfig);
00119
00128 void connectToPeer(bulkdata::BulkDataReceiverConfig *recvConfig_p);
00129
00140 void getFlowProtocol(ACE_CString &flowname, TAO_AV_Protocol_Object *¤tProtocol_p);
00141
00152 void startSend(CORBA::ULong flownumber, ACE_Message_Block *param = 0);
00153
00164 void startSend(CORBA::ULong flownumber, const char *param, size_t len);
00165
00176 void sendData(CORBA::ULong flownumber, ACE_Message_Block *buffer);
00177
00190 void sendData(CORBA::ULong flownumber, const char *buffer, size_t len);
00191
00202 void stopSend(CORBA::ULong flownumber);
00203
00211 void disconnectPeer();
00212
00213 TAO_StreamCtrl * getStreamCtrl()
00214 {
00215 return streamctrl_p;
00216 }
00217
00218 const char *getFlowSpec(const ACE_CString & flowName);
00219
00226 std::vector<std::string> getFlowNames();
00227
00228
00229 ACE_HANDLE findHandle(ACE_CString &flowname);
00230
00231
00232
00233
00234 void startStream(CORBA::ULong flownumber);
00235
00236 void sendStream(CORBA::ULong flownumber, ACE_Message_Block *buffer);
00237
00238 void stopStream(CORBA::ULong flownumber);
00239
00240
00241
00242 private:
00243
00244 typedef ACE_Hash_Map_Manager<ACE_CString, BulkDataFlowProducer<TSenderCallback> *, ACE_Null_Mutex> FepObjects;
00245 typedef ACE_Hash_Map_Iterator<ACE_CString, BulkDataFlowProducer<TSenderCallback> *, ACE_Null_Mutex> FepObjectsIterator;
00246
00247 typedef ACE_Hash_Map_Manager<ACE_CString, ACE_HANDLE, ACE_Null_Mutex> HandleMap;
00248 typedef ACE_Hash_Map_Iterator<ACE_CString, ACE_HANDLE, ACE_Null_Mutex> HandleMapIterator;
00249
00256 void initPartA();
00257
00264 AVStreams::StreamEndPoint_A_ptr createSepA();
00265
00275 AVStreams::FlowProducer_ptr createFepProducerA(ACE_CString &flowname,
00276 AVStreams::protocolSpec protocols,
00277 ACE_CString &format,
00278 TAO_StreamCtrl *strctrl_p);
00279
00280
00289 void addFepToSep(AVStreams::StreamEndPoint_A_ptr locSepA_p, AVStreams::FlowProducer_ptr locFepA_p);
00290
00291
00298 TAO_StreamCtrl *createStreamCtrl();
00299
00300
00307
00308
00309
00323 const char * createFwdFlowSpec(ACE_CString &flowname,
00324 ACE_CString &direction,
00325 ACE_CString &formatName,
00326 ACE_CString &flowProtocol,
00327 ACE_CString &carrierProtocol,
00328 ACE_CString &localAddress,
00329 ACE_CString &remoteAddress);
00330
00331
00339 void setReceiverConfig(bulkdata::BulkDataReceiverConfig *recvConfig_p);
00340
00341
00342
00349 AVStreams::StreamEndPoint_A_ptr getStreamEndPointA();
00350
00351
00358 void deleteStreamCtrl();
00359
00360
00361
00368 void deleteFepsA();
00369
00376 void deleteSepA();
00377
00378 void deleteConnector();
00379
00380 void deleteHandler();
00381
00390 const char * createFlowSpec(ACE_CString &flowname,
00391 ACE_CString &fepProtocol);
00392
00393 void mergeFlowSpecs();
00394
00398 TAO_AV_Endpoint_Reactive_Strategy_A<TAO_StreamEndPoint_A,TAO_VDev,AV_Null_MediaCtrl> endpointStrategy_m;
00399
00400 AVStreams::StreamEndPoint_A_var sepA_p;
00401
00402 AVStreams::StreamEndPoint_B_var sepB_p;
00403
00404
00405
00406 struct FepsCfgA
00407 {
00408 ACE_CString fepsFlowname;
00409 ACE_CString fepsFormat;
00410 ACE_CString fepsProtocol;
00411 };
00412
00413 FepObjects fepMap_m;
00414
00415 HandleMap handleMap_m;
00416
00417 AVStreams::flowSpec_var recvFeps_p;
00418
00419 AVStreams::flowSpec senderFeps_m;
00420
00421 TAO_StreamEndPoint_A * sepRefCount_p;
00422
00423 CORBA::Boolean disconnectPeerFlag;
00424
00425 AVStreams::flowSpec flowSpec_m;
00426
00427 TAO_StreamCtrl *streamctrl_p;
00428
00432
00433
00434
00435
00436
00437
00438
00439
00440
00441 };
00442 }
00443
00444 #include "bulkDataSender.i"
00445
00446 #endif