Pi Pico ADC input using DMA and MicroPython

Analog data capture using DMA

This is the second part of my Web-based Pi Pico oscilloscope project. In the first part I used an Espressif ESP32 to add WiFi connectivity to the Pico, and now I’m writing code to grab analog data from the on-chip Analog-to-Digital Converter (ADC), which can potentially provide up to 500k samples/sec.

High-speed transfers like this normally require code written in C or assembly-language, but I’ve decided to use MicroPython, which is considerably slower, so I need to use hardware acceleration to handle the data rate, specifically Direct Memory Access (DMA).

MicroPython ‘uctypes’

MicroPython does not have built-in functions to support DMA, and doesn’t provide any simple way of accessing the registers that control the ADC, DMA and I/O pins. However it does provide a way of defining these registers, using a new mechanism called ‘uctypes’. This is vaguely similar to ‘ctypes’ in standard Python, which is used to define Python interfaces for ‘foreign’ functions, but defines hardware registers, using a very compact (and somewhat obscure) syntax.

To give a specific example, the DMA controller has multiple channels, and according to the RP2040 datasheet section 2.5.7, each channel has 4 registers, with the following offsets:

0x000 READ_ADDR
0x004 WRITE_ADDR
0x008 TRANS_COUNT
0x00c CTRL_TRIG

The first three of these require simple 32-bit values, but the fourth has a complex bitfield:

Bit 31:   AHB_ERROR
Bit 30:   READ_ERROR
..and so on until..
Bits 3-2: DATA_SIZE
Bit 1:    HIGH_PRIORITY
Bit 0:    EN

With MicroPython uctypes, we can define the registers, and individual bitfields within those registers, e.g.

from uctypes import BF_POS, BF_LEN, UINT32, BFUINT32
DMA_CHAN_REGS = {
    "READ_ADDR_REG":       0x00|UINT32,
    "WRITE_ADDR_REG":      0x04|UINT32,
    "TRANS_COUNT_REG":     0x08|UINT32,
    "CTRL_TRIG_REG":       0x0c|UINT32,
    "CTRL_TRIG":          (0x0c,DMA_CTRL_TRIG_FIELDS)
}
DMA_CTRL_TRIG_FIELDS = {
    "AHB_ERROR":   31<<BF_POS | 1<<BF_LEN | BFUINT32,
    "READ_ERROR":  30<<BF_POS | 1<<BF_LEN | BFUINT32,
..and so on until..
    "DATA_SIZE":    2<<BF_POS | 2<<BF_LEN | BFUINT32,
    "HIGH_PRIORITY":1<<BF_POS | 1<<BF_LEN | BFUINT32,
    "EN":           0<<BF_POS | 1<<BF_LEN | BFUINT32
}

The UINT32, BF_POS and BF_LEN entries may look strange, but they are just a way of encapsulating the data type, bit position & bit count into a single variable, and once that has been defined, you can easily read or write any element of the bitfield, e.g.

# Set DMA data source to be ADC FIFO
dma_chan.READ_ADDR_REG = ADC_FIFO_ADDR

# Set transfer size as 16-bit words
dma_chan.CTRL_TRIG.DATA_SIZE = 1

You may wonder why there are 2 definitions for one register: CTRL_TRIG and CTRL_TRIG_REG. Although it is useful to be able to manipulate individual bitfields (as in the above code) sometimes you need to write the whole register at one time, for example to clear all fields to zero:

# Clear the CTRL_TRIG register
dma_chan.CTRL_TRIG_REG = 0

An additional complication is that there are 12 DMA channels, so we need to define all 12, then select one of them to work on:

DMA_CHAN_WIDTH  = 0x40
DMA_CHAN_COUNT  = 12
DMA_CHANS = [struct(DMA_BASE + n*DMA_CHAN_WIDTH, DMA_CHAN_REGS)
    for n in range(0,DMA_CHAN_COUNT)]

DMA_CHAN = 0
dma_chan = DMA_CHANS[DMA_CHAN]

To add even more complication, the DMA controller also has a single block of registers that are not channel specific, e.g.

DMA_REGS = {
    "INTR":               0x400|UINT32,
    "INTE0":              0x404|UINT32,
    "INTF0":              0x408|UINT32,
    "INTS0":              0x40c|UINT32,
    "INTE1":              0x414|UINT32,
..and so on until..
    "FIFO_LEVELS":        0x440|UINT32,
    "CHAN_ABORT":         0x444|UINT32
}

So to cancel all DMA transactions on all channels:

DMA_DEVICE = struct(DMA_BASE, DMA_REGS)
dma = DMA_DEVICE
dma.CHAN_ABORT = 0xffff

Single ADC sample

MicroPython has a function for reading the ADC, but we’ll be using DMA to grab multiple samples very quickly, so this function can’t be used; we need to program the hardware from scratch. A useful first step is to check that we can produce sensible values for a single ADC sample. Firstly the I/O pin needs to be set as an analog input, using the uctype definitions. There are 3 analog input channels, numbered from 0 to 2:

