Introduction à la programmation réactive

La programmation réactive est un outil puissant pour concevoir les applications modernes et gérer les événements asynchrones. L'utilisation de bibliothèques réactives permet un réel gain en résilience, lisibilité et performances.

Bonjour et bienvenue dans ce deuxième article de la Chronique Réactive !

Aujourd'hui nous allons rester dans la continuité du premier article sur les systèmes réactifs et aborder le sujet de la programmation réactive.

Un peu de contexte

Tout d'abord nous allons parler du contexte dans lequel nait la programmation réactive et des besoins auxquels elle répond.

Si vous êtes vous-même développeur vous savez d'ores et déjà que l'informatique de manière générale évolue dans un contexte profondément asynchrone. Sauf exceptions les logiciels modernes doivent effectuer de nombreuses actions parallèles :

  • les serveurs web doivent répondre à des milliers de requêtes simultanées et ne peuvent pas se permettre de finir de répondre à un client pour passer au suivant. Les serveurs cherchent aujourd'hui à répondre parallèlement aux requêtes qu'ils reçoivent
  • les jeux vidéos effectuent des requêtes réseau, chargent la carte au fil des déplacements du joueur, chargent et déchargent des ressources, le tout en essayant de garder un rendu fluide et stable
  • les pages web chargent de nombreuses données depuis le réseau, certaines au fil de votre lecture, tout en gardant un rendu fluide

Pour gérer ces situations les développeurs disposent d'un outil omniprésent en programmation nommé le multithreading.

Qu'est-ce que le multithreading ?

Le multithreading désigne la capacité d'un programme informatique à exécuter plusieurs parties de son code de manière simultanée, chaque "unité de traitement parallèle" étant nommée une thread.

Comme vous le savez surement déjà les processeurs modernes disposent généralement de plusieurs cœurs permettant l’exécution d'actions parallèles. Bien que les ordinateurs soient connus pour exécuter des suites d'instructions les unes après les autres, exécuter du code parallèle peut signifier une véritable simultanéité dans l’exécution de code.

Pour gérer cette simultanéité les développeurs utilisent également des méthodes de synchronisation comme les mutex et les sémaphores.

Les problématiques

Cette capacité qu'ont les programmes informatiques à exécuter du code de manière parallèle est un véritable atout pour les développeurs et une composante centrale des applications modernes, mais avec ces avantages viennent aussi complexité et erreurs.

Car un grand pouvoir implique de grandes responsabilités, un multithreading mal géré peut conduire à divers problèmes :

  • des conditions de concurrence ou race condition, désignent les comportements indéfinis provoqués par des accès en lecture/écriture simultanés par plusieurs threads parallèles sur une même variable. Par exemple lorsque deux threads tentent d'attribuer une valeur différente à une même variable de manière simultanée. Cela peut même conduire à un crash
  • des ralentissements, par exemple si une thread attend la synchronisation d'une autre
  • des deadlocks, boucles infinies dans lesquelles plusieurs threads s'attendent mutuellement

Le multithreading apportant également de la complexité il peut parfois être difficile de gérer correctement les erreurs. Et une erreur non gérée peut par exemple conduire une thread à attendre éternellement une réponse de la thread ayant reçu l'erreur.

Et c'est dans ce contexte de sensibilité aux erreurs et de complexité que nait la programmation réactive.

Définition

La programmation réactive est un paradigme de programmation visant à gérer les événements asynchrones sous forme de flux de données.

Cette approche permet de s'affranchir de la gestion bas niveau des threads et vise à permettre une gestion beaucoup plus transparente des erreurs, une plus grande résilience ainsi qu'une meilleure gestion de la mémoire au travers de concepts comme la contre-pression (ou backpressure).

Plusieurs implémentations existent, la plus connue étant ReactiveX.

ReactiveX

