18 #include "threadlocal.h"
19 #include "atomic_smart_ptr.h"
22 #include "allocator.h"
25 namespace Transactional {
85 template <
class T,
typename... Args>
86 static T *create(Args&&... args);
90 using NodeNotFoundError = std::domain_error;
97 bool insert(
Transaction<XN> &tr,
const shared_ptr<XN> &var,
bool online_after_insertion =
false);
101 void insert(
const shared_ptr<XN> &var);
108 void release(
const shared_ptr<XN> &var);
115 bool swap(
Transaction<XN> &tr,
const shared_ptr<XN> &x,
const shared_ptr<XN> &y);
118 void swap(
const shared_ptr<XN> &x,
const shared_ptr<XN> &y);
125 template <
typename Closure>
129 template <
typename Closure>
133 template <
typename Closure>
134 inline void iterate_commit_while(Closure);
140 Payload() noexcept : m_serial(-1), m_tr(
nullptr) {}
144 XN &
node() noexcept {
return *m_node;}
146 const XN &
node() const noexcept {
return *m_node;}
147 int64_t serial() const noexcept {
return this->m_serial;}
150 virtual void catchEvent(
const shared_ptr<XN>&,
int) {}
151 virtual void releaseEvent(
const shared_ptr<XN>&,
int) {}
152 virtual void moveEvent(
unsigned int ,
unsigned int ) {}
153 virtual void listChangeEvent() {}
170 using iterator =
typename NodeList::iterator;
171 using const_iterator =
typename NodeList::const_iterator;
174 Node &operator=(
const Node &) =
delete;
180 shared_ptr<NodeList> m_subnodes;
189 virtual typename PayloadWrapper::rettype_clone clone(
Transaction<XN> &tr, int64_t serial) {
191 auto p = make_local_shared<PayloadWrapper>( *this);
193 p->m_serial = serial;
198 PayloadWrapper(XN &node) noexcept : P::Payload(){ this->m_node = &node;}
211 Packet() noexcept : m_missing(
false) {}
212 int size()
const noexcept {
return subpackets() ? subpackets()->size() : 0;}
215 shared_ptr<NodeList> &subnodes() noexcept {
return subpackets()->m_subnodes;}
216 shared_ptr<PacketList> &subpackets() noexcept {
return m_subpackets;}
217 const shared_ptr<NodeList> &subnodes()
const noexcept {
return subpackets()->m_subnodes;}
218 const shared_ptr<PacketList> &subpackets()
const noexcept {
return m_subpackets;}
223 const Node &
node() const noexcept {
return payload()->node();}
226 bool missing() const noexcept {
return m_missing;}
234 shared_ptr<PacketList> m_subpackets;
239 using cnt_t = int64_t;
240 static cnt_t now() noexcept {
241 auto tm = XTime::now();
242 return (cnt_t)tm.sec() * 1000000uL + tm.usec();
247 using cnt_t = uint16_t;
249 operator cnt_t()
const noexcept {
return m_var;}
260 enum : int64_t {SERIAL_NULL = 0};
263 m_var = (int64_t)*stl_processID * 1000000000000uLL;
265 operator int64_t()
const noexcept {
return m_var;}
267 cnt_t &operator++(
int) noexcept {
268 m_var = ((m_var + 1) & 0xffffffffffffuLL)
269 | ((m_var) & 0xffff000000000000uLL);
274 static int64_t current() noexcept {
277 static int64_t gen() noexcept {
278 auto &v = *stl_serial;
292 PacketWrapper(
const shared_ptr<Linkage> &bp,
int reverse_index, int64_t bundle_serial) noexcept;
294 bool hasPriority()
const noexcept {
return m_ridx == (int)PACKET_STATE::PACKET_HAS_PRIORITY; }
299 shared_ptr<Linkage>
bundledBy() const noexcept {
return m_bundledBy.lock();}
302 void setReverseIndex(
int i) noexcept {m_ridx = i;}
305 weak_ptr<Linkage>
const m_bundledBy;
308 int64_t m_bundle_serial;
309 enum class PACKET_STATE : int { PACKET_HAS_PRIORITY = -1};
311 PacketWrapper(
const PacketWrapper &) =
delete;
318 void negotiate(
typename NegotiationCounter::cnt_t &started_time,
float mult_wait = 6.0f) noexcept {
319 auto transaction_started_time = m_transaction_started_time;
320 if( !transaction_started_time)
322 negotiate_internal(started_time, mult_wait);
324 void negotiate_internal(
typename NegotiationCounter::cnt_t &started_time,
float mult_wait) noexcept;
335 void snapshot(
Snapshot<XN> &target,
bool multi_nodal,
typename NegotiationCounter::cnt_t started_time)
const;
337 m_link->negotiate(target.m_started_time, 4.0f);
338 snapshot(
static_cast<Snapshot<XN> &
>(target), multi_nodal, target.m_started_time);
339 target.m_oldpacket = target.
m_packet;
341 enum class SnapshotStatus {SNAPSHOT_SUCCESS = 0, SNAPSHOT_DISTURBED = 1,
342 SNAPSHOT_VOID_PACKET = 2, SNAPSHOT_NODE_MISSING = 4,
343 SNAPSHOT_COLLIDED = 8, SNAPSHOT_NODE_MISSING_AND_COLLIDED = 12};
348 shared_ptr<Linkage> linkage;
352 enum class SnapshotMode {SNAPSHOT_FOR_UNBUNDLE, SNAPSHOT_FOR_BUNDLE};
353 static inline SnapshotStatus snapshotSupernode(
const shared_ptr<Linkage> &linkage, shared_ptr<Linkage> &linkage_super,
356 int64_t serial = SerialGenerator::SERIAL_NULL, CASInfoList *cas_infos =
nullptr);
364 enum class BundledStatus {BUNDLE_SUCCESS, BUNDLE_DISTURBED};
371 typename NegotiationCounter::cnt_t &started_time, int64_t bundle_serial,
bool is_bundle_root);
374 typename NegotiationCounter::cnt_t &started_time, int64_t bundle_serial);
375 enum class UnbundledStatus {UNBUNDLE_W_NEW_SUBVALUE,
376 UNBUNDLE_SUBVALUE_HAS_CHANGED,
377 UNBUNDLE_COLLIDED, UNBUNDLE_DISTURBED};
386 static UnbundledStatus unbundle(
const int64_t *bundle_serial,
typename NegotiationCounter::cnt_t &time_started,
404 bool copy_branch, int64_t tr_serial,
bool set_missing, XN** uppernode);
406 bool copy_branch, int64_t tr_serial = 0,
bool set_missing =
false);
413 bool copy_branch, int64_t tr_serial,
bool set_missing,
422 using FuncPayloadCreator = Payload *(*)(XN &);
424 void lookupFailure()
const;
426 bool copy_branch, int64_t tr_serial,
bool set_missing, XN **uppernode);
430 template <
class T,
typename... Args>
432 *T::stl_funcPayloadCreator = [](XN &node)->Payload *{
return new PayloadWrapper<T>(node);};
433 return new T(std::forward<Args>(args)...);
454 const typename T::Payload &
operator[](
const shared_ptr<T> &node)
const noexcept {
455 return operator[](const_cast<const T&>( *node));
459 const typename T::Payload &
operator[](
const T &node)
const noexcept {
462 return *
static_cast<const typename T::Payload*
>(payload.get());
467 const shared_ptr<const typename Node<XN>::NodeList>
list() const noexcept {
469 return shared_ptr<typename Node<XN>::NodeList>();
477 shared_ptr<const typename Node<XN>::NodeList>
list(
const shared_ptr<
Node<XN> > &node) const noexcept {
479 if( !packet->size() )
481 return packet->subnodes();
485 const shared_ptr<const typename Node<XN>::NodeList> lx(
list());
489 if(x.get() == &lower)
500 template <
typename T,
typename tArgRef>
501 void talk(T &talker, tArgRef arg)
const { talker.talk( *
this, std::forward<tArgRef>(arg)); }
503 friend class Node<XN>;
512 template <
class XN,
typename T>
520 return static_cast<const typename T::Payload *
>(this->
m_packet->payload().get());
530 #include "transaction_signal.h"
531 namespace Transactional {
549 Snapshot<XN>(), m_oldpacket(), m_multi_nodal(multi_nodal) {
551 node.snapshot( *
this, multi_nodal);
552 assert( &this->
m_packet->node() == &node);
553 assert( &this->m_oldpacket->node() == &node);
558 m_oldpacket(this->
m_packet), m_multi_nodal(multi_nodal) {
563 if(m_started_time && this->
m_packet) {
565 if(node.m_link->m_transaction_started_time == m_started_time) {
566 node.
m_link->m_transaction_started_time = 0;
574 typename T::Payload &operator[](const shared_ptr<T> &node) {
580 assert(isMultiNodal() || ( &node == &this->
m_packet->node()));
582 node.reverseLookup(this->m_packet,
true, this->m_serial)->payload());
583 if(payload->m_serial != this->m_serial) {
584 payload = payload->clone( *
this, this->m_serial);
585 auto &p( *static_cast<typename T::Payload *>(payload.get()));
588 auto &p( *static_cast<typename T::Payload *>(payload.get()));
591 bool isMultiNodal() const noexcept {
return m_multi_nodal;}
594 template <
typename T,
typename ...Args>
595 void mark(T &talker, Args&&...args) {
596 if(
auto m = talker.createMessage(this->m_serial, std::forward<Args>(args)...)) {
599 m_messages->push_back(m);
604 int unmark(
const shared_ptr<XListener> &x) {
607 for(
auto &&msg: *m_messages)
608 canceled += msg->unmark(x);
612 bool isModified() const noexcept {
613 return (this->
m_packet != this->m_oldpacket);
618 if( !isModified() || node.commit( *
this)) {
619 finalizeCommitment(node);
637 supernode.snapshot( *
this,
true);
647 friend class Node<XN>;
659 auto time(node.m_link->m_transaction_started_time);
660 if( !time || (time > m_started_time))
661 node.m_link->m_transaction_started_time = m_started_time;
664 this->
m_packet->node().snapshot( *
this, m_multi_nodal);
668 void finalizeCommitment(
Node<XN> &node);
671 const bool m_multi_nodal;
672 typename Node<XN>::NegotiationCounter::cnt_t m_started_time;
674 unique_ptr<MessageList> m_messages;
680 template <
class XN,
typename T>
687 return ( *
this)[
static_cast<T &
>(this->
m_packet->node())];
699 if(node.m_link->m_transaction_started_time == m_started_time) {
700 node.m_link->m_transaction_started_time = 0;
707 for(
auto &&msg: *m_messages)
714 template <
typename Closure>
719 return std::move(tr);
723 template <
typename Closure>
730 return std::move(tr);
734 template <
typename Closure>
747 bool copy_branch, int64_t tr_serial,
bool set_missing, XN **uppernode) {
749 if( &superpacket->node() ==
this)
750 foundpacket = &superpacket;
752 foundpacket = lookupFromChild(superpacket,
753 copy_branch, tr_serial, set_missing, uppernode);
755 if(copy_branch && (( *foundpacket)->payload()->m_serial != tr_serial)) {
766 bool copy_branch, int64_t tr_serial,
bool set_missing) {