eRPC
rpc.h
1 #pragma once
2 
3 #include <map>
4 #include <set>
5 #include "cc/timing_wheel.h"
6 #include "common.h"
7 #include "msg_buffer.h"
8 #include "nexus.h"
9 #include "pkthdr.h"
10 #include "req_handle.h"
11 #include "rpc_types.h"
12 #include "session.h"
13 #include "transport.h"
14 #include "util/buffer.h"
15 #include "util/fixed_queue.h"
16 #include "util/huge_alloc.h"
17 #include "util/logger.h"
18 #include "util/mt_queue.h"
19 #include "util/rand.h"
20 #include "util/timer.h"
21 #include "util/udp_client.h"
22 
23 namespace erpc {
24 
73 template <class TTr>
74 class Rpc {
75  friend class RpcTest;
76 
77  private:
79  static constexpr size_t kInitialHugeAllocSize = MB(8);
80 
82  static constexpr size_t kSMTimeoutMs = kTesting ? 10 : 100;
83 
84  public:
86  static constexpr size_t kMaxMsgSize =
87  HugeAlloc::k_max_class_size -
88  ((HugeAlloc::k_max_class_size / TTr::kMaxDataPerPkt) * sizeof(pkthdr_t));
89  static_assert((1 << kMsgSizeBits) >= kMaxMsgSize, "");
90  static_assert((1 << kPktNumBits) * TTr::kMaxDataPerPkt > 2 * kMaxMsgSize, "");
91 
111  Rpc(Nexus *nexus, void *context, uint8_t rpc_id, sm_handler_t sm_handler,
112  uint8_t phy_port = 0);
113 
115  ~Rpc();
116 
137  inline MsgBuffer alloc_msg_buffer(size_t max_data_size) {
138  assert(max_data_size > 0); // Doesn't work for max_data_size = 0
139 
140  // This function avoids division for small data sizes
141  size_t max_num_pkts = data_size_to_num_pkts(max_data_size);
142 
143  lock_cond(&huge_alloc_lock_);
144  Buffer buffer =
145  huge_alloc_->alloc(max_data_size + (max_num_pkts * sizeof(pkthdr_t)));
146  unlock_cond(&huge_alloc_lock_);
147 
148  if (unlikely(buffer.buf_ == nullptr)) {
149  MsgBuffer msg_buffer;
150  msg_buffer.buf_ = nullptr;
151  return msg_buffer;
152  }
153 
154  MsgBuffer msg_buffer(buffer, max_data_size, max_num_pkts);
155  return msg_buffer;
156  }
157 
170  static inline void resize_msg_buffer(MsgBuffer *msg_buffer,
171  size_t new_data_size) {
172  assert(new_data_size <= msg_buffer->max_data_size_);
173 
174  // Avoid division for single-packet data sizes
175  size_t new_num_pkts = data_size_to_num_pkts(new_data_size);
176  msg_buffer->resize(new_data_size, new_num_pkts);
177  }
178 
181  inline void free_msg_buffer(MsgBuffer msg_buffer) {
182  lock_cond(&huge_alloc_lock_);
183  huge_alloc_->free_buf(msg_buffer.buffer_);
184  unlock_cond(&huge_alloc_lock_);
185  }
186 
208  int create_session(std::string remote_uri, uint8_t rem_rpc_id) {
209  return create_session_st(remote_uri, rem_rpc_id);
210  }
211 
223  int destroy_session(int session_num) {
224  return destroy_session_st(session_num);
225  }
226 
255  void enqueue_request(int session_num, uint8_t req_type, MsgBuffer *req_msgbuf,
256  MsgBuffer *resp_msgbuf, erpc_cont_func_t cont_func,
257  void *tag, size_t cont_etid = kInvalidBgETid);
258 
283  void enqueue_response(ReqHandle *req_handle, MsgBuffer *resp_msgbuf);
284 
286  inline void run_event_loop(size_t timeout_ms) {
287  run_event_loop_timeout_st(timeout_ms);
288  }
289 
291  inline void run_event_loop_once() { run_event_loop_do_one_st(); }
292 
294  inline MsgBuffer alloc_msg_buffer_or_die(size_t max_data_size) {
295  MsgBuffer m = alloc_msg_buffer(max_data_size);
296  rt_assert(m.buf_ != nullptr);
297  return m;
298  }
299 
302  size_t num_active_sessions() { return num_active_sessions_st(); }
303 
306  bool is_connected(int session_num) const {
307  return session_vec_[static_cast<size_t>(session_num)]->is_connected();
308  }
309 
311  size_t get_bandwidth() const { return transport_->get_bandwidth(); }
312 
314  size_t get_num_re_tx(int session_num) const {
315  Session *session = session_vec_[static_cast<size_t>(session_num)];
316  return session->client_info_.num_re_tx_;
317  }
318 
320  void reset_num_re_tx(int session_num) {
321  Session *session = session_vec_[static_cast<size_t>(session_num)];
322  session->client_info_.num_re_tx_ = 0;
323  }
324 
326  inline size_t get_stat_user_alloc_tot() {
327  lock_cond(&huge_alloc_lock_);
328  size_t ret = huge_alloc_->get_stat_user_alloc_tot();
329  unlock_cond(&huge_alloc_lock_);
330  return ret;
331  }
332 
334  Timely *get_timely(int session_num) {
335  Session *session = session_vec_[static_cast<size_t>(session_num)];
336  return &session->client_info_.cc_.timely_;
337  }
338 
340  TimingWheel *get_wheel() { return wheel_; }
341 
344  inline void set_context(void *_context) {
345  rt_assert(context_ == nullptr, "Cannot reset non-null Rpc context");
346  context_ = _context;
347  }
348 
350  inline void set_pre_resp_msgbuf_size(size_t new_pre_resp_msgbuf_size) {
351  pre_resp_msgbuf_size_ = new_pre_resp_msgbuf_size;
352  }
353 
355  inline HugeAlloc *get_huge_alloc() const {
356  rt_assert(nexus_->num_bg_threads_ == 0,
357  "Cannot extract allocator because background threads exist.");
358  return huge_alloc_;
359  }
360 
362  static inline constexpr size_t get_max_data_per_pkt() {
363  return TTr::kMaxDataPerPkt;
364  }
365 
367  std::string get_remote_hostname(int session_num) const {
368  return session_vec_[static_cast<size_t>(session_num)]
370  }
371 
373  static inline constexpr size_t get_max_num_sessions() {
374  return Transport::kNumRxRingEntries / kSessionCredits;
375  }
376 
378  static inline size_t get_max_msg_size() { return kMaxMsgSize; }
379 
381  inline uint8_t get_rpc_id() const { return rpc_id_; }
382 
384  inline bool in_background() const { return !in_dispatch(); }
385 
387  inline size_t get_etid() const { return tls_registry_->get_etid(); }
388 
390  inline double get_freq_ghz() const { return freq_ghz_; }
391 
394  return to_sec(rdtsc() - creation_tsc_, freq_ghz_);
395  }
396 
398  double get_avg_rx_batch() {
399  if (!kDatapathStats || dpath_stats_.rx_burst_calls_ == 0) return -1.0;
400  return dpath_stats_.pkts_rx_ * 1.0 / dpath_stats_.rx_burst_calls_;
401  }
402 
404  double get_avg_tx_batch() {
405  if (!kDatapathStats || dpath_stats_.tx_burst_calls_ == 0) return -1.0;
406  return dpath_stats_.pkts_tx_ * 1.0 / dpath_stats_.tx_burst_calls_;
407  }
408 
411  memset(reinterpret_cast<void *>(&dpath_stats_), 0, sizeof(dpath_stats_));
412  }
413 
419 
424  void fault_inject_set_pkt_drop_prob_st(double pkt_drop_prob);
425 
426  private:
427  int create_session_st(std::string remote_uri, uint8_t rem_rpc_id);
428  int destroy_session_st(int session_num);
429  size_t num_active_sessions_st();
430 
431  //
432  // Session management helper functions
433  //
434 
436  void handle_sm_rx_st();
437 
441  void bury_session_st(Session *);
442 
445  void sm_pkt_udp_tx_st(const SmPkt &);
446 
450  void send_sm_req_st(Session *);
451 
452  //
453  // Session management packet handlers
454  //
455  void handle_connect_req_st(const SmPkt &);
456  void handle_connect_resp_st(const SmPkt &);
457 
458  void handle_disconnect_req_st(const SmPkt &);
459  void handle_disconnect_resp_st(const SmPkt &);
460 
463  bool handle_reset_client_st(Session *session);
464 
467  bool handle_reset_server_st(Session *session);
468 
469  //
470  // Methods to bury server-side request and response MsgBuffers. Client-side
471  // request and response MsgBuffers are owned by user apps, so eRPC doesn't
472  // free their backing memory.
473  //
474 
483  inline void bury_resp_msgbuf_server_st(SSlot *sslot) {
484  assert(in_dispatch());
485 
486  // Free the response MsgBuffer iff it's the dynamically allocated response.
487  // This high-specificity checks prevents freeing a null tx_msgbuf.
488  if (sslot->tx_msgbuf_ == &sslot->dyn_resp_msgbuf_) {
489  MsgBuffer *tx_msgbuf = sslot->tx_msgbuf_;
490  free_msg_buffer(*tx_msgbuf);
491  // Need not nullify tx_msgbuf->buffer.buf: we'll just nullify tx_msgbuf
492  }
493 
494  sslot->tx_msgbuf_ = nullptr;
495  }
496 
504  inline void bury_req_msgbuf_server_st(SSlot *sslot) {
505  MsgBuffer &req_msgbuf = sslot->server_info_.req_msgbuf_;
506  if (unlikely(req_msgbuf.is_dynamic())) {
507  free_msg_buffer(req_msgbuf);
508  req_msgbuf.buffer_.buf_ = nullptr; // Mark invalid for future
509  }
510 
511  req_msgbuf.buf_ = nullptr;
512  }
513 
514  //
515  // Handle available ring entries
516  //
517 
519  bool have_ring_entries() const {
520  return ring_entries_available_ >= kSessionCredits;
521  }
522 
524  void alloc_ring_entries() {
525  assert(have_ring_entries());
526  ring_entries_available_ -= kSessionCredits;
527  }
528 
530  void free_ring_entries() {
531  ring_entries_available_ += kSessionCredits;
532  assert(ring_entries_available_ <= Transport::kNumRxRingEntries);
533  }
534 
535  //
536  // Datapath helpers
537  //
538 
541  static inline size_t resp_ntoi(size_t pkt_num, size_t num_req_pkts) {
542  return pkt_num - (num_req_pkts - 1);
543  }
544 
547  inline size_t in_order_client(const SSlot *sslot, const pkthdr_t *pkthdr) {
548  // Counters for pkthdr's request number are valid only if req numbers match
549  if (unlikely(pkthdr->req_num_ != sslot->cur_req_num_)) return false;
550 
551  const auto &ci = sslot->client_info_;
552  if (unlikely(pkthdr->pkt_num_ != ci.num_rx_)) return false;
553 
554  // Ignore spurious packets received as a consequence of rollback:
555  // 1. We've only sent pkts up to (ci.num_tx - 1). Ignore later packets.
556  // 2. Ignore if the corresponding client packet for pkthdr is still in wheel
557  if (unlikely(pkthdr->pkt_num_ >= ci.num_tx_)) return false;
558 
559  if (kCcPacing &&
560  unlikely(ci.in_wheel_[pkthdr->pkt_num_ % kSessionCredits])) {
561  pkt_loss_stats_.still_in_wheel_during_retx_++;
562  return false;
563  }
564 
565  return true;
566  }
567 
574  static size_t data_size_to_num_pkts(size_t data_size) {
575  if (data_size <= TTr::kMaxDataPerPkt) return 1;
576  return (data_size + TTr::kMaxDataPerPkt - 1) / TTr::kMaxDataPerPkt;
577  }
578 
581  static inline size_t wire_pkts(MsgBuffer *req_msgbuf,
582  MsgBuffer *resp_msgbuf) {
583  return req_msgbuf->num_pkts_ + resp_msgbuf->num_pkts_ - 1;
584  }
585 
587  static inline bool req_pkts_pending(SSlot *sslot) {
588  return sslot->client_info_.num_tx_ < sslot->tx_msgbuf_->num_pkts_;
589  }
590 
592  inline bool can_bypass_wheel(SSlot *sslot) const {
593  if (!kCcPacing) return true;
594  if (kTesting) return faults_.hard_wheel_bypass_;
595  if (kCcOptWheelBypass) {
596  // To prevent reordering, do not bypass the wheel if it contains packets
597  // for this session.
598  return sslot->client_info_.wheel_count_ == 0 &&
599  sslot->session_->is_uncongested();
600  }
601  return false;
602  }
603 
606  void drain_tx_batch_and_dma_queue() {
607  if (tx_batch_i_ > 0) do_tx_burst_st();
608  transport_->tx_flush();
609  }
610 
612  inline void add_to_active_rpc_list(SSlot &sslot) {
613  SSlot *prev_tail = active_rpcs_tail_sentinel_.client_info_.prev_;
614 
615  prev_tail->client_info_.next_ = &sslot;
616  sslot.client_info_.prev_ = prev_tail;
617 
618  sslot.client_info_.next_ = &active_rpcs_tail_sentinel_;
619  active_rpcs_tail_sentinel_.client_info_.prev_ = &sslot;
620  }
621 
623  inline void delete_from_active_rpc_list(SSlot &sslot) {
624  sslot.client_info_.prev_->client_info_.next_ = sslot.client_info_.next_;
625  sslot.client_info_.next_->client_info_.prev_ = sslot.client_info_.prev_;
626  }
627 
628  //
629  // Datapath processing
630  //
631 
633  void run_event_loop_timeout_st(size_t timeout_ms);
634 
636  void run_event_loop_do_one_st();
637 
641  void kick_req_st(SSlot *);
642 
646  void kick_rfr_st(SSlot *);
647 
650  void process_small_req_st(SSlot *, pkthdr_t *);
651 
653  void process_large_req_one_st(SSlot *, const pkthdr_t *);
654 
659  void process_resp_one_st(SSlot *, const pkthdr_t *, size_t rx_tsc);
660 
668  void enqueue_cr_st(SSlot *sslot, const pkthdr_t *req_pkthdr);
669 
674  void process_expl_cr_st(SSlot *, const pkthdr_t *, size_t rx_tsc);
675 
685  void enqueue_rfr_st(SSlot *sslot, const pkthdr_t *resp_pkthdr);
686 
688  void process_rfr_st(SSlot *, const pkthdr_t *);
689 
694  inline void enqueue_pkt_tx_burst_st(SSlot *sslot, size_t pkt_idx,
695  size_t *tx_ts) {
696  assert(in_dispatch());
697  const MsgBuffer *tx_msgbuf = sslot->tx_msgbuf_;
698 
699  Transport::tx_burst_item_t &item = tx_burst_arr_[tx_batch_i_];
700  item.routing_info_ = sslot->session_->remote_routing_info_;
701  item.msg_buffer_ = const_cast<MsgBuffer *>(tx_msgbuf);
702  item.pkt_idx_ = pkt_idx;
703  if (kCcRTT) item.tx_ts_ = tx_ts;
704 
705  if (kTesting) {
706  item.drop_ = roll_pkt_drop();
707  testing_.pkthdr_tx_queue_.push(*tx_msgbuf->get_pkthdr_n(pkt_idx));
708  }
709 
710  ERPC_TRACE("Rpc %u, lsn %u (%s): TX %s. Slot %s.%s\n", rpc_id_,
711  sslot->session_->local_session_num_,
712  sslot->session_->get_remote_hostname().c_str(),
713  tx_msgbuf->get_pkthdr_str(pkt_idx).c_str(),
714  sslot->progress_str().c_str(), item.drop_ ? " Drop." : "");
715 
716  tx_batch_i_++;
717  if (tx_batch_i_ == TTr::kPostlist) do_tx_burst_st();
718  }
719 
722  inline void enqueue_hdr_tx_burst_st(SSlot *sslot, MsgBuffer *ctrl_msgbuf,
723  size_t *tx_ts) {
724  assert(in_dispatch());
725 
726  Transport::tx_burst_item_t &item = tx_burst_arr_[tx_batch_i_];
727  item.routing_info_ = sslot->session_->remote_routing_info_;
728  item.msg_buffer_ = ctrl_msgbuf;
729  item.pkt_idx_ = 0;
730  if (kCcRTT) item.tx_ts_ = tx_ts;
731 
732  if (kTesting) {
733  item.drop_ = roll_pkt_drop();
734  testing_.pkthdr_tx_queue_.push(*ctrl_msgbuf->get_pkthdr_0());
735  }
736 
737  ERPC_TRACE("Rpc %u, lsn %u (%s): TX %s. Slot %s.%s.\n", rpc_id_,
738  sslot->session_->local_session_num_,
739  sslot->session_->get_remote_hostname().c_str(),
740  ctrl_msgbuf->get_pkthdr_str(0).c_str(),
741  sslot->progress_str().c_str(), item.drop_ ? " Drop." : "");
742 
743  tx_batch_i_++;
744  if (tx_batch_i_ == TTr::kPostlist) do_tx_burst_st();
745  }
746 
748  inline void enqueue_wheel_req_st(SSlot *sslot, size_t pkt_num) {
749  const size_t pkt_idx = pkt_num;
750  size_t pktsz =
751  sslot->tx_msgbuf_->get_pkt_size<TTr::kMaxDataPerPkt>(pkt_idx);
752  size_t ref_tsc = dpath_rdtsc();
753  size_t desired_tx_tsc =
754  sslot->session_->cc_getupdate_tx_tsc(ref_tsc, pktsz);
755 
756  ERPC_CC("Rpc %u: lsn/req/pkt %u/%zu/%zu, REQ wheeled for %.3f us.\n",
757  rpc_id_, sslot->session_->local_session_num_, sslot->cur_req_num_,
758  pkt_num, to_usec(desired_tx_tsc - creation_tsc_, freq_ghz_));
759 
760  wheel_->insert(wheel_ent_t(sslot, pkt_num), ref_tsc, desired_tx_tsc);
761  sslot->client_info_.in_wheel_[pkt_num % kSessionCredits] = true;
762  sslot->client_info_.wheel_count_++;
763  }
764 
766  inline void enqueue_wheel_rfr_st(SSlot *sslot, size_t pkt_num) {
767  const size_t pkt_idx = resp_ntoi(pkt_num, sslot->tx_msgbuf_->num_pkts_);
768  const MsgBuffer *resp_msgbuf = sslot->client_info_.resp_msgbuf_;
769  size_t pktsz = resp_msgbuf->get_pkt_size<TTr::kMaxDataPerPkt>(pkt_idx);
770  size_t ref_tsc = dpath_rdtsc();
771  size_t desired_tx_tsc =
772  sslot->session_->cc_getupdate_tx_tsc(ref_tsc, pktsz);
773 
774  ERPC_CC("Rpc %u: lsn/req/pkt %u/%zu/%zu, RFR wheeled for %.3f us.\n",
775  rpc_id_, sslot->session_->local_session_num_, sslot->cur_req_num_,
776  pkt_num, to_usec(desired_tx_tsc - creation_tsc_, freq_ghz_));
777 
778  wheel_->insert(wheel_ent_t(sslot, pkt_num), ref_tsc, desired_tx_tsc);
779  sslot->client_info_.in_wheel_[pkt_num % kSessionCredits] = true;
780  sslot->client_info_.wheel_count_++;
781  }
782 
784  inline void do_tx_burst_st() {
785  assert(in_dispatch());
786  assert(tx_batch_i_ > 0);
787 
788  // Measure TX burst size
789  dpath_stat_inc(dpath_stats_.tx_burst_calls_, 1);
790  dpath_stat_inc(dpath_stats_.pkts_tx_, tx_batch_i_);
791 
792  if (kCcRTT) {
793  size_t batch_tsc = 0;
794  if (kCcOptBatchTsc) batch_tsc = dpath_rdtsc();
795 
796  for (size_t i = 0; i < tx_batch_i_; i++) {
797  if (tx_burst_arr_[i].tx_ts_ != nullptr) {
798  *tx_burst_arr_[i].tx_ts_ = kCcOptBatchTsc ? batch_tsc : dpath_rdtsc();
799  }
800  }
801  }
802 
803  transport_->tx_burst(tx_burst_arr_, tx_batch_i_);
804  tx_batch_i_ = 0;
805  }
806 
808  static inline void bump_credits(Session *session) {
809  assert(session->is_client());
810  assert(session->client_info_.credits_ < kSessionCredits);
811  session->client_info_.credits_++;
812  }
813 
815  static inline void copy_data_to_msgbuf(MsgBuffer *msgbuf, size_t pkt_idx,
816  const pkthdr_t *pkthdr) {
817  size_t offset = pkt_idx * TTr::kMaxDataPerPkt;
818  size_t to_copy =
819  (std::min)(TTr::kMaxDataPerPkt, pkthdr->msg_size_ - offset);
820  memcpy(&msgbuf->buf_[offset], pkthdr + 1, to_copy); // From end of pkthdr
821  }
822 
832  void process_comps_st();
833 
840  void submit_bg_req_st(SSlot *sslot);
841 
852  void submit_bg_resp_st(erpc_cont_func_t cont_func, void *tag, size_t bg_etid);
853 
854  //
855  // Queue handlers
856  //
857 
859  void process_credit_stall_queue_st();
860 
862  void process_wheel_st();
863 
865  void process_bg_queues_enqueue_request_st();
866 
868  void process_bg_queues_enqueue_response_st();
869 
874  void fault_inject_check_ok() const;
875 
876  //
877  // Packet loss handling
878  //
879 
881  void pkt_loss_scan_st();
882 
884  void pkt_loss_retransmit_st(SSlot *sslot);
885 
886  //
887  // Misc private functions
888  //
889 
891  inline bool in_dispatch() const { return get_etid() == creator_etid_; }
892 
894  inline bool is_usr_session_num_in_range_st(int session_num) const {
895  assert(in_dispatch());
896  return session_num >= 0 &&
897  static_cast<size_t>(session_num) < session_vec_.size();
898  }
899 
901  inline void lock_cond(std::mutex *mutex) {
902  if (unlikely(multi_threaded_)) mutex->lock();
903  }
904 
906  inline void unlock_cond(std::mutex *mutex) {
907  if (unlikely(multi_threaded_)) mutex->unlock();
908  }
909 
918  inline void update_timely_rate(SSlot *sslot, size_t pkt_num, size_t rx_tsc) {
919  size_t rtt_tsc =
920  rx_tsc - sslot->client_info_.tx_ts_[pkt_num % kSessionCredits];
921  // This might use Timely bypass
922  sslot->session_->client_info_.cc_.timely_.update_rate(rx_tsc, rtt_tsc);
923  }
924 
926  inline bool roll_pkt_drop() {
927  static constexpr uint32_t kBillion = 1000000000;
928  return ((fast_rand_.next_u32() % kBillion) <
929  faults_.pkt_drop_thresh_billion_);
930  }
931 
932  public:
933  // Hooks for apps to modify eRPC behavior
934 
938 
939  private:
940  // Constructor args
941  Nexus *nexus_;
942  void *context_;
943  const uint8_t rpc_id_;
944  const sm_handler_t sm_handler_;
945  const uint8_t phy_port_;
946  const size_t numa_node_;
947 
948  // Derived
949  const size_t creation_tsc_;
950  const bool multi_threaded_;
951  const double freq_ghz_;
952  const size_t rpc_rto_cycles_;
953  const size_t rpc_pkt_loss_scan_cycles_;
954 
957  const std::array<ReqFunc, kReqTypeArraySize> req_func_arr_;
958 
959  // Rpc metadata
960  size_t creator_etid_;
961  TlsRegistry *tls_registry_;
962 
963  // Sessions
964 
968  std::vector<Session *> session_vec_;
969 
970  // Transport
971  TTr *transport_ = nullptr;
972 
974  size_t ring_entries_available_ = TTr::kNumRxRingEntries;
975 
976  Transport::tx_burst_item_t tx_burst_arr_[TTr::kPostlist];
977  size_t tx_batch_i_ = 0;
978 
984  uint8_t *rx_ring_[TTr::kNumRxRingEntries];
985  size_t rx_ring_head_ = 0;
986 
987  std::vector<SSlot *> stallq_;
988 
989  size_t ev_loop_tsc_;
990 
991  // Packet loss
992  size_t pkt_loss_scan_tsc_;
993 
1001  SSlot active_rpcs_root_sentinel_, active_rpcs_tail_sentinel_;
1002 
1003  // Allocator
1004  HugeAlloc *huge_alloc_ = nullptr;
1005  std::mutex huge_alloc_lock_;
1006 
1007  MsgBuffer ctrl_msgbufs_[2 * TTr::kUnsigBatch];
1008  size_t ctrl_msgbuf_head_ = 0;
1009  FastRand fast_rand_;
1010 
1011  // Cold members live below, in order of coolness
1012 
1015  TimingWheel *wheel_;
1016 
1018  struct {
1019  MtQueue<enq_req_args_t> enqueue_request_;
1020  MtQueue<enq_resp_args_t> enqueue_response_;
1021  } bg_queues_;
1022 
1023  // Misc
1024  SlowRand slow_rand_;
1025  UDPClient<SmPkt> udp_client_;
1026  Nexus::Hook nexus_hook_;
1027 
1033  std::map<conn_req_uniq_token_t, uint16_t> conn_req_token_map_;
1034 
1036  std::set<uint16_t> sm_pending_reqs_;
1037 
1039  struct {
1040  bool fail_resolve_rinfo_ = false;
1041  bool hard_wheel_bypass_ = false;
1042  double pkt_drop_prob_ = 0.0;
1043 
1046  } faults_;
1047 
1048  // Additional members for testing
1049  struct {
1050  FixedQueue<pkthdr_t, kSessionCredits> pkthdr_tx_queue_;
1051  } testing_;
1052 
1055  FILE *trace_file_;
1056 
1058  struct {
1059  size_t ev_loop_calls_ = 0;
1060  size_t pkts_tx_ = 0;
1061  size_t tx_burst_calls_ = 0;
1062  size_t pkts_rx_ = 0;
1063  size_t rx_burst_calls_ = 0;
1064  } dpath_stats_;
1065 
1066  public:
1067  struct {
1068  size_t num_re_tx_ = 0;
1069 
1073  } pkt_loss_stats_;
1074 
1078  size_t pre_resp_msgbuf_size_ = TTr::kMaxDataPerPkt;
1079 };
1080 
1081 // This goes at the end of every Rpc implementation file to force compilation
1082 #define FORCE_COMPILE_TRANSPORTS template class Rpc<CTransport>;
1083 } // namespace erpc
int create_session(std::string remote_uri, uint8_t rem_rpc_id)
A session is a connection between two eRPC endpoints (similar to a TCP connection). This function creates a session to a remote Rpc object and initiates session connection. A session management callback of type kConnected or kConnectFailed will be invoked after session creation completes or fails.
Definition: rpc.h:208
void set_pre_resp_msgbuf_size(size_t new_pre_resp_msgbuf_size)
Change this Rpc&#39;s preallocated response message buffer size.
Definition: rpc.h:350
size_t pre_resp_msgbuf_size_
Definition: rpc.h:1078
double pkt_drop_prob_
Probability of dropping an RPC packet.
Definition: rpc.h:1042
size_t get_bandwidth() const
Return the physical link bandwidth (bytes per second)
Definition: rpc.h:311
uint32_t pkt_drop_thresh_billion_
Derived: Drop packet iff urand[0, ..., one billion] is smaller than this.
Definition: rpc.h:1045
Timely * get_timely(int session_num)
Return the Timely instance for a connected session. Expert use only.
Definition: rpc.h:334
bool is_connected(int session_num) const
Definition: rpc.h:306
static void resize_msg_buffer(MsgBuffer *msg_buffer, size_t new_data_size)
Resize a MsgBuffer to fit a request or response. Safe to call from background threads (TS)...
Definition: rpc.h:170
void fault_inject_fail_resolve_rinfo_st()
Inject a fault that always fails all routing info resolution.
static constexpr size_t get_max_data_per_pkt()
Return the maximum data size in one packet for the (private) transport.
Definition: rpc.h:362
int destroy_session(int session_num)
Disconnect and destroy a session. The application must not use this session number after this functio...
Definition: rpc.h:223
std::string get_remote_hostname(int session_num) const
Return the hostname of the remote endpoint for a connected session.
Definition: rpc.h:367
size_t get_stat_user_alloc_tot()
Return the total amount of huge page memory allocated to the user.
Definition: rpc.h:326
void reset_num_re_tx(int session_num)
Reset the number of retransmissions for a connected session.
Definition: rpc.h:320
Rpc(Nexus *nexus, void *context, uint8_t rpc_id, sm_handler_t sm_handler, uint8_t phy_port=0)
Construct the Rpc object.
TimingWheel * get_wheel()
Return the Timing Wheel for this Rpc. Expert use only.
Definition: rpc.h:340
~Rpc()
Destroy the Rpc from a foreground thread.
static size_t get_max_msg_size()
Return the data size in bytes that can be sent in one request or response.
Definition: rpc.h:378
static constexpr size_t get_max_num_sessions()
Return the maximum number of sessions supported.
Definition: rpc.h:373
void run_event_loop(size_t timeout_ms)
Run the event loop for some milliseconds.
Definition: rpc.h:286
void reset_dpath_stats()
Reset all datapath stats to zero.
Definition: rpc.h:410
void free_msg_buffer(MsgBuffer msg_buffer)
Definition: rpc.h:181
Applications store request and response messages in hugepage-backed buffers called message buffers...
Definition: msg_buffer.h:28
uint8_t get_rpc_id() const
Return the ID of this Rpc object.
Definition: rpc.h:381
bool retry_connect_on_invalid_rpc_id_
Definition: rpc.h:937
size_t get_num_re_tx(int session_num) const
Return the number of retransmissions for a connected session.
Definition: rpc.h:314
bool fail_resolve_rinfo_
Fail routing info resolution.
Definition: rpc.h:1040
double get_avg_rx_batch()
Return the average number of packets received in a call to rx_burst.
Definition: rpc.h:398
double get_freq_ghz() const
Return RDTSC frequency in GHz.
Definition: rpc.h:390
Types exposed to the eRPC user.
double sec_since_creation()
Return the number of seconds elapsed since this Rpc was created.
Definition: rpc.h:393
void run_event_loop_once()
Run the event loop once.
Definition: rpc.h:291
Definition: nexus.h:12
bool in_background() const
Return true iff the caller is running in a background thread.
Definition: rpc.h:384
void set_context(void *_context)
Definition: rpc.h:344
double get_avg_tx_batch()
Return the average number of packets sent in a call to tx_burst.
Definition: rpc.h:404
Handle object passed by eRPC to the application&#39;s request handler callbacks.
Definition: req_handle.h:7
static constexpr size_t kMaxMsgSize
Max request or response data size, i.e., excluding packet headers.
Definition: rpc.h:86
size_t num_active_sessions()
Definition: rpc.h:302
size_t still_in_wheel_during_retx_
Total retransmissions across all sessions.
Definition: rpc.h:1072
MsgBuffer alloc_msg_buffer_or_die(size_t max_data_size)
Identical to alloc_msg_buffer(), but throws an exception on failure.
Definition: rpc.h:294
void enqueue_request(int session_num, uint8_t req_type, MsgBuffer *req_msgbuf, MsgBuffer *resp_msgbuf, erpc_cont_func_t cont_func, void *tag, size_t cont_etid=kInvalidBgETid)
Enqueue a request for transmission. This always succeeds. eRPC owns the request and response msgbufs ...
A per-process library object used for initializing eRPC.
Definition: nexus.h:21
void enqueue_response(ReqHandle *req_handle, MsgBuffer *resp_msgbuf)
Enqueue a response for transmission at the server. This must be either the request handle&#39;s prealloca...
void fault_inject_set_pkt_drop_prob_st(double pkt_drop_prob)
Set the TX packet drop probability for this Rpc.
bool hard_wheel_bypass_
Wheel bypass regardless of congestion.
Definition: rpc.h:1041
size_t get_etid() const
Return the eRPC thread ID of the caller.
Definition: rpc.h:387
uint8_t * buf_
Definition: msg_buffer.h:152
HugeAlloc * get_huge_alloc() const
Retrieve this Rpc&#39;s hugepage allocator. For expert use only.
Definition: rpc.h:355
MsgBuffer alloc_msg_buffer(size_t max_data_size)
Create a hugepage-backed buffer for storing request or response messages. Safe to call from backgroun...
Definition: rpc.h:137
void(* erpc_cont_func_t)(void *context, void *tag)
The type of the continuation callback invoked at the client. This returns ownership of the request an...
Definition: rpc_types.h:49