Table des matières

Conversion analogique-numérique multivoies avec échantillonnage

1. Introduction

Ce document montre comment effectuer des conversions numérique-analogiques multivoies échantillonnées avec un Arduino, et transmettre les échantillons à l'ordinateur en flot continu. Il fait suite au document Conversion analogique-numérique rapide avec échantillonnage. La numérisation sera faite en 10 bits par échantillon, avec acquisition de plusieurs voies.

L'échantillonnage est fait avec un chronomètre-compteur 16 bits (Timer 1). Le code fourni fonctionne sur l'Arduino Mega (ATmega 2560) et sur l'Arduino UNO (ATmega 328).

2. Programme arduino

Le programme est similaire à celui présenté dans Conversion analogique-numérique rapide avec échantillonnage. Les tampons sont dans un tableau d'entiers 16 bits (pour stocker les nombres 10 bits). Il y a un tampon pour chaque voie. Le maximum de voies (ici 4) est codé en dur. Avec un tampon de 256 éléments pour chaque voie, on doit se contenter d'un double tampon pour tenir dans la mémoire de l'Arduino MEGA. Pour l'Arduino UNO, il faut réduire la taille du tampon en modifiant BUF_SIZE et BUF_MASK (le premier doit être une puissance de 2).

arduino-ADC-multivoies.ino
#include "Arduino.h"
#define SET_ACQUISITION 100
#define STOP_ACQUISITION 101
#define NBUF 0x2
#define NBUF_MASK 0x1 // (NBUF-1)
#define BUF_SIZE 0x100
#define BUF_MASK 0xFF // (BUF_SIZE-1)
#define MAX_NVOIES 4
volatile uint16_t buf[MAX_NVOIES][NBUF][BUF_SIZE];
volatile uint8_t compteur_buf,compteur_buf_transmis;
volatile uint16_t indice_buf;
uint16_t diviseur[6] = {0,1,8,64,256,1024};
uint32_t nblocs;
uint8_t nvoies;
uint8_t multiplex[MAX_NVOIES];
uint8_t sans_fin;
uint8_t flag;
            

L'échantillonnage est fait avec le Timer 1, utilisé en mode CTC (clear timer on compare match), avec une valeur maximale fixée par le registre OCR1A. Le compteur est incrémenté d'une unité à chaque top d'horloge jusqu'à atteindre la valeur fixée par OCR1A. La période d'échantillonnage est égale à la période de l'horloge multipliée par la valeur maximale. Une interruption COMPA est générée lorsque le compteur TCNT1 atteint la valeur de OCR1A. La fonction suivante configure et déclenche le Timer avec la période donnée en microsecondes :

void timer1_init(uint32_t period) {
    TCCR1A = 0;
    TCCR1B = 0;
    TCCR1B |= (1 << WGM12); // mode CTC avec OCR1A pour le maximum
    uint32_t top = (F_CPU/1000000*period);
    int clock = 1;
    while ((top>0xFFFF)&&(clock<5)) {
          clock++;
          top = (F_CPU/1000000*period/diviseur[clock]);
      }
    OCR1A = top; // période
    TIMSK1 = (1 << OCIE1A); // interruption lorsque TCNT1=OCR1A
    TCCR1B |= clock;
}
               

La configuration du convertisseur analogique-numérique ADC se limite à l'activer et à choisir le facteur de division pour l'horloge.

void adc_init(uint8_t prescaler) {
  ADCSRA = 0;
  ADCSRA |= (1 << ADEN); // enable ADC
  ADCSRA |= prescaler ;
  ADCSRB = 0;
}
               

La fonction suivante est appelée par l'interruption générée par le Timer. Pour chaque voie, la configuration du multiplexeur est faite, puis le convertisseur est démarré en activant le bit 6 de ADCSRA. On doit attendre que ce bit revienne à zéro pour lire le résultat de la conversion. Le registre ADCL, contenant les 8 bits de poids faible, doit être lu en premier. Le nombre 10 bits obtenu est stocké dans le tampon en cours. Cette méthode ne permet pas d'obtenir un échantillonnage aussi précis que celui décrit dans Conversion analogique-numérique rapide avec échantillonnage, où l'ADC pour une voie est directement déclenché par le Timer. Cependant, la fréquence d'échantillonnage maximale pour 4 voies est de l'ordre du kHz, fréquence à laquelle le retard d'échantillonnage entre une voie et la précédente devrait être négligeable (à vérifier expérimentalement).