import rp_devices as devs
ADC_CHAN = 0
ADC_PIN  = 26 + ADC_CHAN
adc = devs.ADC_DEVICE
pin = devs.GPIO_PINS[ADC_PIN]
pad = devs.PAD_PINS[ADC_PIN]
pin.GPIO_CTRL_REG = devs.GPIO_FUNC_NULL
pad.PAD_REG = 0

Then we clear down the control & status register, and the FIFO control & status register; this is only necessary if they have previously been programmed:

adc.CS_REG = adc.FCS_REG = 0

Then enable the ADC, and select the channel to be converted:

adc.CS.EN = 1
adc.CS.AINSEL = ADC_CHAN

Now trigger the ADC for one capture cycle, and read the result:

adc.CS.START_ONCE = 1
print(adc.RESULT_REG)

These two lines can be repeated to get multiple samples.

If the input pin is floating (not connected to anything) then the value returned is impossible to predict, but generally it seems to be around 50 to 80 units. The important point is that the value fluctuates between samples; if several samples have exactly the same value, then there is a problem.

Multiple ADC samples

Since MicroPython isn’t fast enough to handle the incoming data, I’m using DMA, so that the ADC values are copied directly into memory without any software intervention.

However, we don’t always want the ADC to run at maximum speed (500k samples/sec) so need some way of triggering it to fetch the next sample after a programmable delay. The RP2040 designers have anticipated this requirement, and have equipped it with a programmable timer, driven from a 48 MHz clock. There is also a mechanism that allows the ADC to automatically sample 2 or 3 inputs in turn; refer to the RP2040 datasheet for details.

Assuming the ADC has been set up as described above, the additional code is required. First we define the DMA channel, the number of samples, and the rate (samples per second).

DMA_CHAN = 0
NSAMPLES = 10
RATE = 100000
dma_chan = devs.DMA_CHANS[DMA_CHAN]
dma = devs.DMA_DEVICE

We now have to enable the ADC FIFO, create a 16-bit buffer to hold the samples, and set the sample rate:

