Florent Jardin   Conférences  Archives  À propos

Un assistant pour copier les données distantes

This article is available in English: An assistant to copy data from a remote server (2024-05-28)

Lors de la dernière PGSession 16, j’ai rédigé et animé un atelier de trois heures au sujet de la migration vers PostgreSQL à l’aide des Foreign Data Wrappers, ou FDW. Ce fut notamment l’occasion de présenter au grand public, l’extension db_migrator pour laquelle j’ai dédié un article sur ce blog.

Au cours de cet atelier, nous pouvons constater que la copie des données avec l’extension db_migrator n’est pas parfaitement prise en charge. En effet, bien qu’il existe une fonction de bas niveau pour répartir sur plusieurs processus le transfert table à table, de nombreuses situations devront exiger de rédiger un grand nombre de requêtes SQL pour se tirer d’affaire. Au cours des mois qui suivirent, je me suis attelé à la conception d’un assistant écrit en PL/pgSQL dont le but est de simplifier la génération de ces requêtes.


Transfert des données sans assistant

Il n’y a pas de magie dans l’opération de copie des données à travers un Foreign Data Wrapper. Tout se résume à une série de requêtes INSERT qui doivent être exécutées dans un ordre prédéfini. Prenons les 16 tables du modèle de données très connu « Sakila » (disponible à cette adresse), pour illustrer le besoin de génération des requêtes de transfert.

ERD Base Sakila

Pour la pédagogie de cet article, je dispose d’une base de données PostgreSQL avec les deux schémas suivants que j’utiliserai dans mes démonstrations :

  • mysql : le schéma source contenant la définition des tables externes via l’extension mysql_fdw ;
  • public : le schéma cible où les données seront copiées.

Chaque table externe fait l’objet d’une étude rapide pour obtenir la bonne correspondance de type de colonne, et leur définition est conservée dans un fichier SQL à destination des équipes du projet. Par exemple, la table externe rental est définie comme suit :

CREATE FOREIGN TABLE mysql.rental (
	 rental_id integer NOT NULL,
	 rental_date timestamp without time zone NOT NULL,
	 inventory_id integer NOT NULL,
	 customer_id smallint NOT NULL,
	 return_date timestamp without time zone,
	 staff_id smallint NOT NULL,
	 last_update timestamp without time zone NOT NULL
)
SERVER sakila_mysql
OPTIONS (
	 dbname 'sakila',
	 table_name 'rental'
);

Lors de la création de la table public.rental qui accueillera les données, il est opportun de décider si nous souhaitons mettre en place un partitionnement, chose que db_migrator est capable d’identifier et de mettre en place. Pour l’exemple, je reprends la structure stricte à l’aide de la syntaxe CREATE TABLE LIKE pour créer toutes mes tables cibles.

CREATE TABLE public.actor (LIKE mysql.actor);
CREATE TABLE public.address (LIKE mysql.address);
...

Avant même de mettre en place un générateur de requêtes INSERT, il est aisé d’entrevoir la forme de celles-ci. Chaque ligne de la table externe sera lue à travers un SELECT global, puis insérée dans la table cible. Le script de migration contient ainsi 16 instructions, une pour chaque table.

-- insert.sql
INSERT INTO public.actor SELECT * FROM mysql.actor;
INSERT INTO public.address SELECT * FROM mysql.address;
...
INSERT INTO public.store SELECT * FROM mysql.store;

Pour bénéficier de plusieurs processus, j’apprécie l’outil xargs qui permet de distribuer chaque ligne du fichier insert.sql sur une nouvelle session psql. Cette technique était présentée dans l’atelier de février, notamment pour paralléliser la construction des index et des clés primaires, définis dans un fichier SQL.

$ xargs -P 4 -a insert.sql -d '\n' -I % sh -c 'psql -c "%"'
INSERT 0 16
INSERT 0 603
...
INSERT 0 16044