ISR(TIMER1_COMPA_vect) {
    uint8_t v;
    for (v=0; v<nvoies; v++) {
       ADMUX  = 0b01000000 | multiplex[v]; // REFS0 et multiplex
       //ADCSRB |= (multiplex & 0b00100000) >> 2; // pour multiplex sur 6 bits
       ADCSRA |= 0b01000000; // start ADC
       while (ADCSRA & 0b01000000);
       buf[v][compteur_buf][indice_buf] = ADCL | (ADCH << 8);
    }
    indice_buf++;
    if (indice_buf == BUF_SIZE) {
       indice_buf = 0;
       compteur_buf = (compteur_buf+1)&NBUF_MASK;
     }   
}
               

La fonction suivante stoppe le Timer :

void stop_acquisition() {
   TCCR1B &= 0b11111000; 
}
               

La fonction setup initialise la liaison série avec l'ordinateur.

void setup() {
  char c;
  Serial.begin(500000);
  Serial.setTimeout(0);
  c = 0;
  Serial.write(c);
  c = 255;
  Serial.write(c);
  c = 0;
  Serial.write(c);
  compteur_buf = compteur_buf_transmis = 0;
  indice_buf = 0;
  flag = 0;
}
               

La fonction suivante lit les paramètres envoyés par l'ordinateur pour la configuration de l'acquisition et la démarre. Les données transmises sont :

void lecture_acquisition() {
  uint32_t c1,c2,c3,c4;
  uint8_t mult,prescaler;
  uint32_t period;
  uint8_t v;
  while (Serial.available()<2) {};
  nvoies = Serial.read();
  prescaler = Serial.read();
  while (Serial.available()<nvoies) {};
  for (v=0; v<nvoies; v++) {
      mult = Serial.read();
      if (v < MAX_NVOIES) multiplex[v] = mult;
  }
  if (nvoies > MAX_NVOIES) nvoies = MAX_NVOIES;
  while (Serial.available()<4) {};
  c1 = Serial.read();
  c2 = Serial.read();
  c3 = Serial.read();
  c4 = Serial.read();
  period = ((c1 << 24) | (c2 << 16) | (c3 << 8) | c4);
  while (Serial.available()<4) {};
  c1 = Serial.read();
  c2 = Serial.read();
  c3 = Serial.read();
  c4 = Serial.read();
  nblocs = ((c1 << 24) | (c2 << 16) | (c3 << 8) | c4);
  if (nblocs==0) {
        sans_fin = 1;
        nblocs = 1;
   }
   else sans_fin = 0;
   compteur_buf=compteur_buf_transmis=0;
   indice_buf = 0;
  cli();
  timer1_init(period);
  adc_init(prescaler);
  sei();
}
               

La fonction suivante lit le port série (de manière non bloquante). En cas de présence d'une des deux commandes, elle exécute l'action correspondante.

void lecture_serie() {
   char com;
   if (Serial.available()>0) {
        com = Serial.read();
        if (com==SET_ACQUISITION) lecture_acquisition();
        if (com==STOP_ACQUISITION) stop_acquisition();
   }
}
               

Voici la fonction loop, qui transmet les tampons sur le port série.

void loop() {
  uint8_t v;
  if ((compteur_buf_transmis!=compteur_buf)&&(nblocs>0)) {
      for (v=0; v<nvoies; v++) Serial.write((uint8_t *)buf[v][compteur_buf_transmis],BUF_SIZE*2);
      compteur_buf_transmis=(compteur_buf_transmis+1)&NBUF_MASK;
      if (sans_fin==0) {
            nblocs--;
            if (nblocs==0) stop_acquisition();
      }
  }
  else lecture_serie();
}
               

3. Programme Python

Le programme suivant (Python 3) effectue la configuration de l'acquisition et la lecture des blocs d'échantillons.

arduinoADCmultivoies.py
# -*- coding: utf-8 -*-
import serial
import numpy
import time
from matplotlib.pyplot import *
import numpy.fft
import threading

