gwenhywfar 5.12.0
endpoint_msgio.c
Go to the documentation of this file.
1/****************************************************************************
2 * This file is part of the project Gwenhywfar.
3 * Gwenhywfar (c) by 2023 Martin Preuss, all rights reserved.
4 *
5 * The license for this file can be found in the file COPYING which you
6 * should have received along with this file.
7 ****************************************************************************/
8
9#ifdef HAVE_CONFIG_H
10# include <config.h>
11#endif
12
13/*#define DISABLE_DEBUGLOG*/
14
15
16#include "./endpoint_msgio_p.h"
17
18#include <gwenhywfar/debug.h>
19
20
21#define GWEN_ENDPOINT_MSGIO_BUFFERSIZE 1024
22
23
24
25/* ------------------------------------------------------------------------------------------------
26 * forward declarations
27 * ------------------------------------------------------------------------------------------------
28 */
29
30static void GWENHYWFAR_CB _freeData(void *bp, void *p);
31
32static void _addSockets(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_UNUSED GWEN_SOCKETSET *xSet);
33static void _checkSockets(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet);
34static int _sendMsgStart(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg);
35static void _sendMsgFinish(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg);
38static int _distributeBufferContent(GWEN_MSG_ENDPOINT *ep, const uint8_t *bufferPtr, int bufferLen);
39
40
41
42/* ------------------------------------------------------------------------------------------------
43 * implementations
44 * ------------------------------------------------------------------------------------------------
45 */
46
47GWEN_INHERIT(GWEN_MSG_ENDPOINT, GWEN_ENDPOINT_MSGIO)
48
49
50
52{
53 GWEN_ENDPOINT_MSGIO *xep;
54
55 GWEN_NEW_OBJECT(GWEN_ENDPOINT_MSGIO, xep);
56 GWEN_INHERIT_SETDATA(GWEN_MSG_ENDPOINT, GWEN_ENDPOINT_MSGIO, ep, xep, _freeData);
57
58 xep->addSocketsFn=GWEN_MsgEndpoint_SetAddSocketsFn(ep, _addSockets);
60}
61
62
63
64void GWENHYWFAR_CB _freeData(void *bp, void *p)
65{
67 GWEN_ENDPOINT_MSGIO *xep;
68
69 ep=(GWEN_MSG_ENDPOINT*) bp;
70 xep=(GWEN_ENDPOINT_MSGIO*) p;
71 GWEN_MsgEndpoint_SetCheckSocketsFn(ep, xep->checkSocketsFn);
73}
74
75
76
78{
79 if (ep) {
80 GWEN_ENDPOINT_MSGIO *xep;
81
82 xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, GWEN_ENDPOINT_MSGIO, ep);
83 if (xep)
84 xep->getBytesNeededFn=f;
85 }
86}
87
88
89
91{
92 if (ep) {
93 GWEN_ENDPOINT_MSGIO *xep;
94
95 xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, GWEN_ENDPOINT_MSGIO, ep);
96 if (xep)
97 xep->sendMsgStartFn=f;
98 }
99}
100
101
102
104{
105 if (ep) {
106 GWEN_ENDPOINT_MSGIO *xep;
107
108 xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, GWEN_ENDPOINT_MSGIO, ep);
109 if (xep)
110 xep->sendMsgFinishFn=f;
111 }
112}
113
114
115
117{
118 if (ep) {
119 GWEN_ENDPOINT_MSGIO *xep;
120
121 xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, GWEN_ENDPOINT_MSGIO, ep);
122 if (xep && xep->sendMsgStartFn)
123 return xep->sendMsgStartFn(ep, msg);
124 }
125
126 return 0;
127}
128
129
130
132{
133 if (ep) {
134 GWEN_ENDPOINT_MSGIO *xep;
135
136 xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, GWEN_ENDPOINT_MSGIO, ep);
137 if (xep && xep->sendMsgStartFn)
138 xep->sendMsgFinishFn(ep, msg);
139 }
140}
141
142
143
145{
146 if (ep) {
147 GWEN_ENDPOINT_MSGIO *xep;
148
149 xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, GWEN_ENDPOINT_MSGIO, ep);
150 if (xep) {
152 GWEN_SOCKET *sk;
153
155 if (sk) {
156 DBG_DEBUG(GWEN_LOGDOMAIN, "Endpoint %s: Adding socket %d to read set",
159 GWEN_SocketSet_AddSocket(readSet, sk);
161 DBG_DEBUG(GWEN_LOGDOMAIN, "Endpoint %s: Adding socket %d to write set",
164 GWEN_SocketSet_AddSocket(writeSet, sk);
165 }
166 } /* if socket */
167 }
168 else if (xep->addSocketsFn) {
169 DBG_INFO(GWEN_LOGDOMAIN, "Endpoint %s: Not connected, calling base function", GWEN_MsgEndpoint_GetName(ep));
170 xep->addSocketsFn(ep, readSet, writeSet, xSet);
171 }
172 } /* if (xep) */
173 } /* if (ep) */
174}
175
176
177
179{
180 if (ep) {
181 GWEN_ENDPOINT_MSGIO *xep;
182
183 xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, GWEN_ENDPOINT_MSGIO, ep);
184 if (xep) {
185 int rv;
186
188 GWEN_SOCKET *sk;
189
191 if (sk) {
192 if (GWEN_SocketSet_HasSocket(writeSet, sk)) {
193 DBG_DEBUG(GWEN_LOGDOMAIN, "Endpoint %s: Has socket in write set", GWEN_MsgEndpoint_GetName(ep));
195 if (rv<0 && rv!=GWEN_ERROR_TIMEOUT) {
197 "Endpoint %s: Error writing current message (%d), disconnecting",
199 rv);
201 return;
202 }
203 }
204
205 if (GWEN_SocketSet_HasSocket(readSet, sk)) {
206 DBG_DEBUG(GWEN_LOGDOMAIN, "Endpoint %s: Has socket in read set", GWEN_MsgEndpoint_GetName(ep));
207 rv=_readCurrentMessage(ep);
208 if (rv<0 && rv!=GWEN_ERROR_TIMEOUT) {
210 "Endpoint %s: Error reading current message (%d), disconnecting",
212 rv);
214 return;
215 }
216 }
217 }
218 } /* if connected */
219 else if (xep->checkSocketsFn) {
220 DBG_INFO(GWEN_LOGDOMAIN, "Endpoint %s: Not connected, calling base function", GWEN_MsgEndpoint_GetName(ep));
221 xep->checkSocketsFn(ep, readSet, writeSet, xSet);
222 }
223 }
224 }
225}
226
227
228
230{
231 GWEN_MSG *msg;
232
233 DBG_DEBUG(GWEN_LOGDOMAIN, "Writing to endpoint %s", GWEN_MsgEndpoint_GetName(ep));
235 if (msg) {
236 uint8_t pos;
237 int remaining;
238 int rv;
239
240 pos=GWEN_Msg_GetCurrentPos(msg);
241 remaining=GWEN_Msg_GetRemainingBytes(msg);
242 if (pos==0 && remaining>0) {
243 DBG_DEBUG(GWEN_LOGDOMAIN, "Starting to write packet");
244 rv=_sendMsgStart(ep, msg);
245 if (rv<0) {
246 if (rv==GWEN_ERROR_TIMEOUT) {
247 DBG_INFO(GWEN_LOGDOMAIN, "Line busy");
248 return rv;
249 }
250 else {
251 DBG_INFO(GWEN_LOGDOMAIN, "Error starting message (%d)", rv);
252 return rv;
253 }
254 }
255 else {
256 DBG_DEBUG(GWEN_LOGDOMAIN, "Okay to write packet");
257 }
258 }
259 if (remaining>0) {
260 const uint8_t *buf;
261
262 /* start new message */
263 buf=GWEN_Msg_GetBuffer(msg)+pos;
264 rv=GWEN_MsgEndpoint_WriteToSocket(ep, buf, remaining);
265 if (rv<0) {
266 if (rv==GWEN_ERROR_TIMEOUT)
267 return rv;
268 DBG_ERROR(GWEN_LOGDOMAIN, "Error on write() (%d)", rv);
269 return rv;
270 }
271 GWEN_Msg_IncCurrentPos(msg, rv);
272 if (rv==remaining) {
273 DBG_INFO(GWEN_LOGDOMAIN, "Message completely sent");
274 _sendMsgFinish(ep, msg);
275 /* end current message */
277 GWEN_Msg_free(msg);
278 }
279 }
280 }
281 else {
282 DBG_INFO(GWEN_LOGDOMAIN, "Nothing to send");
283 }
284 return 0;
285}
286
287
288
289
291{
292 int rv;
293 uint8_t buffer[GWEN_ENDPOINT_MSGIO_BUFFERSIZE];
294
295 DBG_DEBUG(GWEN_LOGDOMAIN, "Reading from endpoint %s", GWEN_MsgEndpoint_GetName(ep));
296 rv=GWEN_MsgEndpoint_ReadFromSocket(ep, buffer, sizeof(buffer));
297 if (rv<0) {
298 if (rv==GWEN_ERROR_TIMEOUT) {
299 DBG_DEBUG(GWEN_LOGDOMAIN, "Timeout (%d)", rv);
300 }
301 else {
302 DBG_INFO(GWEN_LOGDOMAIN, "here (%d)", rv);
303 }
304 return rv;
305 }
306 else if (rv==0) {
307 DBG_INFO(GWEN_LOGDOMAIN, "EOF met on read()");
308 return GWEN_ERROR_IO;
309 }
310
311 rv=_distributeBufferContent(ep, buffer, rv);
312 if (rv<0) {
313 DBG_INFO(GWEN_LOGDOMAIN, "here (%d)", rv);
314 return rv;
315 }
316
317 return 0;
318}
319
320
321
322int _distributeBufferContent(GWEN_MSG_ENDPOINT *ep, const uint8_t *bufferPtr, int bufferLen)
323{
324 if (ep) {
325 GWEN_ENDPOINT_MSGIO *xep;
326
327 xep=GWEN_INHERIT_GETDATA(GWEN_MSG_ENDPOINT, GWEN_ENDPOINT_MSGIO, ep);
328 if (xep) {
329 if (xep->getBytesNeededFn) {
330 GWEN_MSG *msg;
331
332 DBG_DEBUG(GWEN_LOGDOMAIN, "Distributing %d received bytes", bufferLen);
334 while(bufferLen) {
335 int bytesNeeded;
336
337 DBG_DEBUG(GWEN_LOGDOMAIN, "%d remaining bytes in buffer", bufferLen);
338 if (msg==NULL) {
339 DBG_DEBUG(GWEN_LOGDOMAIN, "Creating new message");
343 }
344
345 bytesNeeded=xep->getBytesNeededFn(ep, msg);
346 DBG_DEBUG(GWEN_LOGDOMAIN, "current message still needs %d bytes", bytesNeeded);
347 if (bytesNeeded==0) {
348 /* message finished already before adding bytes?? */
349 DBG_ERROR(GWEN_LOGDOMAIN, "Incoming message complete, SNH!");
350 }
351 else if (bytesNeeded<0) {
352 DBG_ERROR(GWEN_LOGDOMAIN, "Unknown how many bytes needed? SNH! (%d)", bytesNeeded);
353 return GWEN_ERROR_IO;
354 }
355 else {
356 int rv;
357
358 /* add bytes to message */
359 if (bytesNeeded>bufferLen)
360 bytesNeeded=bufferLen;
361 DBG_DEBUG(GWEN_LOGDOMAIN, "adding %d bytes to current message", bytesNeeded);
362 rv=GWEN_Msg_AddBytes(msg, bufferPtr, bytesNeeded);
363 if (rv<0) {
364 DBG_INFO(GWEN_LOGDOMAIN, "here (%d)", rv);
365 return rv;
366 }
367 if (xep->getBytesNeededFn(ep, msg)==0) {
368 /* message finished */
369 DBG_DEBUG(GWEN_LOGDOMAIN, "Incoming message complete");
372 msg=NULL;
373 }
374 bufferPtr+=bytesNeeded;
375 bufferLen-=bytesNeeded;
376 }
377 } /* while */
378
379 return 0;
380 }
381 else {
382 DBG_INFO(GWEN_LOGDOMAIN, "Endpoint %s: Function \"getBytesNeeded\" not set", GWEN_MsgEndpoint_GetName(ep));
383 }
384 } /* if (xep) */
385 } /* if (ep) */
386 return GWEN_ERROR_GENERIC;
387}
388
389
390
#define NULL
Definition binreloc.c:300
#define DBG_INFO(dbg_logger, format,...)
Definition debug.h:181
#define DBG_ERROR(dbg_logger, format,...)
Definition debug.h:97
#define DBG_DEBUG(dbg_logger, format,...)
Definition debug.h:214
GWEN_MSG * GWEN_MsgEndpoint_GetCurrentlyReceivedMsg(const GWEN_MSG_ENDPOINT *ep)
Definition endpoint.c:274
GWEN_SOCKET * GWEN_MsgEndpoint_GetSocket(const GWEN_MSG_ENDPOINT *ep)
Definition endpoint.c:104
void GWEN_MsgEndpoint_SetCurrentlyReceivedMsg(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *m)
Definition endpoint.c:281
int GWEN_MsgEndpoint_GetState(const GWEN_MSG_ENDPOINT *ep)
Definition endpoint.c:124
int GWEN_MsgEndpoint_WriteToSocket(GWEN_MSG_ENDPOINT *ep, const uint8_t *bufferPtr, uint32_t bufferLen)
Definition endpoint.c:444
int GWEN_MsgEndpoint_GetGroupId(const GWEN_MSG_ENDPOINT *ep)
Definition endpoint.c:97
GWEN_MSG_ENDPOINT_ADDSOCKETS_FN GWEN_MsgEndpoint_SetAddSocketsFn(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT_ADDSOCKETS_FN fn)
Definition endpoint.c:504
void GWEN_MsgEndpoint_Disconnect(GWEN_MSG_ENDPOINT *ep)
Definition endpoint.c:483
void GWEN_MsgEndpoint_AddReceivedMessage(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *m)
Definition endpoint.c:221
const char * GWEN_MsgEndpoint_GetName(const GWEN_MSG_ENDPOINT *ep)
Definition endpoint.c:90
int GWEN_MsgEndpoint_HaveMessageToSend(const GWEN_MSG_ENDPOINT *ep)
Definition endpoint.c:267
int GWEN_MsgEndpoint_GetDefaultMessageSize(const GWEN_MSG_ENDPOINT *ep)
Definition endpoint.c:185
GWEN_MSG_ENDPOINT_CHECKSOCKETS_FN GWEN_MsgEndpoint_SetCheckSocketsFn(GWEN_MSG_ENDPOINT *ep, GWEN_MSG_ENDPOINT_CHECKSOCKETS_FN fn)
Definition endpoint.c:518
int GWEN_MsgEndpoint_ReadFromSocket(GWEN_MSG_ENDPOINT *ep, uint8_t *bufferPtr, uint32_t bufferLen)
Definition endpoint.c:424
GWEN_MSG * GWEN_MsgEndpoint_GetFirstSendMessage(const GWEN_MSG_ENDPOINT *ep)
Definition endpoint.c:260
struct GWEN_MSG_ENDPOINT GWEN_MSG_ENDPOINT
Object which can send and receive messages (base class).
Definition endpoint.h:37
#define GWEN_MSG_ENDPOINT_STATE_CONNECTED
Definition endpoint.h:25
#define GWEN_ENDPOINT_MSGIO_BUFFERSIZE
void GWEN_MsgIoEndpoint_SetSendMsgFinishFn(GWEN_MSG_ENDPOINT *ep, GWEN_ENDPOINT_MSGIO_SENDMSGFINISH_FN f)
static int _distributeBufferContent(GWEN_MSG_ENDPOINT *ep, const uint8_t *bufferPtr, int bufferLen)
static void _sendMsgFinish(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg)
static void _addSockets(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_UNUSED GWEN_SOCKETSET *xSet)
static void GWENHYWFAR_CB _freeData(void *bp, void *p)
static int _readCurrentMessage(GWEN_MSG_ENDPOINT *ep)
void GWEN_MsgIoEndpoint_Extend(GWEN_MSG_ENDPOINT *ep)
static void _checkSockets(GWEN_MSG_ENDPOINT *ep, GWEN_SOCKETSET *readSet, GWEN_SOCKETSET *writeSet, GWEN_SOCKETSET *xSet)
static int _writeCurrentMessage(GWEN_MSG_ENDPOINT *ep)
void GWEN_MsgIoEndpoint_SetSendMsgStartFn(GWEN_MSG_ENDPOINT *ep, GWEN_ENDPOINT_MSGIO_SENDMSGSTART_FN f)
static int _sendMsgStart(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg)
void GWEN_MsgIoEndpoint_SetGetNeededBytesFn(GWEN_MSG_ENDPOINT *ep, GWEN_ENDPOINT_MSGIO_GETBYTESNEEDED_FN f)
int(* GWEN_ENDPOINT_MSGIO_SENDMSGSTART_FN)(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg)
void(* GWEN_ENDPOINT_MSGIO_SENDMSGFINISH_FN)(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg)
int(* GWEN_ENDPOINT_MSGIO_GETBYTESNEEDED_FN)(GWEN_MSG_ENDPOINT *ep, GWEN_MSG *msg)
#define GWEN_ERROR_TIMEOUT
Definition error.h:71
#define GWEN_ERROR_IO
Definition error.h:123
#define GWEN_ERROR_GENERIC
Definition error.h:62
GWENHYWFAR_API int GWEN_SocketSet_HasSocket(GWEN_SOCKETSET *ssp, const GWEN_SOCKET *sp)
GWENHYWFAR_API int GWEN_SocketSet_AddSocket(GWEN_SOCKETSET *ssp, const GWEN_SOCKET *sp)
GWENHYWFAR_API int GWEN_Socket_GetSocketInt(const GWEN_SOCKET *sp)
#define GWEN_UNUSED
#define GWENHYWFAR_CB
struct GWEN_SOCKETSETSTRUCT GWEN_SOCKETSET
Definition inetsocket.h:41
struct GWEN_SOCKET GWEN_SOCKET
Definition inetsocket.h:40
#define GWEN_INHERIT_SETDATA(bt, t, element, data, fn)
Definition inherit.h:300
#define GWEN_INHERIT(bt, t)
Definition inherit.h:264
#define GWEN_INHERIT_GETDATA(bt, t, element)
Definition inherit.h:279
void GWEN_Msg_List_Del(GWEN_MSG *element)
#define GWEN_LOGDOMAIN
Definition logger.h:35
#define GWEN_FREE_OBJECT(varname)
Definition memory.h:61
#define GWEN_NEW_OBJECT(typ, varname)
Definition memory.h:55
uint32_t GWEN_Msg_GetCurrentPos(const GWEN_MSG *msg)
Definition msg.c:176
int GWEN_Msg_AddBytes(GWEN_MSG *msg, const uint8_t *bufferPtr, uint32_t bufferLen)
Definition msg.c:193
int GWEN_Msg_GetRemainingBytes(const GWEN_MSG *msg)
Definition msg.c:493
int GWEN_Msg_IncCurrentPos(GWEN_MSG *msg, uint32_t i)
Definition msg.c:468
void GWEN_Msg_SetGroupId(GWEN_MSG *msg, int groupId)
Definition msg.c:126
uint8_t * GWEN_Msg_GetBuffer(GWEN_MSG *msg)
Definition msg.c:133
void GWEN_Msg_free(GWEN_MSG *msg)
Definition msg.c:78
GWEN_MSG * GWEN_Msg_new(uint32_t bufferSize)
Definition msg.c:37
struct GWEN_MSG GWEN_MSG
Definition msg.h:24
static void GWENHYWFAR_CB _freeData(GWEN_UNUSED void *bp, void *p)
Definition nogui.c:222