adc.FCS.EN = adc.FCS.DREQ_EN = 1
adc_buff = array.array('H', (0 for _ in range(NSAMPLES)))
adc.DIV_REG = (48000000 // RATE - 1) << 8
adc.FCS.THRESH = adc.FCS.OVER = adc.FCS.UNDER = 1

The DMA controller is configured with the source & destination addresses, and sample count:

dma_chan.READ_ADDR_REG = devs.ADC_FIFO_ADDR
dma_chan.WRITE_ADDR_REG = uctypes.addressof(adc_buff)
dma_chan.TRANS_COUNT_REG = NSAMPLES

The DMA destination is set to auto-increment, with a data size of 16 bits; the data request comes from the ADC. Then DMA is enabled, waiting for the first request.

dma_chan.CTRL_TRIG_REG = 0
dma_chan.CTRL_TRIG.CHAIN_TO = DMA_CHAN
dma_chan.CTRL_TRIG.INCR_WRITE = dma_chan.CTRL_TRIG.IRQ_QUIET = 1
dma_chan.CTRL_TRIG.TREQ_SEL = devs.DREQ_ADC
dma_chan.CTRL_TRIG.DATA_SIZE = 1
dma_chan.CTRL_TRIG.EN = 1

Before starting the sampling, it is important to clear down the ADC FIFO, by reading out any existing samples – if this step is omitted, the data you get will be a mix of old & new, which can be very confusing.

while adc.FCS.LEVEL:
    x = adc.FIFO_REG

We can now set the START_MANY bit, and the ADC will start generating samples, which will be loaded into its FIFO, then transferred by DMA to the RAM buffer. Once the buffer is full (i.e. the DMA transfer count has been reached, and its BUSY bit is cleared) the DMA transfers will stop, but the ADC will keep trying to put samples in the FIFO until the START_MANY bit is cleared.

adc.CS.START_MANY = 1
while dma_chan.CTRL_TRIG.BUSY:
    time.sleep_ms(10)
adc.CS.START_MANY = 0
dma_chan.CTRL_TRIG.EN = 0

We can now print the results, converted into a voltage reading:

vals = [("%1.3f" % (val*3.3/4096)) for val in adc_buff]
print(vals)

As with the single-value test, the displayed values should show some dithering; if the input is floating, you might see something like:

['0.045', '0.045', '0.047', '0.046', '0.045', '0.046', '0.045', '0.046', '0.046', '0.041']

Running the code

If you are unfamiliar with the process of loading MicroPython onto the Pico, or loading files into the MicroPython filesystem, I suggest you read my previous post.

The source files are available on Github here; you need to load the library file rp_devices.py onto the MicroPython filesystem, then run rp_adc_test.py; I normally run this using Thonny, as it simplifies the process of editing, running and debugging the code.

In the next part I combine the ADC sampling and the network interface to create a networked oscilloscope with a browser interface.

Copyright (c) Jeremy P Bentham 2021. Please credit this blog if you use the information or software in it.

8 thoughts on “Pi Pico ADC input using DMA and MicroPython”

  1. Very nice article. Thank you for that!
    Would it be a problem to read all 4 channels at the same time and would this have an effect on the maximum speed of 500k samples / sec?

    Like

  2. Hi, Thanks for posting this series of articles. I’ve trying to learn about DMA on the Pi Pico, and your article provided me with the perfect entrance :-)!
    BTW, when I ran the ADC example, I noticed that the first eight samples from the Read Many part are values from before the program was run (I changed the voltage at the pin before running the program, but only the ninth and tenth values corresponding to the current voltage, in spite of reading the FIFO beforehand). Any idea how to really flush the FIFO and get all values correct ?
    Thanks!
    Benjamin

    Like

    1. Unfortunately I haven’t had the time to investigate why this occurs, and just use the obvious workaround of discarding the first 8 samples, which doesn’t really address the problem.

      Like

  3. Thank you for the great article! I am trying to do something very similar in microPython where I have a (working) 8b clocked input PIO that can outrun standard reads from the double RX FIFO. I’m close, I only want a 20 byte burst. I want to try DMA in microPython before I port the entire program over to C, but I am pretty lost how to set up the DMA for this condition. Do you have any suggestion how to set up the registers for the DMA ?

    Like

    1. Unfortunately I don’t have any experience of programming Pico DMA in Python, so can’t offer any help, but have you looked at the Micropython ‘native code’ feature? I haven’t used it, but it supposedly offers a significant speed improvement,

      Like

  4. Hi,
    Thanks a lot for your script. I have found that the script works very well for short recordings (3-4s) but struggles for a large recording . For large recording, we have to record segments of 3-4s and stream them to internal storage or wifi. I have tried both, however, after each 3s bit, the lag introduced by IO (storage writing or wifi transmission) misses a significant 2-3 s of audio. This is slightly better for internal storage but even that storage is also very limited. Is there any workaround? I’m thinking its due to slow python and is going to try C++. Thought worth giving it a try posting here.

    I have tried the following code

    TOTSECS = 12 # TOTAL SECONDS OF RECORDING REQUIRED
    MEMSECS = 3 # SECONDS OF AUDIO WHICH CAN BE HELD IN MEMORY WITHOUT OVERFLOW
    RATE = 8000
    NSAMPLES = RATE*MEMSECS

    for i in range(int(TOTSECS/MEMSECS)): # This part triggers the DMA multiple times for fetch audio
    # for j in range(NSAMPLES):
    # adc_buff[j] = 0

    dma_chan.READ_ADDR_REG = devs.ADC_FIFO_ADDR
    dma_chan.WRITE_ADDR_REG = uctypes.addressof(adc_buff)
    dma_chan.TRANS_COUNT_REG = NSAMPLES

    dma_chan.CTRL_TRIG_REG = 0
    dma_chan.CTRL_TRIG.CHAIN_TO = DMA_CHAN
    dma_chan.CTRL_TRIG.INCR_WRITE = dma_chan.CTRL_TRIG.IRQ_QUIET = 1
    dma_chan.CTRL_TRIG.TREQ_SEL = devs.DREQ_ADC
    dma_chan.CTRL_TRIG.DATA_SIZE = 1
    dma_chan.CTRL_TRIG.EN = 1

    while adc.FCS.LEVEL:
    x = adc.FIFO_REG

    adc.CS.START_MANY = 1
    while dma_chan.CTRL_TRIG.BUSY:
    time.sleep_ms(10)
    adc.CS.START_MANY = 0
    dma_chan.CTRL_TRIG.EN = 0
    # print(adc_buff)

    # SENDS DATA OVER WIFI, THE DATA IS CACHED IN RAM AT THE SERVER
    try:
    res = urequests.post(url=’http://192.168.1.3:5001/login’, data=adc_buff, headers={‘Content-Type’: ‘application/octet-stream’})
    except Exception as e:
    print(e)
    pass

    # FINALLY AT THE END, DATA IS WRITTEN TO DISK IN SERVER IN WAV FILE
    res = urequests.get(url=’http://192.168.1.3:5001/success’)

    Like

    1. Sending the data using HTTP involves a lot of to-and-fro negotiation with the server to create a TCP link, and encapsulate the data, so I’m not surprised this punches a big hole in the data stream. The first thing I’d try is a write-only interface that sends the data as datagrams to a UDP server; if that isn’t reliable enough for your application, you can just send each datagram twice, with duplicate detection.

      If you want to stick with TCP, then it might be worth trying Websockets (or just a simple TCP server) to create a persistent TCP connection, which would avoid some of the negotiation every time a block of data is sent.

      Translating your current code into C may well not fix the problem, since I suspect the CPU time is mainly being absorbed by the lwIP C code, not your Python. I’ll be looking at the whole issue of data throughput as part of my PicoWi project in a few weeks time.

      Like

Leave a comment