ReactiveX est une bibliothèque logicielle disponible dans de nombreux langages (entre autres Java, JS, Cpp, Rust, C#, ainsi que de nombreux autres) proposant une implémentation de programmation réactive au travers d'objets appelés Observables.

Les Observables

Un Observable est un flux de données asynchrones observables au travers de callbacks qu'il est possible d'enregistrer.

Trois types d'émissions sont à distinguer :

  • les émissions de valeur. Si votre Observable encapsule par exemple des nombres entiers, une callback vous permettra d'écouter l'émission de chaque nombre
  • les émissions de fin de flux. Lorsqu'un flux se termine un signal dédié est émis. Plus aucune valeur ou erreur ne sera émise par la suite
  • les émissions d'erreur. Un Observable peut être en état d'erreur, une callback permettra de récupérer l'objet de l'erreur et aucune nouvelle émission de valeur ou de fin de flux ne suivra

Les Observables mettent également à dispositions une suite d'opérateurs permettant leur manipulation, entre autres :

  • des opérateurs de création. Cela va de la simple émission de variable à celle de réels flux asynchrones, en passant par l'émission d'erreurs
  • des opérateurs de transformation, permettant d'effectuer des opérations complexes sur les valeurs émises. Ces opérateurs peuvent permettre de changer le type de retour (il est par exemple tout à fait courant qu'un opérateur retourne une chaine de caractères contenue dans un objet) mais également de retourner d'autres Observables. Si vous souhaitez par exemple effectuer une action asynchrone à partir d'une valeur émise c'est possible
  • des opérateurs de filtrage permettant de filtrer les valeurs selon une condition, de ne récupérer que les X premières valeurs, les X dernières, d'en sauter, etc...
  • des opérateurs de combinaison permettant l'enchainement, la fusion, la concaténation d'Observables
  • des opérateurs de gestion d'erreur permettant la mise en place de stratégies de repli, de retry, de timeout, etc...

et bien d'autres qui peuvent varier en fonction des implémentations ;)

Schedulers

Une notion également centrale et allant de pair avec les Observables est celle de Schedulers. Il s'agit d'objets permettant de définir où et quand exécuter les actions asynchrones des Observables. En pratique un Scheduler va permettre de spécifier sur quelle thread (ou groupe de threads) exécuter les actions et les signaux des Observables.

Spécialisations d'Observables

Certaines implémentations comme RxJava proposent des spécialisations d'Observables, notamment :

  • Completable : un Observable n'émettant aucune valeur. Il émettra uniquement un signal de complétion ou bien d'erreur
  • Single : un Observable émettant une unique valeur. Un Single ne peut pas être vide, il émettra sa valeur et son signal de complétion ou d'erreur le cas échéant
  • Maybe : un Observable pouvant émettre une unique valeur. Contrairement au Single un Maybe peut très bien émettre un signal de complétion sans avoir émis de valeur. Un signal d'erreur peut également être émis
  • Flowable : Un Observable gérant la contre-pression (ou backpressure), concept très important en programmation réactive que nous verrons un peu plus loin

Chaque spécialisation va disposer de son propre panel d'opérateurs car tous ne sont pas pertinents en fonction de la spécialisation. Par exemple un Completable n'ayant pas de valeur à gérer, aucun opérateur de transformation ou de filtrage n'est disponible. Un Maybe va également avoir des opérateurs supplémentaires pour gérer le cas où aucune valeur n'est présente.

Ces spécialisations trouvent leur utilité dans de nombreux contextes, prenons l'exemple d'une méthode permettant de récupérer un utilisateur dans une base de données en fonction de son uuid et utilisant un Observable, le prototype de cette méthode pourrait ressembler à ceci :‌

Observable<User> rxGetUser(String uuid);

‌Son alternative utilisant un Single ressemblerait à ceci :

Single<User> rxGetUser(String uuid);

