Events and Observables
The project uses RxJS extensively for reactive communication between objects, services, and UI components. This page covers the observable patterns, the global event bus, the data model's refresh system, and subscription management best practices.
RxJS Patterns Used
Subject
A Subject is a multicast observable that can emit values to multiple subscribers. Used for one-time or fire-and-forget events:
private _onExpired = new Subject<void>();
public OnExpired = this._onExpired.asObservable();
// Emitting
this._onExpired.next();
// Subscribing
this.state.OnExpired.subscribe(() => {
this.View = null;
});
Key uses in the codebase:
stateService.OnSession,OnDevice,OnExpired,OnReadyviewService.OnViewChanged,OnRightChanged,OnTabChanged,OnCloseRequestDataObject.OnRefresh,OnChildChanged,OnCommit,OnUpdatedsyncService.OnChange,OnReleased,OnExpired
ReplaySubject
A ReplaySubject(1) caches the last emitted value, so late subscribers immediately receive the current state:
private _isReady = new ReplaySubject<boolean>(1);
public IsReady = this._isReady.asObservable();
// Complete after first emission — acts as a one-shot promise-like observable
this._isReady.next(true);
this._isReady.complete();
Used for:
syncService.IsReady— resolves when device ID is availableBaseObject._waitLoaded— resolves when object data is loaded from server
Observable Exposure Pattern
The codebase consistently follows the pattern of keeping the Subject private and exposing only the Observable:
private _onRefresh = new Subject<Array<string>>();
public OnRefresh = this._onRefresh.asObservable();
This ensures that only the owning class can emit values, while external consumers can only subscribe.
eventbusService — Global Event Bus
Located in libs/upp-base/src/modules/events.ts. Provides decoupled pub/sub communication:
@Injectable({ providedIn: 'root' })
export class eventbusService {
private subjects: Map<string, Subject<any>> = new Map();
emit<T>(event: string, data: T): void {
this.getSubject<T>(event).next(data);
}
on<T>(event: string): Observable<T> {
return this.getSubject<T>(event).asObservable();
}
}
Usage
// Emitting an event
this.eventBus.emit('userLoggedIn', { id: 1, name: 'John' });
// Subscribing to an event
this.eventBus.on<{ id: number; name: string }>('userLoggedIn')
.subscribe(user => {
console.log('User logged in:', user);
});
When to Use
- Cross-feature communication where direct service injection would create circular dependencies
- Loose coupling between components that should not know about each other
- Prefer direct service observables when the producer and consumer have a natural dependency relationship
Data Model Refresh System
The data model uses a hierarchical refresh system to propagate change notifications through the object graph.
OnRefresh (DataObject)
Every DataObject has an OnRefresh observable that emits the list of affected table names:
protected _onRefresh = new Subject<Array<string>>();
public OnRefresh = this._onRefresh.asObservable();
DoRefresh(table: string[] | string, upwards = true): void {
const _tables: string[] = Array.isArray(table) ? table : [table];
this._onRefresh.next(_tables);
if (upwards) {
this.UpRefresh(_tables, null);
}
}
Upward Propagation (UpRefresh)
When a child object changes, the notification propagates upward through parent relations:
graph BT
TP["TicketProduct<br/>(changed)"] -->|UpRefresh| T["Ticket"]
T -->|UpRefresh| S["Session"]
TP2["TicketProduct"] -->|UpRefresh| T
TO["TicketOffer"] -->|UpRefresh| T
UpRefresh(table: string[], _updates: Set<DataObject> | null): void {
if (_updates == null) {
_updates = new Set<DataObject>();
}
if (_updates.has(this)) return; // cycle prevention
_updates.add(this);
for (const _relation of this._schema.relate) {
if (_relation.child == false) { // only traverse to parents
const _related = this._children[_relation.name] || null;
if (_related && (_related instanceof RelatedObject)) {
_related.DoRefresh(table, false);
_related.UpRefresh(table, _updates);
}
}
}
}
The _updates Set prevents infinite loops in cyclic graphs.
OnChildChanged (DataObject)
Emits the relation name whenever children are added or removed:
protected _onChildChanged = new Subject<string>();
public OnChildChanged = this._onChildChanged.asObservable();
protected override DoChildChanged(target: string): void {
this._onChildChanged.next(target);
}
This is triggered by SetChild(), AddChild(), and DelChild(), enabling views to re-register subscriptions when the child set changes.
ViewObject Refresh
ViewObject implements a two-stage refresh to separate data updates from UI updates:
private _onViewRefresh = new Subject<void>();
public OnViewRefresh = this._onViewRefresh.asObservable();
private _onDataRefresh = new Subject<void>();
public OnDataRefresh = this._onDataRefresh.asObservable();
Two-Stage Refresh
private _doRefreshView(mode: 'DATA' | 'VIEW') {
switch(mode) {
case 'DATA':
this._onDataRefresh.next();
break;
case 'VIEW':
if (!this._scheduled) {
this._scheduled = true;
Promise.resolve().then(() => {
this._onViewRefresh.next();
this._scheduled = false;
});
}
break;
}
}
OnDataRefresh: Emitted immediately when data changes. Used for logic that must react instantly.OnViewRefresh: Coalesced via microtask (Promise.resolve().then(...)) to avoid multiple UI updates in the same tick. Further deferred viaclockService.OnRefreshTickwhen delayed refresh is enabled.
Deferred Rendering
protected DoRefreshView(now = false) {
this._doRefreshView('DATA');
if (GenericUtils.DelayedRefresh() == false) {
now = true; // force immediate refresh
}
if (now && this.data.clock.Enable) {
this._doRefreshView('VIEW');
} else {
if (this._tick_subscription == null) {
this._tick_subscription = this.data.clock.OnRefreshTick.subscribe(() => {
this._tick_subscription?.unsubscribe();
this._tick_subscription = null;
this._doRefreshView('VIEW');
});
}
}
}
This batches UI updates to the next clock tick, preventing excessive re-renders during bulk data loading (e.g., initial sync).
Subscription Management
In ViewObject
ViewObject maintains a _subscriptions array that is cleaned up in OnDestroy():
protected _subscriptions: Subscription[] = [];
OnDestroy(): void {
for (const _subscription of this._subscriptions) {
_subscription.unsubscribe();
}
this._subscriptions = [];
}
Derived views register their subscriptions in doRegister():
doRegister(): void {
this._subscriptions.push(
this.dataobject.OnRefresh.subscribe((tables) => {
if (tables.includes('TICKETPRODUCT')) {
this.DoRefreshView();
}
})
);
this._subscriptions.push(
this.dataobject.OnChildChanged.subscribe((relation) => {
if (relation === 'products') {
// Re-register subscriptions for new children
this.registerProductSubscriptions();
this.DoRefreshView();
}
})
);
}
In viewService
The service keeps its own subscription array:
private _subscriptions: Array<Subscription> = [];
ngOnDestroy() {
for (const _subscription of this._subscriptions) {
_subscription.unsubscribe();
}
this._subscriptions = [];
}
In PendingQueue (Sync)
The sync queue uses explicit subscription tracking:
private _onblock_subscription: Subscription | null = null;
OnDestroy() {
if (this._onblock_subscription) {
this._onblock_subscription.unsubscribe();
this._onblock_subscription = null;
}
}
Best Practices
- Always unsubscribe: Store subscriptions and clean them up in
OnDestroy()orngOnDestroy(). - Use
take(1)for one-shot subscriptions: When you only need a single emission:
this.data.OnRefreshCompleted.pipe(take(1)).subscribe(() => {
// runs once then auto-unsubscribes
});
- Prefer
firstValueFrom()for async/await: Convert an observable to a promise:
async WaitLoaded(): Promise<void> {
if (!this.ToInsert && !this._isLoaded) {
await firstValueFrom(this._waitLoaded.asObservable());
}
}
WeakRef Pattern for ViewObjects
DataObject uses WeakRef to cache ViewObject instances:
private _objectviewref: WeakRef<ViewObject<DataObject>> | null = null;
get View(): ViewObject<DataObject> | null {
let view = this._objectviewref?.deref() ?? null;
if (view == null) {
view = this.viewProxy;
if (view) {
this._objectviewref = new WeakRef(view);
}
}
return view;
}
Benefits:
- Memory efficiency: The
ViewObjectcan be garbage-collected when no component holds a strong reference - Lazy creation: The view is only created when first accessed
- Automatic recreation: If the GC collects the view, it is transparently recreated on next access
- No memory leaks: Unlike a strong reference, this pattern does not prevent GC of unused views
Event Classes (Registry Events)
Located in libs/upp-data/src/modules/events.ts. These are DataObject subclasses used for server-side event logging:
| Class | Purpose | Key Properties |
|---|---|---|
DrawerEvent | Cash drawer opened | printer: PrintDevice |
LoginEvent | User login/logout | place: Place, login: boolean |
GuestEvent | Guest login/logout | qrcode: QrCode, login: boolean |
Events use the ToNotify flag and Notify() method to send one-shot notifications to the server via the commit pipeline:
Notify(): void {
if (this.ToNotify) {
this.DoCommit(true); // force = true for immediate send
}
}
The eventsNotifier class provides a simple API for logging:
const notifier = new eventsNotifier(dataService);
notifier.LogAction(LogActions.LOGIN, { place: myPlace, login: true });
notifier.LogAction(LogActions.DRAWER, { printer: myPrinter });
Summary of Observable Patterns
| Pattern | Where | Purpose |
|---|---|---|
Subject<void> | State changes, UI events | Simple notification without data |
Subject<T> | Data changes, sync updates | Notification with payload |
ReplaySubject<boolean>(1) | Ready/loaded state | Late subscribers get current state |
eventbusService | Cross-feature communication | Decoupled pub/sub |
OnRefresh + UpRefresh | Data model | Hierarchical change propagation |
OnViewRefresh (deferred) | ViewObjects | Batched UI updates |
OnChildChanged | DataObject | Child list mutation notification |
WeakRef + lazy init | DataObject.View | GC-friendly view caching |