Blog ENI : Toute la veille numérique !
Accès illimité 24h/24 à tous nos livres & vidéos ! 
Découvrez la Bibliothèque Numérique ENI. Cliquez ici
💥 Du 22 au 24 novembre : Accès 100% GRATUIT
à la Bibliothèque Numérique ENI. Je m'inscris !
  1. Livres et vidéos
  2. Python 3
  3. Programmation distribuée
Extrait - Python 3 Traitement de données et techniques de programmation (2e édition)
Extraits du livre
Python 3 Traitement de données et techniques de programmation (2e édition) Revenir à la page d'achat du livre

Programmation distribuée

Définitions

La programmation distribuée est une solution idéale pour répondre à des problématiques de concurrence où l’on a besoin de faire exécuter des tâches en arrière-plan. La solution consiste à avoir, à côté de l’application principale, une application secondaire capable d’exécuter ces tâches. Il faut ensuite transmettre des messages d’une application à l’autre pour d’une part expliquer la tâche à exécuter et d’autre part renvoyer le résultat (cette dernière partie étant optionnelle).

Pour gérer cette problématique, on se base sur le protocole AMQP (Advanced Message Queuing Protocol) qui est un standard pour l’organisation de ces messages. Nous présenterons les principaux patrons de conception qui permettent de résoudre la plupart des besoins en les utilisant à bas niveau. Nous commencerons par présenter ØMQ dont l’interface de programmation est relativement similaire à l’utilisation de simples sockets réseau, puis nous utiliserons une solution basée sur un message broker RabbitMQ.

Dans un second temps, nous présenterons Celery qui est une solution de haut niveau pour gérer la problématique de la programmation distribuée très rapidement et efficacement....

ØMQ

1. Présentation générale

ØMQ est une solution élégante et performante qui va nous permettre de présenter les patrons de conception essentiels liés à cette problématique. Pour l’installer :

$ pip install zmq 

Lorsque l’on écrit un programme et que l’on souhaite utiliser cette bibliothèque, il y a toujours le même type de code à écrire :

>>> from zmq import Context 
>>> context = Context() 
>>> socket = context.socket(METHODE) 

Ce code se trouve dans les deux applications (ou plus) qui communiquent entre elles. Il permet de créer le socket réseau dans le cadre duquel les échanges de messages auront lieu. La méthode à utiliser dépend du patron de conception que l’on souhaite utiliser.

Une fois le socket créé, on peut visualiser ses méthodes :

>>> dir(socket) 

Comme on travaille avec un socket, on utilisera la méthode bind pour écouter sur une interface réseau et un port particulier. Les protocoles supportés sont TCP, UDP, PGM, EPGM, INPROC et IPC. Il existe également bind_to_random_port qui permet de laisser au programme le choix du port. Pour fermer une connexion, il faudra utiliser la méthode unbind. L’attribut closed permet de connaître l’état du socket.

Le programme qui réalisera l’opération de bind sera le serveur, celui qui écoute le port (et il ne peut y en avoir qu’un). L’autre programme devra utiliser connect et sera le client. Suivant le paradigme utilisé, la communication est monodirectionnelle ou bidirectionnelle. Autrement dit, dans certains protocoles, le client et le serveur peuvent à la fois envoyer et recevoir de la donnée, alors que dans d’autres, l’un ne fait que recevoir et l’autre qu’envoyer. Pour fermer une telle connexion, il faut utiliser disconnect.

Les sockets peuvent ensuite envoyer ou recevoir des données et ils supportent plusieurs formats :

Type d’objets

Méthode réception

Méthode envoi

Octets

recv

send

Objet convertible en JSON (usuellement un dictionnaire)

recv_json

send_json

Liste d’octets (essentiellement)

recv_multipart

send_multipart

Objet Python sérialisable...

AMQP avec RabbitMQ

Précédemment, nous avons montré les différents patrons que l’on peut utiliser pour écrire des applications distribuées. Ici nous allons voir comment utiliser RabbitMQ à bas niveau et par la suite nous verrons une solution de haut niveau.

Nous allons utiliser Pika pour manipuler une file d’attente de RabbitMQ et présenter les différentes manières de les utiliser.

1. Installation

Pour installer RabbitMQ, il faut utiliser un dépôt spécifique, car les paquets issus de la distribution, s’ils existent dans le dépôt officiel, sont souvent datés et souffrent de vulnérabilités :

# wget -O - "https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey" | 
apt-key add - 
# apt update 
# apt install rabbitmq-server 

