👀

How to use Queue with FreeRTOS and WiFi on Wio Terminal

2024/09/22に公開

Queueを使ったタスク間の連携

Wi-FiFreeRTOSとの両立については、既にerpc::Threadという解決策を見つけている。

https://zenn.dev/amenaruya/articles/801a76734428ec

これにより、Wi-Fiによる通信を行う傍ら、並行処理によって他の作業を同時に進めることができるようになった。しかしこれで実現したのは単なる並行処理であり、各タスクは未だ独立したまま、連携するには至っていない。本記事では、Queueを使用する方法を示す。

当然乍らxQueueSendxQueueReceiveなどは使えないため、代替として実装されているerpc::StaticQueueを使う。

erpc::StaticQueue

erpc::StaticQueueの用例を示す。また、排他制御のためerpc::Mutexを併用している。

StaticQueue sample
#include "rpcWiFi.h"
#include "erpc/erpc_static_queue.h" // necessary

#define STACK_SIZE 256
#define QUEUE_SIZE 4

erpc::StaticQueue<int, QUEUE_SIZE> g_QUEUE;

erpc::Mutex g_MUTEX;

static void ThreadA(void* pvParameters) {
    (void) pvParameters;
    Serial.println("ThreadA\tbegin");
    int iValue = 0;
    bool bSucceedAddition = false;
    while (1) {
        {
            // send element
            erpc::Mutex::Guard guardA(g_MUTEX);
            bSucceedAddition = g_QUEUE.add(iValue);
        }

        if (bSucceedAddition) {
            Serial.print("ThreadA\tsend: ");
            Serial.println(iValue);

            iValue++;
        }
        delay(2000);
    }
}
static void ThreadB(void* pvParameters) {
    (void) pvParameters;
    Serial.println("ThreadB\tbegin");
    int iValueReceived = 0;
    int iQueueSize = 0;
    bool bSucceedAcquisition = false;
    while (1) {
        {
            // get size
            erpc::Mutex::Guard guardB1(g_MUTEX);
            iQueueSize = g_QUEUE.size();
        }
        Serial.print("ThreadB\tnumber of elements in queue: ");
        Serial.println(iQueueSize);

        if (iQueueSize != 0) {
            // receive element
            erpc::Mutex::Guard guardB2(g_MUTEX);
            bSucceedAcquisition = g_QUEUE.get(&iValueReceived);

        } else {
            bSucceedAcquisition = false;
        }
        if (bSucceedAcquisition) {
            Serial.print("ThreadB\treceive: ");
            Serial.println(iValueReceived);
        }
        delay(1000);
    }
}

void setup() {
    Serial.begin(115200);
    vNopDelayMS(1000);
    while(!Serial);

    erpc::Thread taskA(
        &ThreadA,
        configMAX_PRIORITIES - 10,
        STACK_SIZE,
        "Task A"
    );

    erpc::Thread taskB(
        &ThreadB,
        configMAX_PRIORITIES - 11,
        STACK_SIZE,
        "Task B"
    );

    erpc::Thread* tasks[] = {&taskA, &taskB};
    for (erpc::Thread* pt : tasks) {
        pt -> start();
    }
}

void loop() {}

serial monitor
ThreadA	begin
ThreadA	send: 0
ThreadB	begin
ThreadB	number of elements in queue: 1
ThreadB	receive: 0
ThreadB	number of elements in queue: 0
ThreadA	send: 1
ThreadB	number of elements in queue: 1
ThreadB	receive: 1
ThreadB	number of elements in queue: 0
ThreadA	send: 2
ThreadB	number of elements in queue: 1
ThreadB	receive: 2
ThreadB	number of elements in queue: 0
ThreadA	send: 3
ThreadB	number of elements in queue: 1
ThreadB	receive: 3
ThreadB	number of elements in queue: 0
ThreadA	send: 4
ThreadB	number of elements in queue: 1
ThreadB	receive: 4
ThreadB	number of elements in queue: 0
ThreadA	send: 5
ThreadB	number of elements in queue: 1
ThreadB	receive: 5
ThreadB	number of elements in queue: 0

要点

  1. erpc::StaticQueueを使用する際は、#include "erpc/erpc_static_queue.h"と明記する必要がある。
  2. erpc::Mutexの使用には、inner classであるerpc::Mutex::Guardを用いる。この手法はResource Acquisition Is Initializationと呼ばれる。erpc::Mutex::Guard::Guard()erpc::Mutex::lock()に対応し、erpc::Mutex::Guard::~Guard()erpc::Mutex::unlock()に対応する。
  3. erpc::StaticQueueの実装にはtemplateが用いられている。第一テンプレート引数にはデータ型を、第二テンプレート引数にはQueueの容量を指定する。

クラス化

erpc::StaticQueueerpc::Mutexとの併用は、艱難ならずとも稍煩雑である。これを扱いよくするため、蕪浅ながら一つのクラスにまとめた。

MutexStaticQueue.hpp
#ifndef __MUTEX_STATIC_QUEUE_H__
#define __MUTEX_STATIC_QUEUE_H__

/* Wi-Fi, FreeRTOS */
#include <rpcWiFi.h>
/* erpc::StaticQueue */
#include "erpc/erpc_static_queue.h"

using namespace erpc;

template <typename T, uint32_t elementCount>
class MutexStaticQueue {
private:
    /* StaticQueue */
    StaticQueue<T, elementCount>* pmpStaticQueue;
    /* Mutex */
    Mutex* pmpMutex;

public:
    /* constructor */
    MutexStaticQueue();

    /* send */
    bool mFbQueueSend(T* const pTElement);

    /* receive */
    bool mFbQueueReceive(T* pTBuffer);

    /* size */
    uint32_t mFui32QueueMessagesWaiting();

