open62541 1.3.12
Open source implementation of OPC UA
Loading...
Searching...
No Matches
ua_subscription.h
Go to the documentation of this file.
1/** This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
4 *
5 * Copyright 2015-2018, 2021-2022 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
6 * Copyright 2015 (c) Chris Iatrou
7 * Copyright 2015-2016 (c) Sten Grüner
8 * Copyright 2015 (c) Oleksiy Vasylyev
9 * Copyright 2017 (c) Florian Palm
10 * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
11 * Copyright 2017 (c) Mattias Bornhager
12 * Copyright 2019 (c) HMS Industrial Networks AB (Author: Jonas Green)
13 * Copyright 2020 (c) Christian von Arnim, ISW University of Stuttgart (for VDW and umati)
14 * Copyright 2021 (c) Fraunhofer IOSB (Author: Andreas Ebner)
15 */
16
17#ifndef UA_SUBSCRIPTION_H_
18#define UA_SUBSCRIPTION_H_
19
20#include <open62541/types.h>
23
24#include "ua_session.h"
25#include "ua_timer.h"
26#include "ua_util_internal.h"
27
29
30#ifdef UA_ENABLE_SUBSCRIPTIONS
31
32/** MonitoredItems create Notifications. Subscriptions collect Notifications from
33 * (several) MonitoredItems and publish them to the client.
34 *
35 * Notifications are put into two queues at the same time. One for the
36 * MonitoredItem that generated the notification. Here we can remove it if the
37 * space reserved for the MonitoredItem runs full. The second queue is the
38 * "global" queue for all Notifications generated in a Subscription. For
39 * publication, the notifications are taken out of the "global" queue in the
40 * order of their creation. */
41
42
43/** Notifications */
44
45
46/** Set to the TAILQ_NEXT pointer of a notification, the sentinel that the
47 * notification was not added to the global queue */
48#define UA_SUBSCRIPTION_QUEUE_SENTINEL ((UA_Notification*)0x01)
49
50typedef struct UA_Notification {
51 TAILQ_ENTRY(UA_Notification) localEntry; /* Notification list for the MonitoredItem */
52 TAILQ_ENTRY(UA_Notification) globalEntry; /* Notification list for the Subscription */
53 UA_MonitoredItem *mon; /* Always set */
54
55 /* The event field is used if mon->attributeId is the EventNotifier */
56 union {
58#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
60#endif
62
63#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
64 UA_Boolean isOverflowEvent; /* Counted manually */
66#endif
68
69/** Initializes and sets the sentinel pointers */
71
72/** Notifications are always added to the queue of the MonitoredItem. That queue
73 * can overflow. If Notifications are reported, they are also added to the
74 * global queue of the Subscription. There they are picked up by the publishing
75 * callback.
76 *
77 * There are two ways Notifications can be put into the global queue of the
78 * Subscription: They are added because the MonitoringMode of the MonitoredItem
79 * is "reporting". Or the MonitoringMode is "sampling" and a link is trigered
80 * that puts the last Notification into the global queue. */
83
84/** Dequeue and delete the notification */
86
87/** A NotificationMessage contains an array of notifications.
88 * Sent NotificationMessages are stored for the republish service. */
93
94/** Queue Definitions */
95typedef TAILQ_HEAD(NotificationQueue, UA_Notification) NotificationQueue;
96typedef TAILQ_HEAD(NotificationMessageQueue, UA_NotificationMessageEntry)
97 NotificationMessageQueue;
98
99
100/** MonitoredItem */
101
102
103/** The type of sampling for MonitoredItems depends on the sampling interval.
104 *
105 * >0: Cyclic callback
106 * =0: Attached to the node. Sampling is triggered after every "write".
107 * <0: Attached to the subscription. Triggered just before every "publish". */
108typedef enum {
111 UA_MONITOREDITEMSAMPLINGTYPE_EVENT, /* Attached to the node. Can be a "write
112 * event" for DataChange MonitoredItems
113 * with a zero sampling interval .*/
114 UA_MONITOREDITEMSAMPLINGTYPE_PUBLISH /* Attached to the subscription */
116
119 LIST_ENTRY(UA_MonitoredItem) listEntry; /* Linked list in the Subscription */
120 UA_Subscription *subscription; /* If NULL, then this is a Local MonitoredItem */
122
123 /* Status and Settings */
127 UA_Boolean registered; /* Registered in the server / Subscription */
128 UA_DateTime triggeredUntil; /* If the MonitoringMode is SAMPLING,
129 * triggering the MonitoredItem puts the latest
130 * Notification into the publishing queue (of
131 * the Subscription). In addition, the first
132 * new sample is also published (and not just
133 * sampled) if it occurs within the duration of
134 * one publishing cycle after the triggering. */
135
136 /* If the filter is a UA_DataChangeFilter: The DataChangeFilter always
137 * contains an absolute deadband definition. Part 8, §6.2 gives the
138 * following formula to test for percentage deadbands:
139 *
140 * DataChange if (absolute value of (last cached value - current value)
141 * > (deadbandValue/100.0) * ((high–low) of EURange)))
142 *
143 * So we can convert from a percentage to an absolute deadband and keep
144 * the hot code path simple.
145 *
146 * TODO: Store the percentage deadband to recompute when the UARange is
147 * changed at runtime of the MonitoredItem */
149
150 /* Sampling */
152 union {
154 UA_MonitoredItem *nodeListNext; /* Event-Based: Attached to Node */
155 LIST_ENTRY(UA_MonitoredItem) samplingListEntry; /* Publish-interval: Linked in
156 * Subscription */
159
160 /* Triggering Links */
163
164 /* Notification Queue */
165 NotificationQueue queue;
166 size_t queueSize; /* This is the current size. See also the configured
167 * (maximum) queueSize in the parameters. */
168 size_t eventOverflows; /* Separate counter for the queue. Can at most double
169 * the queue size */
170};
171
173
174void
176
177void
179
180void
182
183/** Register sampling. Either by adding a repeated callback or by adding the
184 * MonitoredItem to a linked list in the node. */
187
188void
190 UA_MonitoredItem *mon);
191
194 UA_MonitoringMode monitoringMode);
195
196void
198 UA_MonitoredItem *monitoredItem);
199
202 UA_MonitoredItem *mon, UA_DataValue *value);
203
206 UA_UInt32 linkId);
207
210 UA_UInt32 linkId);
211
214 UA_Subscription *sub,
215 UA_MonitoredItem *mon,
216 const UA_DataValue *value);
217
220 UA_MonitoredItem *mon);
221
224
225void
227 const UA_EventFilter *eventFilter,
228 UA_StatusCode *result);
229
232 const UA_ContentFilter *filter,
234
235/** Remove entries until mon->maxQueueSize is reached. Sets infobits for lost
236 * data if required. */
237void
239
240
241/** Subscription */
242
243
244/** We use only a subset of the states defined in the standard */
245typedef enum {
246 /* UA_SUBSCRIPTIONSTATE_CLOSED */
247 /* UA_SUBSCRIPTIONSTATE_CREATING */
252
253/** Subscriptions are managed in a server-wide linked list. If they are attached
254 * to a Session, then they are additionaly in the per-Session linked-list. A
255 * subscription is always generated for a Session. But the CloseSession Service
256 * may keep Subscriptions intact beyond the Session lifetime. They can then be
257 * re-bound to a new Session with the TransferSubscription Service. */
260 LIST_ENTRY(UA_Subscription) serverListEntry;
261 /* Ordered according to the priority byte and round-robin scheduling for
262 * late subscriptions. See ua_session.h. Only set if session != NULL. */
263 TAILQ_ENTRY(UA_Subscription) sessionListEntry;
264 UA_Session *session; /* May be NULL if no session is attached. */
266
267 /* Settings */
274
275 /* Runtime information */
277 UA_StatusCode statusChange; /* If set, a notification is generated and the
278 * Subscription is deleted within
279 * UA_Subscription_publish. */
283
284 /* Publish Callback. Registered if id > 0. */
286
287 /* MonitoredItems */
288 UA_UInt32 lastMonitoredItemId; /* increase the identifiers */
289 LIST_HEAD(, UA_MonitoredItem) monitoredItems;
291
292 /* MonitoredItems that are sampled in every publish callback (with the
293 * publish interval of the subscription) */
294 LIST_HEAD(, UA_MonitoredItem) samplingMonitoredItems;
295
296 /* Global list of notifications from the MonitoredItems */
297 TAILQ_HEAD(, UA_Notification) notificationQueue;
298 UA_UInt32 notificationQueueSize; /* Total queue size */
301
302 /* Retransmission Queue */
303 NotificationMessageQueue retransmissionQueue;
305
306 /* Statistics for the server diagnostics. The fields are defined according
307 * to the SubscriptionDiagnosticsDataType (Part 5, §12.15). */
308#ifdef UA_ENABLE_DIAGNOSTICS
309 UA_UInt32 modifyCount;
310 UA_UInt32 enableCount;
311 UA_UInt32 disableCount;
312 UA_UInt32 republishRequestCount;
313 UA_UInt32 republishMessageCount;
314 UA_UInt32 transferRequestCount;
315 UA_UInt32 transferredToAltClientCount;
316 UA_UInt32 transferredToSameClientCount;
317 UA_UInt32 publishRequestCount;
318 UA_UInt32 dataChangeNotificationsCount;
319 UA_UInt32 eventNotificationsCount;
320 UA_UInt32 notificationsCount;
321 UA_UInt32 latePublishRequestCount;
322 UA_UInt32 discardedMessageCount;
323 UA_UInt32 monitoringQueueOverflowCount;
324 UA_UInt32 eventQueueOverFlowCount;
325#endif
326};
327
329
330void
332
335 UA_Subscription *sub);
336
337void
339 UA_Subscription *sub);
340
343 UA_UInt32 monitoredItemId);
344
345void
347
350
351void
353
356 UA_UInt32 sequenceNumber);
357
360
361/** Forward declaration for A&C used in ua_server_internal.h" */
362struct UA_ConditionSource;
364
365
366/** Helpers */
367
368
369/** Evaluate content filter, Only for unit testing */
370#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
373 const UA_NodeId *eventNode,
374 const UA_ContentFilter *contentFilter,
375 UA_ContentFilterResult *contentFilterResult);
376#endif
377
378/** Setting an integer value within bounds */
379#define UA_BOUNDEDVALUE_SETWBOUNDS(BOUNDS, SRC, DST) { \
380 if(SRC > BOUNDS.max) DST = BOUNDS.max; \
381 else if(SRC < BOUNDS.min) DST = BOUNDS.min; \
382 else DST = SRC; \
383 }
384
385/** Logging
386 * See a description of the tricks used in ua_session.h */
387#define UA_LOG_SUBSCRIPTION_INTERNAL(LOGGER, LEVEL, SUB, MSG, ...) \
388 do { \
389 if((SUB) && (SUB)->session) { \
390 UA_LOG_##LEVEL##_SESSION(LOGGER, (SUB)->session, \
391 "Subscription %" PRIu32 " | " MSG "%.0s", \
392 (SUB)->subscriptionId, __VA_ARGS__); \
393 } else { \
394 UA_LOG_##LEVEL(LOGGER, UA_LOGCATEGORY_SERVER, \
395 "Subscription %" PRIu32 " | " MSG "%.0s", \
396 (SUB) ? (SUB)->subscriptionId : 0, __VA_ARGS__); \
397 } \
398 } while(0)
399
400#if UA_LOGLEVEL <= 100
401# define UA_LOG_TRACE_SUBSCRIPTION(LOGGER, SUB, ...) \
402 UA_MACRO_EXPAND(UA_LOG_SUBSCRIPTION_INTERNAL(LOGGER, TRACE, SUB, __VA_ARGS__, ""))
403#else
404# define UA_LOG_TRACE_SUBSCRIPTION(LOGGER, SUB, ...) do {} while(0)
405#endif
406
407#if UA_LOGLEVEL <= 200
408# define UA_LOG_DEBUG_SUBSCRIPTION(LOGGER, SUB, ...) \
409 UA_MACRO_EXPAND(UA_LOG_SUBSCRIPTION_INTERNAL(LOGGER, DEBUG, SUB, __VA_ARGS__, ""))
410#else
411# define UA_LOG_DEBUG_SUBSCRIPTION(LOGGER, SUB, ...) do {} while(0)
412#endif
413
414#if UA_LOGLEVEL <= 300
415# define UA_LOG_INFO_SUBSCRIPTION(LOGGER, SUB, ...) \
416 UA_MACRO_EXPAND(UA_LOG_SUBSCRIPTION_INTERNAL(LOGGER, INFO, SUB, __VA_ARGS__, ""))
417#else
418# define UA_LOG_INFO_SUBSCRIPTION(LOGGER, SUB, ...) do {} while(0)
419#endif
420
421#if UA_LOGLEVEL <= 400
422# define UA_LOG_WARNING_SUBSCRIPTION(LOGGER, SUB, ...) \
423 UA_MACRO_EXPAND(UA_LOG_SUBSCRIPTION_INTERNAL(LOGGER, WARNING, SUB, __VA_ARGS__, ""))
424#else
425# define UA_LOG_WARNING_SUBSCRIPTION(LOGGER, SUB, ...) do {} while(0)
426#endif
427
428#if UA_LOGLEVEL <= 500
429# define UA_LOG_ERROR_SUBSCRIPTION(LOGGER, SUB, ...) \
430 UA_MACRO_EXPAND(UA_LOG_SUBSCRIPTION_INTERNAL(LOGGER, ERROR, SUB, __VA_ARGS__, ""))
431#else
432# define UA_LOG_ERROR_SUBSCRIPTION(LOGGER, SUB, ...) do {} while(0)
433#endif
434
435#if UA_LOGLEVEL <= 600
436# define UA_LOG_FATAL_SUBSCRIPTION(LOGGER, SUB, ...) \
437 UA_MACRO_EXPAND(UA_LOG_SUBSCRIPTION_INTERNAL(LOGGER, FATAL, SUB, __VA_ARGS__, ""))
438#else
439# define UA_LOG_FATAL_SUBSCRIPTION(LOGGER, SUB, ...) do {} while(0)
440#endif
441
442#endif /* UA_ENABLE_SUBSCRIPTIONS */
443
445
446#endif /* UA_SUBSCRIPTION_H_ */
#define _UA_BEGIN_DECLS
#undef UA_DEBUG_DUMP_PKGS
Definition config.h:89
#define _UA_END_DECLS
Definition config.h:96
#define TAILQ_HEAD(name, type)
#define LIST_ENTRY(type)
LIST_ENTRY(UA_MonitoredItem) listEntry
UA_DataValue lastValue
UA_MonitoringMode monitoringMode
UA_ReadValueId itemToMonitor
union UA_MonitoredItem::@45 sampling
UA_Subscription * subscription
UA_MonitoringParameters parameters
UA_TimestampsToReturn timestampsToReturn
UA_MonitoredItem * nodeListNext
UA_TimerEntry delayedFreePointers
NotificationQueue queue
UA_UInt32 * triggeringLinks
UA_DateTime triggeredUntil
UA_MonitoredItemSamplingType samplingType
UA_UInt32 monitoredItemId
A NotificationMessage contains an array of notifications.
TAILQ_ENTRY(UA_NotificationMessageEntry) listEntry
UA_NotificationMessage message
UA_EventFieldList event
TAILQ_ENTRY(UA_Notification) globalEntry
UA_MonitoredItem * mon
TAILQ_ENTRY(UA_Notification) localEntry
UA_EventFilterResult result
UA_Boolean isOverflowEvent
union UA_Notification::@44 data
UA_MonitoredItemNotification dataChange
Subscriptions are managed in a server-wide linked list.
UA_UInt32 currentLifetimeCount
UA_UInt32 lifeTimeCount
UA_UInt32 notificationsPerPublish
size_t retransmissionQueueSize
LIST_HEAD(, UA_MonitoredItem) monitoredItems
UA_UInt32 lastMonitoredItemId
UA_UInt32 subscriptionId
UA_UInt32 monitoredItemsSize
UA_UInt32 notificationQueueSize
UA_Boolean publishingEnabled
UA_Session * session
UA_UInt32 dataChangeNotifications
UA_UInt32 currentKeepAliveCount
UA_Double publishingInterval
TAILQ_HEAD(, UA_Notification) notificationQueue
UA_UInt32 eventNotifications
UA_SubscriptionState state
UA_UInt32 nextSequenceNumber
UA_UInt64 publishCallbackId
LIST_HEAD(, UA_MonitoredItem) samplingMonitoredItems
UA_TimerEntry delayedFreePointers
NotificationMessageQueue retransmissionQueue
UA_StatusCode statusChange
LIST_ENTRY(UA_Subscription) serverListEntry
UA_UInt32 maxKeepAliveCount
TAILQ_ENTRY(UA_Subscription) sessionListEntry
_UA_BEGIN_DECLS typedef bool UA_Boolean
This Source Code Form is subject to the terms of the Mozilla Public License, v.
Definition types.h:26
uint32_t UA_UInt32
Definition types.h:56
int64_t UA_DateTime
Definition types.h:144
uint32_t UA_StatusCode
Definition types.h:77
double UA_Double
Definition types.h:74
uint8_t UA_Byte
Definition types.h:36
uint64_t UA_UInt64
Definition types.h:66
UA_TimestampsToReturn
UA_MonitoringMode
UA_StatusCode UA_MonitoredItem_registerSampling(UA_Server *server, UA_MonitoredItem *mon)
Register sampling.
UA_StatusCode UA_MonitoredItem_createDataChangeNotification(UA_Server *server, UA_Subscription *sub, UA_MonitoredItem *mon, const UA_DataValue *value)
void UA_Event_staticSelectClauseValidation(UA_Server *server, const UA_EventFilter *eventFilter, UA_StatusCode *result)
struct UA_NotificationMessageEntry UA_NotificationMessageEntry
A NotificationMessage contains an array of notifications.
UA_SubscriptionState
Subscription.
@ UA_SUBSCRIPTIONSTATE_KEEPALIVE
@ UA_SUBSCRIPTIONSTATE_LATE
@ UA_SUBSCRIPTIONSTATE_NORMAL
void UA_MonitoredItem_removeOverflowInfoBits(UA_MonitoredItem *mon)
UA_Boolean UA_Subscription_publishOnce(UA_Server *server, UA_Subscription *sub)
void UA_Notification_delete(UA_Notification *n)
Dequeue and delete the notification.
void UA_MonitoredItem_sampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem)
UA_StatusCode sampleCallbackWithValue(UA_Server *server, UA_Subscription *sub, UA_MonitoredItem *mon, UA_DataValue *value)
UA_StatusCode UA_MonitoredItem_removeLink(UA_Subscription *sub, UA_MonitoredItem *mon, UA_UInt32 linkId)
void UA_Subscription_delete(UA_Server *server, UA_Subscription *sub)
UA_StatusCode UA_Server_evaluateWhereClauseContentFilter(UA_Server *server, UA_Session *session, const UA_NodeId *eventNode, const UA_ContentFilter *contentFilter, UA_ContentFilterResult *contentFilterResult)
Helpers.
UA_StatusCode UA_Event_addEventToMonitoredItem(UA_Server *server, const UA_NodeId *event, UA_MonitoredItem *mon)
void UA_Notification_enqueueAndTrigger(UA_Server *server, UA_Notification *n)
Notifications are always added to the queue of the MonitoredItem.
void UA_MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem)
struct UA_Notification UA_Notification
UA_StatusCode UA_Event_generateEventId(UA_ByteString *generatedId)
UA_MonitoredItem * UA_Subscription_getMonitoredItem(UA_Subscription *sub, UA_UInt32 monitoredItemId)
UA_StatusCode UA_MonitoredItem_setMonitoringMode(UA_Server *server, UA_MonitoredItem *mon, UA_MonitoringMode monitoringMode)
UA_Notification * UA_Notification_new(void)
Initializes and sets the sentinel pointers.
void UA_MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon)
Remove entries until mon->maxQueueSize is reached.
void UA_Server_registerMonitoredItem(UA_Server *server, UA_MonitoredItem *mon)
UA_StatusCode UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub, UA_UInt32 sequenceNumber)
void Subscription_unregisterPublishCallback(UA_Server *server, UA_Subscription *sub)
struct UA_ConditionSource UA_ConditionSource
void UA_Subscription_sampleAndPublish(UA_Server *server, UA_Subscription *sub)
UA_MonitoredItemSamplingType
MonitoredItem.
@ UA_MONITOREDITEMSAMPLINGTYPE_CYCLIC
@ UA_MONITOREDITEMSAMPLINGTYPE_EVENT
@ UA_MONITOREDITEMSAMPLINGTYPE_PUBLISH
@ UA_MONITOREDITEMSAMPLINGTYPE_NONE
void UA_MonitoredItem_init(UA_MonitoredItem *mon)
void UA_MonitoredItem_unregisterSampling(UA_Server *server, UA_MonitoredItem *mon)
UA_Subscription * UA_Subscription_new(void)
UA_StatusCode UA_Event_staticWhereClauseValidation(UA_Server *server, const UA_ContentFilter *filter, UA_ContentFilterResult *)
void UA_Subscription_publish(UA_Server *server, UA_Subscription *sub)
UA_StatusCode Subscription_registerPublishCallback(UA_Server *server, UA_Subscription *sub)
UA_Boolean UA_Session_reachedPublishReqLimit(UA_Server *server, UA_Session *session)
UA_StatusCode UA_MonitoredItem_addLink(UA_Subscription *sub, UA_MonitoredItem *mon, UA_UInt32 linkId)