00001 #ifndef CONSUMER_H 00002 #define CONSUMER_H 00003 00004 /* @(#) $Id: acsncConsumer.h,v 1.71.26.1 2011/11/24 14:48:02 rtobar Exp $ 00005 * 00006 * Consumer Abstract base class for notification channel push structured event 00007 * consumers. 00008 * ALMA - Atacama Large Millimiter Array 00009 * 00010 * (c) Associated Universities Inc., 2002 00011 * (c) European Southern Observatory, 2002 00012 * Copyright by ESO (in the framework of the ALMA collaboration) 00013 * and Cosylab 2002, All rights reserved 00014 * 00015 * This library is free software; you can redistribute it and/or 00016 * modify it under the terms of the GNU Lesser General Public 00017 * License as published by the Free Software Foundation; either 00018 * version 2.1 of the License, or (at your option) any later version. 00019 * 00020 * This library is distributed in the hope that it will be useful, 00021 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00022 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00023 * Lesser General Public License for more details. 00024 * 00025 * You should have received a copy of the GNU Lesser General Public 00026 * License along with this library; if not, write to the Free Software 00027 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00028 */ 00029 00034 #include "acsncHelper.h" 00035 #include <acstimeProfiler.h> 00036 #include "acsncCDBProperties.h" 00037 00038 #include <list> 00039 namespace nc { 00040 00053 class Consumer : 00054 protected Helper, 00055 public POA_CosNotifyComm::StructuredPushConsumer, 00056 protected virtual PortableServer::RefCountServantBase 00057 { 00058 public: 00067 Consumer(const char* channelName); 00068 00076 Consumer(const char* channelName, 00077 CORBA::ORB_ptr orb_mp); 00078 00100 Consumer(const char* channelName, 00101 int argc, 00102 char *argv[]); 00103 00114 virtual void 00115 disconnect(); 00116 00130 virtual void 00131 push_structured_event(const CosNotification::StructuredEvent &publishedEvent)= 0; 00132 00146 virtual void 00147 offer_change(const CosNotification::EventTypeSeq &added, 00148 const CosNotification::EventTypeSeq &removed); 00149 00159 virtual void 00160 disconnect_structured_push_consumer(); 00161 00172 void 00173 consumerReady(); 00174 00184 void 00185 resume(); 00186 00197 void 00198 suspend(); 00199 00211 template <class T> void 00212 addSubscription() 00213 { 00214 //Create a tempory any 00215 CORBA::Any any; 00216 //Create a temporary instance of the ALMA event. 00217 T junk; 00218 00219 //Store the ALMA event in the any to extract the name 00220 //Probably a nicer way of doing this but it's the best that 00221 //I can come up with at the moment. 00222 any <<= junk; 00223 00224 //Use the version of addSubscription which really subscribes 00225 //to events from the channel 00226 00227 if (any.type()->kind()!=CORBA::tk_sequence) 00228 { 00229 addSubscription(any.type()->name()); 00230 } 00231 else 00232 { 00233 std::string etName= acsnc::SEQUENCE_EVENT_TYPE_PREFIX; //_SequenceOf_ 00234 CORBA::Any a; 00235 a._tao_set_typecode(any.type()->content_type()); 00236 etName+=a.type()->name(); 00237 addSubscription(etName.c_str()); 00238 } 00239 } 00240 00251 template <class T> void 00252 removeSubscription() 00253 { 00254 //Create a tempory any 00255 CORBA::Any any; 00256 //Create a temporary instance of the ALMA event. 00257 T junk; 00258 00259 //Store the ALMA event in the any to extract the name 00260 //Probably a nicer way of doing this but it's the best that 00261 //I can come up with at the moment. 00262 any <<= junk; 00263 00264 //Use the version of removeSubscription which really unsubscribes 00265 //from an event type. 00266 if (any.type()->kind()!=CORBA::tk_sequence) 00267 { 00268 removeSubscription(any.type()->name()); 00269 } 00270 else 00271 { 00272 std::string etName= acsnc::SEQUENCE_EVENT_TYPE_PREFIX; //_SequenceOf_" 00273 CORBA::Any a; 00274 a._tao_set_typecode(any.type()->content_type()); 00275 etName+=a.type()->name(); 00276 removeSubscription(etName.c_str()); 00277 } 00278 } 00279 00296 int 00297 addFilter(const char* type_name, 00298 const char* filter); 00299 00310 bool 00311 removeFilter(int filter_id); 00312 00317 virtual void reconnect(::NotifyMonitoringExt::EventChannelFactory *ecf); 00318 00319 void setAntennaName(std::string antennaName); 00320 00321 protected: 00333 void 00334 init(); 00335 00339 virtual ~Consumer(){} 00341 00351 void 00352 createConsumer(); 00353 00354 00356 00360 CosNotifyChannelAdmin::ConsumerAdmin_var consumerAdmin_m; 00361 00366 CosNotifyChannelAdmin::StructuredProxyPushSupplier_var proxySupplier_m; 00367 00372 unsigned long long numEvents_m; 00373 00377 CosNotifyComm::StructuredPushConsumer_var reference_m; 00378 00388 virtual const char* 00389 getFilterLanguage(); 00390 00394 CDBProperties::EventHandlerTimeoutMap handlerTimeoutMap_m; 00395 00402 static double DEFAULT_MAX_PROCESS_TIME; 00403 00407 Profiler *profiler_mp; 00408 00409 std::string antennaName; 00410 private: 00411 00421 void 00422 init(CORBA::ORB_ptr orb); 00423 00427 CORBA::ORB_ptr orb_mp; 00428 00432 void operator=(const Consumer&); 00433 00437 Consumer(const Consumer&); 00438 00450 void 00451 addSubscription(const char* type_name); 00452 00463 void 00464 removeSubscription(const char* type_name); 00465 00466 CosNotifyChannelAdmin::AdminID adminid; 00467 CosNotifyChannelAdmin::ProxyID proxySupplierID; 00469 }; 00470 }; 00471 00472 #endif /* CONSUMER_H */