C’est un peu rude, ça manque de verbosité, les requêtes SQL sont statiques. Bref, voyons la partie suivante pour découvrir ce que mon assistant peut apporter.


Démonstration de l’assistant

Contrairement à mes autres projets en PL/pgSQL, cet assistant n’est pas une extension et s’installe comme un vulgaire script. Une fois téléchargé, il suffit de l’invoquer sur la base de données de votre choix avec la commande suivante :

$ psql -d sakila -f fdw-assistant.sql 

Le schéma par défaut se nomme assistant et contient une table de configuration sobrement appelée config. Pour chaque table à migrer, il suffit d’insérer une unique ligne qui servira d’élément de départ à la génération des requêtes de migration des données. On y retrouve dans la version actuelle les paramètres suivants :

  • source : la table externe qui contient les données à copier ;
  • target : la table cible où les données seront copiées ;
  • pkey : la colonne de clé primaire de la table cible ;
  • priority : les valeurs les plus faibles définissent les tables à traiter en premier ;
  • parts : le nombre de processus à lancer pour la copie des données ;
  • trunc : une option pour vider la table cible avant de copier les données ;
  • condition : une clause WHERE pour filtrer les données à copier ;
  • batchsize : le nombre de lignes à copier avant de réaliser un COMMIT intermédiaire.

Pour initialiser cette table en première intention, il est nécessaire de connaître a minima les colonnes de clé primaire de chaque table distante. En reportant les informations du diagramme relationnel de la base Sakila, nous pouvons remplir la table config de la façon suivante :

INSERT INTO assistant.config (source, target, pkey)
VALUES
  ('mysql.actor', 'public.actor', 'actor_id'),
  ('mysql.address', 'public.address', 'address_id'),
  ...
  ('mysql.store', 'public.store', 'store_id');

Pour chaque transfert, nous indiquons la table source et la table cible, ainsi que la colonne de clé primaire. Cette dernière est requise pour trier les lignes, découper le transfert en plusieurs lots (batchs) et redémarrer le transfert en cas d’interruption.

À l’aide de cette configuration, nous pouvons passer à la planification. Les tables stage et job sont alimentées avec de nouveaux éléments qui serviront au pilotage et au suivi des différents transferts à déclencher.

SELECT * FROM assistant.plan();
    target     |        invocation
---------------+--------------------------
 customer      | CALL assistant.copy(1);
 address       | CALL assistant.copy(2);
 ...
(16 rows)

Une vue nommée report permet de suivre l’avancement des différentes étapes en joignant les tables stage et job. Elle donne notamment des éléments très utiles pour suivre l’avancement et le débit des transferts.

SELECT target, state, rows FROM assistant.report WHERE stage_id = 1;
    target     |  state  | rows
---------------+---------+------
 rental        | pending |    0
 actor         | pending |    0
 ...
(16 rows)

Les lignes retournées par la commande plan() peuvent alors être invoquées les unes après les autres avec la méta-commande \gexec de psql, ou alors en reprenant la technique du fichier et la distribution des requêtes avec xargs.

L’appel à la méthode copy() se charge de construire l’instruction INSERT relative à la copie des données d’une table distante vers une table locale. Par exemple, pour la table customer, le résultat de l’appel sera le suivant :

CALL assistant.copy(1);
NOTICE:  Executing: TRUNCATE public.customer
NOTICE:  Executing: SELECT count(customer_id) FROM mysql.customer 
                    WHERE customer_id > 0
NOTICE:  Executing: INSERT INTO public.customer 
                    SELECT customer_id, store_id, first_name, last_name, email,
                           address_id, active, create_date, last_update 
                    FROM mysql.customer 
                    WHERE customer_id > 0 ORDER BY customer_id
CALL

À l’issue du transfert, la vue report nous fait un résumé de l’opération.

SELECT * FROM assistant.report
 WHERE stage_id = 1 AND target = 'public.customer'::regclass;
