Skip to main content
The fromObservable() function creates actor logic from an observable stream. This is useful for reactive programming patterns and is compatible with RxJS observables.

Signature

function fromObservable<
  TContext,
  TInput extends NonReducibleUnknown,
  TEmitted extends EventObject = EventObject
>(
  observableCreator: ({
    input,
    system,
    self,
    emit
  }: {
    input: TInput;
    system: AnyActorSystem;
    self: ObservableActorRef<TContext>;
    emit: (emitted: TEmitted) => void;
  }) => Subscribable<TContext>
): ObservableActorLogic<TContext, TInput, TEmitted>;

Parameters

observableCreator
function
required
A function that creates an observable. Receives an object with:
input
TInput
Data provided to the observable actor when created or invoked.
self
ObservableActorRef<TContext>
Reference to the observable actor itself.
system
AnyActorSystem
The actor system to which the observable actor belongs.
emit
(emitted: TEmitted) => void
Function to emit custom events to subscribers.
Should return a Subscribable (compatible with RxJS Observable).

Returns

ObservableActorLogic
ActorLogic
Actor logic that can be used with createActor() or invoked in a state machine.

Usage

Basic Example with RxJS

import { fromObservable, createActor } from 'xstate';
import { interval } from 'rxjs';

const intervalLogic = fromObservable(() => interval(1000));

const actor = createActor(intervalLogic);

actor.subscribe((snapshot) => {
  console.log('Count:', snapshot.context);
});

actor.start();
// Count: 0
// Count: 1
// Count: 2
// ...

With RxJS Operators

import { fromObservable, createActor } from 'xstate';
import { interval } from 'rxjs';
import { map, filter, take } from 'rxjs/operators';

const logic = fromObservable(() => 
  interval(1000).pipe(
    map(n => n * 2),
    filter(n => n % 4 === 0),
    take(5)
  )
);

const actor = createActor(logic);

actor.subscribe((snapshot) => {
  console.log('Value:', snapshot.context);
  if (snapshot.status === 'done') {
    console.log('Observable completed');
  }
});

actor.start();
// Value: 0
// Value: 4
// Value: 8
// Value: 12
// Value: 16
// Observable completed

With Input

import { fromObservable, createActor } from 'xstate';
import { interval } from 'rxjs';
import { map } from 'rxjs/operators';

type Input = { 
  period: number;
  multiplier: number;
};

const logic = fromObservable<number, Input>(({ input }) => 
  interval(input.period).pipe(
    map(n => n * input.multiplier)
  )
);

const actor = createActor(logic, {
  input: { period: 500, multiplier: 10 }
});

actor.subscribe((snapshot) => {
  console.log(snapshot.context);
});

actor.start();
// 0, 10, 20, 30, ...

Invoking in a Machine

import { setup, fromObservable } from 'xstate';
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

const timerLogic = fromObservable<number, { duration: number }>(({ input }) =>
  interval(1000).pipe(take(input.duration))
);

const machine = setup({
  actors: {
    timer: timerLogic
  }
}).createMachine({
  initial: 'idle',
  states: {
    idle: {
      on: {
        START: 'timing'
      }
    },
    timing: {
      invoke: {
        src: 'timer',
        input: { duration: 5 },
        onDone: 'complete'
      },
      on: {
        CANCEL: 'idle'
      }
    },
    complete: {}
  }
});

Custom Observable Implementation

import { fromObservable, createActor } from 'xstate';
import type { Subscribable } from 'xstate';

function customObservable(interval: number): Subscribable<number> {
  return {
    subscribe(observer) {
      let count = 0;
      const id = setInterval(() => {
        observer.next?.(count++);
      }, interval);
      
      return {
        unsubscribe() {
          clearInterval(id);
        }
      };
    }
  };
}

const logic = fromObservable(() => customObservable(1000));

const actor = createActor(logic);
actor.subscribe((snapshot) => {
  console.log(snapshot.context);
});
actor.start();

Mouse Position Stream

import { fromObservable } from 'xstate';
import { fromEvent } from 'rxjs';
import { map, throttleTime } from 'rxjs/operators';

type Position = { x: number; y: number };

const mousePositionLogic = fromObservable<Position>(() =>
  fromEvent<MouseEvent>(document, 'mousemove').pipe(
    throttleTime(100),
    map(event => ({ x: event.clientX, y: event.clientY }))
  )
);