    bool mFbIsQueueFull();

    bool mFbIsQueueEmpty();
};

/* pointers of the Class member functions */
template <typename T, uint32_t elementCount>
bool (MutexStaticQueue<T, elementCount>::*pmFbQueueSend)(T* const pTElement) = &MutexStaticQueue<T, elementCount>::mFbQueueSend;
template <typename T, uint32_t elementCount>
bool (MutexStaticQueue<T, elementCount>::*pmFbQueueReceive)(T* pTBuffer) = &MutexStaticQueue<T, elementCount>::mFbQueueReceive;
template <typename T, uint32_t elementCount>
uint32_t (MutexStaticQueue<T, elementCount>::*pmFui32QueueMessagesWaiting)() = &MutexStaticQueue<T, elementCount>::mFui32QueueMessagesWaiting;
template <typename T, uint32_t elementCount>
bool (MutexStaticQueue<T, elementCount>::*pmFbIsQueueFull)() = &MutexStaticQueue<T, elementCount>::mFbIsQueueFull;
template <typename T, uint32_t elementCount>
bool (MutexStaticQueue<T, elementCount>::*pmFbIsQueueEmpty)() = &MutexStaticQueue<T, elementCount>::mFbIsQueueEmpty;

#include "MutexStaticQueue.ipp"

#endif // __MUTEX_STATIC_QUEUE_H__

MutexStaticQueue.ipp
/* constructor */
template <typename T, uint32_t elementCount>
MutexStaticQueue<T, elementCount>::MutexStaticQueue():
pmpStaticQueue(new StaticQueue<T, elementCount>()),
pmpMutex(new Mutex()) {}

/* send */
template <typename T, uint32_t elementCount>
bool MutexStaticQueue<T, elementCount>::mFbQueueSend(T* const pTElement) {
    Mutex::Guard mutexGuard(*(this -> pmpMutex));
    return (this -> pmpStaticQueue) -> add(*pTElement);
}

/* receive */
template <typename T, uint32_t elementCount>
bool MutexStaticQueue<T, elementCount>::mFbQueueReceive(T* pTBuffer) {
    Mutex::Guard mutexGuard(*(this -> pmpMutex));
    return (this -> pmpStaticQueue) -> get(pTBuffer);
}

/* size */
template <typename T, uint32_t elementCount>
uint32_t MutexStaticQueue<T, elementCount>::mFui32QueueMessagesWaiting() {
    Mutex::Guard mutexGuard(*(this -> pmpMutex));
    return (this -> pmpStaticQueue) -> size();
}

template <typename T, uint32_t elementCount>
bool MutexStaticQueue<T, elementCount>::mFbIsQueueFull() {
    Mutex::Guard mutexGuard(*(this -> pmpMutex));
    if ((this -> pmpStaticQueue) -> size() == elementCount)
        return true;
    else
        return false;
}

template <typename T, uint32_t elementCount>
bool MutexStaticQueue<T, elementCount>::mFbIsQueueEmpty() {
    Mutex::Guard mutexGuard(*(this -> pmpMutex));
    if ((this -> pmpStaticQueue) -> size() == 0)
        return true;
    else
        return false;
}

mutex static queue sample
#include <rpcWiFi.h>

#include "MutexStaticQueue.hpp"

constexpr uint8_t   PRIORITY_CRITERION  = configMAX_PRIORITIES - 10;
const uint16_t      STACK_SIZE          = 256;
const uint8_t       QUEUE_SIZE          = 4;

MutexStaticQueue<
    uint16_t,
    QUEUE_SIZE
>                   u16_queue;

static void ThreadA(void* pvParameters) {
    (void) pvParameters;
    Serial.println("Thread A\tStarted");

    uint16_t u16Data = 0;

    while (1) {
        /* check queue is full or not */
        if ( !(u16_queue.*pmFbIsQueueFull<uint16_t, QUEUE_SIZE>)() ) {
            /* if queue is not full, send data */
            (u16_queue.*pmFbQueueSend<uint16_t, QUEUE_SIZE>)(&u16Data);

            Serial.print("Thread A\tsend: ");
            Serial.println(u16Data);

            u16Data++;
        }

        delay(3000);
    }
}

static void ThreadB(void* pvParameters) {
    (void) pvParameters;
    Serial.println("Thread B\tStarted");

    uint16_t u16Buffer = 0;

    while (1) {
        /* check queue is empty or not */
        if ( !(u16_queue.*pmFbIsQueueEmpty<uint16_t, QUEUE_SIZE>)() ) {
            /* if queue is not empty, receive data */
            if ( (u16_queue.*pmFbQueueReceive<uint16_t, QUEUE_SIZE>)(&u16Buffer) ) {
                Serial.print("Thread B\treceive: ");
                Serial.println(u16Buffer);
            }
        }
        delay(1000);
    }
}

void setup() {
    Serial.begin(115200);
    vNopDelayMS(1000);
    while(!Serial);

    Thread TaskA(
        &ThreadA,
        PRIORITY_CRITERION,
        STACK_SIZE,
        "Task A"
    );

    Thread TaskB(
        &ThreadB,
        PRIORITY_CRITERION - 1,
        STACK_SIZE,
        "Task B"
    );

    Thread* tasksArray[2] = {&TaskA, &TaskB};

    for (Thread* t : tasksArray) {
        t -> start();
    }
}

void loop() {}

serial monitor
Thread A	Started
Thread A	send: 0
Thread B	Started
Thread B	receive: 0
Thread A	send: 1
Thread B	receive: 1
Thread A	send: 2
Thread B	receive: 2
Thread A	send: 3
Thread B	receive: 3
Thread A	send: 4
Thread B	receive: 4
Thread A	send: 5
Thread B	receive: 5

Discussion