The fromObservable() function creates actor logic from an observable stream. This is useful for reactive programming patterns and is compatible with RxJS observables.
Signature
function fromObservable<
TContext,
TInput extends NonReducibleUnknown,
TEmitted extends EventObject = EventObject
>(
observableCreator: ({
input,
system,
self,
emit
}: {
input: TInput;
system: AnyActorSystem;
self: ObservableActorRef<TContext>;
emit: (emitted: TEmitted) => void;
}) => Subscribable<TContext>
): ObservableActorLogic<TContext, TInput, TEmitted>;
Parameters
A function that creates an observable. Receives an object with:Data provided to the observable actor when created or invoked.
self
ObservableActorRef<TContext>
Reference to the observable actor itself.
The actor system to which the observable actor belongs.
emit
(emitted: TEmitted) => void
Function to emit custom events to subscribers.
Should return a Subscribable (compatible with RxJS Observable).
Returns
Actor logic that can be used with createActor() or invoked in a state machine.
Usage
Basic Example with RxJS
import { fromObservable, createActor } from 'xstate';
import { interval } from 'rxjs';
const intervalLogic = fromObservable(() => interval(1000));
const actor = createActor(intervalLogic);
actor.subscribe((snapshot) => {
console.log('Count:', snapshot.context);
});
actor.start();
// Count: 0
// Count: 1
// Count: 2
// ...
With RxJS Operators
import { fromObservable, createActor } from 'xstate';
import { interval } from 'rxjs';
import { map, filter, take } from 'rxjs/operators';
const logic = fromObservable(() =>
interval(1000).pipe(
map(n => n * 2),
filter(n => n % 4 === 0),
take(5)
)
);
const actor = createActor(logic);
actor.subscribe((snapshot) => {
console.log('Value:', snapshot.context);
if (snapshot.status === 'done') {
console.log('Observable completed');
}
});
actor.start();
// Value: 0
// Value: 4
// Value: 8
// Value: 12
// Value: 16
// Observable completed
import { fromObservable, createActor } from 'xstate';
import { interval } from 'rxjs';
import { map } from 'rxjs/operators';
type Input = {
period: number;
multiplier: number;
};
const logic = fromObservable<number, Input>(({ input }) =>
interval(input.period).pipe(
map(n => n * input.multiplier)
)
);
const actor = createActor(logic, {
input: { period: 500, multiplier: 10 }
});
actor.subscribe((snapshot) => {
console.log(snapshot.context);
});
actor.start();
// 0, 10, 20, 30, ...
Invoking in a Machine
import { setup, fromObservable } from 'xstate';
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';
const timerLogic = fromObservable<number, { duration: number }>(({ input }) =>
interval(1000).pipe(take(input.duration))
);
const machine = setup({
actors: {
timer: timerLogic
}
}).createMachine({
initial: 'idle',
states: {
idle: {
on: {
START: 'timing'
}
},
timing: {
invoke: {
src: 'timer',
input: { duration: 5 },
onDone: 'complete'
},
on: {
CANCEL: 'idle'
}
},
complete: {}
}
});
Custom Observable Implementation
import { fromObservable, createActor } from 'xstate';
import type { Subscribable } from 'xstate';
function customObservable(interval: number): Subscribable<number> {
return {
subscribe(observer) {
let count = 0;
const id = setInterval(() => {
observer.next?.(count++);
}, interval);
return {
unsubscribe() {
clearInterval(id);
}
};
}
};
}
const logic = fromObservable(() => customObservable(1000));
const actor = createActor(logic);
actor.subscribe((snapshot) => {
console.log(snapshot.context);
});
actor.start();
Mouse Position Stream
import { fromObservable } from 'xstate';
import { fromEvent } from 'rxjs';
import { map, throttleTime } from 'rxjs/operators';
type Position = { x: number; y: number };
const mousePositionLogic = fromObservable<Position>(() =>
fromEvent<MouseEvent>(document, 'mousemove').pipe(
throttleTime(100),
map(event => ({ x: event.clientX, y: event.clientY }))
)
);
WebSocket Stream
import { fromObservable } from 'xstate';
import { Observable } from 'rxjs';
type Message = { type: string; data: unknown };
type Input = { url: string };
const websocketLogic = fromObservable<Message, Input>(({ input }) =>
new Observable<Message>(subscriber => {
const ws = new WebSocket(input.url);
ws.addEventListener('message', (event) => {
subscriber.next(JSON.parse(event.data));
});
ws.addEventListener('error', (error) => {
subscriber.error(error);
});
ws.addEventListener('close', () => {
subscriber.complete();
});
return () => {
ws.close();
};
})
);
Combining Multiple Streams
import { fromObservable } from 'xstate';
import { combineLatest, interval } from 'rxjs';
import { map } from 'rxjs/operators';
type CombinedData = {
counter: number;
timestamp: number;
};
const combinedLogic = fromObservable<CombinedData>(() =>
combineLatest([
interval(1000),
interval(500).pipe(map(() => Date.now()))
]).pipe(
map(([counter, timestamp]) => ({ counter, timestamp }))
)
);
Error Handling
import { fromObservable, createActor } from 'xstate';
import { Observable } from 'rxjs';
const riskyLogic = fromObservable(() =>
new Observable(subscriber => {
let count = 0;
const id = setInterval(() => {
if (count < 5) {
subscriber.next(count++);
} else {
subscriber.error(new Error('Count exceeded!'));
}
}, 1000);
return () => clearInterval(id);
})
);
const actor = createActor(riskyLogic);
actor.subscribe({
next: (snapshot) => {
if (snapshot.status === 'error') {
console.error('Error:', snapshot.error);
} else {
console.log('Value:', snapshot.context);
}
},
error: (err) => {
console.error('Observer error:', err);
}
});
actor.start();
Emitting Custom Events
import { fromObservable, createActor } from 'xstate';
import { interval } from 'rxjs';
type TickEvent = { type: 'tick'; count: number };
const logic = fromObservable<number, void, TickEvent>(({ emit }) => {
return interval(1000).pipe(
map(n => {
emit({ type: 'tick', count: n });
return n;
})
);
});
const actor = createActor(logic);
actor.on('tick', (event) => {
console.log('Tick:', event.count);
});
actor.start();
Event Observable
XState also provides fromEventObservable() for observables that emit events to the parent:
import { fromEventObservable, createActor } from 'xstate';
import { fromEvent } from 'rxjs';
import { map } from 'rxjs/operators';
const clickLogic = fromEventObservable(() =>
fromEvent<MouseEvent>(document, 'click').pipe(
map(event => ({
type: 'CLICK',
x: event.clientX,
y: event.clientY
}))
)
);
Snapshot
The observable actor snapshot has the following structure:
interface ObservableSnapshot<TContext, TInput> {
status: 'active' | 'done' | 'error' | 'stopped';
context: TContext | undefined;
output: undefined;
error: unknown;
input: TInput | undefined;
}
Status Values
'active' - Observable is emitting values
'done' - Observable completed successfully
'error' - Observable encountered an error
'stopped' - Actor was stopped before observable completed
Behavior
- Context updates: Each emitted value updates the actor’s
context
- Completion: Observable completion transitions the actor to
done status
- Error handling: Observable errors transition the actor to
error status
- Cleanup: Unsubscribes from the observable when actor is stopped
- No restart: Completed observables are not restarted
Type Parameters
The type of values emitted by the observable.
TInput
type
default:"NonReducibleUnknown"
The type of the input data.
TEmitted
EventObject
default:"EventObject"
The type of events that can be emitted.
Subscribable Interface
XState’s Subscribable interface is compatible with RxJS Observable:
interface Subscribable<T> {
subscribe(observer: {
next?: (value: T) => void;
error?: (err: unknown) => void;
complete?: () => void;
}): Subscription;
}
interface Subscription {
unsubscribe(): void;
}
See Also