WebSocket Stream

import { fromObservable } from 'xstate';
import { Observable } from 'rxjs';

type Message = { type: string; data: unknown };
type Input = { url: string };

const websocketLogic = fromObservable<Message, Input>(({ input }) =>
  new Observable<Message>(subscriber => {
    const ws = new WebSocket(input.url);
    
    ws.addEventListener('message', (event) => {
      subscriber.next(JSON.parse(event.data));
    });
    
    ws.addEventListener('error', (error) => {
      subscriber.error(error);
    });
    
    ws.addEventListener('close', () => {
      subscriber.complete();
    });
    
    return () => {
      ws.close();
    };
  })
);

Combining Multiple Streams

import { fromObservable } from 'xstate';
import { combineLatest, interval } from 'rxjs';
import { map } from 'rxjs/operators';

type CombinedData = {
  counter: number;
  timestamp: number;
};

const combinedLogic = fromObservable<CombinedData>(() =>
  combineLatest([
    interval(1000),
    interval(500).pipe(map(() => Date.now()))
  ]).pipe(
    map(([counter, timestamp]) => ({ counter, timestamp }))
  )
);

Error Handling

import { fromObservable, createActor } from 'xstate';
import { Observable } from 'rxjs';

const riskyLogic = fromObservable(() =>
  new Observable(subscriber => {
    let count = 0;
    const id = setInterval(() => {
      if (count < 5) {
        subscriber.next(count++);
      } else {
        subscriber.error(new Error('Count exceeded!'));
      }
    }, 1000);
    
    return () => clearInterval(id);
  })
);

const actor = createActor(riskyLogic);

actor.subscribe({
  next: (snapshot) => {
    if (snapshot.status === 'error') {
      console.error('Error:', snapshot.error);
    } else {
      console.log('Value:', snapshot.context);
    }
  },
  error: (err) => {
    console.error('Observer error:', err);
  }
});

actor.start();

Emitting Custom Events

import { fromObservable, createActor } from 'xstate';
import { interval } from 'rxjs';

type TickEvent = { type: 'tick'; count: number };

const logic = fromObservable<number, void, TickEvent>(({ emit }) => {
  return interval(1000).pipe(
    map(n => {
      emit({ type: 'tick', count: n });
      return n;
    })
  );
});

const actor = createActor(logic);

actor.on('tick', (event) => {
  console.log('Tick:', event.count);
});

actor.start();

Event Observable

XState also provides fromEventObservable() for observables that emit events to the parent:
import { fromEventObservable, createActor } from 'xstate';
import { fromEvent } from 'rxjs';
import { map } from 'rxjs/operators';

const clickLogic = fromEventObservable(() =>
  fromEvent<MouseEvent>(document, 'click').pipe(
    map(event => ({
      type: 'CLICK',
      x: event.clientX,
      y: event.clientY
    }))
  )
);

Snapshot

The observable actor snapshot has the following structure:
interface ObservableSnapshot<TContext, TInput> {
  status: 'active' | 'done' | 'error' | 'stopped';
  context: TContext | undefined;
  output: undefined;
  error: unknown;
  input: TInput | undefined;
}

Status Values

  • 'active' - Observable is emitting values
  • 'done' - Observable completed successfully
  • 'error' - Observable encountered an error
  • 'stopped' - Actor was stopped before observable completed

Behavior

  • Context updates: Each emitted value updates the actor’s context
  • Completion: Observable completion transitions the actor to done status
  • Error handling: Observable errors transition the actor to error status
  • Cleanup: Unsubscribes from the observable when actor is stopped
  • No restart: Completed observables are not restarted

Type Parameters

TContext
type
The type of values emitted by the observable.
TInput
type
default:"NonReducibleUnknown"
The type of the input data.
TEmitted
EventObject
default:"EventObject"
The type of events that can be emitted.

Subscribable Interface

XState’s Subscribable interface is compatible with RxJS Observable:
interface Subscribable<T> {
  subscribe(observer: {
    next?: (value: T) => void;
    error?: (err: unknown) => void;
    complete?: () => void;
  }): Subscription;
}

interface Subscription {
  unsubscribe(): void;
}

See Also