Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#include <assert.h>
#include <string.h>
#include <stdio.h>
#include "../util/alg_ds/alg/defer.h"
#include "msg_dispatcher_xapp.h"
static
msg_dispatch_t static_msg;
static
void* create_val(void* it)
{
if(it == NULL)
return NULL;
// msg_dispatch_t* msg = calloc(1, sizeof( msg_dispatch_t ));
// assert(msg != NULL && "Memory exhausted");
memcpy(&static_msg, it, sizeof(msg_dispatch_t ) );
return &static_msg;
}
static
void* worker_thread(void* arg)
{
tsnq_t* q = (tsnq_t*)arg;
while(true){
msg_dispatch_t* msg = wait_and_pop_tsnq(q, create_val);
if(msg == NULL)
break;
msg->sm_cb(&msg->rd);
free_sm_ag_if_rd(&msg->rd);
}
q->stopped = true;
return NULL;
}
void init_msg_dispatcher( msg_dispatcher_xapp_t* d)
{
assert(d != NULL);
init_tsnq(&d->q, sizeof(msg_dispatch_t));
int rc = pthread_create(&d->p, NULL, worker_thread, &d->q);
assert(rc == 0);
}
void free_msg_dispatcher(msg_dispatcher_xapp_t* d)
{
assert(d != NULL);
free_tsnq(&d->q, NULL);
int rc = pthread_join(d->p, NULL);
assert(rc == 0);
}
void send_msg_dispatcher( msg_dispatcher_xapp_t* d, msg_dispatch_t* msg )
{
assert(d != NULL);
assert(msg != NULL);
push_tsnq(&d->q, msg, sizeof(msg_dispatch_t));
}
size_t size_msg_dispatcher(msg_dispatcher_xapp_t* d)
{
assert(d != NULL);
return size_tsnq(&d->q);
}