Improving Stream Management in Bloc
•7 min read
If you’ve been following the reactive repository pattern, you’re likely familiar with subscribing to a stream in a bloc. While the traditional approach works, there’s a more efficient and cleaner way to manage streams and subscriptions in BLoC.
The "traditional" approach
Let’s first take a look at the typical pattern for subscribing to a stream in a bloc. Here's an example of a bloc that listens to a stream of todos:
class TodoBloc extends Bloc<TodoEvent, TodoState> {
TodoBloc({
required TodoRepository repository,
}) : _repository = repository,
super(const TodoInitialState()) {
on<TodoEvent>(
(event, emit) => switch (event) {
WatchTodos() => _onWatchTodos(event, emit),
},
);
}
final TodoRepository _repository;
StreamSubscription<List<Todo>>? _todosStreamController;
@override
Future<void> close() {
_todosStreamController?.cancel();
return super.close();
}
FutureOr<void> _onWatchTodos(
WatchAllTodos event,
Emitter<TodoWatcherState> emit,
) async {
_todosStreamController = _repository.todosStream.listen(
(todos) {
emit(TodoSuccessState(todos));
},
onError: (error) {
emit(TodoErrorState(error));
},
);
}
}
This approach is totally valid, but there’s one potential issue: manually canceling the stream subscription in the close
method.
If we forget to cancel the subscription, we risk a memory leak.
A better approach
There’s a cleaner and safer alternative: emit.forEach
.
This method allows the bloc to manage the stream subscription automatically, so we don't have to worry about canceling the subscription manually.
What is emit.forEach
?
The emit.forEach method subscribes to a stream and automatically emits new states as the stream emits new data. This method handles the subscription internally, reducing the likelihood of memory leaks. From the official bloc documentation:
Subscribes to the provided [stream] and invokes the [onData] callback when the [stream] emits new data and the result of [onData] is emitted.
And from an official bloc tutorial:
Using await emit.forEach() is a newer pattern for subscribing to a stream which allows the bloc to manage the subscription internally.
Here's how we can use it following the previous example:
class TodoBloc extends Bloc<TodoEvent, TodoState> {
TodoBloc({
required TodoRepository repository,
}) : _repository = repository,
super(const TodoInitialState()) {
on<TodoEvent>(
(event, emit) => switch (event) {
WatchTodos() => _onWatchTodos(event, emit),
},
);
}
final TodoRepository _repository;
FutureOr<void> _onWatchTodos(
WatchTodos event,
Emitter<TodoState> emit,
) async {
await emit.forEach(
_repository.todosStream,
onData: TodoSuccessState.new, // Convert the data to a new state
onError: (e, s) => TodoErrorState(e), // Convert the error to a new state
);
}
}
Why is this better?
- No manual subscription management: You don’t have to manually cancel the subscription when the bloc is closed, as bloc takes care of it internally.
- Cleaner and more readable code: The code becomes more concise and less error-prone because bloc is now handling the stream management for you.
- Automatic error handling: You can easily handle stream errors by providing an
onError
callback inemit.forEach
.
Important Note:
Make sure to await the emit.forEach
method, or it won't execute properly. This is crucial for ensuring that the subscription is managed correctly.
Bonus: Using emit.onEach
Sometimes, you may want to subscribe to a stream but don't want to emit any state; instead,
you might just want to trigger an event when the stream emits an error or other data.
This can be done with emit.onEach
.
The method definition is similar to emit.forEach, but instead of emitting new states, it allows you to trigger events when new data is received or an error occurs.
Following the previous example, we can add a new event when the stream emits an error:
class TodoBloc extends Bloc<TodoEvent, TodoState> {
TodoBloc({
required TodoRepository repository,
}) : _repository = repository,
super(const TodoInitialState()) {
on<TodoEvent>(
(event, emit) => switch (event) {
WatchTodos() => _onWatchTodos(event, emit),
HandleTodoError() => _onHandleTodoError(event, emit),
},
);
}
final TodoRepository _repository;
FutureOr<void> _onWatchTodos(
WatchTodos event,
Emitter<TodoState> emit,
) async {
await emit.onEach(
_repository.todosStream,
onData: (todos) => emit(TodoSuccessState(todos)),
onError: (e, _) => add(HandleTodoError(e)),
);
}
FutureOr<void> _onHandleTodoError(
HandleTodoError event,
Emitter<TodoState> emit,
) async {
// Handle the error
}
}
Why use emit.onEach
?
- Event-based handling: Instead of emitting a state, you can trigger other events in the BLoC. This is useful when you want to handle specific errors or actions without emitting a new state.
- Flexibility: It's useful when you need to react to stream changes but don’t need to update the UI immediately or directly.
Conclusion
Incorporating emit.forEach
and emit.onEach
into your bloc architecture helps streamline the handling of streams, making your code easier to read, safer, and more efficient by removing manual subscription management.
It's a small change that can have a big impact, reducing the chances of memory leaks and simplifying error handling.
If you have any questions or suggestions, feel free to reach out on X (formerly Twitter) or send me an email at mail@lorensala.com.
Happy coding! 🚀