Table des matières

Filtrage numérique sur arduino

1. Introduction

Ce document présente un programme de filtrage numérique sur un arduino (microcontrôleur ATmega2560 ou ATmega328).

On reprend le programme exposé dans Conversion analogique-numérique rapide avec échantillonnage, qui fait une numérisation échantillonnée rapide en 8 bits, pour lui ajouter des fonctions de filtrage. Le programme transmet les signaux numériques filtrés à un programme python via le port série, ce qui permet de vérifier sur ordinateur le bon fonctionnement du filtrage. En effet, la mise au point de filtres numériques sur microcontrôleur est délicate, ce qui impose la réalisation de tests complets avant de pouvoir les implanter sur des applications embarquées.

Les principes généraux sont exposés dans Filtrage numérique sur un microcontrôleur. Nous allons faire le filtrage d'un signal numérisé en Q=8 bits par échantillons. Les calculs seront faits en entiers 16 bits, 32 bits, ou flottants 32 bits. Le filtrage en flottants donnera les meilleurs résultats en terme de qualité de filtrage, mais il sera parfois nécessaire de calculer avec des entiers pour obtenir une vitesse de traitement suffisante, surtout pour les grandes fréquences d'échantillonnage (de l'ordre du kHz) et les filtres RIF à grand nombre de coefficients.

Les réalisations de filtrage implémentées sont les réalisations directes de type I et II, et la réalisation directe transposée de type II.

Le filtrage est fait sur des échantillons 8 bits, mais pourra facilement être adapté au cas des échantillons 10 bits, en modifiant les types des tampons utilisés pour le stockage du signal.

2. Programme arduino

2.a. Variables globales

Voici tout d'abord les variables globales, dont le rôle sera expliqué au cours de leur utilisation. Il y a 4 tampons de 256 échantillons 8 bits chacun, pour stocker le signal filtré avant de le transmettre par le port série. Le nombre maximal de coefficients du filtre (a ou b) est fixé à 50. Les tampons circulaires pour le filtrage ont une taille de 256, surdimensionnée mais commode pour l'implémentation de la condition limite circulaire avec un indice 8 bits (J_tampon).

arduinoFiltrageRapide.ino
#include "Arduino.h"
#define SET_ACQUISITION 100
#define STOP_ACQUISITION 101
#define SET_FILTRAGE_16BITS 102
#define SET_FILTRAGE_32BITS 103
#define SET_FILTRAGE_FLOAT 104
#define NBUF 0x4
#define NBUF_MASK 0x3 // (NBUF-1)
#define BUF_SIZE 0x100
#define BUF_MASK 0xFF // (BUF_SIZE-1)
volatile uint8_t buf[NBUF][BUF_SIZE]; // tampon pour la transmission
volatile uint8_t compteur_buf,compteur_buf_transmis;
volatile uint16_t indice_buf;
uint16_t diviseur[6] = {0,1,8,64,256,1024};
uint32_t nblocs;
uint8_t sans_fin;
uint8_t flag;
uint8_t reduc;
uint8_t i_reduc;

void (*isrCallback)();

#define MAX_NCOEF 50
uint8_t nb; // nombre de coefficients b
int16_t b[MAX_NCOEF]; // coefficients b
int32_t bb[MAX_NCOEF];
float bbb[MAX_NCOEF];
uint8_t na; // nombre de coefficients a
int16_t a[MAX_NCOEF]; // coefficients a
int32_t aa[MAX_NCOEF];
float aaa[MAX_NCOEF];
uint8_t ab_shift;
uint8_t max_na_nb;
#define TAILLE_TAMPON 0x100 // supérieur à max(na,nb)
#define MASQUE_TAMPON 0xFF // TAILLE_TAMPON-1
int8_t x[TAILLE_TAMPON]; // tampon pour les x_k
int8_t y[TAILLE_TAMPON]; // tampon pour les y_k
int32_t ww[TAILLE_TAMPON];
float www[TAILLE_TAMPON];
int16_t v[MAX_NCOEF];
int32_t vv[MAX_NCOEF];


uint8_t J_tampon; // indice pour les tampons
uint8_t offset;
            

2.b. Configuration du convertisseur et interruptions

Les fonctions de configuration du Timer (pour l'échantillonnage) et du convertisseur analogique-numérique, sont expliquées dans Conversion analogique-numérique rapide avec échantillonnage. La première fonction configure et déclenche le Timer pour une période d'échantillonnage (en microsecondes) :

void timer1_init(uint32_t period) {
    TCCR1A = 0;
    TCCR1B = 0;
    TCCR1B |= (1 << WGM12);
    uint32_t top = (F_CPU/1000000*period);
    int clock = 1;
    while ((top>0xFFFF)&&(clock<5)) {
          clock++;
          top = (F_CPU/1000000*period/diviseur[clock]);
      }
    OCR1A = top;
    OCR1B = top >> 1;
    TIMSK1 = (1 << OCIE1B);
    TCCR1B |= clock;
    
}
              

La fonction suivante stoppe le timer et donc l'acquisition :

void stop_acquisition() {
   TCCR1B &= ~0x7; 
}
              

La fonction suivante configure le convertisseur analogique-numérique, avec le code de configuration du multiplexeur et la fonction de traitement du signal qui sera appelée pour chaque échantillon obtenu. Pour une entrée simple, le code de multiplexage est simplement le numéro de l'entrée.

void adc_init(uint8_t multiplex, uint8_t prescaler,void (*isr)()) {
  ADMUX = (1 << REFS0);
  ADMUX |= multiplex & 0b00011111;
  ADMUX |= (1 << ADLAR); //left adjust
  ADCSRA = 0;
  ADCSRA |= (1 << ADEN); // enable ADC
  ADCSRA |= (1 << ADATE); // Auto Trigger Enable
  ADCSRA |= (1 << ADIE); // interrupt enable
  ADCSRA |= prescaler ;
  ADCSRB = 0;
  ADCSRB |= (multiplex & 0b00100000) >> 2;
  ADCSRB |= (1 << ADTS2)|(1 << ADTS0);// trigger source : timer 1 comp B
  isrCallback = isr;
}
                

Voici la fonction appelée par interruption qui se déclenche dès qu'une conversion analogique-numérique est terminée. Elle appelle la fonction dont le pointeur se trouve dans la variable globale isrCallback, qui est choisie en fonction de l'opération de filtrage souhaitée.

ISR(ADC_vect) {
     isrCallback();
}
                

Le timer déclenche aussi une interruption synchrone avec l'échantillonnage. Dans cette fonction, on fait alterner le niveau de la sortie 18 (sur Aduino MEGA) ou de la sortie 3 (sur Arduino UNO). Cela permet de vérifier le bon fonctionnement de l'échantillonnage à l'oscilloscope.

ISR(TIMER1_COMPB_vect) {
    if (flag==0) {
         flag = 1;
         PORTD &= ~(1<<PORTD3); // sortie 18 sur Arduino MEGA, 3 sur UNO
       }
     else {
        flag = 0;
        PORTD |= (1<<PORTD3);   
     }   
}
                  

2.c. Acquisition sans filtrage

La première fonction de traitement ne fait pas de filtrage, et se contente de recopier la valeur de l'échantillon dans le tampon. Le résultat de la conversion est lu dans le registre 8 bits ADCH. On a ajouté une réduction de la fréquence d'échantillonnage, définie par la variable reduc, qui consiste à ne retenir qu'un échantillon pour reduc échantillons. Une des applications des filtres numériques passe-bas est en effet la réduction de la fréquence d'échantillonnage après suppression des hautes fréquences.

void adc() {
   if (--i_reduc == 0) {
    i_reduc = reduc;
    buf[compteur_buf][indice_buf] = ADCH;
    indice_buf++;
    if (indice_buf == BUF_SIZE) {
       indice_buf = 0;
       compteur_buf = (compteur_buf+1)&NBUF_MASK;
     } 
   }
}
                 

2.d. Filtrage 16 bits

Voyons à présent les différentes fonctions de filtrage. Elles sont appelées par interruption, juste après la conversion analogique-numérique.

La première fonction de filtrage est une réalisation directe de type I, qui opère avec un accumulateur 16 bits. Voir Filtrage numérique sur un microcontrôleur pour les explications détaillées. C'est en principe l'implémentation la plus rapide, non soumise au risque de débordement. La variable globale offset correspond au niveau 0 du signal, par exemple 128 si le signal est décalé à 2,5 volts. L'offset doit être retranché après la numérisation (le tampon contient des entiers signés), puis ajouté avant le stockage, car on garde la convention consistant à transmettre des valeurs non signés. Les coefficients du filtre sont des entiers 16 bits, obtenus à partir des coefficients réels par multiplication par 10s. Pour cette raison, l'accumulateur doit subir un décalage de s bits vers la droite (variable ab_shift) pour être converti en la valeur finale 8 bits. Les tampons circulaires x et y servent à mémoriser les échantillons passés de l'entrée et de la sortie. Si l'on doit implémenter le même filtrage pour des échantillons 10 bits, il suffit de changer en 16 bits les types des tampons x,y,buf (voir déclaration des variables globales). Dans le présent programme, on a choisi un type 8 bits pour stocker les échantillons 8 bits, afin d'économiser la mémoire (très faible sur l'arduino UNO).

void adc_filtrage_16bits_FD1() {
    int16_t accum;
    int16_t sortie;
    uint8_t i,k;
    x[J_tampon] = ADCH-offset;
    accum = 0;
    i = J_tampon;
    for (k=0; k<nb; k++) {
      accum += x[i]*b[k];
      i--; 
    }
    i = J_tampon-1;
    for (k=1; k<na; k++) {
      accum -= y[i]*a[k];
      i--;  
    }
    sortie = accum >> ab_shift;
    y[J_tampon] = sortie;
    J_tampon++;
    if (--i_reduc == 0) {
    i_reduc = reduc;
    buf[compteur_buf][indice_buf] = sortie+offset;
    indice_buf++;
    if (indice_buf == BUF_SIZE) {
       indice_buf = 0;
       compteur_buf = (compteur_buf+1)&NBUF_MASK;
     }
    }
}
                 

La fonction suivante est une réalisation directe de type II, toujours avec un accumulateur 16 bits. Un seul tampon circulaire 16 bits est nécessaire ww. Ce tampon contient des valeurs dépassant largement la gamme [-128,127] du signal, d'où la nécessité d'utiliser 16 bits et non pas 8.

void adc_filtrage_16bits_FD2() {
    int16_t accum;
    int16_t sortie;
    uint8_t i,k;
    accum = (ADCH-offset)*a[0];
    i = J_tampon-1;
    for (k=1; k<na; k++) {
      accum -= ww[i]*a[k];
      i--; 
    }
    ww[J_tampon] = accum >> ab_shift;
    i = J_tampon;
    accum = 0;
    for (k=0; k<nb; k++) {
      accum += ww[i]*b[k];
      i--;  
    }
    sortie = accum >> ab_shift;
    J_tampon++;
    if (--i_reduc == 0) {
    i_reduc = reduc;
    buf[compteur_buf][indice_buf] = sortie+offset;
    indice_buf++;
    if (indice_buf == BUF_SIZE) {
       indice_buf = 0;
       compteur_buf = (compteur_buf+1)&NBUF_MASK;
     }
    } 
}
                 

La fonction suivante implémente la réalisation directe transposée de type II, toujours en 16 bits. Cette réalisation utilise des variables d'état 16 bits (tableau v). L'implémentation est plus simple que pour les deux cas précédents car il n'y a pas de tampon circulaire. Les coefficients ak sont en même nombre que les coefficients ak pour simplifier l'implémentation, ce qui signifie que les coefficients non utilisés doivent être nuls. Pour une implémentation plus efficace, il faudra éviter de multiplier par ces valeurs nuls, ce qui nécessite d'écrire plusieurs boucles.

void adc_filtrage_16bits_FDT2() {
    int16_t entree;
    int16_t sortie;
    uint8_t k,i;
    i = 0;
    entree = ADCH-offset;
    sortie = (b[0]*entree+v[0]) >> ab_shift;
    for (k=1; k<max_na_nb; k++) {
      v[i] = entree*b[k] - sortie*a[k] + v[i+1];
      i++;  
    }
    if (--i_reduc == 0) {
    i_reduc = reduc;
    buf[compteur_buf][indice_buf] = sortie+offset;
    indice_buf++;
    if (indice_buf == BUF_SIZE) {
       indice_buf = 0;
       compteur_buf = (compteur_buf+1)&NBUF_MASK;
     }
    }
}
                   

2.e. Filtrage 32 bits

Le filtrage en 32 bits est moins rapide que le filtrage 16 bits, mais il permet d'encoder les coefficients du filtre avec plus de bits, ce qui réduit les erreurs d'arrondis. Pour la réalisation directe de type II, le filtrage en 32 bits s'impose pour éviter les débordements de l'accumulateur.

Voici la réalisation directe de type I. La différence avec le cas 16 bits est l'utilisation de coefficients et d'un accumulateur 32 bits au lieu de 16 bits. Les tampons sont toujours en 8 bits, car la numérisation de fait en 8 bits. Pour une numérisation en plus que 8 bits, il faudra définir des tampons 16 bits. Dans cette réalisation, il ne se produit pas de débordement et on peut donc choisir 32-8=24 bits pour les coefficients, ce qui permet de réaliser des filtrages très fins.

void adc_filtrage_32bits_FD1() {
    int32_t accum;
    int16_t sortie;
    uint8_t i,k;
    x[J_tampon] = ADCH-offset;
    accum = 0;
    i = J_tampon;
    for (k=0; k<nb; k++) {
      accum += x[i]*bb[k];
      i--; 
    }
    i = J_tampon-1;
    for (k=1; k<na; k++) {
      accum -= y[i]*aa[k];
      i--;  
    }
    sortie = accum >> ab_shift;
    y[J_tampon] = sortie;
    J_tampon++;
    if (--i_reduc == 0) {
    i_reduc = reduc;
    buf[compteur_buf][indice_buf] = sortie+offset;
    indice_buf++;
    if (indice_buf == BUF_SIZE) {
       indice_buf = 0;
       compteur_buf = (compteur_buf+1)&NBUF_MASK;
     }
     }
}
                         

Voici la réalisation directe de type II. Elle est soumise au débordement, donc il faut réduire le nombre de bits pour les coefficients, mais la marge est beaucoup plus grande que dans le cas 16 bits. On peut par exemple choisir 20 bits pour les coefficients, ce qui laisse 4 bits de sécurité pour éviter les débordements.

void adc_filtrage_32bits_FD2() {
    int32_t accum;
    int16_t sortie;
    uint8_t i,k;
    accum = (ADCH-offset)*aa[0];
    i = J_tampon-1;
    for (k=1; k<na; k++) {
      accum -= ww[i]*aa[k];
      i--; 
    }
    ww[J_tampon] = accum >> ab_shift;
    i = J_tampon;
    accum = 0;
    for (k=0; k<nb; k++) {
      accum += ww[i]*bb[k];
      i--;  
    }
    J_tampon++;
    sortie = accum >> ab_shift;
    if (--i_reduc == 0) {
    i_reduc = reduc;
    buf[compteur_buf][indice_buf] =  sortie+offset;
    indice_buf++;
    if (indice_buf == BUF_SIZE) {
       indice_buf = 0;
       compteur_buf = (compteur_buf+1)&NBUF_MASK;
     }
     }
}
                            

Voici la réalisation directe transposée de type II. Les variables d'état (tableau v), sont en 32 bits.

void adc_filtrage_32bits_FDT2() {
    int16_t in;
    int16_t out;
    uint8_t k,i;
    i = 0;
    in = ADCH-offset;
    out = (bb[0]*in+vv[0]) >> ab_shift;
    for (k=1; k<max_na_nb; k++) {
      vv[i] = in*bb[k] - out*aa[k] + vv[i+1];
      i++;  
    }
    if (--i_reduc == 0) {
    i_reduc = reduc;
    buf[compteur_buf][indice_buf] = out+offset;
    indice_buf++;
    if (indice_buf == BUF_SIZE) {
       indice_buf = 0;
       compteur_buf = (compteur_buf+1)&NBUF_MASK;
     }
    }
}
                           

2.f. Filtrage en flottants 32 bits

L'implémentation du filtrage avec des flottants 32 bits est la plus facile, car on utilise directement les coefficients sous forme de flottants. L'accumulateur est aussi un flottant. Bien sûr, il n'y a pas de débordement avec des flottants, mais l'exécution est beaucoup moins rapide qu'avec des entiers. En effet, les microprecesseurs des microcontrôleurs ATmega2560 et ATmega328 n'ont pas d'unité de calcul en virgule flottante. Le compilateur convertit les opérations sur les flottants en opérations sur des entiers 8 bits.

Voici la forme directe de type I :

void adc_filtrage_float_FD1() {
    float accum;
    uint16_t sortie;
    uint8_t i,k;
    x[J_tampon] = ADCH-offset;
    accum = 0;
    i = J_tampon;
    for (k=0; k<nb; k++) {
      accum += x[i]*bbb[k];
      i--; 
    }
    i = J_tampon-1;
    for (k=1; k<na; k++) {
      accum -= y[i]*aaa[k];
      i--;  
    }
    sortie = accum;
    y[J_tampon] = sortie;
    J_tampon++;
    if (--i_reduc == 0) {
    i_reduc = reduc;
    buf[compteur_buf][indice_buf] = sortie+offset;
    indice_buf++;
    if (indice_buf == BUF_SIZE) {
       indice_buf = 0;
       compteur_buf = (compteur_buf+1)&NBUF_MASK;
     }
     }
}
                     

La forme directe de type II :

void adc_filtrage_float_FD2() {
    float accum;
    uint8_t i,k;
    accum = (ADCH-offset)*aaa[0];
    i = J_tampon-1;
    for (k=1; k<na; k++) {
      accum -= www[i]*aaa[k];
      i--; 
    }
    www[J_tampon] = accum;
    accum = 0;
    i = J_tampon;
    for (k=0; k<nb; k++) {
      accum += www[i]*bbb[k];
      i--;  
    }
    J_tampon++;
    if (--i_reduc == 0) {
    i_reduc = reduc;
    buf[compteur_buf][indice_buf] = accum+offset;
    indice_buf++;
    if (indice_buf == BUF_SIZE) {
       indice_buf = 0;
       compteur_buf = (compteur_buf+1)&NBUF_MASK;
     }
     }
}
                     

Finalement, la forme directe transposée de type II :

void adc_filtrage_float_FDT2() {
    int16_t entree,sortie;
    uint8_t k;
    uint8_t i;
    i = 0;
    entree = ADCH-offset;
    sortie = bbb[0]*entree+www[0];
    for (k=1; k<max_na_nb; k++) {
      www[i] = bbb[k]*entree - aaa[k]*sortie + www[i+1];
      i++; 
    }
    if (--i_reduc == 0) {
    i_reduc = reduc;
    buf[compteur_buf][indice_buf] = sortie+offset;
    indice_buf++;
    if (indice_buf == BUF_SIZE) {
       indice_buf = 0;
       compteur_buf = (compteur_buf+1)&NBUF_MASK;
     }
    }
    
}
                     

2.g. Communication avec l'ordinateur

Voici tout d'abord la fonction setup, qui initialise la communication série. Le port D3 est configuré en sortie.

void setup() {
  char c;
  Serial.begin(500000);
  Serial.setTimeout(0);
  c = 0;
  Serial.write(c);
  c = 255;
  Serial.write(c);
  c = 0;
  Serial.write(c);
  compteur_buf = compteur_buf_transmis = 0;
  indice_buf = 0;
  DDRD |= 1 << PORTD3; // PD3 en sortie
  flag = 0;
}
                        

La fonction suivante permet de configurer et de déclencher une acquisition sans filtrage. Les informations envoyées par l'ordinateur sont :

  • Le code du multiplexeur (8 bits).
  • Le code du facteur diviseur de l'horloge de l'ADC (8 bits).
  • La période d'échantillonnage en microsecondes (32 bits).
  • Le nombre de blocs de 256 échantillons à transmettre (32 bits). Si cette valeur est nulle, la transmission se fait sans fin.
  • Le facteur de réduction d'échantillonnage pour les données à transmettre (8 bits).

L'appel cli() désactive les interruptions, l'appel sei() active les interruptions.

void lecture_acquisition() {
  uint32_t c1,c2,c3,c4;
  uint8_t multiplex,prescaler;
  uint32_t period;
  while (Serial.available()<2) {};
  multiplex = Serial.read();
  prescaler = Serial.read();
  while (Serial.available()<4) {};
  c1 = Serial.read();
  c2 = Serial.read();
  c3 = Serial.read();
  c4 = Serial.read();
  period = ((c1 << 24) | (c2 << 16) | (c3 << 8) | c4);
  while (Serial.available()<4) {};
  c1 = Serial.read();
  c2 = Serial.read();
  c3 = Serial.read();
  c4 = Serial.read();
  nblocs = ((c1 << 24) | (c2 << 16) | (c3 << 8) | c4);
  while (Serial.available()<1);
  reduc = Serial.read();
  i_reduc = reduc;
  if (nblocs==0) {
        sans_fin = 1;
        nblocs = 1;
   }
   else sans_fin = 0;
   compteur_buf=compteur_buf_transmis=0;
   indice_buf = 0;
  cli();
  timer1_init(period);
  adc_init(multiplex,prescaler,adc);
  sei();
}
                          

La fonction suivante configure et déclenche l'acquisition avec un filtrage en 16 bits. En plus des paramètres précédents, l'ordinateur doit fournir les suivants :

  • Le nombre de coefficients ak du filtre (8 bits).
  • Les coefficients ak (16 bits).
  • Le nombre de coefficients bk du filtre (8 bits).
  • Les coefficients bk (16 bits).
  • Le décalage s, c'est-à-dire l'exposant du facteur multiplicatif 2s appliqué aux coefficients (8 bits).
  • Le décalage (8 bits) à appliquer aux valeurs du signal avant le filtrage, correspondant au niveau 0 du signal.
  • Le type de filtrage (8 bits). 1 = forme directe I, 2 = forme directe II, 3 = forme directe transposée II.
void lecture_filtrage_16bits() {
  uint32_t c1,c2,c3,c4;
  uint8_t multiplex,prescaler;
  uint32_t period;
  uint8_t type;
  while (Serial.available()<2) {};
  multiplex = Serial.read();
  prescaler = Serial.read();
  while (Serial.available()<4) {};
  c1 = Serial.read();
  c2 = Serial.read();
  c3 = Serial.read();
  c4 = Serial.read();
  period = ((c1 << 24) | (c2 << 16) | (c3 << 8) | c4);
  while (Serial.available()<4) {};
  c1 = Serial.read();
  c2 = Serial.read();
  c3 = Serial.read();
  c4 = Serial.read();
  nblocs = ((c1 << 24) | (c2 << 16) | (c3 << 8) | c4);
  while (Serial.available()<1);
  reduc = Serial.read();
  i_reduc = reduc;
   // filtrage
   while (Serial.available()<1) {};
   na = Serial.read();
   for (int k=0; k<na; k++) {
       while (Serial.available()<2) {};
       c1 = Serial.read();
       c2 = Serial.read();
       a[k] = ((c1<<8)|c2);
   }
   while (Serial.available()<1) {};
   nb = Serial.read();
   for (int k=0; k<nb; k++) {
       while (Serial.available()<2) {};
       c1 = Serial.read();
       c2 = Serial.read();
       b[k] = ((c1<<8)|c2);
   }
   while (Serial.available()<1) {};
   ab_shift = Serial.read();
   while (Serial.available()<1) {};
   offset = Serial.read();
   while (Serial.available()<1) {};
   type = Serial.read();
   
   if (nblocs==0) {
        sans_fin = 1;
        nblocs = 1;
   }
   else sans_fin = 0;
   compteur_buf=compteur_buf_transmis=0;
   indice_buf = 0;
   for (int i=0; i<= TAILLE_TAMPON-1; i++) {
     x[i] = 0;
     y[i] = 0;
     ww[i] = 0;
   }
  J_tampon = 0;
  cli();
  timer1_init(period);
  if (type==1) adc_init(multiplex,prescaler, adc_filtrage_16bits_FD1);
  else if (type==2) adc_init(multiplex,prescaler, adc_filtrage_16bits_FD2);
  else if (type==3) {
    max_na_nb = max(na,nb);
    int k;
    for (k=na; k<max_na_nb; k++) a[k] = 0;
    for (k=nb; k<max_na_nb; k++) b[k] = 0;
    adc_init(multiplex,prescaler, adc_filtrage_16bits_FDT2);
  }
  sei();
}
                           

La fonction suivante fait la même chose pour un filtrage en 32 bits. La seule différence est la transmission de coefficients en 32 bits.

void lecture_filtrage_32bits() {
  uint32_t c1,c2,c3,c4;
  uint8_t multiplex,prescaler;
  uint32_t period;
  uint8_t type;
  while (Serial.available()<2) {};
  multiplex = Serial.read();
  prescaler = Serial.read();
  while (Serial.available()<4) {};
  c1 = Serial.read();
  c2 = Serial.read();
  c3 = Serial.read();
  c4 = Serial.read();
  period = ((c1 << 24) | (c2 << 16) | (c3 << 8) | c4);
  while (Serial.available()<4) {};
  c1 = Serial.read();
  c2 = Serial.read();
  c3 = Serial.read();
  c4 = Serial.read();
  nblocs = ((c1 << 24) | (c2 << 16) | (c3 << 8) | c4);
  while (Serial.available()<1);
  reduc = Serial.read();
  i_reduc = reduc;
   // filtrage
   while (Serial.available()<1) {};
   na = Serial.read();
   for (int k=0; k<na; k++) {
       while (Serial.available()<4) {};
       c1 = Serial.read();
       c2 = Serial.read();
       c3 = Serial.read();
       c4 = Serial.read();
       aa[k] = ((c1 << 24) | (c2 << 16) | (c3 << 8) | c4);
   }
   while (Serial.available()<1) {};
   nb = Serial.read();
   for (int k=0; k<nb; k++) {
       while (Serial.available()<4) {};
       c1 = Serial.read();
       c2 = Serial.read();
       c3 = Serial.read();
       c4 = Serial.read();
       bb[k] = ((c1 << 24) | (c2 << 16) | (c3 << 8) | c4);
   }
   while (Serial.available()<1) {};
   ab_shift = Serial.read();
   while (Serial.available()<1) {};
   offset = Serial.read();
   while (Serial.available()<1) {};
   type = Serial.read();
   
   if (nblocs==0) {
        sans_fin = 1;
        nblocs = 1;
       
   }
   else sans_fin = 0;
   compteur_buf=compteur_buf_transmis=0;
   indice_buf = 0;
   for (int i=0; i<= TAILLE_TAMPON-1; i++) {
     x[i] = 0;
     y[i] = 0;
     ww[i] = 0;
   }
  J_tampon = 0;
  cli();
  timer1_init(period);
  if (type==1) adc_init(multiplex,prescaler, adc_filtrage_32bits_FD1);
  else if (type==2) adc_init(multiplex,prescaler, adc_filtrage_32bits_FD2);
  else if (type==3) {
    max_na_nb = max(na,nb);
    int k;
    for (k=na; k<max_na_nb; k++) aa[k] = 0;
    for (k=nb; k<max_na_nb; k++) bb[k] = 0;
    adc_init(multiplex,prescaler, adc_filtrage_32bits_FDT2);
  }
  sei();
}

                            

La fonction suivante configure et déclenche le filtrage en flottants. Les coefficients flottants sont transmis sous forme d'une mantisse 32 bits et d'un exposant 8 bits.

void lecture_filtrage_float() {
  uint32_t c1,c2,c3,c4;
  int32_t m;
  int8_t e;
  uint8_t multiplex,prescaler;
  uint32_t period;
  uint8_t type;
  while (Serial.available()<2) {};
  multiplex = Serial.read();
  prescaler = Serial.read();
  while (Serial.available()<4) {};
  c1 = Serial.read();
  c2 = Serial.read();
  c3 = Serial.read();
  c4 = Serial.read();
  period = ((c1 << 24) | (c2 << 16) | (c3 << 8) | c4);
  while (Serial.available()<4) {};
  c1 = Serial.read();
  c2 = Serial.read();
  c3 = Serial.read();
  c4 = Serial.read();
  nblocs = ((c1 << 24) | (c2 << 16) | (c3 << 8) | c4);
  while (Serial.available()<1);
  reduc = Serial.read();
  i_reduc = reduc;
   // filtrage
   while (Serial.available()<1) {};
   na = Serial.read();
   for (int k=0; k<na; k++) {
       while (Serial.available()<5) {};
       c1 = Serial.read();
       c2 = Serial.read();
       c3 = Serial.read();
       c4 = Serial.read();
       e = Serial.read();
       m = ((c1 << 24) | (c2 << 16) | (c3 << 8) | c4);
       aaa[k] = ((float)m)*pow(2,e-30);
   }
   while (Serial.available()<1) {};
   nb = Serial.read();
   for (int k=0; k<nb; k++) {
       while (Serial.available()<5) {};
       c1 = Serial.read();
       c2 = Serial.read();
       c3 = Serial.read();
       c4 = Serial.read();
       e = Serial.read();
       m = ((c1 << 24) | (c2 << 16) | (c3 << 8) | c4);
       bbb[k] = ((float)m)*pow(2,e-30);
   }
   while (Serial.available()<1) {};
   offset = Serial.read();
   while (Serial.available()<1) {};
   type = Serial.read();
   
   if (nblocs==0) {
        sans_fin = 1;
        nblocs = 1;
   }
   else sans_fin = 0;
   compteur_buf=compteur_buf_transmis=0;
   indice_buf = 0;
   for (int i=0; i<= TAILLE_TAMPON-1; i++) {
     x[i] = 0;
     y[i] = 0;
     www[i] = 0.0;  
   }
  J_tampon = 0;
  cli();
  timer1_init(period);
  if (type==1) adc_init(multiplex,prescaler, adc_filtrage_float_FD1);
  else if (type==2) adc_init(multiplex,prescaler, adc_filtrage_float_FD2);
  else if (type==3) {
    max_na_nb = max(na,nb);
    int k;
    for (k=na; k<max_na_nb; k++) aaa[k] = 0.0;
    for (k=nb; k<max_na_nb; k++) bbb[k] = 0.0;
    adc_init(multiplex,prescaler, adc_filtrage_float_FDT2);
  }
  sei();
}
                             

La fonction suivante lit le port série pour détecter la présence d'un caractère de commande (les codes sont définis en entête). La fonction de configuration correspondante est appelée.

void lecture_serie() {
   char com;
   if (Serial.available()>0) {
        com = Serial.read();
        if (com==SET_ACQUISITION) lecture_acquisition();
        if (com==STOP_ACQUISITION) stop_acquisition();
        if (com==SET_FILTRAGE_16BITS) lecture_filtrage_16bits();
        if (com==SET_FILTRAGE_32BITS) lecture_filtrage_32bits();
        if (com==SET_FILTRAGE_FLOAT) lecture_filtrage_float();
   }
}
                              

La fonction loop transmet un bloc d'échantillons à l'ordinateur, lorsqu'il y a en un disponible, ou lit le port série. La taille des blocs est définie en entête (BUF_SIZE).

void loop() {
  if ((compteur_buf_transmis!=compteur_buf)&&(nblocs>0)) {
      Serial.write((uint8_t *)buf[compteur_buf_transmis],BUF_SIZE);
      compteur_buf_transmis=(compteur_buf_transmis+1)&NBUF_MASK;
      if (sans_fin==0) {
            nblocs--;
            if (nblocs==0) stop_acquisition();
      }
  }
  else lecture_serie();
}                         
                               

3. Programme python

3.a. Communication avec l'arduino

La classe suivante, similaire à celle définie dans Conversion analogique-numérique rapide avec échantillonnage permet de communiquer avec l'arduino pour configurer et déclencher les différents types de filtrage.

Les arguments des fonction de configuration du filtrage sont :

  • multiplex : le code du multiplexeur, par exemple 0 pour l'entrée A0.
  • fechant : la fréquence d'échantillonnage en Hz.
  • nblocs : le nombre de blocs à transmettre, égal à 0 pour une acquisition sans fin.
  • prescaler : le facteur diviseur de l'horloge de l'ADC.
  • reduc : le facteur de réduction de l'échantillonnage pour les données transmises.
  • a : la liste des coefficients ak du filtre.
  • b : la liste des coefficients bk du filtre.
  • offset : le décalage correspondant au niveau 0 du signal, par exemple 128 pour un niveau 0 de 2,5 volts.
  • typ : le type de réalisation 1 (forme I) ,2 (forme II) ou 3 (forme transposée II).
  • gbits : le nombre de bits de sécurité pour prévenir le débordement, dans le cas du filtrae 16 bits et 32 bits. Pour la forme directe de type I, on peut choisir ce nombre nul.
arduinoFiltrageRapide.py
# -*- coding: utf-8 -*-
import serial
import numpy
import math
import time
from matplotlib.pyplot import *
import numpy.fft
import threading
import scipy.signal

class Arduino():
    def __init__(self,port):
        self.ser = serial.Serial(port,baudrate=500000)
        c_recu = self.ser.read(1)
        while ord(c_recu)!=0:
            c_recu = self.ser.read(1)
        c_recu = self.ser.read(1)
        while ord(c_recu)!=255:
            c_recu = self.ser.read(1)
        c_recu = self.ser.read(1)
        while ord(c_recu)!=0:
            c_recu = self.ser.read(1)
        self.SET_ACQUISITION = 100
        self.STOP_ACQUISITION = 101
        self.SET_FILTRAGE_16BITS = 102
        self.SET_FILTRAGE_32BITS = 103
        self.SET_FILTRAGE_FLOAT = 104
        self.TAILLE_BLOC = 256
        self.MAX_NCOEF = 50

    def close(self):
        self.ser.close()

    def write_int8(self,v):
        v = numpy.int8(v)
        self.ser.write(chr(v & 0xFF))


    def write_int16(self,v):
        v = numpy.int16(v)
        char1 = (v & 0xFF00) >> 8
        char2 = (v & 0x00FF)
        self.ser.write(chr(char1))
        self.ser.write(chr(char2))
        
    def write_int32(self,v):
        v = numpy.int32(v)
        char1 = (v & 0xFF000000) >> 24
        char2 = (v & 0x00FF0000) >> 16
        char3 = (v & 0x0000FF00) >> 8
        char4 = (v & 0x000000FF)
        self.ser.write(chr(char1))
        self.ser.write(chr(char2))
        self.ser.write(chr(char3))
        self.ser.write(chr(char4))

    def write_float(self,v):
        if v!=0.0:
            e = math.floor(math.log(abs(v)/math.log(2)))
        else:
            e = 0
        m = numpy.int32(v*2**(30-e))
        self.write_int32(m)
        self.write_int8(e)


    def lancer_acquisition(self,multiplex,fechant,nblocs,prescaler,reduc):
        period = int(1e6*1.0/fechant)
        self.nblocs = nblocs
        self.ser.write(chr(self.SET_ACQUISITION))
        self.ser.write(chr(multiplex))
        self.ser.write(chr(prescaler))
        self.write_int32(period)
        self.write_int32(nblocs)
        self.ser.write(chr(reduc))

    def lancer_filtrage_16bits(self,multiplex,fechant,nblocs,prescaler,reduc,a,b,offset,typ,gbits):
        a = numpy.array(a)
        b = numpy.array(b)
        na = len(a)
        nb = len(b)
        if na > self.MAX_NCOEF or nb > self.MAX_NCOEF:
            raise Exception("trop de coefficients de filtrage")
        mb = numpy.max(numpy.absolute(b))
        ma = numpy.max(numpy.absolute(a))
        m = max(ma,mb)
        P = 16+1-8-gbits # nombre de bits des coefficients
        s = numpy.floor(P-1-numpy.log(m)/numpy.log(2))
        a_int16 = numpy.array(a*2**s,dtype=numpy.int16)
        b_int16 = numpy.array(b*2**s,dtype=numpy.int16)
        (zeros,poles,gain)=scipy.signal.tf2zpk(b_int16,a_int16)
        for p in poles:
            if numpy.absolute(p) >= 1:
                print "Filtre instable"
        print(b_int16)
        print(a_int16)
        period = int(1e6*1.0/fechant)
        self.nblocs = nblocs
        self.ser.write(chr(self.SET_FILTRAGE_16BITS))
        self.ser.write(chr(multiplex))
        self.ser.write(chr(prescaler))
        self.write_int32(period)
        self.write_int32(nblocs)
        self.ser.write(chr(reduc))
        self.ser.write(chr(na))
        for k in range(na):
            self.write_int16(a_int16[k])
        self.ser.write(chr(nb))
        for k in range(nb):
            self.write_int16(b_int16[k])
        self.ser.write(chr(int(s)))
        self.ser.write(chr(offset))
        self.ser.write(chr(typ))

    def lancer_filtrage_32bits(self,multiplex,fechant,nblocs,prescaler,reduc,a,b,offset,typ,gbits):
        a = numpy.array(a)
        b = numpy.array(b)
        na = len(a)
        nb = len(b)
        if na > self.MAX_NCOEF or nb > self.MAX_NCOEF:
            raise Exception("trop de coefficients de filtrage")
        mb = numpy.max(numpy.absolute(b))
        ma = numpy.max(numpy.absolute(a))
        m = max(ma,mb)
        P = 32+1-8-gbits # nombre de bits des coefficients
        s = numpy.floor(P-1-numpy.log(m)/numpy.log(2))
        a_int32 = numpy.array(a*2**s,dtype=numpy.int32)
        b_int32 = numpy.array(b*2**s,dtype=numpy.int32)
        (zeros,poles,gain)=scipy.signal.tf2zpk(b_int32,a_int32)
        for p in poles:
            if numpy.absolute(p) >= 1:
                print "Filtre instable"
        print(b_int32)
        print(a_int32)
        period = int(1e6*1.0/fechant)
        self.nblocs = nblocs
        self.ser.write(chr(self.SET_FILTRAGE_32BITS))
        self.ser.write(chr(multiplex))
        self.ser.write(chr(prescaler))
        self.write_int32(period)
        self.write_int32(nblocs)
        self.ser.write(chr(reduc))
        self.ser.write(chr(na))
        for k in range(na):
            self.write_int32(a_int32[k])
        self.ser.write(chr(nb))
        for k in range(nb):
            self.write_int32(b_int32[k])
        self.ser.write(chr(int(s)))
        self.ser.write(chr(offset))
        self.ser.write(chr(typ))


    def lancer_filtrage_float(self,multiplex,fechant,nblocs,prescaler,reduc,a,b,offset,typ):
        a = numpy.array(a)
        b = numpy.array(b)
        na = len(a)
        nb = len(b)
        if na > self.MAX_NCOEF or nb > self.MAX_NCOEF:
            raise Exception("trop de coefficients de filtrage")
        (zeros,poles,gain)=scipy.signal.tf2zpk(b,a)
        for p in poles:
            if numpy.absolute(p) >= 1:
                print "Filtre instable"
        print(b)
        print(a)
        period = int(1e6*1.0/fechant)
        self.nblocs = nblocs
        self.ser.write(chr(self.SET_FILTRAGE_FLOAT))
        self.ser.write(chr(multiplex))
        self.ser.write(chr(prescaler))
        self.write_int32(period)
        self.write_int32(nblocs)
        self.ser.write(chr(reduc))
        self.ser.write(chr(na))
        for k in range(na):
            self.write_float(a[k])
        self.ser.write(chr(nb))
        for k in range(nb):
            self.write_float(b[k])
        self.ser.write(chr(offset))
        self.ser.write(chr(typ))


    def stopper_acquisition(self):
        self.ser.write(chr(self.STOP_ACQUISITION))

    def lecture(self):
        buf = self.ser.read(self.TAILLE_BLOC)
        buf = map(ord,buf)
        return numpy.array(buf,dtype=numpy.float32)

                 

3.b. Exemple

La fonction suivante montre un exemple d'utilisation, avec un filtre RIF passe-bas à 11 coefficients. Elle fait l'acquisition de 5 blocs de 256 échantillons à la fréquence d'échantillonnage de 1 kHz, et tracer le signal et son spectre. L'analyse spectrale est un bon moyen de vérifier le bon déroulement de l'échantillonnage.

def test():
    ard = Arduino(4) # port COM5
    nblocs = 5
    fechant = 1000.0
    prescaler = 4
    reduc = 5
    N = ard.TAILLE_BLOC
    ne = N*nblocs
    x = numpy.zeros(ne,dtype=numpy.float32)
    t = numpy.arange(ne,dtype=numpy.float32)*1.0/(fechant/reduc)
    a = [1.0]
    b = scipy.signal.firwin(numtaps=11,cutoff=[0.1],window='hann',nyq=0.5) 
    offset = 128
    typ = 1
    gbits = 0
    ard.lancer_filtrage_16bits(0,fechant,nblocs,prescaler,reduc,a,b,offset,typ,gbits)
    i = 0
    for b in range(nblocs):
        data = ard.lecture()
        x[i:i+N] = data
        i += N
    time.sleep(1)
    ard.close()
    figure()
    plot(t,x)
    xlabel("t (s)")
    axis([0,t[-1],0,256])
    grid()
    figure()
    spectre = numpy.absolute(numpy.fft.fft(x))/(ne)
    f = numpy.arange(ne,dtype=numpy.float32)*(fechant/reduc/ne)
    plot(f,spectre)
    xlabel("f (Hz)")
    grid()
    show()
               

3.c. Acquisition dans un thread

La classe suivante, similaire à celle décrite dans Conversion analogique-numérique rapide avec échantillonnage, permet de lire les blocs de signal envoyé par l'arduino dans un thread secondaire. Les blocs sont regroupés en paquets.

class AcquisitionThread(threading.Thread): 
    def __init__(self,arduino,multiplex,fechant,nblocs,prescaler,reduc,a,b,offset,typ,nbits,gbits):
        threading.Thread.__init__(self)
        self.arduino = arduino
        self.multiplex = multiplex
        self.fechant = fechant
        self.a = a
        self.b = b
        self.offset = offset
        self.nbits = nbits
        self.gbits = gbits
        self.typ = typ
        self.prescaler = prescaler
        self.reduc = reduc
        self.running = False
        self.nblocs = nblocs # nombre de blocs dans un paquet
        self.npaquets = 8 # 1 paquet = nblocs*arduino.TAILLE_BLOC
        self.taille_bloc = arduino.TAILLE_BLOC
        self.buf = numpy.zeros((self.npaquets,self.nblocs*arduino.TAILLE_BLOC))
        self.compteur_paquets = 0
        self.compteur_paquets_lus = 0
            
    def run(self):
        if self.nbits==16:
            self.arduino.lancer_filtrage_16bits(self.multiplex,self.fechant,0,self.prescaler,self.reduc,self.a,self.b,self.offset,self.typ,self.gbits) # acquisition sans fin
        elif self.nbits==32:
            self.arduino.lancer_filtrage_32bits(self.multiplex,self.fechant,0,self.prescaler,self.reduc,self.a,self.b,self.offset,self.typ,self.gbits)
        elif self.nbits == "float":
            self.arduino.lancer_filtrage_float(self.multiplex,self.fechant,0,self.prescaler,self.reduc,self.a,self.b,self.offset,self.typ)
        else:
            raise Exception("16 ou 32 bits")
        self.running = True
        indice_bloc = 0
        while self.running:
            i = indice_bloc*self.taille_bloc
            j = i+self.taille_bloc
            self.buf[self.compteur_paquets,i:j] = self.arduino.lecture()
            indice_bloc += 1
            if indice_bloc==self.nblocs:
                indice_bloc = 0
                self.compteur_paquets += 1
                if self.compteur_paquets==self.npaquets:
                    self.compteur_paquets = 0
            
    def stop(self):
        self.running = False
        self.join()
        self.arduino.stopper_acquisition()
            
    def paquet(self):
        if self.compteur_paquets==self.compteur_paquets_lus:
            return None
        P = self.buf[self.compteur_paquets_lus,:]
        self.compteur_paquets_lus += 1
        if self.compteur_paquets_lus==self.npaquets:
            self.compteur_paquets_lus = 0
        return P
                  

4. Test des filtres

4.a. Programme de test

Voici le programme de test.

testAcquisitionAnimate.py
# -*- coding: utf-8 -*-
import numpy
from matplotlib.pyplot import *
import matplotlib.animation as animation
from arduinoFiltrageRapide import *
import scipy.signal

ard = Arduino(4) # numéro du port COM (4 pour COM5)
fechant = 1000
reduc = 1
n = 2
nechant = ard.TAILLE_BLOC*n
delai = nechant*1.0/(fechant/reduc)
t = numpy.arange(nechant)*1.0/(fechant/reduc)
fig,ax = subplots()
line0, = ax.plot(t,numpy.zeros(nechant))
ax.axis([0,t.max(),0,256])
ax.grid()

#Définition du filtre
#a = [1.0]
#b = scipy.signal.firwin(numtaps=21,cutoff=[0.1],window='hann',nyq=0.5)
b,a = scipy.signal.iirfilter(N=2,Wn=[0.1*2],btype="lowpass",ftype="butter")
print(b)
print(a)

w,h=scipy.signal.freqz(b,a)

figure()
subplot(211)
plot(w/(2*numpy.pi),20*numpy.log10(numpy.absolute(h)))
xlabel("f/fe")
ylabel("GdB")
grid()
subplot(212)
plot(w/(2*numpy.pi),numpy.unwrap(numpy.angle(h)))
xlabel("f/fe")
ylabel("phase")
grid()

multiplex=0
prescaler = 4
offset = 128
typ = 2
nbits = 32
gbits = 6 # bits de sécurité

acquisition = AcquisitionThread(ard,multiplex,fechant,n,prescaler,reduc,a,b,offset,typ,nbits,gbits)
acquisition.start()

def animate(i):
    global line0,acquisition
    data = acquisition.paquet()
    if data!=None:
        line0.set_ydata(data)

ani = animation.FuncAnimation(fig,animate,100,interval=delai*1000)
show()
acquisition.stop()
ard.close()
                

Avant de lancer l'acquisition, le programme trace la réponse fréquentielle du filtre. Sur l'exemple ci-dessus, il s'agit d'un filtre récursif bi-quadratique. Pendant l'acquisition, le signal est tracé au moyen d'une animation. Les valeurs tracés sont des entiers de 0 à 255, qu'on pourra facilement convertir en valeurs de tension.

Creative Commons LicenseTextes et figures sont mis à disposition sous contrat Creative Commons.