-[ RECORD 1 ]-------------------------
stage_id  | 1
target    | public.customer
job_start | 2024-05-28 10:19:18.334917
state     | completed
rows      | 599
total     | 599
elapsed   | 00:00:00.081273
rate      | 7370.22
progress  | 1.00
eti       | 00:00:00
eta       | 2024-05-28 10:19:18.334917

Distribution sur plusieurs processus

Au fur et à mesure de la conception de l’assistant, j’ai éprouvé le besoin d’enrichir les requêtes sous-jacentes pour répondre à d’autres cas d’usage, récurrents dans le domaine de la migration de données. Parmi ceux-ci, on y retrouve la capacité de répartir les lignes d’une même table sur plusieurs sessions, chacune disposant d’une clause WHERE basé sur le résultat de la division euclidienne (modulo) de la clé primaire.

Pour activer cette fonctionnalité, il suffit de renseigner le paramètre parts dans la table config. Par exemple, pour la table film, nous pouvons définir :

UPDATE assistant.config SET parts = 4 WHERE target = 'public.film'::regclass;

Cette configuration prend effet lors de la prochaine planification. L’appel à la méthode plan() insère alors quatre lignes dans la table job pour la table film, chacune rattachée à une valeur comprise entre 0 et 3. La colonne condition est alors enrichie pour les lignes de la table task en vue de l’étape suivante.

SELECT invocation FROM assistant.plan('{public.film}');
        invocation
--------------------------
 CALL assistant.copy(17);
 CALL assistant.copy(18);
 CALL assistant.copy(19);
 CALL assistant.copy(20);
(4 rows)
SELECT job_id, job.target, state, part, condition
  FROM assistant.job JOIN assistant.task USING (job_id)
 WHERE stage_id = 2;
 job_id | target |  state  | part |    condition
--------+--------+---------+------+-----------------
     17 | film   | pending |    0 | film_id % 4 = 0
     18 | film   | pending |    1 | film_id % 4 = 1
     19 | film   | pending |    2 | film_id % 4 = 2
     20 | film   | pending |    3 | film_id % 4 = 3     

Lors de l’appel à la méthode copy(), l’assistant construit les requêtes INSERT sur la base des conditions précédemment définies. Par exemple, pour la première partie de la table film, la trace indique les requêtes INSERT qui ont été générées.

CALL assistant.copy(17);
NOTICE:  Executing: TRUNCATE public.film
NOTICE:  Executing: SELECT count(film_id) FROM mysql.film 
                    WHERE film_id > 0 AND film_id % 4 = 0
NOTICE:  Executing: INSERT INTO public.film 
                    SELECT film_id, title, description, release_year, 
                           language_id, original_language_id, rental_duration, 
                           rental_rate, length, replacement_cost, rating,
                           special_features, last_update 
                    FROM mysql.film WHERE film_id > 0 AND film_id % 4 = 0 
                    ORDER BY film_id
CALL

L’opération TRUNCATE intervient uniquement pour la session dont la valeur de part est égale à 0. Dans le cas nominal, cette session est lancée avant toutes les autres pour respecter le comportement attendu dans la configuration avec la colonne trunc (true par défaut).

L’intérêt évident de cette méthode est d’obtenir le meilleur débit d’insertion pour une table donnée, en reposant sur la puissance d’extraction du serveur distant et la capacité d’écriture du serveur local. Le débit peut être consulté avec la vue report, notamment pour comparer deux chargements pour une même table, comme l’exemple de la table film. La colonne rate est exprimée en nombre de lignes par seconde.

SELECT stage_id, target, state, rate FROM assistant.report
 WHERE target = 'public.film'::regclass;
 stage_id |   target    |   state   |   rate
----------+-------------+-----------+----------
        1 | public.film | completed | 19162.59
        2 | public.film | completed | 51389.37

Reprise après interruption

