ESP32とガベージコレクション
次のPythonプログラムはESP32開発ボード(ESP32-C3M)に接続されたAHT21で5分毎に温度と湿度を測定して、IoTクラウドのThingSpeakにアップロードしてグラフを描くプログラムである。
プログラムを動かすと4回目のアップロードで次のエラーが出た。
-10368, 'MBEDTLS_ERR_X509_ALLOC_FAILED'
これはMbed TLSライブラリにおいて、メモリの割り当てに失敗したことを示すエラーであり、必要なメモリを確保できなかった場合に返される。
# AHT21 and ThingSpeak
# Mar. 7th 2025
import requests
from machine import Pin, I2C
import network
import time
from aht21 import AHT21
import json
import gc
DEBUG = True
WIFI_SSID = 'SSID'
WIFI_PASS = 'パスワード'
# ThingSpeakのAPIキーとURL
API_KEY = 'APIキー'
THINGSPEAK_URL = 'https://api.thingspeak.com/update.json'
headers = {
'Content-Type' : 'application/json'
}
i2c = I2C(0)
aht21 = AHT21(i2c)
wifi = network.WLAN(network.STA_IF)
if not wifi.isconnected():
print("Connecting to WiFi...")
wifi.active(True)
wifi.connect(WIFI_SSID, WIFI_PASS)
while not wifi.isconnected():
pass
print('IP:', wifi.ifconfig()[0])
gc.enable()
gc.collect()
mem_free = str(gc.mem_free())
print("mem_free:", mem_free)
try:
n = 0
while True:
temperature = '{:.1f}'.format(aht21.temperature)
humidity = '{:.1f}'.format(aht21.relative_humidity)
print('TEMP:',temperature, 'HUM:', humidity)
# アップロードするデータ
data = {
"api_key": API_KEY,
"field1": str(temperature), # フィールド1のデータ
"field2": str(humidity), # フィールド2のデータ
"field3": mem_free
}
# 辞書型からJSONに変換
data = json.dumps(data).encode('utf-8')
if DEBUG == True:
print(data)
# POSTリクエストを送信
response = requests.post(THINGSPEAK_URL, headers=headers, data=data)
print(response.status_code)
# レスポンスの確認
if response.status_code == 200:
print("データが正常にアップロードされました。")
else:
print("データのアップロードに失敗しました:", response.status_code)
time.sleep(60 * 5) # 5分に1回測定、アップロードするためにお休み
# メモリ解放
# gc.collect()
n += 1
mem_free = str(gc.mem_free())
print("n:", str(n), "mem_free:", mem_free)
except OSError as e:
print("mem_free:", str(gc.mem_free()))
print('エラーが発生しました。: {}'.format(e))
ちなみに、アップロードするたびに取得しているgc.mem_free()
は次のようになった。
実行直後 mem_free: 162144
n: 1 mem_free: 123136
n: 2 mem_free: 94384
n: 3 mem_free: 60512
4回目のアップロードでメモリ不足のエラーが発生した。
エラー発生時 mem_free: 56496
上のプログラムの79行目にあるgc.collect()
のコメントを外して、アップロードするたびに実行するようにしてみると次のようになり、メモリ不足のエラーが発生しなくなった。
実行直後 91488
n: 1 mem_free: 88256
n: 2 mem_free: 88256
n: 3 mem_free: 88256
n: 4 mem_free: 88256
:
:
n: 9 mem_free: 88256
n: 10 mem_free: 88256
n: 11 mem_free: 88256
:
:
gc.mem_free() の値をThingSpeakで表示させると次のようになり、メモリ解放が行われていることが分かる。
ちなみに、温度、湿度は次のように表示されている。
追記
上のプログラムでgc.collect()を有効にして放置していたところ、32回目で
n: 32 mem_free: 72896
mem_free: 70000
[Errno 12] ENOMEM
のエラーが発生し、プログラムが停止してしまった。
またもやメモリー不足である。
そこでimportしているモジュールから必要なクラスだけをインポートするように、次のようにダイエットし、プログラムをESP32開発ボードにmain.pyのファイル名で転送し、PCを接続せず、ボードだけで動かしててみた。
from requests import post
from machine import Pin, I2C
from network import WLAN
from time import sleep
from aht21 import AHT21
from json import dumps
import gc
9時過ぎから18時過ぎまでのグラフを以下に示す。
メモリ不足のエラーなしに、約9時間、5分に1回、計100回以上アップロードできているのが分かる。
24時間経過後のグラフ。今のところ、問題なく動いている。
ダイエット版のサンプルコード
# AHT21 and ThingSpeak
# Mar. 7th 2025
from requests import post
from machine import Pin, I2C
from network import WLAN
from time import sleep
from aht21 import AHT21
from json import dumps
import gc
DEBUG = True
WIFI_SSID = 'WiFiのSSID'
WIFI_PASS = 'パスワード'
# ThingSpeakのAPIキーとURL
API_KEY = 'APIキー'
THINGSPEAK_URL = 'https://api.thingspeak.com/update.json'
headers = {
'Content-Type' : 'application/json'
}
i2c = I2C(0)
aht21 = AHT21(i2c)
wifi = WLAN(0) # network.STA_IF = 0
if not wifi.isconnected():
print("Connecting to WiFi...")
wifi.active(True)
wifi.connect(WIFI_SSID, WIFI_PASS)
while not wifi.isconnected():
pass
print('IP:', wifi.ifconfig()[0])
gc.enable()
gc.collect()
mem_free = str(gc.mem_free())
print("mem_free:", mem_free)
try:
n = 0
while True:
temperature = '{:.1f}'.format(aht21.temperature)
humidity = '{:.1f}'.format(aht21.relative_humidity)
print('TEMP:',temperature, 'HUM:', humidity)
# アップロードするデータ
data = {
"api_key": API_KEY,
"field1": str(temperature), # フィールド1のデータ
"field2": str(humidity), # フィールド2のデータ
"field3": mem_free
}
# 辞書型からJSONに変換
data = dumps(data).encode('utf-8')
if DEBUG == True:
print(data)
# POSTリクエストを送信
response = post(THINGSPEAK_URL, headers=headers, data=data)
print(response.status_code)
# レスポンスの確認
if response.status_code == 200:
print("データが正常にアップロードされました。")
else:
print("データのアップロードに失敗しました:", response.status_code)
sleep(60 * 5) # 5分に1回測定、アップロードするためにお休み
# メモリ解放
gc.collect()
n += 1
mem_free = str(gc.mem_free())
print("n:", str(n), "mem_free:", mem_free)
except OSError as e:
print("mem_free:", str(gc.mem_free()))
print('エラーが発生しました。: {}'.format(e))
さらに Micropythonのドキュメント に
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
とすれば良いとの記述があったので試してみた。
これは、ヒープサイズの25%に現在使用中のメモリサイズを加算し、この値を超えるとガベージコレクションが自動的に行われるというもの。
ついでに、オンボードLEDを点灯させるようにし、アップロードに失敗した場合はLEDを消灯するコードを追加してみた。
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())を追加したコード
# AHT21 and ThingSpeak
# Mar. 7th 2025
# Last modify: Mar. 10th 2025
from requests import post
from machine import Pin, I2C
from network import WLAN
from time import sleep
from aht21 import AHT21
from json import dumps
import gc
DEBUG = True
ONBOARD_LED = 0
WIFI_SSID = 'WiFiのSSID'
WIFI_PASS = 'WiFiのパスワード'
# ThingSpeakのAPIキーとURL
API_KEY = 'APIキー'
THINGSPEAK_URL = 'https://api.thingspeak.com/update.json'
headers = {
'Content-Type' : 'application/json'
}
i2c = I2C(0)
aht21 = AHT21(i2c)
led = Pin(ONBOARD_LED, Pin.OUT)
# オンボードLEDをn回点滅させる関数
def blink(n):
for i in range(n):
led.on()
sleep(0.1)
led.off()
sleep(0.1)
wifi = WLAN(0) # network.STA_IF = 0
if not wifi.isconnected():
print("Connecting to WiFi...")
wifi.active(True)
wifi.connect(WIFI_SSID, WIFI_PASS)
while not wifi.isconnected():
blink(1)
pass
print('IP:', wifi.ifconfig()[0])
gc.enable()
gc.collect()
mem_free = str(gc.mem_free())
print("mem_free:", mem_free)
try:
n = 0
while True:
temperature = '{:.1f}'.format(aht21.temperature)
humidity = '{:.1f}'.format(aht21.relative_humidity)
print('TEMP:',temperature, 'HUM:', humidity)
# アップロードするデータ
data = {
"api_key": API_KEY,
"field1": str(temperature), # フィールド1のデータ
"field2": str(humidity), # フィールド2のデータ
"field3": mem_free
}
# 辞書型からJSONに変換
data = dumps(data).encode('utf-8')
if DEBUG == True:
print(data)
# POSTリクエストを送信
response = post(THINGSPEAK_URL, headers=headers, data=data)
print(response.status_code)
# レスポンスの確認
if response.status_code == 200:
print("データが正常にアップロードされました。")
blink(5)
led.on()
else:
print("データのアップロードに失敗しました:", response.status_code)
blink(10)
led.off()
# gc.thresholdを設定
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
sleep(60 * 5) # 5分に1回測定、アップロードするためにお休み
# メモリ解放
gc.collect()
n += 1
mem_free = str(gc.mem_free())
print("n:", str(n), "mem_free:", mem_free)
except OSError as e:
print("mem_free:", str(gc.mem_free()))
print('エラーが発生しました。: {}'.format(e))
上のコードで19時30分過ぎから23時40分過ぎまで約50回の計測結果を下図に示す。
空きメモリが85Kbytesを大きく下回ることはなくなった。
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())追加後のグラフ
Discussion