2015년 4월 29일 수요일

How to make message queue with Condition variable - Implementation

3. SisoQueue Class Definition


We will use C++ to implement the message queue. And we will call the queue's class name as "SisoQueue". The SISO stands for "single input single output" which show one input entity and one output entity. The member variables and methods of SisoQueue are like below.

#define CD_TRUE     (1)
#define CD_FALSE    (0)
#define CD_ERROR    (-1)
#define CD_SUCCESS  (0)

class SisoQueue
{
    int m_id;          // unique number for debug tracking
    int m_len;         // queue length (recommend 3)

    void **m_pp_data;  // array to contains messages
    
    // read index points out the place to read when queue get
    // write index points out the place to write when queue put
    // If read index is same as write index, this means queue empty or full
    // So, is_empty flag is used to distinguish between queue empty and full
    int m_write_index; // index position for queue put
    int m_read_index;  // index position for queue get
    int m_is_empty;

    // condition variable to wait in put method when queue is full 
    pthread_cond_t m_put_cond;   
    // condition variable to wait int get method when queue is empty
    pthread_cond_t m_get_cond;   
    pthread_mutex_t m_mutex; // mutex for access control between threads

public:
    // prepares the resource and initialize member variables
    SisoQueue(int id, int len);

    // releases the resource
    ~SisoQueue(); 

    //returns whether siso queue is full or not (CD_TRUE, CD_FALSE).
    int is_full();

    // sends the message into queue
    //   if queue is full, this will wait until queue is available
    int put(void * p_data);

    // receives the message from queue
    //   if queue is empty, this will wait until the message comes
    void * get();

};

4. SisoQueue method implementation


4.1 Constructor and Destructor

Constructor initializes the member variables and allocate the array with queue length size which can contain messages. Destructor free the array and destroy mutex and condition variables.

SisoQueue::SisoQueue(int id, int len)
{
    m_id = id;
    m_len = len;
    m_write_index = 0;
    m_read_index = 0;
    m_is_empty = CD_TRUE;
    m_pp_data = (void **) malloc(sizeof(void *) * len);

    pthread_cond_init(&m_put_cond, NULL);
    pthread_cond_init(&m_get_cond, NULL);
    pthread_mutex_init(&m_mutex, NULL);
}

SisoQueue::~SisoQueue()
{
    free(m_pp_data);

    pthread_cond_destroy(&m_put_cond);
    pthread_cond_destroy(&m_get_cond);
    pthread_mutex_destroy(&m_mutex);
}

4.2 put

This will insert the message in the queue. If the queue is empty, this will signal the m_get_cond to wake up the consumer thread which might wait for m_get_cond. If the queue is full, this will wait for m_put_cond.

int SisoQueue::put(void * p_data)
{
    pthread_mutex_lock(&m_mutex);

    if( m_is_empty )
    {
        m_pp_data[m_write_index] = p_data;
        m_write_index = (m_write_index+1) % m_len; // update write_index
        m_is_empty = CD_FALSE;
        pthread_cond_signal(&m_get_cond);
    }
    else
    {
        if( m_write_index == m_read_index ) // if queue is full
        {
            pthread_cond_wait(&m_put_cond, &m_mutex);

// if queue is still full, show error message
            if( !m_is_empty && (m_write_index==m_read_index) )
            {
                printf("ERROR: SisoQueue(id:%d) put error\n", m_id);
                pthread_mutex_unlock(&m_mutex);
                return CD_ERROR_QUEUE_PUT;
            }
        }

        m_pp_data[m_write_index] = p_data;
        m_write_index = (m_write_index+1) % m_len; // update write_index
        m_is_empty = CD_FALSE;
    }

    pthread_mutex_unlock(&m_mutex);
    return CD_SUCCESS;
}

4.3 get

This will retrieve the message from the queue. If queue is empty, this will wait for m_get_cond. If queue is not empty, this will return the message which read index points out and this will signal the m_put_cond to producer thread which can wait for m_put_cond. If the producer thread does not wait m_put_cond, this signal will be ignored and this will not make any problem.

void * SisoQueue::get()
{
    void * p_data;

    pthread_mutex_lock(&m_mutex);

    if( m_is_empty )
    {
        pthread_cond_wait(&m_get_cond, &m_mutex);
        if( m_is_empty )
        {
            printf("ERROR: SisoQueue(id:%d) get error\n", m_id);
            pthread_mutex_unlock(&m_mutex);
            return NULL;
        }
    }

    p_data = m_pp_data[m_read_index];
    m_pp_data[m_read_index] = NULL;
    m_read_index = (m_read_index+1) % m_len; // update read_index

    if( m_write_index == m_read_index ) // if queue is empty
        m_is_empty = CD_TRUE;

    pthread_cond_signal(&m_put_cond);
    pthread_mutex_unlock(&m_mutex);
    return p_data;
}

4.4 is_full

This will return the result whether SisoQueue is full or not. The return value will be CD_TRUE or CD_FALSE.

int SisoQueue::is_full()
{
    int rv;
    
    pthread_mutex_lock(&m_mutex);
    rv = CD_FALSE;
    if( !m_is_empty && (m_write_index==m_read_index) ) // if queue is full
    {
        rv = CD_TRUE;
    }
    pthread_mutex_unlock(&m_mutex);
    return rv;


}

댓글 없음:

댓글 쓰기