La capacité de pouvoir relancer une copie par lot en cas d’interruption est une des raisons pour lesquelles la clé primaire doit être renseignée dans la configuration. L’assistant s’appuie sur la dernière valeur de clé primaire extraite (à l’aide d’une clause RETURNING) pour connaître le prochain point de reprise. Dans la conception de cette fonctionnalité, il m’a fallu arbitrer sur les limitations qu’impose ce mécanisme.

  • Les clés primaires ne doivent pas être composées, car la clause RETURNING ne peut retourner qu’une seule valeur ;
  • Les tables dont la clé primaire est composée ne peuvent pas bénéficier du traitement par lot, et donc de la reprise après interruption ;
  • Les données sont systématiquement triées lors de l’extraction, même si le traitement par lot n’est pas activé ;
  • La colonne de la clé primaire doit être de type numérique.

L’activation d’un traitement par lot consiste à mettre à jour la colonne batchsize de la table de configuration. Prenons l’exemple de la table rental :

UPDATE assistant.config SET batchsize = 1000 
 WHERE target = 'public.rental'::regclass;

SELECT invocation FROM assistant.plan('{public.rental}');
        invocation
--------------------------
 CALL assistant.copy(21);
(1 row)

Le transfert des données de la table rental est alors découpé en lots de 1000 lignes. Il est bien sûr possible de combiner cette technique avec la parallélisation, la clause WHERE réalisera l’essentiel du travail de répartition pour empêcher que la même ligne soit exportée deux fois.

CALL assistant.copy(21);
TRUNCATE public.rental
SELECT count(rental_id) FROM mysql.rental WHERE rental_id > 0
INSERT INTO public.rental SELECT rental_id, ... 
  FROM mysql.rental WHERE rental_id > 0  ORDER BY rental_id LIMIT 1000
...
...
INSERT INTO public.rental SELECT rental_id, ... 
  FROM mysql.rental WHERE rental_id > 16005  ORDER BY rental_id LIMIT 1000
INSERT INTO public.rental SELECT rental_id, ... 
  FROM mysql.rental WHERE rental_id > 16049  ORDER BY rental_id LIMIT 1000
CALL

Dès qu’une requête ne retourne plus aucune ligne, l’assistant considère que le transfert est terminé. La table job est mise à jour à chaque itération pour suivre la dernière valeur de la séquence de clé primaire.

SELECT target, lastseq FROM assistant.job 
 WHERE stage_id = 3 AND target = 'public.rental'::regclass;
    target     | lastseq
---------------+---------
 public.rental |   16049

En cas d’interruption, il est possible de relancer le transfert en appelant la méthode copy() avec le même identifiant de tâche. L’assistant se charge de reprendre le transfert à partir de la dernière valeur de clé primaire connue.

CALL assistant.copy(21);
NOTICE:  Executing: SELECT count(rental_id) FROM mysql.rental
                    WHERE rental_id > 16049
NOTICE:  Executing: INSERT INTO public.rental SELECT rental_id, ...
                    FROM mysql.rental WHERE rental_id > 16049  
                    ORDER BY rental_id LIMIT 1000
CALL

Conclusion

La conception d’un tel outil était un petit défi personnel dans la droite lignée de mes recherches autours de la migration vers PostgreSQL avec l’aide exclusive des Foreign Data Wrappers. Ma principale source d’inspiration reste le projet Ora2Pg, l’un des outils open-source le plus complet à ce jour.

J’ai conscience des limites techniques de cet assistant, et du bricolage qu’il reste à mettre en place pour faciliter la vie d’un consultant comme moi. Dans un autre article, j’aimerais présenter un autre outil nommé dispatch que je maintiens depuis quelque temps et avec lequel je réponds aux questions d’orchestration et traçabilité des étapes de la migration.

En prenant un peu de recul, les concepts de base sont là, n’importe quel autre outil dans d’autres langages pourrait parfaitement émerger et enrichir l’écosystème open-source dans la quête de la migration vers PostgreSQL.