Skip to main content

Events and Observables

The project uses RxJS extensively for reactive communication between objects, services, and UI components. This page covers the observable patterns, the global event bus, the data model's refresh system, and subscription management best practices.

RxJS Patterns Used

Subject

A Subject is a multicast observable that can emit values to multiple subscribers. Used for one-time or fire-and-forget events:

private _onExpired = new Subject<void>();
public OnExpired = this._onExpired.asObservable();

// Emitting
this._onExpired.next();

// Subscribing
this.state.OnExpired.subscribe(() => {
this.View = null;
});

Key uses in the codebase:

  • stateService.OnSession, OnDevice, OnExpired, OnReady
  • viewService.OnViewChanged, OnRightChanged, OnTabChanged, OnCloseRequest
  • DataObject.OnRefresh, OnChildChanged, OnCommit, OnUpdated
  • syncService.OnChange, OnReleased, OnExpired

ReplaySubject

A ReplaySubject(1) caches the last emitted value, so late subscribers immediately receive the current state:

private _isReady = new ReplaySubject<boolean>(1);
public IsReady = this._isReady.asObservable();

// Complete after first emission — acts as a one-shot promise-like observable
this._isReady.next(true);
this._isReady.complete();

Used for:

  • syncService.IsReady — resolves when device ID is available
  • BaseObject._waitLoaded — resolves when object data is loaded from server

Observable Exposure Pattern

The codebase consistently follows the pattern of keeping the Subject private and exposing only the Observable:

private _onRefresh = new Subject<Array<string>>();
public OnRefresh = this._onRefresh.asObservable();

This ensures that only the owning class can emit values, while external consumers can only subscribe.

eventbusService — Global Event Bus

Located in libs/upp-base/src/modules/events.ts. Provides decoupled pub/sub communication:

@Injectable({ providedIn: 'root' })
export class eventbusService {
private subjects: Map<string, Subject<any>> = new Map();

emit<T>(event: string, data: T): void {
this.getSubject<T>(event).next(data);
}

on<T>(event: string): Observable<T> {
return this.getSubject<T>(event).asObservable();
}
}

Usage

// Emitting an event
this.eventBus.emit('userLoggedIn', { id: 1, name: 'John' });

// Subscribing to an event
this.eventBus.on<{ id: number; name: string }>('userLoggedIn')
.subscribe(user => {
console.log('User logged in:', user);
});

When to Use

  • Cross-feature communication where direct service injection would create circular dependencies
  • Loose coupling between components that should not know about each other
  • Prefer direct service observables when the producer and consumer have a natural dependency relationship

Data Model Refresh System

The data model uses a hierarchical refresh system to propagate change notifications through the object graph.

OnRefresh (DataObject)

Every DataObject has an OnRefresh observable that emits the list of affected table names:

protected _onRefresh = new Subject<Array<string>>();
public OnRefresh = this._onRefresh.asObservable();

DoRefresh(table: string[] | string, upwards = true): void {
const _tables: string[] = Array.isArray(table) ? table : [table];
this._onRefresh.next(_tables);
if (upwards) {
this.UpRefresh(_tables, null);
}
}

Upward Propagation (UpRefresh)

When a child object changes, the notification propagates upward through parent relations:

graph BT
TP["TicketProduct<br/>(changed)"] -->|UpRefresh| T["Ticket"]
T -->|UpRefresh| S["Session"]

TP2["TicketProduct"] -->|UpRefresh| T
TO["TicketOffer"] -->|UpRefresh| T
UpRefresh(table: string[], _updates: Set<DataObject> | null): void {
if (_updates == null) {
_updates = new Set<DataObject>();
}
if (_updates.has(this)) return; // cycle prevention
_updates.add(this);

for (const _relation of this._schema.relate) {
if (_relation.child == false) { // only traverse to parents
const _related = this._children[_relation.name] || null;
if (_related && (_related instanceof RelatedObject)) {
_related.DoRefresh(table, false);
_related.UpRefresh(table, _updates);
}
}
}
}

The _updates Set prevents infinite loops in cyclic graphs.

OnChildChanged (DataObject)

Emits the relation name whenever children are added or removed:

protected _onChildChanged = new Subject<string>();
public OnChildChanged = this._onChildChanged.asObservable();

protected override DoChildChanged(target: string): void {
this._onChildChanged.next(target);
}

This is triggered by SetChild(), AddChild(), and DelChild(), enabling views to re-register subscriptions when the child set changes.

ViewObject Refresh

ViewObject implements a two-stage refresh to separate data updates from UI updates:

private _onViewRefresh = new Subject<void>();
public OnViewRefresh = this._onViewRefresh.asObservable();

private _onDataRefresh = new Subject<void>();
public OnDataRefresh = this._onDataRefresh.asObservable();

Two-Stage Refresh