‌Un Observable sera tout à fait fonctionnel et permettra de récupérer l'objet "User", ses opérateurs de filtrage pourront également permettre de s'assurer qu'il y a bien une unique valeur. Mais il est pourtant préférable d'utiliser un Single dans cette situation pour plusieurs raisons :

  • la lisibilité : le Single est plus explicite sur le nombre d'objets retournés, un autre développeur devant lire ce code comprendra plus facilement ce dont il s'agit.
  • la simplification du code : si vous souhaitez vous assurer du nombre de valeurs un type spécifique est préférable car il évite l'ajout d'opérateurs de filtrage
  • la réduction du risque d'erreur : une lisibilité accrue, une simplification du code et une réduction du nombre d'opérateurs possible induit naturellement une meilleure maintenabilité et une réduction du risque d'erreur

Cas pratique

Pour expliciter mon propos et vous donner une meilleure idée de l'apparence d'un code rédigé avec ReactiveX, prenons l'exemple simple de l'écriture dans un fichier.

Bien que cette situation soit d'apparence terriblement simple et commune, elle est en réalité sensible à de nombreuses erreurs qu'il est nécessaire de gérer. Une implémentation classique en Java cherchant à gérer les différentes erreurs pouvant arriver pourrait par exemple ressembler à ceci :‌

FileWriter fileWriter = null;
try {
	fileWriter = new FileWriter("fileName.txt");
	fileWriter.write("Hello, World!");
} catch (IOException e) {
	e.printStackTrace();
} finally {
	try {
		if (fileWriter != null) {
			fileWriter.close();
		}
	} catch (IOException e) {
		e.printStackTrace();
	}
}

Son alternative ReactiveX pourrait ressembler à ceci :‌

fileSystem.rxOpen(filePath)
	.flatMap(file -> {
		return file.rxWrite("Hello, world!")
			.doOnTerminate(file.rxClose(file));
	})
	.doOnError(e -> e.printStackTrace());

Ces deux implémentations comportent chacune l'ouverture d'un fichier, l'écriture dans ce même fichier, sa fermeture ainsi que la gestion d'erreur associée à ces actions.

Le premier constat que nous pouvons faire est que la gestion d'erreur de l'approche réactive apporte un gain conséquent en taille de code. En effet deux try-catch-finally imbriqués sont maintenant réduits à une seule lambda de gestion d'erreur appelée par la méthode doOnError, cette réduction de la taille du code induit également une meilleure lisibilité (même si ce  n'est pas forcément évidement si vous n'êtes pas habitué à la programmation réactive).

