p_sr.c 11.6 KB
Newer Older
1
2
/*********************************************************************************************************
* Software License Agreement (BSD License)                                                               *
Sebastien Decugis's avatar
Sebastien Decugis committed
3
* Author: Sebastien Decugis <sdecugis@freediameter.net>							 *
4
*													 *
5
* Copyright (c) 2012, WIDE Project and NICT								 *
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
* All rights reserved.											 *
* 													 *
* Redistribution and use of this software in source and binary forms, with or without modification, are  *
* permitted provided that the following conditions are met:						 *
* 													 *
* * Redistributions of source code must retain the above 						 *
*   copyright notice, this list of conditions and the 							 *
*   following disclaimer.										 *
*    													 *
* * Redistributions in binary form must reproduce the above 						 *
*   copyright notice, this list of conditions and the 							 *
*   following disclaimer in the documentation and/or other						 *
*   materials provided with the distribution.								 *
* 													 *
* * Neither the name of the WIDE Project or NICT nor the 						 *
*   names of its contributors may be used to endorse or 						 *
*   promote products derived from this software without 						 *
*   specific prior written permission of WIDE Project and 						 *
*   NICT.												 *
* 													 *
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 	 *
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 	 *
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF   *
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.								 *
*********************************************************************************************************/

Sebastien Decugis's avatar
Sebastien Decugis committed
36
#include "fdcore-internal.h"
37
38
39
40
41

/* Structure to store a sent request */
struct sentreq {
	struct fd_list	chain; 	/* the "o" field points directly to the hop-by-hop of the request (uint32_t *)  */
	struct msg	*req;	/* A request that was sent and not yet answered. */
42
	uint32_t	prevhbh;/* The value to set in the hbh header when the message is retrieved */
43
44
	struct fd_list  expire; /* the list of expiring requests */
	struct timespec added_on; /* the time the request was added */
45
46
};

47
/* Find an element in the hbh list, or the following one */
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
static struct fd_list * find_or_next(struct fd_list * srlist, uint32_t hbh, int * match)
{
	struct fd_list * li;
	*match = 0;
	for (li = srlist->next; li != srlist; li = li->next) {
		uint32_t * nexthbh = li->o;
		if (*nexthbh < hbh)
			continue;
		if (*nexthbh == hbh)
			*match = 1;
		break;
	}
	return li;
}

Sebastien Decugis's avatar
Sebastien Decugis committed
63
64
65
static void srl_dump(const char * text, struct fd_list * srlist)
{
	struct fd_list * li;
66
	struct timespec now;
67
68
	
	if (!TRACE_BOOL(ANNOYING))
Sebastien Decugis's avatar
Sebastien Decugis committed
69
		return;
70
	
Sebastien Decugis's avatar
Sebastien Decugis committed
71
	fd_log_debug("%sSentReq list @%p:\n", text, srlist);
72
73
74
	
	CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), );
	
Sebastien Decugis's avatar
Sebastien Decugis committed
75
76
77
	for (li = srlist->next; li != srlist; li = li->next) {
		struct sentreq * sr = (struct sentreq *)li;
		uint32_t * nexthbh = li->o;
78
79
		
		fd_log_debug(" - Next req (hbh:%x): [since %ld.%06ld sec]\n", *nexthbh, 
80
81
			(now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_sec - sr->added_on.tv_sec) : (now.tv_sec - sr->added_on.tv_sec - 1),
			(now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_nsec - sr->added_on.tv_nsec) / 1000 : (now.tv_nsec - sr->added_on.tv_nsec + 1000000000) / 1000);
82
83
		
		fd_msg_dump_one(ANNOYING + 1, sr->req);
Sebastien Decugis's avatar
Sebastien Decugis committed
84
85
86
	}
}

87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/* (detached) thread that calls the anscb on expired messages. 
  We do it in a separate thread to avoid blocking the reception of new messages during this time */
static void * call_anscb_expire(void * arg) {
	struct msg * expired_req = arg;
	
	void (*anscb)(void *, struct msg **);
	void * data;
	
	TRACE_ENTRY("%p", arg);
	CHECK_PARAMS_DO( arg, return NULL );
	
	/* Set the thread name */
	fd_log_threadname ( "Expired req cb." );
	
	/* Log */
	TRACE_DEBUG(INFO, "The expiration timer for a request has been reached, abording this attempt now & calling cb...");
	
	/* Retrieve callback in the message */
	CHECK_FCT_DO( fd_msg_anscb_get( expired_req, &anscb, &data ), return NULL);
	ASSERT(anscb);
107
108
109
	
	/* Clean up this data from the message */
	CHECK_FCT_DO( fd_msg_anscb_associate( expired_req, NULL, NULL, NULL ), return NULL);
110
111
112
113
114
115

	/* Call it */
	(*anscb)(data, &expired_req);
	
	/* If the callback did not dispose of the message, do it now */
	if (expired_req) {
116
		fd_msg_log(FD_MSG_LOG_DROPPED, expired_req, "Expiration period completed without an answer, and the expiry callback did not dispose of the message.");
117
118
119
120
121
122
123
		CHECK_FCT_DO( fd_msg_free(expired_req), /* ignore */ );
	}
	
	/* Finish */
	return NULL;
}