class Arduino():
    def __init__(self,port):
        self.ser = serial.Serial(port,baudrate=500000)
        c_recu = self.ser.read(1)
        while ord(c_recu)!=0:
            c_recu = self.ser.read(1)
        c_recu = self.ser.read(1)
        while ord(c_recu)!=255:
            c_recu = self.ser.read(1)
        c_recu = self.ser.read(1)
        while ord(c_recu)!=0:
            c_recu = self.ser.read(1)
        self.SET_ACQUISITION = 100
        self.STOP_ACQUISITION = 101
        self.TAILLE_BLOC = 256
        self.TAILLE_BLOC_INT8 = self.TAILLE_BLOC *2
        self.MAX_NVOIES = 4

    def close(self):
        self.ser.close()

    def write_int16(self,v):
        v = numpy.int16(v)
        char1 = (v & 0xFF00) >> 8
        char2 = (v & 0x00FF)
        self.ser.write(chr(char1))
        self.ser.write(chr(char2))
        
    def write_int16(self,v):
        v = numpy.int16(v)
        char1 = int((v & 0xFF00) >> 8)
        char2 = int((v & 0x00FF))
        self.ser.write((char1).to_bytes(1,byteorder='big'))
        self.ser.write((char2).to_bytes(1,byteorder='big'))
        
    def write_int32(self,v):
        v = numpy.int32(v)
        char1 = int((v & 0xFF000000) >> 24)
        char2 = int((v & 0x00FF0000) >> 16)
        char3 = int((v & 0x0000FF00) >> 8)
        char4 = int((v & 0x000000FF))
        self.ser.write((char1).to_bytes(1,byteorder='big'))
        self.ser.write((char2).to_bytes(1,byteorder='big'))
        self.ser.write((char3).to_bytes(1,byteorder='big'))
        self.ser.write((char4).to_bytes(1,byteorder='big'))

    def lancer_acquisition(self,multiplex,fechant,nblocs,prescaler=4):
        period = int(1e6*1.0/fechant)
        self.nblocs = nblocs
        self.nvoies = len(multiplex)
        if self.nvoies > self.MAX_NVOIES:
            raise Exception("trop de voies")
        self.ser.write((self.SET_ACQUISITION).to_bytes(1,byteorder='big'))
        self.ser.write((self.nvoies).to_bytes(1,byteorder='big'))
        self.ser.write((prescaler).to_bytes(1,byteorder='big'))
        for v in range(self.nvoies):
            self.ser.write((multiplex[v]).to_bytes(1,byteorder='big'))
        self.write_int32(period)
        self.write_int32(nblocs)

    def stopper_acquisition(self):
        self.ser.write((self.STOP_ACQUISITION).to_bytes(1,byteorder='big'))

    def lecture(self):
        buf = self.ser.read(self.TAILLE_BLOC_INT8)
        data = numpy.zeros(self.TAILLE_BLOC,dtype=numpy.float32)
        j=0
        for i in range(self.TAILLE_BLOC):
            data[i] = buf[j]+0x100*buf[j+1]
            j += 2
        return data
            

La fonction suivante effectue l'acquisition de 10 blocs à la fréquence d'échantillonnage de 1 kHz, pour 4 voies. L'arduino est branché sur le port COM5. Les échantillons sont tracés en fonction du temps et un spectre est tracé, ce qui permet de vérifier le bon fonctionnement de l'échantillonnage.

def test():
    ard = Arduino("COM8")
    nblocs = 10
    fechant = 1000.0
    prescaler = 7
    N = ard.TAILLE_BLOC
    ne = N*nblocs
    voies = [0,1,2,3]
    nv = len(voies)
    x = numpy.zeros((nv,ne),dtype=numpy.float32)
    t = numpy.arange(ne,dtype=numpy.float32)*1.0/fechant
    
    ard.lancer_acquisition(voies,fechant,nblocs,prescaler)
    i = 0
    for b in range(nblocs):
        for v in range(nv):
            data = ard.lecture()
            x[v,i:i+N] = data
        i += N
        
    time.sleep(1)
    ard.close()
    figure()
    plot(t,x[0,:])
    plot(t,x[1,:])
    xlabel("t (s)")
    axis([0,t[-1],0,1024])
    grid()
    figure()
    spectre = numpy.absolute(numpy.fft.fft(x[0,:]))/(ne)
    f = numpy.arange(ne,dtype=numpy.float32)*fechant/ne
    plot(f,spectre)
    xlabel("f (Hz)")
    grid()
    show()
            

Les tests montrent que la fréquence d'échantillonnage maximale pour 4 voies est 1,7 kHz avec prescaler=7. Avec prescaler=4, on peut aller jusqu'à 5 kHz. Le débit de données pour 4 voies en 16 bits est 8 fois plus élevé que pour une voie en 8 bits. Pour la fréquence 5 kHz, il est de 400 kbits/s.