Le deuxième constat moins évident cette fois, est que l'approche réactive apporte un gain en performances. En effet là où se trouvaient des appels bloquants comme l'ouverture, l'écriture et la fermeture d'un fichier (ce qui peut par exemple entrainer une baisse de fluidité d'une application), se trouvent maintenant des appels asynchrones comme rxOpen, rxWrite et rxClose. Certains pourraient objecter qu'il est possible d'effectuer ces actions de manière asynchrone sans s'aider de programmation réactive, mais les développeurs ne s’embarrassent pas toujours de pareilles considérations lorsqu'ils cherchent par exemple à lire un bête fichier de configuration.

S'aider d'une librairie réactive adéquate c'est donc bénéficier constamment de ce gain en performances sans se préoccuper de problématiques asynchrones bas niveau tout en gardant une écriture simple et une très bonne gestion d'erreurs.

Reactive Streams

A la différence de ReactiveX, Reactive Streams n'est pas une bibliothèque de programmation réactive à proprement parler mais une spécification définissant des interfaces et des règles pour faciliter l'interopérabilité entre les librairies réactives.

Cette spécification se base autour de 4 interfaces clés :

  • le Publisher : un objet émetteur d'événements auquel il est possible de s'abonner. Comme en ReactiveX cet objet signalera chaque valeur, erreur, ou fin de flux
  • le Subscriber : un objet pouvant s'abonner à un Publisher et recevoir ses différents signaux
  • le Processor : un objet héritant à la fois de Publisher et de Subscriber. Il représente une étape de traitement des données comme une action de filtre ou de transformation par exemple
  • la Subscription : un objet partagé par le Publisher et le Subscriber permettant au Subscriber de demander ou d'annuler l'envoi de données

Cette spécification n'est pas incompatible avec ReactiveX, certaines implémentations de ReactiveX sont même basées sur la norme Reactive Streams. Mais cette norme apporte une notion clé dans la programmation réactive moderne manquant à mon sens terriblement à la norme ReactiveX et à certaines de ses implémentations : la notion de contre-pression ou backpressure.

Contre-pression

La contre-pression ou backpressure est un concept relatif à la gestion de flux d'objets ou d'événements, et fait référence à la situation où un flux d'entrée émet plus rapidement que son flux de sortie ne peut lire. Cette situation entraine naturellement un entassement des données (et donc un problème de gestion de la mémoire et de résilience), d'où la notion assez visuelle de "pression".

L'approche Reactive Streams permet la gestion de cette pression, il est possible d'implémenter soi même les interfaces Reactive Streams (Publisher, Subscriber, Subscription, Processor) en incorporant une stratégie de contre-pression, ou bien d'utiliser une implémentation pré-existante. Ces stratégies peuvent être :

  • la mise en pause du flux d'entrée. Lorsque le nombre de données repassera sous un certain seuil le flux d'entrée pourra être remis en marche. Cette approche n'est pas toujours possible, mais lorsque pouvant être mise en place elle permet de conserver un débit important de données tout en conservant un grand contrôle sur la mémoire et en préservant l'intégralité des données
  • l'abandon des données les plus anciennes. Dans un contexte où il n'est pas possible de mettre en pause le flux d'entrée et où la perte de certaines données est acceptable par exemple lorsque le flux provient d'un capteur physique, l'approche de la suppression de certaines données est tout à fait pertinente pour garantir un débit stable et une mémoire sous contrôle
  • l'abandon des données les plus récentes
  • l'envoi d'un signal d'erreur

Ce concept est profondément ancré dans l'approche réactive qui cherche à garantir la résilience, la stabilité et la disponibilité des programmes informatiques. Certains types de flux ne peuvent pas être simplement mis en pause (je pense par exemple à la réception de données de capteurs, aux technologies de websockets ou de communication inter-services comme Kafka), c'est pourquoi il est primordial de définir des stratégies de gestion de ces flux.

Conclusion

Pour récapituler, la programmation réactive est un outil puissant pour concevoir les applications modernes et gérer les événements asynchrones. L'utilisation de bibliothèques réactives permet un réel gain en résilience, lisibilité et performances.

ReactiveX est la librairie de programmation réactive la plus connue et propose un standard centré autour du concept d'Observable et disponible au travers de plusieurs langages. Malheureusement ReactiveX n'est pas uniforme, et certaines classes et fonctionnalités ne sont pas disponibles dans toutes les implémentations.

Un concept également manquant à ReactiveX est celui de gestion de la contre-pression. Un standard nommé Reactive Streams est disponible et permet cette gestion.

Des spécialisations d'Observables sont également disponibles dans certaines implémentations de ReactiveX, je vous conseille vivement leur utilisation dès que possible car elles permettent une simplification du code ainsi qu'une meilleure lisibilité.

Bien que nous n'ayons qu'effleuré la surface de la programmation réactive au travers de cette introduction j'espère avoir su susciter votre intérêt. La programmation réactive est un sujet vaste et complexe qu'il est difficile de synthétiser, n'hésitez donc pas à jeter un oeil aux sources et liens en fin de post si vous souhaitez en apprendre davantage sur le sujet.

Vous y découvrirez de nouveaux concepts et problématiques que je n'ai malheureusement pas eu le temps d'aborder ici.

De futurs articles sont à venir en rapport avec l'univers réactif, n'hésitez pas à me donner votre avis et à me faire savoir si le sujet vous intéresse ;)

Merci pour votre attention et à bientôt !

Références