Pour les autres distributions, il faudra se référer à la documentation officielle (https://www.rabbitmq.com/download.html).

2. Configuration

Lorsque l’on utilise PostgreSQL, par exemple, il faut créer un utilisateur avec son mot de passe ainsi qu’une base de données. On peut alors l’utiliser dans notre programme Python à partir d’une URL telle que :

database_url = 'postgres://user:secret@localhost:5432/db_name' 

Avec RabbitMQ, il s’agit de la même chose :

rabbit_broker_url = 'amqp://user:secret@localhost:5672/vhost' 

Il existe ensuite quelques commandes à connaître pour créer un utilisateur, un vhost et pour paramétrer les permissions :

# rabbitmqctl add_user test 
# sudo rabbitmqctl add_vhost vhost 
# sudo rabbitmqctl set_permissions -p vhost test ".*" ".*" ".*" 

Nous pouvons maintenant utiliser RabbitMQ à partir de l’URL définie plus haut.

Pour terminer, il faut également installer pika :

pip install pika 

3. Introduction

Le premier exemple de code est très basique : un producteur et un consommateur utilisant une file d’attente (queue) pour échanger des messages. Il s’agit essentiellement de voir comment créer une connexion, envoyer ou recevoir des messages.

Voici le producteur :

from pika import BlockingConnection 
from pika.connection import URLParameters 
 
connection = BlockingConnection( 
    URLParameters('amqp://test:test@localhost:5672/vhost')) ...

Kafka

1. Présentation

Kafka est un système de distribution de message en temps réel utilisé notamment dans les canaux de données mais également en tant que système d’échange dans les systèmes de micro-services. 

Au niveau de l’écosystème Kafka en Python, il existe notamment kafka-python et aiokafka, sa version asynchrone, permettant de créer des producteurs et des consommateurs.

Il existe également le projet faust qui permet de gérer l’équivalent du pattern publish/subscribe.

2. Principes généraux

Kafka est un système de distribution de messages. Ces messages sont isolés dans un topic. Ce topic est un mécanisme de stockage qui conserve l’ordre d’émission des messages et les messages peuvent être vus comme des événements.

Chaque topic peut avoir une à plusieurs partitions et l’ordre des messages est garanti au sein d’une même partition mais pas entre les différentes partitions.

Lorsqu’un producteur produit un message, ce dernier sera émis dans une partition seulement. 

Il existe également la notion de groupe. En effet, dans un système où de multiples parties échangent des messages, le message doit être lu par chaque groupe à l’écoute. Si un groupe n’a qu’un seul consommateur, ce dernier écoutera toutes les partitions....

Celery

1. Présentation

Celery est un ordonnanceur de tâche :

  • Il reçoit ses tâches via un système de message.

  • Il les distribue de manière optimisée via l’utilisation des workers.

  • Chaque tâche peut s’exécuter de manière synchrone ou asynchrone.

  • Il est conçu pour fonctionner en temps réel, mais peut aussi gérer une planification.

  • Chaque tâche peut renvoyer un résultat (optionnellement).

Il s’agit d’un outil de haut niveau offrant des fonctions et une syntaxe pour répondre facilement à des besoins d’architecture applicative complexe.

2. Dans quel cas utiliser Celery ?

Le cas classique est lorsque l’on a une fonction qui doit générer un résultat, puis qui doit faire quelque chose annexe avec ce résultat et le renvoyer à l’utilisateur. Si l’on fait ces actions annexes dans le même tunnel de traitement, alors on retarde d’autant la réponse à l’utilisateur. L’idée est donc de mettre ces actions annexes dans des tâches distribuées (exécutées à côté du processus principal). Ainsi, on lance ces tâches qui seront exécutées plus tard, mais on n’attend pas qu’elles soient terminées et l’on peut continuer le processus principal et rendre la main à l’utilisateur.

Les tâches placées dans une file d’attente sont exécutées au plus tôt. Il n’y a cependant aucune garantie de la temporalité de leur exécution ni sur le fait qu’elles réussissent ou échouent.

L’expérience utilisateur est ainsi améliorée, car l’application répond plus vite, mais il doit aussi être tenu au courant qu’il y a des tâches en attente et être notifié lorsque celles-ci se terminent.

Un exemple concret serait une tâche pour envoyer un e-mail ou générer un document à partir des résultats du processus principal.

Il est possible de prévoir un moyen pour Celery de restituer le résultat, ceci permet de résoudre toutes les situations classiques où la programmation asynchrone est efficace, sauf qu’on le fait avec une solution tierce qui permet une mise...

Crossbar

1. Présentation

Pour terminer ce chapitre sur la programmation distribuée, nous allons introduire WAMP (Web Application Messaging Protocol). Il s’agit d’un standard ouvert de WebSocket qui permet à deux applications de s’échanger des messages RPC (Remote Procedure Call, soit appel distant de procédure) ou de type Publish/Subscribe (que nous avons déjà croisé deux fois dans ce chapitre).

La conception d’origine de ce protocole a été réalisée par crossbar.io en 2012 et est relativement utile et facile à mettre en place pour des projets web.

La solution présentée ici utilise massivement la bibliothèque asynchrone de Python. Il est donc conseillé d’avoir parcouru le chapitre Programmation asynchrone : avancée avant de s’attaquer à cette partie.

Pour installer les outils nécessaires :

$ pip install autobahn 

2. WebSocket

En guise d’introduction, nous allons réaliser un WebSocket de type Echo. Ce dernier va donc recevoir de la donnée et la renvoyer telle quelle. Voici ce serveur :

import json 
import logging 
import coloredlogs 
from autobahn.asyncio.websocket import ( 
    WebSocketServerProtocol, 
    WebSocketServerFactory) 
 
 
logger = logging.getLogger("notifications") 
coloredlogs.install(level='DEBUG', logger=logger) 
 
 
class TestServerProtocol(WebSocketServerProtocol): 
 
    def onConnect(self, request): 
        logger.info("Client connecting: {0}".format(request.peer)) 
 
    def onOpen(self): 
        logger.info("WebSocket connection open.") 
 
    def onMessage(self, payload, isBinary): 
        if isBinary: 
            result = self.onMessageBinary(payload) 
        else: 
            payload = payload.decode("utf-8") 
            try: ...