private _doRefreshView(mode: 'DATA' | 'VIEW') {
switch(mode) {
case 'DATA':
this._onDataRefresh.next();
break;
case 'VIEW':
if (!this._scheduled) {
this._scheduled = true;
Promise.resolve().then(() => {
this._onViewRefresh.next();
this._scheduled = false;
});
}
break;
}
}
  • OnDataRefresh: Emitted immediately when data changes. Used for logic that must react instantly.
  • OnViewRefresh: Coalesced via microtask (Promise.resolve().then(...)) to avoid multiple UI updates in the same tick. Further deferred via clockService.OnRefreshTick when delayed refresh is enabled.

Deferred Rendering

protected DoRefreshView(now = false) {
this._doRefreshView('DATA');

if (GenericUtils.DelayedRefresh() == false) {
now = true; // force immediate refresh
}

if (now && this.data.clock.Enable) {
this._doRefreshView('VIEW');
} else {
if (this._tick_subscription == null) {
this._tick_subscription = this.data.clock.OnRefreshTick.subscribe(() => {
this._tick_subscription?.unsubscribe();
this._tick_subscription = null;
this._doRefreshView('VIEW');
});
}
}
}

This batches UI updates to the next clock tick, preventing excessive re-renders during bulk data loading (e.g., initial sync).

Subscription Management

In ViewObject

ViewObject maintains a _subscriptions array that is cleaned up in OnDestroy():

protected _subscriptions: Subscription[] = [];

OnDestroy(): void {
for (const _subscription of this._subscriptions) {
_subscription.unsubscribe();
}
this._subscriptions = [];
}

Derived views register their subscriptions in doRegister():

doRegister(): void {
this._subscriptions.push(
this.dataobject.OnRefresh.subscribe((tables) => {
if (tables.includes('TICKETPRODUCT')) {
this.DoRefreshView();
}
})
);

this._subscriptions.push(
this.dataobject.OnChildChanged.subscribe((relation) => {
if (relation === 'products') {
// Re-register subscriptions for new children
this.registerProductSubscriptions();
this.DoRefreshView();
}
})
);
}

In viewService

The service keeps its own subscription array:

private _subscriptions: Array<Subscription> = [];

ngOnDestroy() {
for (const _subscription of this._subscriptions) {
_subscription.unsubscribe();
}
this._subscriptions = [];
}

In PendingQueue (Sync)

The sync queue uses explicit subscription tracking:

private _onblock_subscription: Subscription | null = null;

OnDestroy() {
if (this._onblock_subscription) {
this._onblock_subscription.unsubscribe();
this._onblock_subscription = null;
}
}

Best Practices

  1. Always unsubscribe: Store subscriptions and clean them up in OnDestroy() or ngOnDestroy().
  2. Use take(1) for one-shot subscriptions: When you only need a single emission:
this.data.OnRefreshCompleted.pipe(take(1)).subscribe(() => {
// runs once then auto-unsubscribes
});
  1. Prefer firstValueFrom() for async/await: Convert an observable to a promise:
async WaitLoaded(): Promise<void> {
if (!this.ToInsert && !this._isLoaded) {
await firstValueFrom(this._waitLoaded.asObservable());
}
}

WeakRef Pattern for ViewObjects

DataObject uses WeakRef to cache ViewObject instances:

private _objectviewref: WeakRef<ViewObject<DataObject>> | null = null;

get View(): ViewObject<DataObject> | null {
let view = this._objectviewref?.deref() ?? null;
if (view == null) {
view = this.viewProxy;
if (view) {
this._objectviewref = new WeakRef(view);
}
}
return view;
}

Benefits:

  • Memory efficiency: The ViewObject can be garbage-collected when no component holds a strong reference
  • Lazy creation: The view is only created when first accessed
  • Automatic recreation: If the GC collects the view, it is transparently recreated on next access
  • No memory leaks: Unlike a strong reference, this pattern does not prevent GC of unused views

Event Classes (Registry Events)

Located in libs/upp-data/src/modules/events.ts. These are DataObject subclasses used for server-side event logging:

ClassPurposeKey Properties
DrawerEventCash drawer openedprinter: PrintDevice
LoginEventUser login/logoutplace: Place, login: boolean
GuestEventGuest login/logoutqrcode: QrCode, login: boolean

Events use the ToNotify flag and Notify() method to send one-shot notifications to the server via the commit pipeline:

Notify(): void {
if (this.ToNotify) {
this.DoCommit(true); // force = true for immediate send
}
}

The eventsNotifier class provides a simple API for logging:

const notifier = new eventsNotifier(dataService);
notifier.LogAction(LogActions.LOGIN, { place: myPlace, login: true });
notifier.LogAction(LogActions.DRAWER, { printer: myPrinter });

Summary of Observable Patterns

PatternWherePurpose
Subject<void>State changes, UI eventsSimple notification without data
Subject<T>Data changes, sync updatesNotification with payload
ReplaySubject<boolean>(1)Ready/loaded stateLate subscribers get current state
eventbusServiceCross-feature communicationDecoupled pub/sub
OnRefresh + UpRefreshData modelHierarchical change propagation
OnViewRefresh (deferred)ViewObjectsBatched UI updates
OnChildChangedDataObjectChild list mutation notification
WeakRef + lazy initDataObject.ViewGC-friendly view caching