eRPC
rpc.h
1 #pragma once
2 
3 #include <set>
4 #include "cc/timing_wheel.h"
5 #include "common.h"
6 #include "msg_buffer.h"
7 #include "nexus.h"
8 #include "pkthdr.h"
9 #include "rpc_types.h"
10 #include "session.h"
11 #include "transport.h"
12 #include "transport_impl/infiniband/ib_transport.h"
13 #include "transport_impl/raw/raw_transport.h"
14 #include "util/buffer.h"
15 #include "util/fixed_queue.h"
16 #include "util/huge_alloc.h"
17 #include "util/logger.h"
18 #include "util/mt_queue.h"
19 #include "util/rand.h"
20 #include "util/timer.h"
21 #include "util/udp_client.h"
22 
23 namespace erpc {
24 
69 template <class TTr>
70 class Rpc {
71  friend class RpcTest;
72 
73  private:
75  static constexpr size_t kInitialHugeAllocSize = MB(8);
76 
78  static constexpr size_t kSMTimeoutMs = kTesting ? 10 : 100;
79 
80  public:
82  static constexpr size_t kMaxMsgSize =
83  HugeAlloc::kMaxClassSize -
84  ((HugeAlloc::kMaxClassSize / TTr::kMaxDataPerPkt) * sizeof(pkthdr_t));
85  static_assert((1 << kMsgSizeBits) >= kMaxMsgSize, "");
86  static_assert((1 << kPktNumBits) * TTr::kMaxDataPerPkt > 2 * kMaxMsgSize, "");
87 
107  Rpc(Nexus *nexus, void *context, uint8_t rpc_id, sm_handler_t sm_handler,
108  uint8_t phy_port = 0);
109 
111  ~Rpc();
112 
133  inline MsgBuffer alloc_msg_buffer(size_t max_data_size) {
134  assert(max_data_size > 0); // Doesn't work for max_data_size = 0
135 
136  // This function avoids division for small data sizes
137  size_t max_num_pkts = data_size_to_num_pkts(max_data_size);
138 
139  lock_cond(&huge_alloc_lock);
140  Buffer buffer =
141  huge_alloc->alloc(max_data_size + (max_num_pkts * sizeof(pkthdr_t)));
142  unlock_cond(&huge_alloc_lock);
143 
144  if (unlikely(buffer.buf == nullptr)) {
145  MsgBuffer msg_buffer;
146  msg_buffer.buf = nullptr;
147  return msg_buffer;
148  }
149 
150  MsgBuffer msg_buffer(buffer, max_data_size, max_num_pkts);
151  return msg_buffer;
152  }
153 
166  static inline void resize_msg_buffer(MsgBuffer *msg_buffer,
167  size_t new_data_size) {
168  assert(new_data_size <= msg_buffer->max_data_size);
169 
170  // Avoid division for single-packet data sizes
171  size_t new_num_pkts = data_size_to_num_pkts(new_data_size);
172  msg_buffer->resize(new_data_size, new_num_pkts);
173  }
174 
177  inline void free_msg_buffer(MsgBuffer msg_buffer) {
178  lock_cond(&huge_alloc_lock);
179  huge_alloc->free_buf(msg_buffer.buffer);
180  unlock_cond(&huge_alloc_lock);
181  }
182 
194  int create_session(std::string remote_uri, uint8_t rem_rpc_id) {
195  return create_session_st(remote_uri, rem_rpc_id);
196  }
197 
209  int destroy_session(int session_num) {
210  return destroy_session_st(session_num);
211  }
212 
241  void enqueue_request(int session_num, uint8_t req_type, MsgBuffer *req_msgbuf,
242  MsgBuffer *resp_msgbuf, erpc_cont_func_t cont_func,
243  void *tag, size_t cont_etid = kInvalidBgETid);
244 
266  void enqueue_response(ReqHandle *req_handle, MsgBuffer *resp_msgbuf);
267 
269  inline void run_event_loop(size_t timeout_ms) {
270  run_event_loop_timeout_st(timeout_ms);
271  }
272 
274  inline void run_event_loop_once() { run_event_loop_do_one_st(); }
275 
277  inline MsgBuffer alloc_msg_buffer_or_die(size_t max_data_size) {
278  MsgBuffer m = alloc_msg_buffer(max_data_size);
279  rt_assert(m.buf != nullptr);
280  return m;
281  }
282 
285  size_t num_active_sessions() { return num_active_sessions_st(); }
286 
289  bool is_connected(int session_num) const {
290  return session_vec[static_cast<size_t>(session_num)]->is_connected();
291  }
292 
294  size_t get_bandwidth() const { return transport->get_bandwidth(); }
295 
297  size_t get_num_re_tx(int session_num) const {
298  Session *session = session_vec[static_cast<size_t>(session_num)];
299  return session->client_info.num_re_tx;
300  }
301 
303  void reset_num_re_tx(int session_num) {
304  Session *session = session_vec[static_cast<size_t>(session_num)];
305  session->client_info.num_re_tx = 0;
306  }
307 
309  inline size_t get_stat_user_alloc_tot() {
310  lock_cond(&huge_alloc_lock);
311  size_t ret = huge_alloc->get_stat_user_alloc_tot();
312  unlock_cond(&huge_alloc_lock);
313  return ret;
314  }
315 
317  Timely *get_timely(int session_num) {
318  Session *session = session_vec[static_cast<size_t>(session_num)];
319  return &session->client_info.cc.timely;
320  }
321 
323  TimingWheel *get_wheel() { return wheel; }
324 
327  inline void set_context(void *_context) {
328  rt_assert(context == nullptr, "Cannot reset non-null Rpc context");
329  context = _context;
330  }
331 
333  inline void set_pre_resp_msgbuf_size(size_t new_pre_resp_msgbuf_size) {
334  pre_resp_msgbuf_size = new_pre_resp_msgbuf_size;
335  }
336 
338  inline HugeAlloc *get_huge_alloc() const {
339  rt_assert(nexus->num_bg_threads == 0,
340  "Cannot extract allocator because background threads exist.");
341  return huge_alloc;
342  }
343 
345  static inline constexpr size_t get_max_data_per_pkt() {
346  return TTr::kMaxDataPerPkt;
347  }
348 
350  std::string get_remote_hostname(int session_num) const {
351  return session_vec[static_cast<size_t>(session_num)]->get_remote_hostname();
352  }
353 
355  static inline constexpr size_t get_max_num_sessions() {
356  return Transport::kNumRxRingEntries / kSessionCredits;
357  }
358 
360  static inline size_t get_max_msg_size() { return kMaxMsgSize; }
361 
363  inline uint8_t get_rpc_id() const { return rpc_id; }
364 
366  inline bool in_background() const { return !in_dispatch(); }
367 
369  inline size_t get_etid() const { return tls_registry->get_etid(); }
370 
372  inline double get_freq_ghz() const { return freq_ghz; }
373 
376  return to_sec(rdtsc() - creation_tsc, freq_ghz);
377  }
378 
380  double get_avg_rx_batch() {
381  if (!kDatapathStats || dpath_stats.rx_burst_calls == 0) return -1.0;
382  return dpath_stats.pkts_rx * 1.0 / dpath_stats.rx_burst_calls;
383  }
384 
386  double get_avg_tx_batch() {
387  if (!kDatapathStats || dpath_stats.tx_burst_calls == 0) return -1.0;
388  return dpath_stats.pkts_tx * 1.0 / dpath_stats.tx_burst_calls;
389  }
390 
393  memset(reinterpret_cast<void *>(&dpath_stats), 0, sizeof(dpath_stats));
394  }
395 
401 
407 
408  private:
409  int create_session_st(std::string remote_uri, uint8_t rem_rpc_id);
410  int destroy_session_st(int session_num);
411  size_t num_active_sessions_st();
412 
413  //
414  // Session management helper functions
415  //
416 
418  void handle_sm_rx_st();
419 
423  void bury_session_st(Session *);
424 
427  void sm_pkt_udp_tx_st(const SmPkt &);
428 
432  void send_sm_req_st(Session *);
433 
434  //
435  // Session management packet handlers
436  //
437  void handle_connect_req_st(const SmPkt &);
438  void handle_connect_resp_st(const SmPkt &);
439 
440  void handle_disconnect_req_st(const SmPkt &);
441  void handle_disconnect_resp_st(const SmPkt &);
442 
445  bool handle_reset_client_st(Session *session);
446 
449  bool handle_reset_server_st(Session *session);
450 
451  //
452  // Methods to bury server-side request and response MsgBuffers. Client-side
453  // request and response MsgBuffers are owned by user apps, so eRPC doesn't
454  // free their backing memory.
455  //
456 
465  inline void bury_resp_msgbuf_server_st(SSlot *sslot) {
466  assert(in_dispatch());
467 
468  // Free the response MsgBuffer iff it's the dynamically allocated response.
469  // This high-specificity checks prevents freeing a null tx_msgbuf.
470  if (sslot->tx_msgbuf == &sslot->dyn_resp_msgbuf) {
471  MsgBuffer *tx_msgbuf = sslot->tx_msgbuf;
472  free_msg_buffer(*tx_msgbuf);
473  // Need not nullify tx_msgbuf->buffer.buf: we'll just nullify tx_msgbuf
474  }
475 
476  sslot->tx_msgbuf = nullptr;
477  }
478 
486  inline void bury_req_msgbuf_server_st(SSlot *sslot) {
487  MsgBuffer &req_msgbuf = sslot->server_info.req_msgbuf;
488  if (unlikely(req_msgbuf.is_dynamic())) {
489  free_msg_buffer(req_msgbuf);
490  req_msgbuf.buffer.buf = nullptr; // Mark invalid for future
491  }
492 
493  req_msgbuf.buf = nullptr;
494  }
495 
496  //
497  // Handle available ring entries
498  //
499 
501  bool have_ring_entries() const {
502  return ring_entries_available >= kSessionCredits;
503  }
504 
506  void alloc_ring_entries() {
507  assert(have_ring_entries());
508  ring_entries_available -= kSessionCredits;
509  }
510 
512  void free_ring_entries() {
513  ring_entries_available += kSessionCredits;
514  assert(ring_entries_available <= Transport::kNumRxRingEntries);
515  }
516 
517  //
518  // Datapath helpers
519  //
520 
523  static inline size_t resp_ntoi(size_t pkt_num, size_t num_req_pkts) {
524  return pkt_num - (num_req_pkts - 1);
525  }
526 
529  inline size_t in_order_client(const SSlot *sslot, const pkthdr_t *pkthdr) {
530  // Counters for pkthdr's request number are valid only if req numbers match
531  if (unlikely(pkthdr->req_num != sslot->cur_req_num)) return false;
532 
533  const auto &ci = sslot->client_info;
534  if (unlikely(pkthdr->pkt_num != ci.num_rx)) return false;
535 
536  // Ignore spurious packets received as a consequence of rollback:
537  // 1. We've only sent pkts up to (ci.num_tx - 1). Ignore later packets.
538  // 2. Ignore if the corresponding client packet for pkthdr is still in wheel
539  if (unlikely(pkthdr->pkt_num >= ci.num_tx)) return false;
540 
541  if (kCcPacing && unlikely(ci.in_wheel[pkthdr->pkt_num % kSessionCredits])) {
542  pkt_loss_stats.still_in_wheel_during_retx++;
543  return false;
544  }
545 
546  return true;
547  }
548 
555  static size_t data_size_to_num_pkts(size_t data_size) {
556  if (data_size <= TTr::kMaxDataPerPkt) return 1;
557  return (data_size + TTr::kMaxDataPerPkt - 1) / TTr::kMaxDataPerPkt;
558  }
559 
562  static inline size_t wire_pkts(MsgBuffer *req_msgbuf,
563  MsgBuffer *resp_msgbuf) {
564  return req_msgbuf->num_pkts + resp_msgbuf->num_pkts - 1;
565  }
566 
568  static inline bool req_pkts_pending(SSlot *sslot) {
569  return sslot->client_info.num_tx < sslot->tx_msgbuf->num_pkts;
570  }
571 
573  inline bool can_bypass_wheel(SSlot *sslot) const {
574  if (!kCcPacing) return true;
575  if (kTesting) return faults.hard_wheel_bypass;
576  if (kCcOptWheelBypass) {
577  // To prevent reordering, do not bypass the wheel if it contains packets
578  // for this session.
579  return sslot->client_info.wheel_count == 0 &&
580  sslot->session->is_uncongested();
581  }
582  return false;
583  }
584 
587  void drain_tx_batch_and_dma_queue() {
588  if (tx_batch_i > 0) do_tx_burst_st();
589  transport->tx_flush();
590  }
591 
593  inline void add_to_active_rpc_list(SSlot &sslot) {
594  SSlot *prev_tail = active_rpcs_tail_sentinel.client_info.prev;
595 
596  prev_tail->client_info.next = &sslot;
597  sslot.client_info.prev = prev_tail;
598 
599  sslot.client_info.next = &active_rpcs_tail_sentinel;
600  active_rpcs_tail_sentinel.client_info.prev = &sslot;
601  }
602 
604  inline void delete_from_active_rpc_list(SSlot &sslot) {
605  sslot.client_info.prev->client_info.next = sslot.client_info.next;
606  sslot.client_info.next->client_info.prev = sslot.client_info.prev;
607  }
608 
609  //
610  // Datapath processing
611  //
612 
614  void run_event_loop_timeout_st(size_t timeout_ms);
615 
617  void run_event_loop_do_one_st();
618 
622  void kick_req_st(SSlot *);
623 
627  void kick_rfr_st(SSlot *);
628 
631  void process_small_req_st(SSlot *, pkthdr_t *);
632 
634  void process_large_req_one_st(SSlot *, const pkthdr_t *);
635 
640  void process_resp_one_st(SSlot *, const pkthdr_t *, size_t rx_tsc);
641 
649  void enqueue_cr_st(SSlot *sslot, const pkthdr_t *req_pkthdr);
650 
655  void process_expl_cr_st(SSlot *, const pkthdr_t *, size_t rx_tsc);
656 
666  void enqueue_rfr_st(SSlot *sslot, const pkthdr_t *resp_pkthdr);
667 
669  void process_rfr_st(SSlot *, const pkthdr_t *);
670 
675  inline void enqueue_pkt_tx_burst_st(SSlot *sslot, size_t pkt_idx,
676  size_t *tx_ts) {
677  assert(in_dispatch());
678  const MsgBuffer *tx_msgbuf = sslot->tx_msgbuf;
679 
680  Transport::tx_burst_item_t &item = tx_burst_arr[tx_batch_i];
681  item.routing_info = sslot->session->remote_routing_info;
682  item.msg_buffer = const_cast<MsgBuffer *>(tx_msgbuf);
683  item.pkt_idx = pkt_idx;
684  if (kCcRTT) item.tx_ts = tx_ts;
685 
686  if (kTesting) {
687  item.drop = roll_pkt_drop();
688  testing.pkthdr_tx_queue.push(*tx_msgbuf->get_pkthdr_n(pkt_idx));
689  }
690 
691  ERPC_TRACE("Rpc %u, lsn %u (%s): TX %s. Slot %s.%s\n", rpc_id,
692  sslot->session->local_session_num,
693  sslot->session->get_remote_hostname().c_str(),
694  tx_msgbuf->get_pkthdr_str(pkt_idx).c_str(),
695  sslot->progress_str().c_str(), item.drop ? " Drop." : "");
696 
697  tx_batch_i++;
698  if (tx_batch_i == TTr::kPostlist) do_tx_burst_st();
699  }
700 
703  inline void enqueue_hdr_tx_burst_st(SSlot *sslot, MsgBuffer *ctrl_msgbuf,
704  size_t *tx_ts) {
705  assert(in_dispatch());
706 
707  Transport::tx_burst_item_t &item = tx_burst_arr[tx_batch_i];
708  item.routing_info = sslot->session->remote_routing_info;
709  item.msg_buffer = ctrl_msgbuf;
710  item.pkt_idx = 0;
711  if (kCcRTT) item.tx_ts = tx_ts;
712 
713  if (kTesting) {
714  item.drop = roll_pkt_drop();
715  testing.pkthdr_tx_queue.push(*ctrl_msgbuf->get_pkthdr_0());
716  }
717 
718  ERPC_TRACE("Rpc %u, lsn %u (%s): TX %s. Slot %s.%s.\n", rpc_id,
719  sslot->session->local_session_num,
720  sslot->session->get_remote_hostname().c_str(),
721  ctrl_msgbuf->get_pkthdr_str(0).c_str(),
722  sslot->progress_str().c_str(), item.drop ? " Drop." : "");
723 
724  tx_batch_i++;
725  if (tx_batch_i == TTr::kPostlist) do_tx_burst_st();
726  }
727 
729  inline void enqueue_wheel_req_st(SSlot *sslot, size_t pkt_num) {
730  const size_t pkt_idx = pkt_num;
731  size_t pktsz = sslot->tx_msgbuf->get_pkt_size<TTr::kMaxDataPerPkt>(pkt_idx);
732  size_t ref_tsc = dpath_rdtsc();
733  size_t desired_tx_tsc = sslot->session->cc_getupdate_tx_tsc(ref_tsc, pktsz);
734 
735  ERPC_CC("Rpc %u: lsn/req/pkt %u/%zu/%zu, REQ wheeled for %.3f us.\n",
736  rpc_id, sslot->session->local_session_num, sslot->cur_req_num,
737  pkt_num, to_usec(desired_tx_tsc - creation_tsc, freq_ghz));
738 
739  wheel->insert(wheel_ent_t(sslot, pkt_num), ref_tsc, desired_tx_tsc);
740  sslot->client_info.in_wheel[pkt_num % kSessionCredits] = true;
741  sslot->client_info.wheel_count++;
742  }
743 
745  inline void enqueue_wheel_rfr_st(SSlot *sslot, size_t pkt_num) {
746  const size_t pkt_idx = resp_ntoi(pkt_num, sslot->tx_msgbuf->num_pkts);
747  const MsgBuffer *resp_msgbuf = sslot->client_info.resp_msgbuf;
748  size_t pktsz = resp_msgbuf->get_pkt_size<TTr::kMaxDataPerPkt>(pkt_idx);
749  size_t ref_tsc = dpath_rdtsc();
750  size_t desired_tx_tsc = sslot->session->cc_getupdate_tx_tsc(ref_tsc, pktsz);
751 
752  ERPC_CC("Rpc %u: lsn/req/pkt %u/%zu/%zu, RFR wheeled for %.3f us.\n",
753  rpc_id, sslot->session->local_session_num, sslot->cur_req_num,
754  pkt_num, to_usec(desired_tx_tsc - creation_tsc, freq_ghz));
755 
756  wheel->insert(wheel_ent_t(sslot, pkt_num), ref_tsc, desired_tx_tsc);
757  sslot->client_info.in_wheel[pkt_num % kSessionCredits] = true;
758  sslot->client_info.wheel_count++;
759  }
760 
762  inline void do_tx_burst_st() {
763  assert(in_dispatch());
764  assert(tx_batch_i > 0);
765 
766  // Measure TX burst size
767  dpath_stat_inc(dpath_stats.tx_burst_calls, 1);
768  dpath_stat_inc(dpath_stats.pkts_tx, tx_batch_i);
769 
770  if (kCcRTT) {
771  size_t batch_tsc = 0;
772  if (kCcOptBatchTsc) batch_tsc = dpath_rdtsc();
773 
774  for (size_t i = 0; i < tx_batch_i; i++) {
775  if (tx_burst_arr[i].tx_ts != nullptr) {
776  *tx_burst_arr[i].tx_ts = kCcOptBatchTsc ? batch_tsc : dpath_rdtsc();
777  }
778  }
779  }
780 
781  transport->tx_burst(tx_burst_arr, tx_batch_i);
782  tx_batch_i = 0;
783  }
784 
786  static inline void bump_credits(Session *session) {
787  assert(session->is_client());
788  assert(session->client_info.credits < kSessionCredits);
789  session->client_info.credits++;
790  }
791 
793  static inline void copy_data_to_msgbuf(MsgBuffer *msgbuf, size_t pkt_idx,
794  const pkthdr_t *pkthdr) {
795  size_t offset = pkt_idx * TTr::kMaxDataPerPkt;
796  size_t to_copy = std::min(TTr::kMaxDataPerPkt, pkthdr->msg_size - offset);
797  memcpy(&msgbuf->buf[offset], pkthdr + 1, to_copy); // From end of pkthdr
798  }
799 
809  void process_comps_st();
810 
817  void submit_bg_req_st(SSlot *sslot);
818 
829  void submit_bg_resp_st(erpc_cont_func_t cont_func, void *tag, size_t bg_etid);
830 
831  //
832  // Queue handlers
833  //
834 
836  void process_credit_stall_queue_st();
837 
839  void process_wheel_st();
840 
842  void process_bg_queues_enqueue_request_st();
843 
845  void process_bg_queues_enqueue_response_st();
846 
851  void fault_inject_check_ok() const;
852 
853  //
854  // Packet loss handling
855  //
856 
858  void pkt_loss_scan_st();
859 
861  void pkt_loss_retransmit_st(SSlot *sslot);
862 
863  //
864  // Misc private functions
865  //
866 
868  inline bool in_dispatch() const { return get_etid() == creator_etid; }
869 
871  inline bool is_usr_session_num_in_range_st(int session_num) const {
872  assert(in_dispatch());
873  return session_num >= 0 &&
874  static_cast<size_t>(session_num) < session_vec.size();
875  }
876 
878  inline void lock_cond(std::mutex *mutex) {
879  if (unlikely(multi_threaded)) mutex->lock();
880  }
881 
883  inline void unlock_cond(std::mutex *mutex) {
884  if (unlikely(multi_threaded)) mutex->unlock();
885  }
886 
895  inline void update_timely_rate(SSlot *sslot, size_t pkt_num, size_t rx_tsc) {
896  size_t rtt_tsc =
897  rx_tsc - sslot->client_info.tx_ts[pkt_num % kSessionCredits];
898  // This might use Timely bypass
899  sslot->session->client_info.cc.timely.update_rate(rx_tsc, rtt_tsc);
900  }
901 
903  inline bool roll_pkt_drop() {
904  static constexpr uint32_t billion = 1000000000;
905  return ((fast_rand.next_u32() % billion) < faults.pkt_drop_thresh_billion);
906  }
907 
908  public:
909  // Hooks for apps to modify eRPC behavior
910 
914 
915  private:
916  // Constructor args
917  Nexus *nexus;
918  void *context;
919  const uint8_t rpc_id;
920  const sm_handler_t sm_handler;
921  const uint8_t phy_port;
922  const size_t numa_node;
923 
924  // Derived
925  const size_t creation_tsc;
926  const bool multi_threaded;
927  const double freq_ghz;
928  const size_t rpc_rto_cycles;
929  const size_t rpc_pkt_loss_scan_cycles;
930 
933  const std::array<ReqFunc, kReqTypeArraySize> req_func_arr;
934 
935  // Rpc metadata
936  size_t creator_etid;
937  TlsRegistry *tls_registry;
938 
939  // Sessions
940 
944  std::vector<Session *> session_vec;
945 
946  // Transport
947  TTr *transport = nullptr;
948 
950  size_t ring_entries_available = TTr::kNumRxRingEntries;
951 
952  Transport::tx_burst_item_t tx_burst_arr[TTr::kPostlist];
953  size_t tx_batch_i = 0;
954 
960  uint8_t *rx_ring[TTr::kNumRxRingEntries];
961  size_t rx_ring_head = 0;
962 
963  std::vector<SSlot *> stallq;
964 
965  size_t ev_loop_tsc;
966 
967  // Packet loss
968  size_t pkt_loss_scan_tsc;
969 
977  SSlot active_rpcs_root_sentinel, active_rpcs_tail_sentinel;
978 
979  // Allocator
980  HugeAlloc *huge_alloc = nullptr;
981  std::mutex huge_alloc_lock;
982 
983  MsgBuffer ctrl_msgbufs[2 * TTr::kUnsigBatch];
984  size_t ctrl_msgbuf_head = 0;
985  FastRand fast_rand;
986 
987  // Cold members live below, in order of coolness
988 
991  TimingWheel *wheel;
992 
994  struct {
995  MtQueue<enq_req_args_t> _enqueue_request;
996  MtQueue<enq_resp_args_t> _enqueue_response;
997  } bg_queues;
998 
999  // Misc
1000  SlowRand slow_rand;
1001  UDPClient<SmPkt> udp_client;
1002  Nexus::Hook nexus_hook;
1003 
1009  std::map<conn_req_uniq_token_t, uint16_t> conn_req_token_map;
1010 
1012  std::set<uint16_t> sm_pending_reqs;
1013 
1015  struct {
1016  bool fail_resolve_rinfo = false;
1017  bool hard_wheel_bypass = false;
1018  double pkt_drop_prob = 0.0;
1019 
1022  } faults;
1023 
1024  // Additional members for testing
1025  struct {
1026  FixedQueue<pkthdr_t, kSessionCredits> pkthdr_tx_queue;
1027  } testing;
1028 
1031  FILE *trace_file;
1032 
1034  struct {
1035  size_t ev_loop_calls = 0;
1036  size_t pkts_tx = 0;
1037  size_t tx_burst_calls = 0;
1038  size_t pkts_rx = 0;
1039  size_t rx_burst_calls = 0;
1040  } dpath_stats;
1041 
1042  public:
1043  struct {
1044  size_t num_re_tx = 0;
1045 
1049  } pkt_loss_stats;
1050 
1054  size_t pre_resp_msgbuf_size = TTr::kMaxDataPerPkt;
1055 };
1056 
1057 // This goes at the end of every Rpc implementation file to force compilation
1058 #define FORCE_COMPILE_TRANSPORTS template class Rpc<CTransport>;
1059 } // namespace erpc
int create_session(std::string remote_uri, uint8_t rem_rpc_id)
Create a session to a remote Rpc object and initiate session connection. A session management callbac...
Definition: rpc.h:194
void set_pre_resp_msgbuf_size(size_t new_pre_resp_msgbuf_size)
Change this Rpc's preallocated response message buffer size.
Definition: rpc.h:333
bool retry_connect_on_invalid_rpc_id
Definition: rpc.h:913
size_t get_bandwidth() const
Return the physical link bandwidth (bytes per second)
Definition: rpc.h:294
Timely * get_timely(int session_num)
Return the Timely instance for a connected session. Expert use only.
Definition: rpc.h:317
bool is_connected(int session_num) const
Definition: rpc.h:289
static void resize_msg_buffer(MsgBuffer *msg_buffer, size_t new_data_size)
Resize a MsgBuffer to fit a request or response. Safe to call from background threads (TS).
Definition: rpc.h:166
size_t pre_resp_msgbuf_size
Definition: rpc.h:1054
void fault_inject_fail_resolve_rinfo_st()
Inject a fault that always fails all routing info resolution.
static constexpr size_t get_max_data_per_pkt()
Return the maximum data size in one packet for the (private) transport.
Definition: rpc.h:345
bool hard_wheel_bypass
Wheel bypass regardless of congestion.
Definition: rpc.h:1017
int destroy_session(int session_num)
Disconnect and destroy a session. The application must not use this session number after this functio...
Definition: rpc.h:209
std::string get_remote_hostname(int session_num) const
Return the hostname of the remote endpoint for a connected session.
Definition: rpc.h:350
size_t get_stat_user_alloc_tot()
Return the total amount of huge page memory allocated to the user.
Definition: rpc.h:309
void reset_num_re_tx(int session_num)
Reset the number of retransmissions for a connected session.
Definition: rpc.h:303
Rpc(Nexus *nexus, void *context, uint8_t rpc_id, sm_handler_t sm_handler, uint8_t phy_port=0)
Construct the Rpc object.
TimingWheel * get_wheel()
Return the Timing Wheel for this Rpc. Expert use only.
Definition: rpc.h:323
~Rpc()
Destroy the Rpc from a foreground thread.
bool fail_resolve_rinfo
Fail routing info resolution.
Definition: rpc.h:1016
static size_t get_max_msg_size()
Return the data size in bytes that can be sent in one request or response.
Definition: rpc.h:360
static constexpr size_t get_max_num_sessions()
Return the maximum number of sessions supported.
Definition: rpc.h:355
uint8_t * buf
Definition: msg_buffer.h:152
void run_event_loop(size_t timeout_ms)
Run the event loop for some milliseconds.
Definition: rpc.h:269
double pkt_drop_prob
Probability of dropping an RPC packet.
Definition: rpc.h:1018
void reset_dpath_stats()
Reset all datapath stats to zero.
Definition: rpc.h:392
void free_msg_buffer(MsgBuffer msg_buffer)
Definition: rpc.h:177
Applications store request and response messages in hugepage-backed buffers called message buffers....
Definition: msg_buffer.h:28
size_t still_in_wheel_during_retx
Total retransmissions across all sessions.
Definition: rpc.h:1048
uint8_t get_rpc_id() const
Return the ID of this Rpc object.
Definition: rpc.h:363
size_t get_num_re_tx(int session_num) const
Return the number of retransmissions for a connected session.
Definition: rpc.h:297
double get_avg_rx_batch()
Return the average number of packets received in a call to rx_burst.
Definition: rpc.h:380
double get_freq_ghz() const
Return RDTSC frequency in GHz.
Definition: rpc.h:372
uint32_t pkt_drop_thresh_billion
Derived: Drop packet iff urand[0, ..., one billion] is smaller than this.
Definition: rpc.h:1021
Types exposed to the eRPC user.
double sec_since_creation()
Return the number of seconds elapsed since this Rpc was created.
Definition: rpc.h:375
void run_event_loop_once()
Run the event loop once.
Definition: rpc.h:274
bool in_background() const
Return true iff the caller is running in a background thread.
Definition: rpc.h:366
void set_context(void *_context)
Definition: rpc.h:327
double get_avg_tx_batch()
Return the average number of packets sent in a call to tx_burst.
Definition: rpc.h:386
static constexpr size_t kMaxMsgSize
Max request or response data size, i.e., excluding packet headers.
Definition: rpc.h:82
size_t num_active_sessions()
Definition: rpc.h:285
MsgBuffer alloc_msg_buffer_or_die(size_t max_data_size)
Identical to alloc_msg_buffer(), but throws an exception on failure.
Definition: rpc.h:277
void enqueue_request(int session_num, uint8_t req_type, MsgBuffer *req_msgbuf, MsgBuffer *resp_msgbuf, erpc_cont_func_t cont_func, void *tag, size_t cont_etid=kInvalidBgETid)
Enqueue a request for transmission. This always succeeds. eRPC owns msg_buffer until it invokes the c...
A per-process library object used for initializing eRPC.
Definition: nexus.h:22
void enqueue_response(ReqHandle *req_handle, MsgBuffer *resp_msgbuf)
Enqueue a response for transmission at the server. See ReqHandle for details about creating the respo...
void fault_inject_set_pkt_drop_prob_st(double pkt_drop_prob)
Set the TX packet drop probability for this Rpc.
size_t get_etid() const
Return the eRPC thread ID of the caller.
Definition: rpc.h:369
HugeAlloc * get_huge_alloc() const
Retrieve this Rpc's hugepage allocator. For expert use only.
Definition: rpc.h:338
MsgBuffer alloc_msg_buffer(size_t max_data_size)
Create a hugepage-backed buffer for storing request or response messages. Safe to call from backgroun...
Definition: rpc.h:133
void(* erpc_cont_func_t)(void *context, void *tag)
The type of the continuation callback invoked at the client. This returns ownership of the request an...
Definition: rpc_types.h:49