xscheduler.cpp
1 /***************************************************************************
2  Copyright (C) 2002-2015 Kentaro Kitagawa
3  kitagawa@phys.s.u-tokyo.ac.jp
4 
5  This program is free software; you can redistribute it and/or
6  modify it under the terms of the GNU Library General Public
7  License as published by the Free Software Foundation; either
8  version 2 of the License, or (at your option) any later version.
9 
10  You should have received a copy of the GNU Library General
11  Public License and a list of authors along with this program;
12  see the files COPYING and AUTHORS.
13 ***************************************************************************/
14 #include "xscheduler.h"
15 
16 shared_ptr<XSignalBuffer> g_signalBuffer;
17 
18 #define ADAPTIVE_DELAY_MIN 10
19 #define ADAPTIVE_DELAY_MAX 100
20 
21 atomic<unsigned int> g_adaptiveDelay = ADAPTIVE_DELAY_MIN;
22 
23 void
24 registerTransactionList(XTransaction_ *transaction) {
25  g_signalBuffer->registerTransactionList(transaction);
26 }
27 
28 XSignalBuffer::XSignalBuffer()
29  : m_oldest_timestamp(XTime::now()) {
30 }
31 XSignalBuffer::~XSignalBuffer() {
32 }
34 XSignalBuffer::popOldest() {
35  XTransaction_ *item = 0L, *skipped_item = 0L;
36  if(m_queue.size()) {
37  item = m_queue.front();
38  if(m_skippedQueue.size()) {
39  if((long)(m_queue.front()->registered_time - m_skippedQueue.front().second) > 0) {
40  skipped_item = m_skippedQueue.front().first;
41  item = 0L;
42  }
43  }
44  }
45  else {
46  if(m_skippedQueue.size())
47  skipped_item = m_skippedQueue.front().first;
48  }
49  if(item)
50  m_queue.pop();
51  if(skipped_item)
52  m_skippedQueue.pop_front();
53  if(m_queue.size()) {
54  if(m_skippedQueue.size()) {
55  if((long)(m_queue.front()->registered_time - m_skippedQueue.front().second) > 0)
56  m_oldest_timestamp = m_skippedQueue.front().second;
57  else
58  m_oldest_timestamp = m_queue.front()->registered_time;
59  }
60  else
61  m_oldest_timestamp = m_queue.front()->registered_time;
62  }
63  else {
64  if(m_skippedQueue.size())
65  m_oldest_timestamp = m_skippedQueue.front().second;
66  }
67  if(item) {
68  assert( !skipped_item);
69  return item;
70  }
71  return skipped_item;
72 }
73 void
75  XTime time(transaction->registered_time);
76  for(;;) {
77  for(unsigned int i = 0; i < 20; i++) {
78  if(isMainThread())
79  break;
80  unsigned long cost = 0;
81  if( !m_queue.empty()) {
82  cost += XTime::now().diff_usec(m_oldest_timestamp);
83  }
84  if(cost < (g_adaptiveDelay + 10) * 1000uL) {
85  for(;;) {
86  unsigned int delay = g_adaptiveDelay;
87  if(delay <= ADAPTIVE_DELAY_MIN) break;
88  if(g_adaptiveDelay.compare_set_strong(delay, delay - 1)) {
89  break;
90  }
91  }
92  break;
93  }
94  if(cost > 100uL) {
95  if(g_adaptiveDelay < ADAPTIVE_DELAY_MAX) {
96  ++g_adaptiveDelay;
97  }
98  }
99  msecsleep(std::min(cost / 1000uL, 10uL));
100  time = XTime::now();
101  }
102  try {
103  bool empty = m_queue.empty();
104  if(empty) m_oldest_timestamp = transaction->registered_time;
105  m_queue.push(transaction);
106  break;
107  }
108  catch (Queue::nospace_error &) {
109  if(isMainThread())
110  synchronize();
111  else
112  msecsleep(10);
113  }
114  }
115 }
116 bool
118  bool dotalk = true;
119  XTime time_stamp_start(XTime::now());
120  unsigned int skipped_cnt = 0;
121 
122  for(;;) {
123  if(m_queue.empty() && (m_skippedQueue.size() <= skipped_cnt)) {
124  dotalk = !m_skippedQueue.empty();
125  break;
126  }
127  XTransaction_ *transaction = popOldest();
128  if( !transaction) {
129  dotalk = false;
130  break;
131  }
132  bool skip = false;
133  try {
134  skip = transaction->talkBuffered();
135  }
136  catch (XKameError &e) {
137  e.print();
138  }
139  if(skip) {
140  m_skippedQueue.emplace_back(
141  transaction, XTime::now());
142  skipped_cnt++;
143  }
144  else {
145  delete transaction;
146  }
147  if(XTime::now().diff_msec(time_stamp_start) > 30uL) break;
148  }
149  return !dotalk;
150 }
151 

Generated for KAME4 by  doxygen 1.8.3