File

src/app/shared/services/message/message.service.ts

Description

Service for creating message channels.

Index

Methods

Constructor

constructor(config: MessageServiceConfig, parent: MessageService<T> | null)

Creates an instance of message service.

Parameters :
Name Type Optional Description
config MessageServiceConfig No

The configuration for this service.

parent MessageService<T> | null No

The parent message service, if any.

Methods

connect
connect(source)

Connects to a message channel.

Parameters :
Name Optional Description
source No

The source of messages sent.

Returns : MessageChannel<T>

A new connected message channel.

import { Inject, Injectable, InjectionToken, OnDestroy, Optional, SkipSelf } from '@angular/core';
import { Observable, Subject, from } from 'rxjs';
import { filter, mergeAll, takeWhile } from 'rxjs/operators';

/**
 * Message service configuration options.
 */
export interface MessageServiceConfig {
  /** Whether to listen to parent messages. */
  isolated: boolean;
}

/**
 * A message sent on a channel.
 *
 * @template T The payload type.
 */
export interface Message<T> {
  /** Unique identifier for a message. */
  id: number;
  /** Sender of the message. */
  source: unknown;
  /** Data associated with this message. */
  payload: T;
}

/** Default message service configuration. */
export const MESSAGE_SERVICE_DEFAULT_CONFIG: MessageServiceConfig = {
  isolated: true,
};

/** Token for specifying the message service configuration. */
export const MESSAGE_SERVICE_CONFIG = new InjectionToken<MessageServiceConfig>('Message service configuration', {
  providedIn: 'root',
  factory: () => MESSAGE_SERVICE_DEFAULT_CONFIG,
});

/**
 * Channel for sending and receiving messages.
 *
 * @template T The payload type.
 */
export class MessageChannel<T> {
  /** Counter used to give each message a unique id. */
  private static counter = 0;

  /**
   * Creates a new channel.
   *
   * @param source The message source.
   * @param channel The message send subject.
   * @param messages The message receive observable.
   */
  constructor(
    readonly source: unknown,
    private readonly channel: Subject<Message<T>>,
    private readonly messages: Observable<Message<T>>,
  ) {}

  /**
   * Sends a single message with a payload.
   *
   * @param payload The message payload.
   */
  sendMessage(payload: T): void {
    this.channel.next({
      id: this.nextMessageId(),
      source: this.source,
      payload,
    });
  }

  /**
   * Gets an observable emitting all messages except for those sent by this source.
   *
   * @returns The message observable.
   */
  getMessages(): Observable<Message<T>> {
    return this.messages.pipe(filter((msg) => msg.source !== this.source));
  }

  /**
   * Gets an observable emitting messages sent from a specific source.
   *
   * @param source The source messages are filtered on.
   * @returns The message observable.
   */
  getMessagesFromSource(source: unknown): Observable<Message<T>> {
    return this.getMessages().pipe(filter((msg) => msg.source === source));
  }

  /**
   * Gets an observable emitting messages sent from any of the specified sources.
   *
   * @param sources The sources messages are filtered on.
   * @returns The message observable.
   */
  getMessagesFromSources(sources: unknown[]): Observable<Message<T>> {
    return this.getMessages().pipe(filter((msg) => sources.includes(msg.source)));
  }

  /**
   * Creates a unique message identifier.
   *
   * @returns The identifier number.
   */
  private nextMessageId(): number {
    const result = MessageChannel.counter;
    MessageChannel.counter++;
    return result;
  }
}

/**
 * Service for creating message channels.
 *
 * @template T The message payload type.
 */
@Injectable({
  providedIn: 'root',
})
export class MessageService<T> implements OnDestroy {
  /** The message channel. */
  private readonly channel = new Subject<Message<T>>();
  /** The message listener. */
  private readonly messages: Observable<Message<T>>;

  /**
   * Creates an instance of message service.
   *
   * @param [config] The configuration for this service.
   * @param [parent] The parent message service, if any.
   */
  constructor(
    @Inject(MESSAGE_SERVICE_CONFIG) readonly config: MessageServiceConfig,
    @Optional() @SkipSelf() readonly parent: MessageService<T> | null,
  ) {
    if (config.isolated || !parent) {
      this.messages = this.channel.asObservable();
    } else {
      this.messages = from([this.channel, parent.messages.pipe(takeWhile(() => !this.channel.closed))]).pipe(
        mergeAll(),
      );
    }
  }

  /**
   * Clean up.
   */
  ngOnDestroy(): void {
    this.channel.complete();
  }

  /**
   * Connects to a message channel.
   *
   * @param source The source of messages sent.
   * @returns A new connected message channel.
   */
  connect(source: unknown): MessageChannel<T> {
    return new MessageChannel(source, this.channel, this.messages);
  }
}

results matching ""

    No results matching ""