Lorsque l'acquisition est lancée, il faut faire une lecture asynchrone des données envoyées par l'arduino. On se fixe comme objectif de traiter ou d'afficher le signal par paquets, chacun étant constitué d'un certain nombre de blocs. Les blocs sont les unités transmises par l'arduino, qui contiennent ici 256 échantillons. Nous allons faire la lecture des données envoyées par l'arduino sur un fil d'exécution. On doit pour cela définir une classe qui hérite de threading.Thread. Voici le constructeur de cette classe :

class AcquisitionThread(threading.Thread): 
    def __init__(self,arduino,voies,fechant,nblocs,prescaler=7):
        threading.Thread.__init__(self)
        self.arduino = arduino
        self.nvoies = len(voies)
        self.voies = voies
        self.prescaler = prescaler
        self.fechant = fechant
        self.running = False
        self.nblocs = nblocs # nombre de blocs dans un paquet
        self.npaquets = 8 # 1 paquet = nblocs*arduino.TAILLE_BLOC
        self.taille_bloc = arduino.TAILLE_BLOC
        self.buf = numpy.zeros((self.npaquets,self.nvoies,self.nblocs*arduino.TAILLE_BLOC))
        self.compteur_paquets = 0
        self.compteur_paquets_lus = 0

              

On crée un tampon circulaire qui contient 8 paquets. On pourra ainsi lire un paquet pour le traiter alors que le Thread est en train d'écrire un autre paquet. On a aussi défini self.compteur_paquets un indice pour le prochain paquet à récupérer de l'arduino et à placer dans le tampon, et self.compteur_paquets un indice pour le prochain paquet à lire dans le tampon.

La fonction run est exécutée lorsqu'on lance le Thread. Elle consiste en une boucle sans fin qui lit les données provenant de l'arduino et les stocke dans le tampon circulaire. Cette boucle incrémente self.compteur_paquets.

    def run(self):
        self.arduino.lancer_acquisition(self.voies,self.fechant,0,self.prescaler) # acquisition sans fin
        self.running = True
        indice_bloc = 0
        while self.running:
            i = indice_bloc*self.taille_bloc
            j = i+self.taille_bloc
            for v in range(self.nvoies):
                self.buf[self.compteur_paquets,v,i:j] = self.arduino.lecture()
            indice_bloc += 1
            if indice_bloc==self.nblocs:
                indice_bloc = 0
                self.compteur_paquets += 1
                if self.compteur_paquets==self.npaquets:
                    self.compteur_paquets = 0
               

La fonction suivante permet de stopper l'acquisition :

    def stop(self):
        self.running = False
        self.join()
        self.arduino.stopper_acquisition()
               

L'appel self.join() permet d'attendre la sortie de la fonction run.

Voici la fonction qui permet de lire un paquet dans le tampon :

    def paquet(self):
        if self.compteur_paquets==self.compteur_paquets_lus:
            return 0
        P = self.buf[self.compteur_paquets_lus,:,:]
        self.compteur_paquets_lus += 1
        if self.compteur_paquets_lus==self.npaquets:
            self.compteur_paquets_lus = 0
        return P
               

4. Test avec tracé des signaux

Le programme de test suivant effectue une acquisition de deux voies à 5 kHz avec un tracé des deux signaux dans une fenêtre matplotlib gérée par une animation.

testAcquisitionAnimate.py
# -*- coding: utf-8 -*-
import numpy
from matplotlib.pyplot import *
import matplotlib.animation as animation
from arduinoADCmultivoies import *

ard = Arduino(4) # numéro du port COM
fechant = 5000
n = 4
nechant = ard.TAILLE_BLOC*n
delai = nechant*1.0/fechant
t = numpy.arange(nechant)*1.0/fechant
fig,ax = subplots()
line0, = ax.plot(t,numpy.zeros(nechant))
line1, = ax.plot(t,numpy.zeros(nechant))
ax.axis([0,t.max(),0,1024])
ax.grid()

voies = [0,1]
prescaler = 4
acquisition = AcquisitionThread(ard,voies,fechant,n,prescaler)
acquisition.start()

def animate(i):
    global line0,line_1,acquisition
    data = acquisition.paquet()
    if isinstance(data,int)==False:
        line0.set_ydata(data[0])
        line1.set_ydata(data[1])

ani = animation.FuncAnimation(fig,animate,100,interval=delai*1000)
show()
acquisition.stop()
ard.close() 
             
Creative Commons LicenseTextes et figures sont mis à disposition sous contrat Creative Commons.