124
/* thread that handles messages expiring. The thread is started only when needed */
125
126
127
128
129
130
131
132
133
134
135
static void * sr_expiry_th(void * arg) {
	struct sr_list * srlist = arg;
	struct msg * expired_req;
	pthread_attr_t detached;
	
	TRACE_ENTRY("%p", arg);
	CHECK_PARAMS_DO( arg, return NULL );
	
	/* Set the thread name */
	{
		char buf[48];
136
		snprintf(buf, sizeof(buf), "ReqExp/%s", ((struct fd_peer *)(srlist->exp.o))->p_hdr.info.pi_diamid);
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
		fd_log_threadname ( buf );
	}
	
	CHECK_POSIX_DO( pthread_attr_init(&detached), return NULL );
	CHECK_POSIX_DO( pthread_attr_setdetachstate(&detached, PTHREAD_CREATE_DETACHED), return NULL );
	
	CHECK_POSIX_DO( pthread_mutex_lock(&srlist->mtx),  return NULL );
	pthread_cleanup_push( fd_cleanup_mutex, &srlist->mtx );
	
	do {
		struct timespec	now, *t;
		struct sentreq * first;
		pthread_t th;
		
		/* Check if there are expiring requests available */
		if (FD_IS_LIST_EMPTY(&srlist->exp)) {
			/* Just wait for a change or cancelation */
			CHECK_POSIX_DO( pthread_cond_wait( &srlist->cnd, &srlist->mtx ), goto error );
			/* Restart the loop on wakeup */
			continue;
		}
		
		/* Get the pointer to the request that expires first */
		first = (struct sentreq *)(srlist->exp.next->o);
		t = fd_msg_anscb_gettimeout( first->req );
		ASSERT(t);
		
		/* Get the current time */
		CHECK_SYS_DO(  clock_gettime(CLOCK_REALTIME, &now),  goto error  );

		/* If first request is not expired, we just wait until it happens */
		if ( TS_IS_INFERIOR( &now, t ) ) {
			
			CHECK_POSIX_DO2(  pthread_cond_timedwait( &srlist->cnd, &srlist->mtx, t ),  
					ETIMEDOUT, /* ETIMEDOUT is a normal return value, continue */,
					/* on other error, */ goto error );
	
			/* on wakeup, loop */
			continue;
		}
		
		/* Now, the first request in the list is expired; remove it and call the anscb for it in a new thread */
		fd_list_unlink(&first->chain);
		fd_list_unlink(&first->expire);
		expired_req = first->req;
		free(first);
		
		CHECK_POSIX_DO( pthread_create( &th, &detached, call_anscb_expire, expired_req ), goto error );

		/* loop */
	} while (1);
error:	
Sebastien Decugis's avatar
Sebastien Decugis committed
189
	; /* pthread_cleanup_pop sometimes expands as "} ..." and the label beofre this cause some compilers to complain... */
190
	pthread_cleanup_pop( 1 );
191
	ASSERT(0); /* we have encountered a problem, maybe time to signal the framework to terminate? */
192
193
194
195
	return NULL;
}


196
/* Store a new sent request */
197
int fd_p_sr_store(struct sr_list * srlist, struct msg **req, uint32_t *hbhloc, uint32_t hbh_restore)
198
199
200
201
{
	struct sentreq * sr;
	struct fd_list * next;
	int match;
202
	struct timespec * ts;
203
	
204
	TRACE_ENTRY("%p %p %p %x", srlist, req, hbhloc, hbh_restore);
205
206
207
208
209
210
	CHECK_PARAMS(srlist && req && *req && hbhloc);
	
	CHECK_MALLOC( sr = malloc(sizeof(struct sentreq)) );
	memset(sr, 0, sizeof(struct sentreq));
	fd_list_init(&sr->chain, hbhloc);
	sr->req = *req;
211
	sr->prevhbh = hbh_restore;
212
213
	fd_list_init(&sr->expire, sr);
	CHECK_SYS( clock_gettime(CLOCK_REALTIME, &sr->added_on) );
214
215
216
217
218
219
220
221
222
223
224
225
226
227
	
	/* Search the place in the list */
	CHECK_POSIX( pthread_mutex_lock(&srlist->mtx) );
	next = find_or_next(&srlist->srs, *hbhloc, &match);
	if (match) {
		TRACE_DEBUG(INFO, "A request with the same hop-by-hop Id was already sent: error");
		free(sr);
		CHECK_POSIX_DO( pthread_mutex_unlock(&srlist->mtx), /* ignore */ );
		return EINVAL;
	}
	
	/* Save in the list */
	*req = NULL;
	fd_list_insert_before(next, &sr->chain);
Sebastien Decugis's avatar
Sebastien Decugis committed
228
	srl_dump("Saved new request, ", &srlist->srs);
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
	
	/* In case of request with a timeout, also store in the timeout list */
	ts = fd_msg_anscb_gettimeout( sr->req );
	if (ts) {
		struct fd_list * li;
		struct timespec * t;
		
		/* browse srlist->exp from the end */
		for (li = srlist->exp.prev; li != &srlist->exp; li = li->prev) {
			struct sentreq * s = (struct sentreq *)(li->o);
			t = fd_msg_anscb_gettimeout( s->req );
			ASSERT( t ); /* sanity */
			if (TS_IS_INFERIOR(t, ts))
				break;
		}
		
		fd_list_insert_after(li, &sr->expire);
	
		/* if the thread does not exist yet, create it */
		if (srlist->thr == (pthread_t)NULL) {
			CHECK_POSIX_DO( pthread_create(&srlist->thr, NULL, sr_expiry_th, srlist), /* continue anyway */);
		} else {
			/* or, if added in first position, signal the condvar to update the sleep time of the thread */
			if (li == &srlist->exp) {
				CHECK_POSIX_DO( pthread_cond_signal(&srlist->cnd), /* continue anyway */);
			}
		}
	}
	
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
	CHECK_POSIX( pthread_mutex_unlock(&srlist->mtx) );
	return 0;
}

