Fix wait for TX buffer blocking receive callback

Currently, the mailbox uses the rpmsg_send function to send messages,
which will block for up to 15 seconds if there is no TX buffer available
for the message. This is an issue because the device mutex is locked
while waiting and the receive callback for messages uses the same mutex
to prevent concurrent access so no received messages can be handled
while waiting for a TX buffer.

To resolve this, the mailbox has been changed to use the rpmsg_trysend
function, which will return directly if there is no TX buffer available,
together with a wait queue. While waiting in the queue to send the
message, the device mutex is released to not block the receive callback
and other users of the mutex.

Change-Id: I34fbfd21167b49fb83744ab2473ab02632a809ee
Signed-off-by: Mikael Olsson <mikael.olsson@arm.com>
diff --git a/kernel/ethosu_device.c b/kernel/ethosu_device.c
index 6866857..b889a7b 100644
--- a/kernel/ethosu_device.c
+++ b/kernel/ethosu_device.c
@@ -71,6 +71,7 @@
 {
 	struct ethosu_device *edev = dev_get_drvdata(&rpdev->dev);
 	struct device *dev = &edev->dev;
+	struct ethosu_mailbox *mbox = &edev->mailbox;
 	struct ethosu_core_rpmsg *rpmsg = data;
 	int length = len - sizeof(rpmsg->header);
 	int ret = 0;
@@ -106,7 +107,7 @@
 		break;
 	case ETHOSU_CORE_MSG_PING:
 		dev_info(dev, "Msg: Ping");
-		ret = ethosu_mailbox_pong(&edev->mailbox);
+		ret = ethosu_mailbox_pong(mbox);
 		break;
 	case ETHOSU_CORE_MSG_PONG:
 		dev_info(dev, "Msg: Pong");
@@ -124,7 +125,7 @@
 			 "Msg: Inference response. ofm_count=%u, status=%u",
 			 rpmsg->inf_rsp.ofm_count, rpmsg->inf_rsp.status);
 
-		ethosu_inference_rsp(&edev->mailbox, rpmsg->header.msg_id,
+		ethosu_inference_rsp(mbox, rpmsg->header.msg_id,
 				     &rpmsg->inf_rsp);
 		break;
 	case ETHOSU_CORE_MSG_CANCEL_INFERENCE_RSP:
@@ -139,7 +140,7 @@
 		dev_info(dev,
 			 "Msg: Cancel Inference response. status=%u",
 			 rpmsg->cancel_rsp.status);
-		ethosu_cancel_inference_rsp(&edev->mailbox,
+		ethosu_cancel_inference_rsp(mbox,
 					    rpmsg->header.msg_id,
 					    &rpmsg->cancel_rsp);
 		break;
@@ -156,7 +157,7 @@
 			rpmsg->version_rsp.major, rpmsg->version_rsp.minor,
 			rpmsg->version_rsp.patch);
 
-		ethosu_version_rsp(&edev->mailbox, rpmsg->header.msg_id,
+		ethosu_version_rsp(mbox, rpmsg->header.msg_id,
 				   &rpmsg->version_rsp);
 		break;
 	case ETHOSU_CORE_MSG_CAPABILITIES_RSP:
@@ -184,7 +185,7 @@
 			 rpmsg->cap_rsp.cmd_stream_version,
 			 rpmsg->cap_rsp.custom_dma);
 
-		ethosu_capability_rsp(&edev->mailbox, rpmsg->header.msg_id,
+		ethosu_capability_rsp(mbox, rpmsg->header.msg_id,
 				      &rpmsg->cap_rsp);
 		break;
 	case ETHOSU_CORE_MSG_NETWORK_INFO_RSP:
@@ -200,7 +201,7 @@
 			 "Msg: Network info response. status=%u",
 			 rpmsg->net_info_rsp.status);
 
-		ethosu_network_info_rsp(&edev->mailbox,
+		ethosu_network_info_rsp(mbox,
 					rpmsg->header.msg_id,
 					&rpmsg->net_info_rsp);
 
@@ -215,6 +216,8 @@
 
 	device_unlock(dev);
 
+	wake_up(&mbox->send_queue);
+
 	return ret;
 }
 
diff --git a/kernel/ethosu_mailbox.c b/kernel/ethosu_mailbox.c
index 4f7f5b7..5cc2465 100644
--- a/kernel/ethosu_mailbox.c
+++ b/kernel/ethosu_mailbox.c
@@ -28,6 +28,7 @@
 #include "ethosu_core_rpmsg.h"
 #include "ethosu_device.h"
 
+#include <linux/atomic.h>
 #include <linux/jiffies.h>
 #include <linux/resource.h>
 #include <linux/uio.h>
@@ -46,9 +47,81 @@
 #endif
 
 /****************************************************************************
+ * Defines
+ ****************************************************************************/
+
+#define MAILBOX_SEND_TIMEOUT_MS 15000
+
+/****************************************************************************
  * Functions
  ****************************************************************************/
 
+/**
+ * ethosu_send_locked() - Blocking mailbox message sender
+ *
+ * Context: Can sleep and must be called with the device mutex locked.
+ *
+ * Return: 0 on success, else error code.
+ */
+static int ethosu_send_locked(struct ethosu_mailbox *mbox,
+			      void *data,
+			      size_t length)
+{
+	DEFINE_WAIT_FUNC(wait, woken_wake_function);
+	struct device *dev = mbox->dev;
+	long timeout = msecs_to_jiffies(MAILBOX_SEND_TIMEOUT_MS);
+	bool try_send = !wq_has_sleeper(&mbox->send_queue);
+	int ret;
+
+	might_sleep();
+
+	/* Exclusive wait to only wake up one task at a time */
+	add_wait_queue_exclusive(&mbox->send_queue, &wait);
+	for (;;) {
+		/* Stop if the mailbox is closing down */
+		if (atomic_read(&mbox->done)) {
+			ret = -ENODEV;
+			break;
+		}
+
+		/* Attempt to send if queue is empty or task was woken up */
+		if (try_send) {
+			ret = rpmsg_trysend(mbox->ept, data, length);
+			if (ret != -ENOMEM)
+				break;
+		} else {
+			try_send = true;
+		}
+
+		/* Unlock device mutex while waiting to not block other tasks */
+		device_unlock(dev);
+		timeout = wait_woken(&wait, TASK_INTERRUPTIBLE, timeout);
+		device_lock(dev);
+
+		/* Stop if the wait was interrupted */
+		if (signal_pending(current)) {
+			ret = -EINTR;
+			break;
+		}
+
+		if (!timeout) {
+			ret = -ETIME;
+			break;
+		}
+	}
+
+	remove_wait_queue(&mbox->send_queue, &wait);
+
+	/*
+	 * If the message was sent successfully, there may be more TX buffers
+	 * available so wake up the next waiting task.
+	 */
+	if (!ret && wq_has_sleeper(&mbox->send_queue))
+		wake_up(&mbox->send_queue);
+
+	return ret;
+}
+
 static void ethosu_core_set_size(struct ethosu_buffer *buf,
 				 struct ethosu_core_buffer *cbuf)
 {
@@ -119,7 +192,7 @@
 		}
 	};
 
