BOSS 7.0.7
BESIII Offline Software System
Loading...
Searching...
No Matches
WriterRpc.cc
Go to the documentation of this file.
1#ifndef DISTBOSS_WRITER_RPC_CC
2#define DISTBOSS_WRITER_RPC_CC
3
5#include <iostream>
6
7template<class Writer>
8WriterRpc<Writer>::WriterRpc(const std::string& svrName, const std::string& fname)
9 : DimRpc(svrName.c_str(), "I", "I"),
10 m_evtDone(0)
11{
12 m_writer = new PthrWriterBufPool<Writer>(fname);
13
14 m_svrName = svrName.substr( svrName.find_last_of('/')+1 );
15}
16
17template<class Writer>
19{
20 if ( m_writer->stat() == 0 ) {
21 while ( m_evtBak.size() > 0 ) {
22 clearBak(-1);
23 }
24 if ( dis_get_n_clients(itsIdOut) > 0 ) {
25 std::cout << "[" << m_svrName << "] PROBLEMS @ TERMINATING. FORCE TO QUIT" << std::endl;
26 }
28 m_writer->writeEvent(&code, 4);
29 }
30
31 delete m_writer;
32
33 for (std::map<int, AutoEnlargeBuffer*>::iterator it = m_evtBak.begin(); it != m_evtBak.end(); ++it) {
34 delete it->second;
35 }
36
37 std::cout << "[" << m_svrName << "] Terminated. Total events processed: " << m_evtDone << std::endl;
38}
39
40template<class Writer>
42{
43 int nClients = 0;
44 int i = 0;
45
46 do {
47 // wait for all the clients dis-connecting to this server
48 // and force the server to stop when the total waiting time
49 // is greater than 10s(5*1s)
50 if ( ++i > 5 ) break;
51 sleep(2);
52 nClients = dis_get_n_clients(itsIdOut);
53 }
54 while ( nClients > 0 );
55
56 return nClients;
57}
58
59template<class Writer>
61{
62 m_size = getSize();
63 m_pEvt = getData();
64
65 int clientId = DimServer::getClientId();
66 int outCode = DistBossCode::StatusSuccess;
67
68 if ( m_size == 4 ) {
69 unsigned int inCode = *(unsigned int*)m_pEvt;
70
71 if ( inCode == DistBossCode::StatusFinalize ) {
72 std::cout << "[" << m_svrName << "] Client [" << DimServer::getClientName() << "] finalized" << std::endl;
73 clearBak(clientId);
74 if ( dis_get_n_clients(itsIdOut) <= 1 ) {
75 m_writer->writeEvent(m_pEvt, m_size);
76 }
77 }
78 else if ( inCode == DistBossCode::ClientReady ) {
79 std::cout << "[" << m_svrName << "] Client [" << DimServer::getClientName() << "] connected" << std::endl;
80 }
81 }
82 else if ( m_size > 4 ) {
83 std::map<int, AutoEnlargeBuffer*>::iterator it = m_evtBak.find( clientId );
84 if ( it == m_evtBak.end() ) {
85 m_evtBak[clientId] = new AutoEnlargeBuffer();
86 m_evtBak[clientId]->copy(m_pEvt, m_size );
87 }
88
89 void* pbak = m_evtBak[clientId]->data();
90 if ( *(int*)pbak != *(int*)m_pEvt ) {
91 m_writer->writeEvent(pbak, m_evtBak[clientId]->size());
92 ++m_evtDone;
93 }
94
95 m_evtBak[clientId]->copy(m_pEvt, m_size );
96 }
97 else {
99 }
100
101 setData(outCode);
102}
103
104template<class Writer>
105void WriterRpc<Writer>::clearBak(int clientId)
106{
107 std::map<int, AutoEnlargeBuffer*>::iterator it = (clientId<0) ? m_evtBak.begin() : m_evtBak.find( clientId );
108 if ( it != m_evtBak.end() ) {
109 m_writer->writeEvent( it->second->data(), it->second->size() );
110 ++m_evtDone;
111
112 delete it->second;
113 m_evtBak.erase(it);
114 }
115}
116
117#endif
int wait_to_end()
Definition: WriterRpc.cc:41
virtual ~WriterRpc()
Definition: WriterRpc.cc:18