14 #include "xscheduler.h"
16 shared_ptr<XSignalBuffer> g_signalBuffer;
18 #define ADAPTIVE_DELAY_MIN 10
19 #define ADAPTIVE_DELAY_MAX 100
25 g_signalBuffer->registerTransactionList(transaction);
28 XSignalBuffer::XSignalBuffer()
29 : m_oldest_timestamp(
XTime::now()) {
31 XSignalBuffer::~XSignalBuffer() {
34 XSignalBuffer::popOldest() {
37 item = m_queue.front();
38 if(m_skippedQueue.size()) {
39 if((
long)(m_queue.front()->registered_time - m_skippedQueue.front().second) > 0) {
40 skipped_item = m_skippedQueue.front().first;
46 if(m_skippedQueue.size())
47 skipped_item = m_skippedQueue.front().first;
52 m_skippedQueue.pop_front();
54 if(m_skippedQueue.size()) {
55 if((
long)(m_queue.front()->registered_time - m_skippedQueue.front().second) > 0)
56 m_oldest_timestamp = m_skippedQueue.front().second;
58 m_oldest_timestamp = m_queue.front()->registered_time;
61 m_oldest_timestamp = m_queue.front()->registered_time;
64 if(m_skippedQueue.size())
65 m_oldest_timestamp = m_skippedQueue.front().second;
68 assert( !skipped_item);
75 XTime time(transaction->registered_time);
77 for(
unsigned int i = 0; i < 20; i++) {
80 unsigned long cost = 0;
81 if( !m_queue.empty()) {
82 cost += XTime::now().diff_usec(m_oldest_timestamp);
84 if(cost < (g_adaptiveDelay + 10) * 1000uL) {
86 unsigned int delay = g_adaptiveDelay;
87 if(delay <= ADAPTIVE_DELAY_MIN)
break;
88 if(g_adaptiveDelay.compare_set_strong(delay, delay - 1)) {
95 if(g_adaptiveDelay < ADAPTIVE_DELAY_MAX) {
99 msecsleep(std::min(cost / 1000uL, 10uL));
103 bool empty = m_queue.empty();
104 if(empty) m_oldest_timestamp = transaction->registered_time;
105 m_queue.push(transaction);
108 catch (Queue::nospace_error &) {
119 XTime time_stamp_start(XTime::now());
120 unsigned int skipped_cnt = 0;
123 if(m_queue.empty() && (m_skippedQueue.size() <= skipped_cnt)) {
124 dotalk = !m_skippedQueue.empty();
134 skip = transaction->talkBuffered();
140 m_skippedQueue.emplace_back(
141 transaction, XTime::now());
147 if(XTime::now().diff_msec(time_stamp_start) > 30uL)
break;