-	return rpmsg_send(mbox->ept, &rpmsg, sizeof(rpmsg.header));
+	return ethosu_send_locked(mbox, &rpmsg, sizeof(rpmsg.header));
 }
 
 int ethosu_mailbox_pong(struct ethosu_mailbox *mbox)
@@ -131,7 +204,7 @@
 		}
 	};
 
-	return rpmsg_send(mbox->ept, &rpmsg, sizeof(rpmsg.header));
+	return ethosu_send_locked(mbox, &rpmsg, sizeof(rpmsg.header));
 }
 
 int ethosu_mailbox_version_request(struct ethosu_mailbox *mbox,
@@ -147,7 +220,7 @@
 
 	msg->type = rpmsg.header.type;
 
-	return rpmsg_send(mbox->ept, &rpmsg, sizeof(rpmsg.header));
+	return ethosu_send_locked(mbox, &rpmsg, sizeof(rpmsg.header));
 }
 
 int ethosu_mailbox_capabilities_request(struct ethosu_mailbox *mbox,
@@ -163,7 +236,7 @@
 
 	msg->type = rpmsg.header.type;
 
-	return rpmsg_send(mbox->ept, &rpmsg, sizeof(rpmsg.header));
+	return ethosu_send_locked(mbox, &rpmsg, sizeof(rpmsg.header));
 }
 
 int ethosu_mailbox_inference(struct ethosu_mailbox *mbox,
@@ -218,8 +291,8 @@
 		inf_req->network.index = network_index;
 	}
 
-	return rpmsg_send(mbox->ept, &rpmsg,
-			  sizeof(rpmsg.header) + sizeof(rpmsg.inf_req));
+	return ethosu_send_locked(mbox, &rpmsg,
+				  sizeof(rpmsg.header) + sizeof(rpmsg.inf_req));
 }
 
 int ethosu_mailbox_network_info_request(struct ethosu_mailbox *mbox,
@@ -246,8 +319,9 @@
 		info_req->network.index = network_index;
 	}
 
-	return rpmsg_send(mbox->ept, &rpmsg,
-			  sizeof(rpmsg.header) + sizeof(rpmsg.net_info_req));
+	return ethosu_send_locked(mbox, &rpmsg,
+				  sizeof(rpmsg.header) +
+				  sizeof(rpmsg.net_info_req));
 }
 
 int ethosu_mailbox_cancel_inference(struct ethosu_mailbox *mbox,
@@ -268,8 +342,9 @@
 
 	msg->type = rpmsg.header.type;
 
-	return rpmsg_send(mbox->ept, &rpmsg,
-			  sizeof(rpmsg.header) + sizeof(rpmsg.cancel_req));
+	return ethosu_send_locked(mbox, &rpmsg,
+				  sizeof(rpmsg.header) +
+				  sizeof(rpmsg.cancel_req));
 }
 
 int ethosu_mailbox_init(struct ethosu_mailbox *mbox,
@@ -279,9 +354,13 @@
 	mbox->dev = dev;
 	mbox->ept = ept;
 	idr_init(&mbox->msg_idr);
+	init_waitqueue_head(&mbox->send_queue);
 
 	return 0;
 }
 
 void ethosu_mailbox_deinit(struct ethosu_mailbox *mbox)
-{}
+{
+	atomic_set(&mbox->done, 1);
+	wake_up_all(&mbox->send_queue);
+}
diff --git a/kernel/ethosu_mailbox.h b/kernel/ethosu_mailbox.h
index a3e2c14..c4c71a9 100644
--- a/kernel/ethosu_mailbox.h
+++ b/kernel/ethosu_mailbox.h
@@ -28,7 +28,7 @@
 
 #include <linux/types.h>
 #include <linux/mailbox_client.h>
-#include <linux/workqueue.h>
+#include <linux/wait.h>
 #include <linux/idr.h>
 
 /****************************************************************************
@@ -48,6 +48,8 @@
 	struct device         *dev;
 	struct rpmsg_endpoint *ept;
 	struct idr            msg_idr;
+	atomic_t              done;
+	wait_queue_head_t     send_queue;
 };
 
 /**