Ce document montre comment effectuer des conversions numérique-analogiques multivoies échantillonnées avec un Arduino, et transmettre les échantillons à l'ordinateur en flot continu. Il fait suite au document Conversion analogique-numérique rapide avec échantillonnage. La numérisation sera faite en 10 bits par échantillon, avec acquisition de plusieurs voies.
L'échantillonnage est fait avec un chronomètre-compteur 16 bits (Timer 1). Le code fourni fonctionne sur l'Arduino Mega (ATmega 2560) et sur l'Arduino UNO (ATmega 328).
Le programme est similaire à celui présenté dans Conversion analogique-numérique rapide avec échantillonnage. Les tampons sont dans un tableau d'entiers 16 bits (pour stocker les nombres 10 bits). Il y a un tampon pour chaque voie. Le maximum de voies (ici 4) est codé en dur. Avec un tampon de 256 éléments pour chaque voie, on doit se contenter d'un double tampon pour tenir dans la mémoire de l'Arduino MEGA. Pour l'Arduino UNO, il faut réduire la taille du tampon en modifiant BUF_SIZE et BUF_MASK (le premier doit être une puissance de 2).
#include "Arduino.h" #define SET_ACQUISITION 100 #define STOP_ACQUISITION 101 #define NBUF 0x2 #define NBUF_MASK 0x1 // (NBUF-1) #define BUF_SIZE 0x100 #define BUF_MASK 0xFF // (BUF_SIZE-1) #define MAX_NVOIES 4 volatile uint16_t buf[MAX_NVOIES][NBUF][BUF_SIZE]; volatile uint8_t compteur_buf,compteur_buf_transmis; volatile uint16_t indice_buf; uint16_t diviseur[6] = {0,1,8,64,256,1024}; uint32_t nblocs; uint8_t nvoies; uint8_t multiplex[MAX_NVOIES]; uint8_t sans_fin; uint8_t flag;
L'échantillonnage est fait avec le Timer 1, utilisé en mode CTC (clear timer on compare match), avec une valeur maximale fixée par le registre OCR1A. Le compteur est incrémenté d'une unité à chaque top d'horloge jusqu'à atteindre la valeur fixée par OCR1A. La période d'échantillonnage est égale à la période de l'horloge multipliée par la valeur maximale. Une interruption COMPA est générée lorsque le compteur TCNT1 atteint la valeur de OCR1A. La fonction suivante configure et déclenche le Timer avec la période donnée en microsecondes :
void timer1_init(uint32_t period) { TCCR1A = 0; TCCR1B = 0; TCCR1B |= (1 << WGM12); // mode CTC avec OCR1A pour le maximum uint32_t top = (F_CPU/1000000*period); int clock = 1; while ((top>0xFFFF)&&(clock<5)) { clock++; top = (F_CPU/1000000*period/diviseur[clock]); } OCR1A = top; // période TIMSK1 = (1 << OCIE1A); // interruption lorsque TCNT1=OCR1A TCCR1B |= clock; }
La configuration du convertisseur analogique-numérique ADC se limite à l'activer et à choisir le facteur de division pour l'horloge.
void adc_init(uint8_t prescaler) { ADCSRA = 0; ADCSRA |= (1 << ADEN); // enable ADC ADCSRA |= prescaler ; ADCSRB = 0; }
La fonction suivante est appelée par l'interruption générée par le Timer. Pour chaque voie, la configuration du multiplexeur est faite, puis le convertisseur est démarré en activant le bit 6 de ADCSRA. On doit attendre que ce bit revienne à zéro pour lire le résultat de la conversion. Le registre ADCL, contenant les 8 bits de poids faible, doit être lu en premier. Le nombre 10 bits obtenu est stocké dans le tampon en cours. Cette méthode ne permet pas d'obtenir un échantillonnage aussi précis que celui décrit dans Conversion analogique-numérique rapide avec échantillonnage, où l'ADC pour une voie est directement déclenché par le Timer. Cependant, la fréquence d'échantillonnage maximale pour 4 voies est de l'ordre du kHz, fréquence à laquelle le retard d'échantillonnage entre une voie et la précédente devrait être négligeable (à vérifier expérimentalement).
ISR(TIMER1_COMPA_vect) { uint8_t v; for (v=0; v<nvoies; v++) { ADMUX = 0b01000000 | multiplex[v]; // REFS0 et multiplex //ADCSRB |= (multiplex & 0b00100000) >> 2; // pour multiplex sur 6 bits ADCSRA |= 0b01000000; // start ADC while (ADCSRA & 0b01000000); buf[v][compteur_buf][indice_buf] = ADCL | (ADCH << 8); } indice_buf++; if (indice_buf == BUF_SIZE) { indice_buf = 0; compteur_buf = (compteur_buf+1)&NBUF_MASK; } }
La fonction suivante stoppe le Timer :
void stop_acquisition() { TCCR1B &= 0b11111000; }
La fonction setup initialise la liaison série avec l'ordinateur.
void setup() { char c; Serial.begin(500000); Serial.setTimeout(0); c = 0; Serial.write(c); c = 255; Serial.write(c); c = 0; Serial.write(c); compteur_buf = compteur_buf_transmis = 0; indice_buf = 0; flag = 0; }
La fonction suivante lit les paramètres envoyés par l'ordinateur pour la configuration de l'acquisition et la démarre. Les données transmises sont :
void lecture_acquisition() { uint32_t c1,c2,c3,c4; uint8_t mult,prescaler; uint32_t period; uint8_t v; while (Serial.available()<2) {}; nvoies = Serial.read(); prescaler = Serial.read(); while (Serial.available()<nvoies) {}; for (v=0; v<nvoies; v++) { mult = Serial.read(); if (v < MAX_NVOIES) multiplex[v] = mult; } if (nvoies > MAX_NVOIES) nvoies = MAX_NVOIES; while (Serial.available()<4) {}; c1 = Serial.read(); c2 = Serial.read(); c3 = Serial.read(); c4 = Serial.read(); period = ((c1 << 24) | (c2 << 16) | (c3 << 8) | c4); while (Serial.available()<4) {}; c1 = Serial.read(); c2 = Serial.read(); c3 = Serial.read(); c4 = Serial.read(); nblocs = ((c1 << 24) | (c2 << 16) | (c3 << 8) | c4); if (nblocs==0) { sans_fin = 1; nblocs = 1; } else sans_fin = 0; compteur_buf=compteur_buf_transmis=0; indice_buf = 0; cli(); timer1_init(period); adc_init(prescaler); sei(); }
La fonction suivante lit le port série (de manière non bloquante). En cas de présence d'une des deux commandes, elle exécute l'action correspondante.
void lecture_serie() { char com; if (Serial.available()>0) { com = Serial.read(); if (com==SET_ACQUISITION) lecture_acquisition(); if (com==STOP_ACQUISITION) stop_acquisition(); } }
Voici la fonction loop, qui transmet les tampons sur le port série.
void loop() { uint8_t v; if ((compteur_buf_transmis!=compteur_buf)&&(nblocs>0)) { for (v=0; v<nvoies; v++) Serial.write((uint8_t *)buf[v][compteur_buf_transmis],BUF_SIZE*2); compteur_buf_transmis=(compteur_buf_transmis+1)&NBUF_MASK; if (sans_fin==0) { nblocs--; if (nblocs==0) stop_acquisition(); } } else lecture_serie(); }
Le programme suivant (Python 3) effectue la configuration de l'acquisition et la lecture des blocs d'échantillons.
# -*- coding: utf-8 -*- import serial import numpy import time from matplotlib.pyplot import * import numpy.fft import threading class Arduino(): def __init__(self,port): self.ser = serial.Serial(port,baudrate=500000) c_recu = self.ser.read(1) while ord(c_recu)!=0: c_recu = self.ser.read(1) c_recu = self.ser.read(1) while ord(c_recu)!=255: c_recu = self.ser.read(1) c_recu = self.ser.read(1) while ord(c_recu)!=0: c_recu = self.ser.read(1) self.SET_ACQUISITION = 100 self.STOP_ACQUISITION = 101 self.TAILLE_BLOC = 256 self.TAILLE_BLOC_INT8 = self.TAILLE_BLOC *2 self.MAX_NVOIES = 4 def close(self): self.ser.close() def write_int16(self,v): v = numpy.int16(v) char1 = (v & 0xFF00) >> 8 char2 = (v & 0x00FF) self.ser.write(chr(char1)) self.ser.write(chr(char2)) def write_int16(self,v): v = numpy.int16(v) char1 = int((v & 0xFF00) >> 8) char2 = int((v & 0x00FF)) self.ser.write((char1).to_bytes(1,byteorder='big')) self.ser.write((char2).to_bytes(1,byteorder='big')) def write_int32(self,v): v = numpy.int32(v) char1 = int((v & 0xFF000000) >> 24) char2 = int((v & 0x00FF0000) >> 16) char3 = int((v & 0x0000FF00) >> 8) char4 = int((v & 0x000000FF)) self.ser.write((char1).to_bytes(1,byteorder='big')) self.ser.write((char2).to_bytes(1,byteorder='big')) self.ser.write((char3).to_bytes(1,byteorder='big')) self.ser.write((char4).to_bytes(1,byteorder='big')) def lancer_acquisition(self,multiplex,fechant,nblocs,prescaler=4): period = int(1e6*1.0/fechant) self.nblocs = nblocs self.nvoies = len(multiplex) if self.nvoies > self.MAX_NVOIES: raise Exception("trop de voies") self.ser.write((self.SET_ACQUISITION).to_bytes(1,byteorder='big')) self.ser.write((self.nvoies).to_bytes(1,byteorder='big')) self.ser.write((prescaler).to_bytes(1,byteorder='big')) for v in range(self.nvoies): self.ser.write((multiplex[v]).to_bytes(1,byteorder='big')) self.write_int32(period) self.write_int32(nblocs) def stopper_acquisition(self): self.ser.write((self.STOP_ACQUISITION).to_bytes(1,byteorder='big')) def lecture(self): buf = self.ser.read(self.TAILLE_BLOC_INT8) data = numpy.zeros(self.TAILLE_BLOC,dtype=numpy.float32) j=0 for i in range(self.TAILLE_BLOC): data[i] = buf[j]+0x100*buf[j+1] j += 2 return data
La fonction suivante effectue l'acquisition de 10 blocs à la fréquence d'échantillonnage de 1 kHz, pour 4 voies. L'arduino est branché sur le port COM5. Les échantillons sont tracés en fonction du temps et un spectre est tracé, ce qui permet de vérifier le bon fonctionnement de l'échantillonnage.
def test(): ard = Arduino("COM8") nblocs = 10 fechant = 1000.0 prescaler = 7 N = ard.TAILLE_BLOC ne = N*nblocs voies = [0,1,2,3] nv = len(voies) x = numpy.zeros((nv,ne),dtype=numpy.float32) t = numpy.arange(ne,dtype=numpy.float32)*1.0/fechant ard.lancer_acquisition(voies,fechant,nblocs,prescaler) i = 0 for b in range(nblocs): for v in range(nv): data = ard.lecture() x[v,i:i+N] = data i += N time.sleep(1) ard.close() figure() plot(t,x[0,:]) plot(t,x[1,:]) xlabel("t (s)") axis([0,t[-1],0,1024]) grid() figure() spectre = numpy.absolute(numpy.fft.fft(x[0,:]))/(ne) f = numpy.arange(ne,dtype=numpy.float32)*fechant/ne plot(f,spectre) xlabel("f (Hz)") grid() show()
Les tests montrent que la fréquence d'échantillonnage maximale pour 4 voies est 1,7 kHz avec prescaler=7. Avec prescaler=4, on peut aller jusqu'à 5 kHz. Le débit de données pour 4 voies en 16 bits est 8 fois plus élevé que pour une voie en 8 bits. Pour la fréquence 5 kHz, il est de 400 kbits/s.
Lorsque l'acquisition est lancée, il faut faire une lecture asynchrone des données envoyées par l'arduino. On se fixe comme objectif de traiter ou d'afficher le signal par paquets, chacun étant constitué d'un certain nombre de blocs. Les blocs sont les unités transmises par l'arduino, qui contiennent ici 256 échantillons. Nous allons faire la lecture des données envoyées par l'arduino sur un fil d'exécution. On doit pour cela définir une classe qui hérite de threading.Thread. Voici le constructeur de cette classe :
class AcquisitionThread(threading.Thread): def __init__(self,arduino,voies,fechant,nblocs,prescaler=7): threading.Thread.__init__(self) self.arduino = arduino self.nvoies = len(voies) self.voies = voies self.prescaler = prescaler self.fechant = fechant self.running = False self.nblocs = nblocs # nombre de blocs dans un paquet self.npaquets = 8 # 1 paquet = nblocs*arduino.TAILLE_BLOC self.taille_bloc = arduino.TAILLE_BLOC self.buf = numpy.zeros((self.npaquets,self.nvoies,self.nblocs*arduino.TAILLE_BLOC)) self.compteur_paquets = 0 self.compteur_paquets_lus = 0
On crée un tampon circulaire qui contient 8 paquets. On pourra ainsi lire un paquet pour le traiter alors que le Thread est en train d'écrire un autre paquet. On a aussi défini self.compteur_paquets un indice pour le prochain paquet à récupérer de l'arduino et à placer dans le tampon, et self.compteur_paquets un indice pour le prochain paquet à lire dans le tampon.
La fonction run est exécutée lorsqu'on lance le Thread. Elle consiste en une boucle sans fin qui lit les données provenant de l'arduino et les stocke dans le tampon circulaire. Cette boucle incrémente self.compteur_paquets.
def run(self): self.arduino.lancer_acquisition(self.voies,self.fechant,0,self.prescaler) # acquisition sans fin self.running = True indice_bloc = 0 while self.running: i = indice_bloc*self.taille_bloc j = i+self.taille_bloc for v in range(self.nvoies): self.buf[self.compteur_paquets,v,i:j] = self.arduino.lecture() indice_bloc += 1 if indice_bloc==self.nblocs: indice_bloc = 0 self.compteur_paquets += 1 if self.compteur_paquets==self.npaquets: self.compteur_paquets = 0
La fonction suivante permet de stopper l'acquisition :
def stop(self): self.running = False self.join() self.arduino.stopper_acquisition()
L'appel self.join() permet d'attendre la sortie de la fonction run.
Voici la fonction qui permet de lire un paquet dans le tampon :
def paquet(self): if self.compteur_paquets==self.compteur_paquets_lus: return 0 P = self.buf[self.compteur_paquets_lus,:,:] self.compteur_paquets_lus += 1 if self.compteur_paquets_lus==self.npaquets: self.compteur_paquets_lus = 0 return P
Le programme de test suivant effectue une acquisition de deux voies à 5 kHz avec un tracé des deux signaux dans une fenêtre matplotlib gérée par une animation.
# -*- coding: utf-8 -*- import numpy from matplotlib.pyplot import * import matplotlib.animation as animation from arduinoADCmultivoies import * ard = Arduino(4) # numéro du port COM fechant = 5000 n = 4 nechant = ard.TAILLE_BLOC*n delai = nechant*1.0/fechant t = numpy.arange(nechant)*1.0/fechant fig,ax = subplots() line0, = ax.plot(t,numpy.zeros(nechant)) line1, = ax.plot(t,numpy.zeros(nechant)) ax.axis([0,t.max(),0,1024]) ax.grid() voies = [0,1] prescaler = 4 acquisition = AcquisitionThread(ard,voies,fechant,n,prescaler) acquisition.start() def animate(i): global line0,line_1,acquisition data = acquisition.paquet() if isinstance(data,int)==False: line0.set_ydata(data[0]) line1.set_ydata(data[1]) ani = animation.FuncAnimation(fig,animate,100,interval=delai*1000) show() acquisition.stop() ard.close()