/* Fetch a request by hbh */
int fd_p_sr_fetch(struct sr_list * srlist, uint32_t hbh, struct msg **req)
{
	struct sentreq * sr;
	int match;
	
	TRACE_ENTRY("%p %x %p", srlist, hbh, req);
	CHECK_PARAMS(srlist && req);
	
	/* Search the request in the list */
	CHECK_POSIX( pthread_mutex_lock(&srlist->mtx) );
Sebastien Decugis's avatar
Sebastien Decugis committed
273
	srl_dump("Fetching a request, ", &srlist->srs);
274
275
	sr = (struct sentreq *)find_or_next(&srlist->srs, hbh, &match);
	if (!match) {
276
		TRACE_DEBUG(INFO, "There is no saved request with this hop-by-hop id (%x)", hbh);
277
278
		*req = NULL;
	} else {
279
280
		/* Restore hop-by-hop id */
		*((uint32_t *)sr->chain.o) = sr->prevhbh;
281
282
		/* Unlink */
		fd_list_unlink(&sr->chain);
283
		fd_list_unlink(&sr->expire);
284
285
286
287
		*req = sr->req;
		free(sr);
	}
	CHECK_POSIX( pthread_mutex_unlock(&srlist->mtx) );
288
289
	
	/* do not stop the expire thread here, it might cause creating/destroying it very often otherwise */
290
291
292
293
294
295
296
297
298
299
300
301

	/* Done */
	return 0;
}

/* Failover requests (free or requeue routables) */
void fd_p_sr_failover(struct sr_list * srlist)
{
	CHECK_POSIX_DO( pthread_mutex_lock(&srlist->mtx), /* continue anyway */ );
	while (!FD_IS_LIST_EMPTY(&srlist->srs)) {
		struct sentreq * sr = (struct sentreq *)(srlist->srs.next);
		fd_list_unlink(&sr->chain);
302
		fd_list_unlink(&sr->expire);
303
304
		if (fd_msg_is_routable(sr->req)) {
			struct msg_hdr * hdr = NULL;
305
			int ret;
306
307
308
309
310
311
312
			
			/* Set the 'T' flag */
			CHECK_FCT_DO(fd_msg_hdr(sr->req, &hdr), /* continue */);
			if (hdr)
				hdr->msg_flags |= CMD_FLAG_RETRANSMIT;
			
			/* Requeue for sending to another peer */
313
314
315
316
317
			CHECK_FCT_DO( ret = fd_fifo_post(fd_g_outgoing, &sr->req),
				{
					fd_msg_log( FD_MSG_LOG_DROPPED, sr->req, "Internal error: error while requeuing during failover: %s", strerror(ret) );
					CHECK_FCT_DO(fd_msg_free(sr->req), /* What can we do more? */)
				});
318
		} else {
319
			/* Just free the request. */
320
			fd_msg_log( FD_MSG_LOG_DROPPED, sr->req, "Sent & unanswered local message discarded during failover." );
321
322
323
324
			CHECK_FCT_DO(fd_msg_free(sr->req), /* Ignore */);
		}
		free(sr);
	}
325
326
327
	/* The list of expiring requests must be empty now */
	ASSERT( FD_IS_LIST_EMPTY(&srlist->exp) );
	
328
	CHECK_POSIX_DO( pthread_mutex_unlock(&srlist->mtx), /* continue anyway */ );
329
330
331
	
	/* Terminate the expiry thread (must be done when the lock can be taken) */
	CHECK_FCT_DO( fd_thr_term(&srlist->thr), /* ignore error */ );
332
333
}