Index: src/InfRcTransport.cc |
diff --git a/src/InfRcTransport.cc b/src/InfRcTransport.cc |
index 25ada3bfc46362599b424c4e62ea54d2d9b70ca8..c2322d737e41a2005c44bdca049737cb5a6cc5d3 100644 |
--- a/src/InfRcTransport.cc |
+++ b/src/InfRcTransport.cc |
@@ -93,6 +93,7 @@ |
#include "ShortMacros.h" |
#include "PerfCounter.h" |
#include "TimeTrace.h" |
+#include "Util.h" |
#define check_error_null(x, s) \ |
do { \ |
@@ -1015,7 +1016,6 @@ InfRcTransport::sendZeroCopy(Buffer* message, QueuePair* qp) |
CycleCounter<RawMetric> _(&metrics->transport.transmit.ticks); |
ibv_send_wr* badTxWorkRequest; |
if (expect_true(!testingDontReallySend)) { |
- |
if (ibv_post_send(qp->qp, &txWorkRequest, &badTxWorkRequest)) { |
throw TransportException(HERE, "ibv_post_send failed"); |
} |
@@ -1192,6 +1192,7 @@ InfRcTransport::ClientRpc::sendOrQueue() |
++metrics->transport.transmit.packetCount; |
request->emplacePrepend<Header>(nonce); |
+ |
t->sendZeroCopy(request, session->qp); |
request->truncateFront(sizeof(Header)); // for politeness |
@@ -1272,6 +1273,7 @@ InfRcTransport::Poller::poll() |
rpc.response->size(); |
metrics->transport.receive.ticks += receiveTicks.stop(); |
rpc.notifier->completed(); |
+ |
t->clientRpcPool.destroy(&rpc); |
if (t->outstandingRpcs.empty()) |
t->clientRpcsActiveTime.destroy(); |
@@ -1439,13 +1441,17 @@ InfRcTransport::PayloadChunk::appendToBuffer(Buffer* buffer, |
/// Returns memory to the HCA once the Chunk is discarded. |
InfRcTransport::PayloadChunk::~PayloadChunk() |
{ |
- // It's crucial that we take the Dispatch lock here before invoking any |
+ // It's crucial that we either make sure we are running in the Dispatch |
+ // thread or hand the work off to the dispatch thread before invoking any |
// transport methods. In all other cases we go through layers (such as |
// WorkerSession) that ensure serialized access to the transport. This |
// fancy Buffer destructor trick is an exception: it may be directly |
// invoked by a worker. |
- Dispatch::Lock lock(transport->context->dispatch); |
- transport->postSrqReceiveAndKickTransmit(srq, bd); |
+ if (transport->context->dispatch->isDispatchThread()) |
+ transport->postSrqReceiveAndKickTransmit(srq, bd); |
+ else |
+ transport->context->dispatchExec->addRequest<ReturnNICBuffer>( |
+ transport, srq, bd); |
} |
/** |